Renaming; make JsonConnection work for 401/403 errors
This commit is contained in:
parent
cb8efaac2b
commit
c596af5f8e
7 changed files with 28 additions and 34 deletions
|
|
@ -27,7 +27,7 @@ class Repository(private val sharedPrefs: SharedPreferences, database: Database)
|
|||
|
||||
private val connectionDetails = ConcurrentHashMap<String, ConnectionDetails>()
|
||||
private val connectionDetailsLiveData = MutableLiveData<Map<String, ConnectionDetails>>(connectionDetails)
|
||||
private val reconnectVersions = ConcurrentHashMap<String, Long>()
|
||||
private val connectionForceReconnectVersions = ConcurrentHashMap<String, Long>()
|
||||
|
||||
// TODO Move these into an ApplicationState singleton
|
||||
val detailViewSubscriptionId = AtomicLong(0L) // Omg, what a hack ...
|
||||
|
|
@ -573,13 +573,13 @@ class Repository(private val sharedPrefs: SharedPreferences, database: Database)
|
|||
return connectionDetails[baseUrl]
|
||||
}
|
||||
|
||||
fun getReconnectVersion(baseUrl: String): Long {
|
||||
return reconnectVersions[baseUrl] ?: 0L
|
||||
fun getConnectionForceReconnectVersion(baseUrl: String): Long {
|
||||
return connectionForceReconnectVersions[baseUrl] ?: 0L
|
||||
}
|
||||
|
||||
fun incrementReconnectVersion(baseUrl: String) {
|
||||
reconnectVersions.compute(baseUrl) { _, current -> (current ?: 0L) + 1 }
|
||||
Log.d(TAG, "Reconnect version incremented for $baseUrl: ${reconnectVersions[baseUrl]}")
|
||||
fun incrementConnectionForceReconnectVersion(baseUrl: String) {
|
||||
connectionForceReconnectVersions.compute(baseUrl) { _, current -> (current ?: 0L) + 1 }
|
||||
Log.d(TAG, "Connection force reconnect version incremented for $baseUrl: ${connectionForceReconnectVersions[baseUrl]}")
|
||||
}
|
||||
|
||||
companion object {
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import com.google.gson.Gson
|
|||
import io.heckel.ntfy.db.Notification
|
||||
import io.heckel.ntfy.db.Repository
|
||||
import io.heckel.ntfy.db.User
|
||||
import io.heckel.ntfy.service.NotAuthorizedException
|
||||
import io.heckel.ntfy.util.ALL_PRIORITIES
|
||||
import io.heckel.ntfy.util.HttpUtil
|
||||
import io.heckel.ntfy.util.Log
|
||||
|
|
@ -149,8 +150,13 @@ class ApiService(private val context: Context) {
|
|||
val call = HttpUtil.subscriberClient(context, baseUrl).newCall(request)
|
||||
val response = call.execute()
|
||||
if (!response.isSuccessful) {
|
||||
val code = response.code
|
||||
val message = response.message
|
||||
response.close()
|
||||
throw IOException("Unexpected response ${response.code} when subscribing to $url")
|
||||
if (code == 401 || code == 403) {
|
||||
throw NotAuthorizedException(code, message)
|
||||
}
|
||||
throw IOException("Unexpected response $code when subscribing to $url")
|
||||
}
|
||||
return Pair(call, response.body.source())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ data class ConnectionId(
|
|||
val headersHash: Int, // Hash of sorted headers or 0 if none
|
||||
val trustedCertsHash: Int, // Hash of trusted certificates or 0 if none
|
||||
val clientCertHash: Int, // Hash of client certificate or 0 if none
|
||||
val reconnectVersion: Long // Incremented to force reconnection for this baseUrl
|
||||
val connectionForceReconnectVersion: Long // Incremented to force reconnection for this baseUrl
|
||||
)
|
||||
|
||||
fun isResponseCode(response: okhttp3.Response?, vararg codes: Int): Boolean {
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ class JsonConnection(
|
|||
private val parser = NotificationParser()
|
||||
|
||||
private var since: String? = sinceId
|
||||
private var errorCount = 0
|
||||
private lateinit var call: Call
|
||||
private lateinit var job: Job
|
||||
|
||||
|
|
@ -44,14 +45,15 @@ class JsonConnection(
|
|||
job = scope.launch(Dispatchers.IO) {
|
||||
Log.d(TAG, "[$url] Starting connection for subscriptions: $topicsToSubscriptionIds")
|
||||
|
||||
var retryMillis = 0L
|
||||
while (isActive && serviceActive()) {
|
||||
Log.d(TAG, "[$url] (Re-)starting connection for subscriptions: $topicsToSubscriptionIds")
|
||||
val startTime = System.currentTimeMillis()
|
||||
|
||||
try {
|
||||
val (newCall, source) = api.subscribe(baseUrl, topicsStr, since, user)
|
||||
call = newCall
|
||||
if (errorCount > 0) {
|
||||
errorCount = 0
|
||||
}
|
||||
connectionDetailsListener(subscriptionIds, ConnectionState.CONNECTED, null, 0L)
|
||||
|
||||
// Blocking read loop: reads JSON lines until connection closes or is cancelled
|
||||
|
|
@ -68,8 +70,6 @@ class JsonConnection(
|
|||
}
|
||||
}
|
||||
|
||||
// Clean disconnect - reset backoff
|
||||
retryMillis = 0L
|
||||
Log.d(TAG, "[$url] Connection closed cleanly")
|
||||
} catch (e: Exception) {
|
||||
if (!isActive) {
|
||||
|
|
@ -77,13 +77,13 @@ class JsonConnection(
|
|||
break
|
||||
}
|
||||
Log.d(TAG, "[$url] Connection broken, reconnecting ...")
|
||||
retryMillis = nextRetryMillis(retryMillis, startTime)
|
||||
val nextRetryTime = System.currentTimeMillis() + retryMillis
|
||||
errorCount++
|
||||
val retrySeconds = RETRY_SECONDS.getOrNull(errorCount-1) ?: RETRY_SECONDS.last()
|
||||
val nextRetryTime = System.currentTimeMillis() + (retrySeconds * 1000L)
|
||||
val error = if (isConnectionBrokenException(e)) null else e
|
||||
// FIXME add NotAuthorizedException
|
||||
connectionDetailsListener(subscriptionIds, ConnectionState.CONNECTING, error, nextRetryTime)
|
||||
Log.w(TAG, "[$url] Retrying connection in ${retryMillis / 1000}s ...")
|
||||
delay(retryMillis)
|
||||
Log.w(TAG, "[$url] Retrying connection in ${retrySeconds}s ...")
|
||||
delay(retrySeconds * 1000L)
|
||||
}
|
||||
}
|
||||
Log.d(TAG, "[$url] Connection job SHUT DOWN")
|
||||
|
|
@ -100,20 +100,8 @@ class JsonConnection(
|
|||
if (this::call.isInitialized) call.cancel()
|
||||
}
|
||||
|
||||
private fun nextRetryMillis(retryMillis: Long, startTime: Long): Long {
|
||||
val connectionDurationMillis = System.currentTimeMillis() - startTime
|
||||
if (connectionDurationMillis > RETRY_RESET_AFTER_MILLIS) {
|
||||
return RETRY_STEP_MILLIS
|
||||
} else if (retryMillis + RETRY_STEP_MILLIS >= RETRY_MAX_MILLIS) {
|
||||
return RETRY_MAX_MILLIS
|
||||
}
|
||||
return retryMillis + RETRY_STEP_MILLIS
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val TAG = "NtfyJsonConnection"
|
||||
private const val RETRY_STEP_MILLIS = 5_000L
|
||||
private const val RETRY_MAX_MILLIS = 60_000L
|
||||
private const val RETRY_RESET_AFTER_MILLIS = 60_000L
|
||||
private val RETRY_SECONDS = listOf(5, 10, 15, 20, 30, 45, 60, 120)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -220,7 +220,7 @@ class SubscriberService : Service() {
|
|||
.hashCode()
|
||||
val trustedCertsHash = repository.getTrustedCertificate(baseUrl)?.hashCode() ?: 0
|
||||
val clientCertHash = repository.getClientCertificate(baseUrl)?.hashCode() ?: 0
|
||||
val reconnectVersion = repository.getReconnectVersion(baseUrl)
|
||||
val connectionForceReconnectVersion = repository.getConnectionForceReconnectVersion(baseUrl)
|
||||
ConnectionId(
|
||||
baseUrl = baseUrl,
|
||||
topicsToSubscriptionIds = subs.associate { s -> s.topic to s.id },
|
||||
|
|
@ -229,7 +229,7 @@ class SubscriberService : Service() {
|
|||
headersHash = headersHash,
|
||||
trustedCertsHash = trustedCertsHash,
|
||||
clientCertHash = clientCertHash,
|
||||
reconnectVersion = reconnectVersion
|
||||
connectionForceReconnectVersion = connectionForceReconnectVersion
|
||||
)
|
||||
}
|
||||
.toSet()
|
||||
|
|
|
|||
|
|
@ -183,7 +183,7 @@ class WsConnection(
|
|||
}
|
||||
state = State.Disconnected
|
||||
errorCount++
|
||||
val retrySeconds = RETRY_SECONDS.getOrNull(errorCount) ?: RETRY_SECONDS.last()
|
||||
val retrySeconds = RETRY_SECONDS.getOrNull(errorCount-1) ?: RETRY_SECONDS.last()
|
||||
val nextRetryTime = System.currentTimeMillis() + (retrySeconds * 1000L)
|
||||
|
||||
// Special cases:
|
||||
|
|
|
|||
|
|
@ -75,7 +75,7 @@ class ConnectionErrorFragment : DialogFragment() {
|
|||
when (menuItem.itemId) {
|
||||
R.id.connection_error_dialog_action_retry -> {
|
||||
selectedBaseUrl?.let { baseUrl ->
|
||||
repository.incrementReconnectVersion(baseUrl)
|
||||
repository.incrementConnectionForceReconnectVersion(baseUrl)
|
||||
}
|
||||
SubscriberServiceManager.refresh(requireContext())
|
||||
true
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue