Merge pull request #159 from binwiederhier/1591-since-none

Fix notifications being missed after service restart
This commit is contained in:
Philipp C. Heckel 2026-02-04 08:12:14 -05:00 committed by GitHub
commit cd15fbd7f4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 73 additions and 46 deletions

View file

@ -562,6 +562,17 @@ interface SubscriptionDao {
@Query("UPDATE subscription SET lastNotificationId = :lastNotificationId WHERE id = :subscriptionId")
fun updateLastNotificationId(subscriptionId: Long, lastNotificationId: String)
@Query("""
SELECT s.lastNotificationId
FROM Subscription AS s
LEFT JOIN Notification AS n ON s.id = n.subscriptionId AND n.deleted != 1
WHERE s.id IN (:subscriptionIds) AND s.lastNotificationId IS NOT NULL
GROUP BY s.id
ORDER BY MAX(n.timestamp) DESC
LIMIT 1
""")
fun getLastNotificationId(subscriptionIds: Collection<Long>): String?
@Query("DELETE FROM subscription WHERE id = :subscriptionId")
fun remove(subscriptionId: Long)
}

View file

@ -160,6 +160,19 @@ class Repository(private val sharedPrefs: SharedPreferences, database: Database)
notificationDao.markAsDeletedBySequenceId(subscriptionId, sequenceId)
}
fun updateLastNotificationId(subscriptionId: Long, notificationId: String) {
subscriptionDao.updateLastNotificationId(subscriptionId, notificationId)
}
/**
* Returns the lastNotificationId to use as "since" parameter for subscribing.
* Selects the lastNotificationId from the subscription with the most recent notification.
* Returns null if no subscriptions have a lastNotificationId.
*/
fun getLastNotificationId(subscriptionIds: Collection<Long>): String? {
return subscriptionDao.getLastNotificationId(subscriptionIds)
}
fun markAllAsDeleted(subscriptionId: Long) {
notificationDao.markAllAsDeleted(subscriptionId)
}

View file

@ -118,7 +118,7 @@ class ApiService(private val context: Context) {
val subscriptionId = subscription.id
val baseUrl = subscription.baseUrl
val topic = subscription.topic
val sinceVal = subscription.lastNotificationId ?: "all"
val sinceVal = subscription.lastNotificationId ?: SINCE_ALL
val url = topicUrlJsonPoll(baseUrl, topic, sinceVal)
Log.d(TAG, "Polling topic $url")
@ -146,7 +146,7 @@ class ApiService(private val context: Context) {
since: String?,
user: User?
): Pair<Call, BufferedSource> {
val sinceVal = since ?: "all"
val sinceVal = since ?: SINCE_ALL
val url = topicUrlJson(baseUrl, topics, sinceVal)
Log.d(TAG, "Opening subscription connection to $url")
val customHeaders = repository.getCustomHeaders(baseUrl)
@ -206,5 +206,9 @@ class ApiService(private val context: Context) {
const val EVENT_MESSAGE_CLEAR = "message_clear"
const val EVENT_KEEPALIVE = "keepalive"
const val EVENT_POLL_REQUEST = "poll_request"
// Valid values for the "since" parameter
const val SINCE_ALL = "all" // Retrieve all cached messages
const val SINCE_NONE = "none" // Don't retrieve any old messages
}
}

View file

@ -90,6 +90,8 @@ class NotificationService(val context: Context) {
.setSmallIcon(R.drawable.ic_notification)
.setColor(Colors.notificationIcon(context))
.setContentTitle(title)
.setWhen(notification.timestamp * 1000) // Set timestamp (convert seconds to millis)
.setShowWhen(true)
.setOnlyAlertOnce(true) // Do not vibrate or play sound if already showing (updates!)
.setAutoCancel(true) // Cancel when notification is clicked
setStyleAndText(builder, subscription, notification) // Preview picture or big text style

View file

@ -7,7 +7,6 @@ import java.net.ProtocolException
interface Connection {
fun start()
fun close()
fun since(): String?
}
/**

View file

@ -11,7 +11,8 @@ import io.heckel.ntfy.util.Log
import io.heckel.ntfy.util.topicUrl
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
@ -19,11 +20,9 @@ import okhttp3.Call
class JsonConnection(
private val connectionId: ConnectionId,
private val scope: CoroutineScope,
private val repository: Repository,
private val api: ApiService,
private val user: User?,
private val sinceId: String?,
private val connectionDetailsListener: (String, ConnectionState, Throwable?, Long) -> Unit,
private val notificationListener: (Subscription, Notification) -> Unit,
private val serviceActive: () -> Boolean
@ -33,19 +32,19 @@ class JsonConnection(
private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",")
private val url = topicUrl(baseUrl, topicsStr)
private val parser = NotificationParser()
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
private var since: String? = sinceId
private var errorCount = 0
private lateinit var call: Call
private lateinit var job: Job
override fun start() {
job = scope.launch(Dispatchers.IO) {
scope.launch {
Log.d(TAG, "[$url] Starting connection for subscriptions: $topicsToSubscriptionIds")
while (isActive && serviceActive()) {
Log.d(TAG, "[$url] (Re-)starting connection for subscriptions: $topicsToSubscriptionIds")
val since = repository.getLastNotificationId(topicsToSubscriptionIds.values) ?: ApiService.SINCE_NONE
try {
val (newCall, source) = api.subscribe(baseUrl, topicsStr, since, user)
call = newCall
@ -59,7 +58,6 @@ class JsonConnection(
val line = source.readUtf8Line() ?: break
val notificationWithTopic = parser.parseWithTopic(line, subscriptionId = 0, baseUrl = baseUrl)
if (notificationWithTopic != null) {
since = notificationWithTopic.notification.id
val topic = notificationWithTopic.topic
val subscriptionId = topicsToSubscriptionIds[topic] ?: continue
val subscription = repository.getSubscription(subscriptionId) ?: continue
@ -84,17 +82,13 @@ class JsonConnection(
delay(retrySeconds * 1000L)
}
}
Log.d(TAG, "[$url] Connection job SHUT DOWN")
Log.d(TAG, "[$url] Connection fully SHUT DOWN")
}
}
override fun since(): String? {
return since
}
override fun close() {
Log.d(TAG, "[$url] Cancelling connection")
if (this::job.isInitialized) job.cancel()
scope.cancel()
if (this::call.isInitialized) call.cancel()
}

View file

@ -31,7 +31,6 @@ import io.heckel.ntfy.ui.MainActivity
import io.heckel.ntfy.util.HttpUtil
import io.heckel.ntfy.util.Log
import io.heckel.ntfy.util.topicUrl
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
@ -206,7 +205,7 @@ class SubscriberService : Service() {
return@launch
}
try {
reallyRefreshConnections(this)
reallyRefreshConnections()
} finally {
refreshMutex.unlock()
}
@ -217,7 +216,7 @@ class SubscriberService : Service() {
* Start/stop connections based on the desired state
* It is guaranteed that only one of function is run at a time (see mutex above).
*/
private suspend fun reallyRefreshConnections(scope: CoroutineScope) {
private suspend fun reallyRefreshConnections() {
// Group instant subscriptions by base URL, there is only one connection per base URL
val instantSubscriptions = repository.getSubscriptions().filter { s -> s.instant }
val activeConnectionIds = connections.keys().toList().toSet()
@ -252,9 +251,6 @@ class SubscriberService : Service() {
val newConnectionIds = desiredConnectionIds.subtract(activeConnectionIds)
val obsoleteConnectionIds = activeConnectionIds.subtract(desiredConnectionIds)
val match = activeConnectionIds == desiredConnectionIds
val sinceByBaseUrl = connections
.map { e -> e.key.baseUrl to e.value.since() } // Use since=<id>, avoid retrieving old messages (see comment below)
.toMap()
Log.d(TAG, "Refreshing subscriptions")
Log.d(TAG, "- Desired connections: $desiredConnectionIds")
@ -270,19 +266,15 @@ class SubscriberService : Service() {
// Open new connections
newConnectionIds.forEach { connectionId ->
// IMPORTANT: Do NOT request old messages for new connections; we call poll() in MainActivity to
// retrieve old messages. This is important, so we don't download attachments from old messages.
val since = sinceByBaseUrl[connectionId.baseUrl] ?: "none"
val serviceActive = { isServiceStarted }
val user = repository.getUser(connectionId.baseUrl)
val customHeaders = repository.getCustomHeaders(connectionId.baseUrl)
val connection = if (connectionId.connectionProtocol == Repository.CONNECTION_PROTOCOL_WS) {
val alarmManager = getSystemService(ALARM_SERVICE) as AlarmManager
val httpClient = HttpUtil.wsClient(this, connectionId.baseUrl)
WsConnection(connectionId, repository, httpClient, user, customHeaders, since, ::onConnectionDetailsChanged, ::onNotificationReceived, alarmManager)
WsConnection(connectionId, repository, httpClient, user, customHeaders, ::onConnectionDetailsChanged, ::onNotificationReceived, alarmManager)
} else {
JsonConnection(connectionId, scope, repository, api, user, since, ::onConnectionDetailsChanged, ::onNotificationReceived, serviceActive)
JsonConnection(connectionId, repository, api, user, ::onConnectionDetailsChanged, ::onNotificationReceived, serviceActive)
}
connections[connectionId] = connection
connection.start()
@ -378,12 +370,14 @@ class SubscriberService : Service() {
if (notification.sequenceId.isNotEmpty()) {
repository.markAsReadBySequenceId(subscription.id, notification.sequenceId)
}
repository.updateLastNotificationId(subscription.id, notification.id)
dispatcher.dispatch(subscription, notification)
}
ApiService.EVENT_MESSAGE_DELETE -> {
if (notification.sequenceId.isNotEmpty()) {
repository.markAsDeletedBySequenceId(subscription.id, notification.sequenceId)
}
repository.updateLastNotificationId(subscription.id, notification.id)
dispatcher.dispatch(subscription, notification)
}
ApiService.EVENT_MESSAGE -> {

View file

@ -8,19 +8,23 @@ import io.heckel.ntfy.db.Notification
import io.heckel.ntfy.db.Repository
import io.heckel.ntfy.db.Subscription
import io.heckel.ntfy.db.User
import io.heckel.ntfy.msg.ApiService
import io.heckel.ntfy.msg.NotificationParser
import io.heckel.ntfy.util.HttpUtil
import io.heckel.ntfy.util.Log
import io.heckel.ntfy.util.topicShortUrl
import io.heckel.ntfy.util.topicUrlWs
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import okhttp3.OkHttpClient
import okhttp3.Response
import okhttp3.WebSocket
import okhttp3.WebSocketListener
import java.net.ProtocolException
import java.util.Calendar
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
/**
* Connect to ntfy server via WebSockets. This connection represents a single connection to a server, with
@ -38,12 +42,12 @@ class WsConnection(
private val httpClient: OkHttpClient,
private val user: User?,
private val customHeaders: List<CustomHeader>,
private val sinceId: String?,
private val connectionDetailsListener: (String, ConnectionState, Throwable?, Long) -> Unit,
private val notificationListener: (Subscription, Notification) -> Unit,
private val alarmManager: AlarmManager
) : Connection {
private val parser = NotificationParser()
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
private var errorCount = 0
private var webSocket: WebSocket? = null
private var state: State? = null
@ -52,7 +56,6 @@ class WsConnection(
private val globalId = GLOBAL_ID.incrementAndGet()
private val listenerId = AtomicLong(0)
private val since = AtomicReference<String?>(sinceId)
private val baseUrl = connectionId.baseUrl
private val topicsToSubscriptionIds = connectionId.topicsToSubscriptionIds
private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",")
@ -73,9 +76,8 @@ class WsConnection(
}
state = State.Connecting
val nextListenerId = listenerId.incrementAndGet()
val sinceId = since.get()
val sinceVal = sinceId ?: "all"
val urlWithSince = topicUrlWs(baseUrl, topicsStr, sinceVal)
val since = repository.getLastNotificationId(topicsToSubscriptionIds.values) ?: ApiService.SINCE_NONE
val urlWithSince = topicUrlWs(baseUrl, topicsStr, since)
val request = HttpUtil.requestBuilder(urlWithSince, user, customHeaders).build()
Log.d(TAG, "$shortUrl (gid=$globalId): Opening $urlWithSince with listener ID $nextListenerId ...")
webSocket = httpClient.newWebSocket(request, Listener(nextListenerId))
@ -84,6 +86,7 @@ class WsConnection(
@Synchronized
override fun close() {
closed = true
scope.cancel()
if (webSocket == null) {
Log.d(TAG,"$shortUrl (gid=$globalId): Not closing existing connection, because there is no active web socket")
return
@ -94,11 +97,6 @@ class WsConnection(
webSocket = null
}
@Synchronized
override fun since(): String? {
return since.get()
}
@Synchronized
fun scheduleReconnect(seconds: Int) {
if (closed || state == State.Connecting || state == State.Connected) {
@ -109,13 +107,16 @@ class WsConnection(
Log.d(TAG,"$shortUrl (gid=$globalId): Scheduling a restart in $seconds seconds (via alarm manager)")
val reconnectTime = Calendar.getInstance()
reconnectTime.add(Calendar.SECOND, seconds)
// The AlarmManager callback runs on the main thread, but start() accesses the database,
// so we dispatch to a background thread using the connection's own scope.
val startOnBackgroundThread = { scope.launch { start() } }
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.TIRAMISU) {
if (alarmManager.canScheduleExactAlarms()) {
alarmManager.setExact(
AlarmManager.RTC_WAKEUP,
reconnectTime.timeInMillis,
RECONNECT_TAG,
{ start() },
{ startOnBackgroundThread() },
null
)
} else {
@ -126,7 +127,7 @@ class WsConnection(
AlarmManager.RTC_WAKEUP,
reconnectTime.timeInMillis,
RECONNECT_TAG,
{ start() },
{ startOnBackgroundThread() },
null
)
}
@ -158,7 +159,6 @@ class WsConnection(
val subscription = repository.getSubscription(subscriptionId) ?: return@synchronize
val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id)
notificationListener(subscription, notificationWithSubscriptionId)
since.set(notification.id)
}
}

View file

@ -96,9 +96,10 @@ class FirebaseService : FirebaseMessagingService() {
private fun handleMessageDelete(remoteMessage: RemoteMessage) {
val data = remoteMessage.data
val id = data["id"] ?: return
val topic = data["topic"] ?: return
val sequenceId = data["sequence_id"] ?: return
Log.d(TAG, "Received message_delete: from=${remoteMessage.from}, topic=$topic, sequenceId=$sequenceId")
Log.d(TAG, "Received message_delete: from=${remoteMessage.from}, id=$id, topic=$topic, sequenceId=$sequenceId")
CoroutineScope(job).launch {
val baseUrl = getString(R.string.app_base_url)
@ -107,6 +108,9 @@ class FirebaseService : FirebaseMessagingService() {
// Mark all notifications with this sequenceId as deleted
repository.markAsDeletedBySequenceId(subscription.id, sequenceId)
// Update lastNotificationId to track message stream position
repository.updateLastNotificationId(subscription.id, id)
// Cancel the Android notification (scoped by baseUrl/topic/sequenceId)
val notificationId = deriveNotificationId(baseUrl, topic, sequenceId)
val notifier = NotificationService(this@FirebaseService)
@ -116,9 +120,10 @@ class FirebaseService : FirebaseMessagingService() {
private fun handleMessageClear(remoteMessage: RemoteMessage) {
val data = remoteMessage.data
val id = data["id"] ?: return
val topic = data["topic"] ?: return
val sequenceId = data["sequence_id"] ?: return
Log.d(TAG, "Received message_clear: from=${remoteMessage.from}, topic=$topic, sequenceId=$sequenceId")
Log.d(TAG, "Received message_clear: from=${remoteMessage.from}, id=$id, topic=$topic, sequenceId=$sequenceId")
CoroutineScope(job).launch {
val baseUrl = getString(R.string.app_base_url)
@ -127,6 +132,9 @@ class FirebaseService : FirebaseMessagingService() {
// Mark all notifications with this sequenceId as read
repository.markAsReadBySequenceId(subscription.id, sequenceId)
// Update lastNotificationId to track message stream position
repository.updateLastNotificationId(subscription.id, id)
// Cancel the Android notification (scoped by baseUrl/topic/sequenceId)
val notificationId = deriveNotificationId(baseUrl, topic, sequenceId)
val notifier = NotificationService(this@FirebaseService)

View file

@ -5,3 +5,5 @@ Features:
Bug fixes + maintenance:
* Fix crash when URL scheme is missing in default server (#1582, ntfy-android#158, thanks to @hard-zero1 for reporting)
* Fix notification timestamp to use original send time instead of receive time (#1112, thanks to @voruti for reporting)
* Fix notifications being missed after service restart (#1591, thanks to @Epifeny for reporting)