From 0c0867d2da0c9d92712d70d6d4b187ac5d56a064 Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Wed, 4 Feb 2026 04:28:51 -0800 Subject: [PATCH] Remove in-memory since value --- .../main/java/io/heckel/ntfy/db/Repository.kt | 13 ++++++++++ .../java/io/heckel/ntfy/msg/ApiService.kt | 8 ++++-- .../java/io/heckel/ntfy/service/Connection.kt | 1 - .../io/heckel/ntfy/service/JsonConnection.kt | 10 ++------ .../heckel/ntfy/service/SubscriberService.kt | 25 +++---------------- .../io/heckel/ntfy/service/WsConnection.kt | 16 +++--------- .../heckel/ntfy/firebase/FirebaseService.kt | 12 +++++++-- 7 files changed, 38 insertions(+), 47 deletions(-) diff --git a/app/src/main/java/io/heckel/ntfy/db/Repository.kt b/app/src/main/java/io/heckel/ntfy/db/Repository.kt index 51504bca..d9973b87 100644 --- a/app/src/main/java/io/heckel/ntfy/db/Repository.kt +++ b/app/src/main/java/io/heckel/ntfy/db/Repository.kt @@ -160,6 +160,19 @@ class Repository(private val sharedPrefs: SharedPreferences, database: Database) notificationDao.markAsDeletedBySequenceId(subscriptionId, sequenceId) } + fun updateLastNotificationId(subscriptionId: Long, notificationId: String) { + subscriptionDao.updateLastNotificationId(subscriptionId, notificationId) + } + + /** + * Returns the first non-null lastNotificationId from the given subscriptions to be used + * in the "since" parameter. + */ + fun getLastNotificationIdForSubscriptions(subscriptionIds: Collection): String? { + return subscriptionIds + .firstNotNullOfOrNull { subscriptionId -> getSubscription(subscriptionId)?.lastNotificationId } + } + fun markAllAsDeleted(subscriptionId: Long) { notificationDao.markAllAsDeleted(subscriptionId) } diff --git a/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt b/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt index ed1ea14f..7492e5bc 100644 --- a/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt +++ b/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt @@ -118,7 +118,7 @@ class ApiService(private val context: Context) { val subscriptionId = subscription.id val baseUrl = subscription.baseUrl val topic = subscription.topic - val sinceVal = subscription.lastNotificationId ?: "all" + val sinceVal = subscription.lastNotificationId ?: SINCE_ALL val url = topicUrlJsonPoll(baseUrl, topic, sinceVal) Log.d(TAG, "Polling topic $url") @@ -146,7 +146,7 @@ class ApiService(private val context: Context) { since: String?, user: User? ): Pair { - val sinceVal = since ?: "all" + val sinceVal = since ?: SINCE_ALL val url = topicUrlJson(baseUrl, topics, sinceVal) Log.d(TAG, "Opening subscription connection to $url") val customHeaders = repository.getCustomHeaders(baseUrl) @@ -206,5 +206,9 @@ class ApiService(private val context: Context) { const val EVENT_MESSAGE_CLEAR = "message_clear" const val EVENT_KEEPALIVE = "keepalive" const val EVENT_POLL_REQUEST = "poll_request" + + // Valid values for the "since" parameter + const val SINCE_ALL = "all" // Retrieve all cached messages + const val SINCE_NONE = "none" // Don't retrieve any old messages } } diff --git a/app/src/main/java/io/heckel/ntfy/service/Connection.kt b/app/src/main/java/io/heckel/ntfy/service/Connection.kt index aad27496..8319b184 100644 --- a/app/src/main/java/io/heckel/ntfy/service/Connection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/Connection.kt @@ -7,7 +7,6 @@ import java.net.ProtocolException interface Connection { fun start() fun close() - fun since(): String? } /** diff --git a/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt b/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt index ea1ea8fc..dfda978f 100644 --- a/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt @@ -23,7 +23,6 @@ class JsonConnection( private val repository: Repository, private val api: ApiService, private val user: User?, - private val sinceId: String?, private val connectionDetailsListener: (String, ConnectionState, Throwable?, Long) -> Unit, private val notificationListener: (Subscription, Notification) -> Unit, private val serviceActive: () -> Boolean @@ -34,7 +33,6 @@ class JsonConnection( private val url = topicUrl(baseUrl, topicsStr) private val parser = NotificationParser() - private var since: String? = sinceId private var errorCount = 0 private lateinit var call: Call private lateinit var job: Job @@ -45,7 +43,8 @@ class JsonConnection( while (isActive && serviceActive()) { Log.d(TAG, "[$url] (Re-)starting connection for subscriptions: $topicsToSubscriptionIds") - + val since = repository.getLastNotificationIdForSubscriptions(topicsToSubscriptionIds.values) ?: ApiService.SINCE_NONE + try { val (newCall, source) = api.subscribe(baseUrl, topicsStr, since, user) call = newCall @@ -59,7 +58,6 @@ class JsonConnection( val line = source.readUtf8Line() ?: break val notificationWithTopic = parser.parseWithTopic(line, subscriptionId = 0, baseUrl = baseUrl) if (notificationWithTopic != null) { - since = notificationWithTopic.notification.id val topic = notificationWithTopic.topic val subscriptionId = topicsToSubscriptionIds[topic] ?: continue val subscription = repository.getSubscription(subscriptionId) ?: continue @@ -88,10 +86,6 @@ class JsonConnection( } } - override fun since(): String? { - return since - } - override fun close() { Log.d(TAG, "[$url] Cancelling connection") if (this::job.isInitialized) job.cancel() diff --git a/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt b/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt index b2ae09e7..19fb160e 100644 --- a/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt +++ b/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt @@ -253,21 +253,6 @@ class SubscriberService : Service() { val obsoleteConnectionIds = activeConnectionIds.subtract(desiredConnectionIds) val match = activeConnectionIds == desiredConnectionIds - // Build since map from existing in-memory connections first - val sinceByBaseUrl = connections - .map { e -> e.key.baseUrl to e.value.since() } - .toMap() - .toMutableMap() - - // For base URLs without an existing connection, use the lastNotificationId from subscriptions. - // This ensures we don't miss notifications after a service restart. - instantSubscriptions.groupBy { it.baseUrl }.forEach { (baseUrl, subs) -> - if (!sinceByBaseUrl.containsKey(baseUrl)) { - val lastNotificationId = subs.mapNotNull { it.lastNotificationId }.firstOrNull() - sinceByBaseUrl[baseUrl] = lastNotificationId - } - } - Log.d(TAG, "Refreshing subscriptions") Log.d(TAG, "- Desired connections: $desiredConnectionIds") Log.d(TAG, "- Active connections: $activeConnectionIds") @@ -282,19 +267,15 @@ class SubscriberService : Service() { // Open new connections newConnectionIds.forEach { connectionId -> - // IMPORTANT: Do NOT request old messages for new connections; we call poll() in MainActivity to - // retrieve old messages. This is important, so we don't download attachments from old messages. - - val since = sinceByBaseUrl[connectionId.baseUrl] ?: "none" val serviceActive = { isServiceStarted } val user = repository.getUser(connectionId.baseUrl) val customHeaders = repository.getCustomHeaders(connectionId.baseUrl) val connection = if (connectionId.connectionProtocol == Repository.CONNECTION_PROTOCOL_WS) { val alarmManager = getSystemService(ALARM_SERVICE) as AlarmManager val httpClient = HttpUtil.wsClient(this, connectionId.baseUrl) - WsConnection(connectionId, repository, httpClient, user, customHeaders, since, ::onConnectionDetailsChanged, ::onNotificationReceived, alarmManager) + WsConnection(connectionId, repository, httpClient, user, customHeaders, ::onConnectionDetailsChanged, ::onNotificationReceived, alarmManager) } else { - JsonConnection(connectionId, scope, repository, api, user, since, ::onConnectionDetailsChanged, ::onNotificationReceived, serviceActive) + JsonConnection(connectionId, scope, repository, api, user, ::onConnectionDetailsChanged, ::onNotificationReceived, serviceActive) } connections[connectionId] = connection connection.start() @@ -390,12 +371,14 @@ class SubscriberService : Service() { if (notification.sequenceId.isNotEmpty()) { repository.markAsReadBySequenceId(subscription.id, notification.sequenceId) } + repository.updateLastNotificationId(subscription.id, notification.id) dispatcher.dispatch(subscription, notification) } ApiService.EVENT_MESSAGE_DELETE -> { if (notification.sequenceId.isNotEmpty()) { repository.markAsDeletedBySequenceId(subscription.id, notification.sequenceId) } + repository.updateLastNotificationId(subscription.id, notification.id) dispatcher.dispatch(subscription, notification) } ApiService.EVENT_MESSAGE -> { diff --git a/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt b/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt index 1e66cc2a..e36ce115 100644 --- a/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt @@ -8,6 +8,7 @@ import io.heckel.ntfy.db.Notification import io.heckel.ntfy.db.Repository import io.heckel.ntfy.db.Subscription import io.heckel.ntfy.db.User +import io.heckel.ntfy.msg.ApiService import io.heckel.ntfy.msg.NotificationParser import io.heckel.ntfy.util.HttpUtil import io.heckel.ntfy.util.Log @@ -17,10 +18,8 @@ import okhttp3.OkHttpClient import okhttp3.Response import okhttp3.WebSocket import okhttp3.WebSocketListener -import java.net.ProtocolException import java.util.Calendar import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.atomic.AtomicReference /** * Connect to ntfy server via WebSockets. This connection represents a single connection to a server, with @@ -38,7 +37,6 @@ class WsConnection( private val httpClient: OkHttpClient, private val user: User?, private val customHeaders: List, - private val sinceId: String?, private val connectionDetailsListener: (String, ConnectionState, Throwable?, Long) -> Unit, private val notificationListener: (Subscription, Notification) -> Unit, private val alarmManager: AlarmManager @@ -52,7 +50,6 @@ class WsConnection( private val globalId = GLOBAL_ID.incrementAndGet() private val listenerId = AtomicLong(0) - private val since = AtomicReference(sinceId) private val baseUrl = connectionId.baseUrl private val topicsToSubscriptionIds = connectionId.topicsToSubscriptionIds private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",") @@ -73,9 +70,8 @@ class WsConnection( } state = State.Connecting val nextListenerId = listenerId.incrementAndGet() - val sinceId = since.get() - val sinceVal = sinceId ?: "all" - val urlWithSince = topicUrlWs(baseUrl, topicsStr, sinceVal) + val since = repository.getLastNotificationIdForSubscriptions(topicsToSubscriptionIds.values) ?: ApiService.SINCE_NONE + val urlWithSince = topicUrlWs(baseUrl, topicsStr, since) val request = HttpUtil.requestBuilder(urlWithSince, user, customHeaders).build() Log.d(TAG, "$shortUrl (gid=$globalId): Opening $urlWithSince with listener ID $nextListenerId ...") webSocket = httpClient.newWebSocket(request, Listener(nextListenerId)) @@ -94,11 +90,6 @@ class WsConnection( webSocket = null } - @Synchronized - override fun since(): String? { - return since.get() - } - @Synchronized fun scheduleReconnect(seconds: Int) { if (closed || state == State.Connecting || state == State.Connected) { @@ -158,7 +149,6 @@ class WsConnection( val subscription = repository.getSubscription(subscriptionId) ?: return@synchronize val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id) notificationListener(subscription, notificationWithSubscriptionId) - since.set(notification.id) } } diff --git a/app/src/play/java/io/heckel/ntfy/firebase/FirebaseService.kt b/app/src/play/java/io/heckel/ntfy/firebase/FirebaseService.kt index 252b0ee1..7d92512a 100644 --- a/app/src/play/java/io/heckel/ntfy/firebase/FirebaseService.kt +++ b/app/src/play/java/io/heckel/ntfy/firebase/FirebaseService.kt @@ -96,9 +96,10 @@ class FirebaseService : FirebaseMessagingService() { private fun handleMessageDelete(remoteMessage: RemoteMessage) { val data = remoteMessage.data + val id = data["id"] ?: return val topic = data["topic"] ?: return val sequenceId = data["sequence_id"] ?: return - Log.d(TAG, "Received message_delete: from=${remoteMessage.from}, topic=$topic, sequenceId=$sequenceId") + Log.d(TAG, "Received message_delete: from=${remoteMessage.from}, id=$id, topic=$topic, sequenceId=$sequenceId") CoroutineScope(job).launch { val baseUrl = getString(R.string.app_base_url) @@ -107,6 +108,9 @@ class FirebaseService : FirebaseMessagingService() { // Mark all notifications with this sequenceId as deleted repository.markAsDeletedBySequenceId(subscription.id, sequenceId) + // Update lastNotificationId to track message stream position + repository.updateLastNotificationId(subscription.id, id) + // Cancel the Android notification (scoped by baseUrl/topic/sequenceId) val notificationId = deriveNotificationId(baseUrl, topic, sequenceId) val notifier = NotificationService(this@FirebaseService) @@ -116,9 +120,10 @@ class FirebaseService : FirebaseMessagingService() { private fun handleMessageClear(remoteMessage: RemoteMessage) { val data = remoteMessage.data + val id = data["id"] ?: return val topic = data["topic"] ?: return val sequenceId = data["sequence_id"] ?: return - Log.d(TAG, "Received message_clear: from=${remoteMessage.from}, topic=$topic, sequenceId=$sequenceId") + Log.d(TAG, "Received message_clear: from=${remoteMessage.from}, id=$id, topic=$topic, sequenceId=$sequenceId") CoroutineScope(job).launch { val baseUrl = getString(R.string.app_base_url) @@ -127,6 +132,9 @@ class FirebaseService : FirebaseMessagingService() { // Mark all notifications with this sequenceId as read repository.markAsReadBySequenceId(subscription.id, sequenceId) + // Update lastNotificationId to track message stream position + repository.updateLastNotificationId(subscription.id, id) + // Cancel the Android notification (scoped by baseUrl/topic/sequenceId) val notificationId = deriveNotificationId(baseUrl, topic, sequenceId) val notifier = NotificationService(this@FirebaseService)