Simplify JsonConnection

This commit is contained in:
Philipp Heckel 2026-01-11 21:51:18 -05:00
parent faab8449bb
commit 3a9daf40d7
2 changed files with 52 additions and 69 deletions

View file

@ -14,10 +14,9 @@ import io.heckel.ntfy.util.topicUrlAuth
import io.heckel.ntfy.util.topicUrlJson
import io.heckel.ntfy.util.topicUrlJsonPoll
import okhttp3.Call
import okhttp3.Callback
import okhttp3.RequestBody
import okhttp3.RequestBody.Companion.toRequestBody
import okhttp3.Response
import okio.BufferedSource
import java.io.IOException
import java.net.URLEncoder
import kotlin.random.Random
@ -140,43 +139,20 @@ class ApiService(private val context: Context) {
baseUrl: String,
topics: String,
since: String?,
user: User?,
notify: (topic: String, Notification) -> Unit,
fail: (Exception) -> Unit
): Call {
user: User?
): Pair<Call, BufferedSource> {
val sinceVal = since ?: "all"
val url = topicUrlJson(baseUrl, topics, sinceVal)
Log.d(TAG, "Opening subscription connection to $url")
val customHeaders = repository.getCustomHeaders(baseUrl)
val request = HttpUtil.requestBuilder(url, user, customHeaders).build()
val call = HttpUtil.subscriberClient(context, baseUrl).newCall(request)
call.enqueue(object : Callback {
override fun onResponse(call: Call, response: Response) {
try {
if (!response.isSuccessful) {
throw Exception("Unexpected response ${response.code} when subscribing to topic $url")
}
val source = response.body.source()
while (!source.exhausted()) {
val line = source.readUtf8Line() ?: throw Exception("Unexpected response for $url: line is null")
val notification =
parser.parseWithTopic(line, notificationId = Random.nextInt(), subscriptionId = 0) // subscriptionId to be set downstream
if (notification != null) {
notify(notification.topic, notification.notification)
}
}
} catch (e: Exception) {
Log.e(TAG, "Connection to $url failed (1): ${e.message}", e)
fail(e)
}
}
override fun onFailure(call: Call, e: IOException) {
Log.e(TAG, "Connection to $url failed (2): ${e.message}", e)
fail(e)
}
})
return call
val response = call.execute()
if (!response.isSuccessful) {
response.close()
throw IOException("Unexpected response ${response.code} when subscribing to $url")
}
return Pair(call, response.body.source())
}
suspend fun checkAuth(baseUrl: String, topic: String, user: User?): Boolean {

View file

@ -1,12 +1,22 @@
package io.heckel.ntfy.service
import io.heckel.ntfy.db.*
import io.heckel.ntfy.util.Log
import io.heckel.ntfy.db.ConnectionState
import io.heckel.ntfy.db.Notification
import io.heckel.ntfy.db.Repository
import io.heckel.ntfy.db.Subscription
import io.heckel.ntfy.db.User
import io.heckel.ntfy.msg.ApiService
import io.heckel.ntfy.msg.NotificationParser
import io.heckel.ntfy.util.Log
import io.heckel.ntfy.util.topicUrl
import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import okhttp3.Call
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.random.Random
class JsonConnection(
private val connectionId: ConnectionId,
@ -24,6 +34,7 @@ class JsonConnection(
private val subscriptionIds = topicsToSubscriptionIds.values
private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",")
private val url = topicUrl(baseUrl, topicsStr)
private val parser = NotificationParser()
private var since: String? = sinceId
private lateinit var call: Call
@ -33,50 +44,47 @@ class JsonConnection(
job = scope.launch(Dispatchers.IO) {
Log.d(TAG, "[$url] Starting connection for subscriptions: $topicsToSubscriptionIds")
// Retry-loop: if the connection fails, we retry unless the job or service is cancelled/stopped
var retryMillis = 0L
while (isActive && serviceActive()) {
Log.d(TAG, "[$url] (Re-)starting connection for subscriptions: $topicsToSubscriptionIds")
val startTime = System.currentTimeMillis()
val notify = notify@ { topic: String, notification: Notification ->
since = notification.id
val subscriptionId = topicsToSubscriptionIds[topic] ?: return@notify
val subscription = repository.getSubscription(subscriptionId) ?: return@notify
val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id)
notificationListener(subscription, notificationWithSubscriptionId)
}
val failed = AtomicBoolean(false)
var lastError: Exception? = null
val fail = { e: Exception ->
failed.set(true)
lastError = e
}
// Call /json subscribe endpoint and loop until the call fails, is canceled,
// or the job or service are cancelled/stopped
try {
call = api.subscribe(baseUrl, topicsStr, since, user, notify, fail)
while (!failed.get() && !call.isCanceled() && isActive && serviceActive()) {
connectionDetailsListener(subscriptionIds, ConnectionState.CONNECTED, null, 0L)
Log.d(TAG,"[$url] Connection is active (failed=$failed, callCanceled=${call.isCanceled()}, jobActive=$isActive, serviceStarted=${serviceActive()}")
delay(CONNECTION_LOOP_DELAY_MILLIS) // Resumes immediately if job is cancelled
val (newCall, source) = api.subscribe(baseUrl, topicsStr, since, user)
call = newCall
connectionDetailsListener(subscriptionIds, ConnectionState.CONNECTED, null, 0L)
// Blocking read loop: reads JSON lines until connection closes or is cancelled
while (isActive && serviceActive() && !source.exhausted()) {
val line = source.readUtf8Line() ?: break
val notificationWithTopic = parser.parseWithTopic(line, notificationId = Random.nextInt(), subscriptionId = 0)
if (notificationWithTopic != null) {
since = notificationWithTopic.notification.id
val topic = notificationWithTopic.topic
val subscriptionId = topicsToSubscriptionIds[topic] ?: continue
val subscription = repository.getSubscription(subscriptionId) ?: continue
val notification = notificationWithTopic.notification.copy(subscriptionId = subscription.id)
notificationListener(subscription, notification)
}
}
// Clean disconnect - reset backoff
retryMillis = 0L
Log.d(TAG, "[$url] Connection closed cleanly")
} catch (e: Exception) {
if (!isActive) {
Log.d(TAG, "[$url] Connection cancelled")
break
}
Log.e(TAG, "[$url] Connection failed: ${e.message}", e)
lastError = e
}
// If we're not cancelled yet, wait little before retrying (incremental back-off)
if (isActive && serviceActive() && lastError != null) {
retryMillis = nextRetryMillis(retryMillis, startTime)
val nextRetryTime = System.currentTimeMillis() + retryMillis
connectionDetailsListener(subscriptionIds, ConnectionState.CONNECTING, lastError, nextRetryTime)
Log.w(TAG, "[$url] Connection failed, retrying connection in ${retryMillis / 1000}s ...")
connectionDetailsListener(subscriptionIds, ConnectionState.CONNECTING, e, nextRetryTime)
Log.w(TAG, "[$url] Retrying connection in ${retryMillis / 1000}s ...")
delay(retryMillis)
}
}
Log.d(TAG, "[$url] Connection job SHUT DOWN")
// FIXME: Do NOT update state here as this can lead to races; this leaks the subscription state map
}
}
@ -102,9 +110,8 @@ class JsonConnection(
companion object {
private const val TAG = "NtfyJsonConnection"
private const val CONNECTION_LOOP_DELAY_MILLIS = 30_000L
private const val RETRY_STEP_MILLIS = 5_000L
private const val RETRY_MAX_MILLIS = 60_000L
private const val RETRY_RESET_AFTER_MILLIS = 60_000L // Must be larger than CONNECTION_LOOP_DELAY_MILLIS
private const val RETRY_RESET_AFTER_MILLIS = 60_000L
}
}