Remove in-memory since value

This commit is contained in:
Philipp Heckel 2026-02-04 04:28:51 -08:00
parent 34cac10f41
commit 0c0867d2da
7 changed files with 38 additions and 47 deletions

View file

@ -160,6 +160,19 @@ class Repository(private val sharedPrefs: SharedPreferences, database: Database)
notificationDao.markAsDeletedBySequenceId(subscriptionId, sequenceId)
}
fun updateLastNotificationId(subscriptionId: Long, notificationId: String) {
subscriptionDao.updateLastNotificationId(subscriptionId, notificationId)
}
/**
* Returns the first non-null lastNotificationId from the given subscriptions to be used
* in the "since" parameter.
*/
fun getLastNotificationIdForSubscriptions(subscriptionIds: Collection<Long>): String? {
return subscriptionIds
.firstNotNullOfOrNull { subscriptionId -> getSubscription(subscriptionId)?.lastNotificationId }
}
fun markAllAsDeleted(subscriptionId: Long) {
notificationDao.markAllAsDeleted(subscriptionId)
}

View file

@ -118,7 +118,7 @@ class ApiService(private val context: Context) {
val subscriptionId = subscription.id
val baseUrl = subscription.baseUrl
val topic = subscription.topic
val sinceVal = subscription.lastNotificationId ?: "all"
val sinceVal = subscription.lastNotificationId ?: SINCE_ALL
val url = topicUrlJsonPoll(baseUrl, topic, sinceVal)
Log.d(TAG, "Polling topic $url")
@ -146,7 +146,7 @@ class ApiService(private val context: Context) {
since: String?,
user: User?
): Pair<Call, BufferedSource> {
val sinceVal = since ?: "all"
val sinceVal = since ?: SINCE_ALL
val url = topicUrlJson(baseUrl, topics, sinceVal)
Log.d(TAG, "Opening subscription connection to $url")
val customHeaders = repository.getCustomHeaders(baseUrl)
@ -206,5 +206,9 @@ class ApiService(private val context: Context) {
const val EVENT_MESSAGE_CLEAR = "message_clear"
const val EVENT_KEEPALIVE = "keepalive"
const val EVENT_POLL_REQUEST = "poll_request"
// Valid values for the "since" parameter
const val SINCE_ALL = "all" // Retrieve all cached messages
const val SINCE_NONE = "none" // Don't retrieve any old messages
}
}

View file

@ -7,7 +7,6 @@ import java.net.ProtocolException
interface Connection {
fun start()
fun close()
fun since(): String?
}
/**

View file

@ -23,7 +23,6 @@ class JsonConnection(
private val repository: Repository,
private val api: ApiService,
private val user: User?,
private val sinceId: String?,
private val connectionDetailsListener: (String, ConnectionState, Throwable?, Long) -> Unit,
private val notificationListener: (Subscription, Notification) -> Unit,
private val serviceActive: () -> Boolean
@ -34,7 +33,6 @@ class JsonConnection(
private val url = topicUrl(baseUrl, topicsStr)
private val parser = NotificationParser()
private var since: String? = sinceId
private var errorCount = 0
private lateinit var call: Call
private lateinit var job: Job
@ -45,7 +43,8 @@ class JsonConnection(
while (isActive && serviceActive()) {
Log.d(TAG, "[$url] (Re-)starting connection for subscriptions: $topicsToSubscriptionIds")
val since = repository.getLastNotificationIdForSubscriptions(topicsToSubscriptionIds.values) ?: ApiService.SINCE_NONE
try {
val (newCall, source) = api.subscribe(baseUrl, topicsStr, since, user)
call = newCall
@ -59,7 +58,6 @@ class JsonConnection(
val line = source.readUtf8Line() ?: break
val notificationWithTopic = parser.parseWithTopic(line, subscriptionId = 0, baseUrl = baseUrl)
if (notificationWithTopic != null) {
since = notificationWithTopic.notification.id
val topic = notificationWithTopic.topic
val subscriptionId = topicsToSubscriptionIds[topic] ?: continue
val subscription = repository.getSubscription(subscriptionId) ?: continue
@ -88,10 +86,6 @@ class JsonConnection(
}
}
override fun since(): String? {
return since
}
override fun close() {
Log.d(TAG, "[$url] Cancelling connection")
if (this::job.isInitialized) job.cancel()

View file

@ -253,21 +253,6 @@ class SubscriberService : Service() {
val obsoleteConnectionIds = activeConnectionIds.subtract(desiredConnectionIds)
val match = activeConnectionIds == desiredConnectionIds
// Build since map from existing in-memory connections first
val sinceByBaseUrl = connections
.map { e -> e.key.baseUrl to e.value.since() }
.toMap()
.toMutableMap()
// For base URLs without an existing connection, use the lastNotificationId from subscriptions.
// This ensures we don't miss notifications after a service restart.
instantSubscriptions.groupBy { it.baseUrl }.forEach { (baseUrl, subs) ->
if (!sinceByBaseUrl.containsKey(baseUrl)) {
val lastNotificationId = subs.mapNotNull { it.lastNotificationId }.firstOrNull()
sinceByBaseUrl[baseUrl] = lastNotificationId
}
}
Log.d(TAG, "Refreshing subscriptions")
Log.d(TAG, "- Desired connections: $desiredConnectionIds")
Log.d(TAG, "- Active connections: $activeConnectionIds")
@ -282,19 +267,15 @@ class SubscriberService : Service() {
// Open new connections
newConnectionIds.forEach { connectionId ->
// IMPORTANT: Do NOT request old messages for new connections; we call poll() in MainActivity to
// retrieve old messages. This is important, so we don't download attachments from old messages.
val since = sinceByBaseUrl[connectionId.baseUrl] ?: "none"
val serviceActive = { isServiceStarted }
val user = repository.getUser(connectionId.baseUrl)
val customHeaders = repository.getCustomHeaders(connectionId.baseUrl)
val connection = if (connectionId.connectionProtocol == Repository.CONNECTION_PROTOCOL_WS) {
val alarmManager = getSystemService(ALARM_SERVICE) as AlarmManager
val httpClient = HttpUtil.wsClient(this, connectionId.baseUrl)
WsConnection(connectionId, repository, httpClient, user, customHeaders, since, ::onConnectionDetailsChanged, ::onNotificationReceived, alarmManager)
WsConnection(connectionId, repository, httpClient, user, customHeaders, ::onConnectionDetailsChanged, ::onNotificationReceived, alarmManager)
} else {
JsonConnection(connectionId, scope, repository, api, user, since, ::onConnectionDetailsChanged, ::onNotificationReceived, serviceActive)
JsonConnection(connectionId, scope, repository, api, user, ::onConnectionDetailsChanged, ::onNotificationReceived, serviceActive)
}
connections[connectionId] = connection
connection.start()
@ -390,12 +371,14 @@ class SubscriberService : Service() {
if (notification.sequenceId.isNotEmpty()) {
repository.markAsReadBySequenceId(subscription.id, notification.sequenceId)
}
repository.updateLastNotificationId(subscription.id, notification.id)
dispatcher.dispatch(subscription, notification)
}
ApiService.EVENT_MESSAGE_DELETE -> {
if (notification.sequenceId.isNotEmpty()) {
repository.markAsDeletedBySequenceId(subscription.id, notification.sequenceId)
}
repository.updateLastNotificationId(subscription.id, notification.id)
dispatcher.dispatch(subscription, notification)
}
ApiService.EVENT_MESSAGE -> {

View file

@ -8,6 +8,7 @@ import io.heckel.ntfy.db.Notification
import io.heckel.ntfy.db.Repository
import io.heckel.ntfy.db.Subscription
import io.heckel.ntfy.db.User
import io.heckel.ntfy.msg.ApiService
import io.heckel.ntfy.msg.NotificationParser
import io.heckel.ntfy.util.HttpUtil
import io.heckel.ntfy.util.Log
@ -17,10 +18,8 @@ import okhttp3.OkHttpClient
import okhttp3.Response
import okhttp3.WebSocket
import okhttp3.WebSocketListener
import java.net.ProtocolException
import java.util.Calendar
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
/**
* Connect to ntfy server via WebSockets. This connection represents a single connection to a server, with
@ -38,7 +37,6 @@ class WsConnection(
private val httpClient: OkHttpClient,
private val user: User?,
private val customHeaders: List<CustomHeader>,
private val sinceId: String?,
private val connectionDetailsListener: (String, ConnectionState, Throwable?, Long) -> Unit,
private val notificationListener: (Subscription, Notification) -> Unit,
private val alarmManager: AlarmManager
@ -52,7 +50,6 @@ class WsConnection(
private val globalId = GLOBAL_ID.incrementAndGet()
private val listenerId = AtomicLong(0)
private val since = AtomicReference<String?>(sinceId)
private val baseUrl = connectionId.baseUrl
private val topicsToSubscriptionIds = connectionId.topicsToSubscriptionIds
private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",")
@ -73,9 +70,8 @@ class WsConnection(
}
state = State.Connecting
val nextListenerId = listenerId.incrementAndGet()
val sinceId = since.get()
val sinceVal = sinceId ?: "all"
val urlWithSince = topicUrlWs(baseUrl, topicsStr, sinceVal)
val since = repository.getLastNotificationIdForSubscriptions(topicsToSubscriptionIds.values) ?: ApiService.SINCE_NONE
val urlWithSince = topicUrlWs(baseUrl, topicsStr, since)
val request = HttpUtil.requestBuilder(urlWithSince, user, customHeaders).build()
Log.d(TAG, "$shortUrl (gid=$globalId): Opening $urlWithSince with listener ID $nextListenerId ...")
webSocket = httpClient.newWebSocket(request, Listener(nextListenerId))
@ -94,11 +90,6 @@ class WsConnection(
webSocket = null
}
@Synchronized
override fun since(): String? {
return since.get()
}
@Synchronized
fun scheduleReconnect(seconds: Int) {
if (closed || state == State.Connecting || state == State.Connected) {
@ -158,7 +149,6 @@ class WsConnection(
val subscription = repository.getSubscription(subscriptionId) ?: return@synchronize
val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id)
notificationListener(subscription, notificationWithSubscriptionId)
since.set(notification.id)
}
}

View file

@ -96,9 +96,10 @@ class FirebaseService : FirebaseMessagingService() {
private fun handleMessageDelete(remoteMessage: RemoteMessage) {
val data = remoteMessage.data
val id = data["id"] ?: return
val topic = data["topic"] ?: return
val sequenceId = data["sequence_id"] ?: return
Log.d(TAG, "Received message_delete: from=${remoteMessage.from}, topic=$topic, sequenceId=$sequenceId")
Log.d(TAG, "Received message_delete: from=${remoteMessage.from}, id=$id, topic=$topic, sequenceId=$sequenceId")
CoroutineScope(job).launch {
val baseUrl = getString(R.string.app_base_url)
@ -107,6 +108,9 @@ class FirebaseService : FirebaseMessagingService() {
// Mark all notifications with this sequenceId as deleted
repository.markAsDeletedBySequenceId(subscription.id, sequenceId)
// Update lastNotificationId to track message stream position
repository.updateLastNotificationId(subscription.id, id)
// Cancel the Android notification (scoped by baseUrl/topic/sequenceId)
val notificationId = deriveNotificationId(baseUrl, topic, sequenceId)
val notifier = NotificationService(this@FirebaseService)
@ -116,9 +120,10 @@ class FirebaseService : FirebaseMessagingService() {
private fun handleMessageClear(remoteMessage: RemoteMessage) {
val data = remoteMessage.data
val id = data["id"] ?: return
val topic = data["topic"] ?: return
val sequenceId = data["sequence_id"] ?: return
Log.d(TAG, "Received message_clear: from=${remoteMessage.from}, topic=$topic, sequenceId=$sequenceId")
Log.d(TAG, "Received message_clear: from=${remoteMessage.from}, id=$id, topic=$topic, sequenceId=$sequenceId")
CoroutineScope(job).launch {
val baseUrl = getString(R.string.app_base_url)
@ -127,6 +132,9 @@ class FirebaseService : FirebaseMessagingService() {
// Mark all notifications with this sequenceId as read
repository.markAsReadBySequenceId(subscription.id, sequenceId)
// Update lastNotificationId to track message stream position
repository.updateLastNotificationId(subscription.id, id)
// Cancel the Android notification (scoped by baseUrl/topic/sequenceId)
val notificationId = deriveNotificationId(baseUrl, topic, sequenceId)
val notifier = NotificationService(this@FirebaseService)