diff --git a/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt b/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt index ccea3e29..2f32a215 100644 --- a/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt +++ b/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt @@ -14,10 +14,9 @@ import io.heckel.ntfy.util.topicUrlAuth import io.heckel.ntfy.util.topicUrlJson import io.heckel.ntfy.util.topicUrlJsonPoll import okhttp3.Call -import okhttp3.Callback import okhttp3.RequestBody import okhttp3.RequestBody.Companion.toRequestBody -import okhttp3.Response +import okio.BufferedSource import java.io.IOException import java.net.URLEncoder import kotlin.random.Random @@ -140,43 +139,20 @@ class ApiService(private val context: Context) { baseUrl: String, topics: String, since: String?, - user: User?, - notify: (topic: String, Notification) -> Unit, - fail: (Exception) -> Unit - ): Call { + user: User? + ): Pair { val sinceVal = since ?: "all" val url = topicUrlJson(baseUrl, topics, sinceVal) Log.d(TAG, "Opening subscription connection to $url") val customHeaders = repository.getCustomHeaders(baseUrl) val request = HttpUtil.requestBuilder(url, user, customHeaders).build() val call = HttpUtil.subscriberClient(context, baseUrl).newCall(request) - call.enqueue(object : Callback { - override fun onResponse(call: Call, response: Response) { - try { - if (!response.isSuccessful) { - throw Exception("Unexpected response ${response.code} when subscribing to topic $url") - } - val source = response.body.source() - while (!source.exhausted()) { - val line = source.readUtf8Line() ?: throw Exception("Unexpected response for $url: line is null") - val notification = - parser.parseWithTopic(line, notificationId = Random.nextInt(), subscriptionId = 0) // subscriptionId to be set downstream - if (notification != null) { - notify(notification.topic, notification.notification) - } - } - } catch (e: Exception) { - Log.e(TAG, "Connection to $url failed (1): ${e.message}", e) - fail(e) - } - } - - override fun onFailure(call: Call, e: IOException) { - Log.e(TAG, "Connection to $url failed (2): ${e.message}", e) - fail(e) - } - }) - return call + val response = call.execute() + if (!response.isSuccessful) { + response.close() + throw IOException("Unexpected response ${response.code} when subscribing to $url") + } + return Pair(call, response.body.source()) } suspend fun checkAuth(baseUrl: String, topic: String, user: User?): Boolean { diff --git a/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt b/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt index 60d33e9e..5e8c0ac8 100644 --- a/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt @@ -1,12 +1,22 @@ package io.heckel.ntfy.service -import io.heckel.ntfy.db.* -import io.heckel.ntfy.util.Log +import io.heckel.ntfy.db.ConnectionState +import io.heckel.ntfy.db.Notification +import io.heckel.ntfy.db.Repository +import io.heckel.ntfy.db.Subscription +import io.heckel.ntfy.db.User import io.heckel.ntfy.msg.ApiService +import io.heckel.ntfy.msg.NotificationParser +import io.heckel.ntfy.util.Log import io.heckel.ntfy.util.topicUrl -import kotlinx.coroutines.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch import okhttp3.Call -import java.util.concurrent.atomic.AtomicBoolean +import kotlin.random.Random class JsonConnection( private val connectionId: ConnectionId, @@ -24,6 +34,7 @@ class JsonConnection( private val subscriptionIds = topicsToSubscriptionIds.values private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",") private val url = topicUrl(baseUrl, topicsStr) + private val parser = NotificationParser() private var since: String? = sinceId private lateinit var call: Call @@ -33,50 +44,47 @@ class JsonConnection( job = scope.launch(Dispatchers.IO) { Log.d(TAG, "[$url] Starting connection for subscriptions: $topicsToSubscriptionIds") - // Retry-loop: if the connection fails, we retry unless the job or service is cancelled/stopped var retryMillis = 0L while (isActive && serviceActive()) { Log.d(TAG, "[$url] (Re-)starting connection for subscriptions: $topicsToSubscriptionIds") val startTime = System.currentTimeMillis() - val notify = notify@ { topic: String, notification: Notification -> - since = notification.id - val subscriptionId = topicsToSubscriptionIds[topic] ?: return@notify - val subscription = repository.getSubscription(subscriptionId) ?: return@notify - val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id) - notificationListener(subscription, notificationWithSubscriptionId) - } - val failed = AtomicBoolean(false) - var lastError: Exception? = null - val fail = { e: Exception -> - failed.set(true) - lastError = e - } - - // Call /json subscribe endpoint and loop until the call fails, is canceled, - // or the job or service are cancelled/stopped + try { - call = api.subscribe(baseUrl, topicsStr, since, user, notify, fail) - while (!failed.get() && !call.isCanceled() && isActive && serviceActive()) { - connectionDetailsListener(subscriptionIds, ConnectionState.CONNECTED, null, 0L) - Log.d(TAG,"[$url] Connection is active (failed=$failed, callCanceled=${call.isCanceled()}, jobActive=$isActive, serviceStarted=${serviceActive()}") - delay(CONNECTION_LOOP_DELAY_MILLIS) // Resumes immediately if job is cancelled + val (newCall, source) = api.subscribe(baseUrl, topicsStr, since, user) + call = newCall + connectionDetailsListener(subscriptionIds, ConnectionState.CONNECTED, null, 0L) + + // Blocking read loop: reads JSON lines until connection closes or is cancelled + while (isActive && serviceActive() && !source.exhausted()) { + val line = source.readUtf8Line() ?: break + val notificationWithTopic = parser.parseWithTopic(line, notificationId = Random.nextInt(), subscriptionId = 0) + if (notificationWithTopic != null) { + since = notificationWithTopic.notification.id + val topic = notificationWithTopic.topic + val subscriptionId = topicsToSubscriptionIds[topic] ?: continue + val subscription = repository.getSubscription(subscriptionId) ?: continue + val notification = notificationWithTopic.notification.copy(subscriptionId = subscription.id) + notificationListener(subscription, notification) + } } + + // Clean disconnect - reset backoff + retryMillis = 0L + Log.d(TAG, "[$url] Connection closed cleanly") } catch (e: Exception) { + if (!isActive) { + Log.d(TAG, "[$url] Connection cancelled") + break + } Log.e(TAG, "[$url] Connection failed: ${e.message}", e) - lastError = e - } - - // If we're not cancelled yet, wait little before retrying (incremental back-off) - if (isActive && serviceActive() && lastError != null) { retryMillis = nextRetryMillis(retryMillis, startTime) val nextRetryTime = System.currentTimeMillis() + retryMillis - connectionDetailsListener(subscriptionIds, ConnectionState.CONNECTING, lastError, nextRetryTime) - Log.w(TAG, "[$url] Connection failed, retrying connection in ${retryMillis / 1000}s ...") + connectionDetailsListener(subscriptionIds, ConnectionState.CONNECTING, e, nextRetryTime) + Log.w(TAG, "[$url] Retrying connection in ${retryMillis / 1000}s ...") delay(retryMillis) } } Log.d(TAG, "[$url] Connection job SHUT DOWN") - // FIXME: Do NOT update state here as this can lead to races; this leaks the subscription state map } } @@ -102,9 +110,8 @@ class JsonConnection( companion object { private const val TAG = "NtfyJsonConnection" - private const val CONNECTION_LOOP_DELAY_MILLIS = 30_000L private const val RETRY_STEP_MILLIS = 5_000L private const val RETRY_MAX_MILLIS = 60_000L - private const val RETRY_RESET_AFTER_MILLIS = 60_000L // Must be larger than CONNECTION_LOOP_DELAY_MILLIS + private const val RETRY_RESET_AFTER_MILLIS = 60_000L } }