diff --git a/app/src/main/java/io/heckel/ntfy/db/Database.kt b/app/src/main/java/io/heckel/ntfy/db/Database.kt index 30b65885..fa3b7187 100644 --- a/app/src/main/java/io/heckel/ntfy/db/Database.kt +++ b/app/src/main/java/io/heckel/ntfy/db/Database.kt @@ -562,6 +562,17 @@ interface SubscriptionDao { @Query("UPDATE subscription SET lastNotificationId = :lastNotificationId WHERE id = :subscriptionId") fun updateLastNotificationId(subscriptionId: Long, lastNotificationId: String) + @Query(""" + SELECT s.lastNotificationId + FROM Subscription AS s + LEFT JOIN Notification AS n ON s.id = n.subscriptionId AND n.deleted != 1 + WHERE s.id IN (:subscriptionIds) AND s.lastNotificationId IS NOT NULL + GROUP BY s.id + ORDER BY MAX(n.timestamp) DESC + LIMIT 1 + """) + fun getLastNotificationId(subscriptionIds: Collection): String? + @Query("DELETE FROM subscription WHERE id = :subscriptionId") fun remove(subscriptionId: Long) } diff --git a/app/src/main/java/io/heckel/ntfy/db/Repository.kt b/app/src/main/java/io/heckel/ntfy/db/Repository.kt index 51504bca..43c8c858 100644 --- a/app/src/main/java/io/heckel/ntfy/db/Repository.kt +++ b/app/src/main/java/io/heckel/ntfy/db/Repository.kt @@ -160,6 +160,19 @@ class Repository(private val sharedPrefs: SharedPreferences, database: Database) notificationDao.markAsDeletedBySequenceId(subscriptionId, sequenceId) } + fun updateLastNotificationId(subscriptionId: Long, notificationId: String) { + subscriptionDao.updateLastNotificationId(subscriptionId, notificationId) + } + + /** + * Returns the lastNotificationId to use as "since" parameter for subscribing. + * Selects the lastNotificationId from the subscription with the most recent notification. + * Returns null if no subscriptions have a lastNotificationId. + */ + fun getLastNotificationId(subscriptionIds: Collection): String? { + return subscriptionDao.getLastNotificationId(subscriptionIds) + } + fun markAllAsDeleted(subscriptionId: Long) { notificationDao.markAllAsDeleted(subscriptionId) } diff --git a/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt b/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt index ed1ea14f..7492e5bc 100644 --- a/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt +++ b/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt @@ -118,7 +118,7 @@ class ApiService(private val context: Context) { val subscriptionId = subscription.id val baseUrl = subscription.baseUrl val topic = subscription.topic - val sinceVal = subscription.lastNotificationId ?: "all" + val sinceVal = subscription.lastNotificationId ?: SINCE_ALL val url = topicUrlJsonPoll(baseUrl, topic, sinceVal) Log.d(TAG, "Polling topic $url") @@ -146,7 +146,7 @@ class ApiService(private val context: Context) { since: String?, user: User? ): Pair { - val sinceVal = since ?: "all" + val sinceVal = since ?: SINCE_ALL val url = topicUrlJson(baseUrl, topics, sinceVal) Log.d(TAG, "Opening subscription connection to $url") val customHeaders = repository.getCustomHeaders(baseUrl) @@ -206,5 +206,9 @@ class ApiService(private val context: Context) { const val EVENT_MESSAGE_CLEAR = "message_clear" const val EVENT_KEEPALIVE = "keepalive" const val EVENT_POLL_REQUEST = "poll_request" + + // Valid values for the "since" parameter + const val SINCE_ALL = "all" // Retrieve all cached messages + const val SINCE_NONE = "none" // Don't retrieve any old messages } } diff --git a/app/src/main/java/io/heckel/ntfy/msg/NotificationService.kt b/app/src/main/java/io/heckel/ntfy/msg/NotificationService.kt index eda4b100..86b3cdd3 100644 --- a/app/src/main/java/io/heckel/ntfy/msg/NotificationService.kt +++ b/app/src/main/java/io/heckel/ntfy/msg/NotificationService.kt @@ -90,6 +90,8 @@ class NotificationService(val context: Context) { .setSmallIcon(R.drawable.ic_notification) .setColor(Colors.notificationIcon(context)) .setContentTitle(title) + .setWhen(notification.timestamp * 1000) // Set timestamp (convert seconds to millis) + .setShowWhen(true) .setOnlyAlertOnce(true) // Do not vibrate or play sound if already showing (updates!) .setAutoCancel(true) // Cancel when notification is clicked setStyleAndText(builder, subscription, notification) // Preview picture or big text style diff --git a/app/src/main/java/io/heckel/ntfy/service/Connection.kt b/app/src/main/java/io/heckel/ntfy/service/Connection.kt index aad27496..8319b184 100644 --- a/app/src/main/java/io/heckel/ntfy/service/Connection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/Connection.kt @@ -7,7 +7,6 @@ import java.net.ProtocolException interface Connection { fun start() fun close() - fun since(): String? } /** diff --git a/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt b/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt index ea1ea8fc..a2bea726 100644 --- a/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt @@ -11,7 +11,8 @@ import io.heckel.ntfy.util.Log import io.heckel.ntfy.util.topicUrl import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch @@ -19,11 +20,9 @@ import okhttp3.Call class JsonConnection( private val connectionId: ConnectionId, - private val scope: CoroutineScope, private val repository: Repository, private val api: ApiService, private val user: User?, - private val sinceId: String?, private val connectionDetailsListener: (String, ConnectionState, Throwable?, Long) -> Unit, private val notificationListener: (Subscription, Notification) -> Unit, private val serviceActive: () -> Boolean @@ -33,19 +32,19 @@ class JsonConnection( private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",") private val url = topicUrl(baseUrl, topicsStr) private val parser = NotificationParser() + private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) - private var since: String? = sinceId private var errorCount = 0 private lateinit var call: Call - private lateinit var job: Job override fun start() { - job = scope.launch(Dispatchers.IO) { + scope.launch { Log.d(TAG, "[$url] Starting connection for subscriptions: $topicsToSubscriptionIds") while (isActive && serviceActive()) { Log.d(TAG, "[$url] (Re-)starting connection for subscriptions: $topicsToSubscriptionIds") - + val since = repository.getLastNotificationId(topicsToSubscriptionIds.values) ?: ApiService.SINCE_NONE + try { val (newCall, source) = api.subscribe(baseUrl, topicsStr, since, user) call = newCall @@ -59,7 +58,6 @@ class JsonConnection( val line = source.readUtf8Line() ?: break val notificationWithTopic = parser.parseWithTopic(line, subscriptionId = 0, baseUrl = baseUrl) if (notificationWithTopic != null) { - since = notificationWithTopic.notification.id val topic = notificationWithTopic.topic val subscriptionId = topicsToSubscriptionIds[topic] ?: continue val subscription = repository.getSubscription(subscriptionId) ?: continue @@ -84,17 +82,13 @@ class JsonConnection( delay(retrySeconds * 1000L) } } - Log.d(TAG, "[$url] Connection job SHUT DOWN") + Log.d(TAG, "[$url] Connection fully SHUT DOWN") } } - override fun since(): String? { - return since - } - override fun close() { Log.d(TAG, "[$url] Cancelling connection") - if (this::job.isInitialized) job.cancel() + scope.cancel() if (this::call.isInitialized) call.cancel() } diff --git a/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt b/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt index 1c6833a4..c6ac23e7 100644 --- a/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt +++ b/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt @@ -31,7 +31,6 @@ import io.heckel.ntfy.ui.MainActivity import io.heckel.ntfy.util.HttpUtil import io.heckel.ntfy.util.Log import io.heckel.ntfy.util.topicUrl -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch @@ -206,7 +205,7 @@ class SubscriberService : Service() { return@launch } try { - reallyRefreshConnections(this) + reallyRefreshConnections() } finally { refreshMutex.unlock() } @@ -217,7 +216,7 @@ class SubscriberService : Service() { * Start/stop connections based on the desired state * It is guaranteed that only one of function is run at a time (see mutex above). */ - private suspend fun reallyRefreshConnections(scope: CoroutineScope) { + private suspend fun reallyRefreshConnections() { // Group instant subscriptions by base URL, there is only one connection per base URL val instantSubscriptions = repository.getSubscriptions().filter { s -> s.instant } val activeConnectionIds = connections.keys().toList().toSet() @@ -252,9 +251,6 @@ class SubscriberService : Service() { val newConnectionIds = desiredConnectionIds.subtract(activeConnectionIds) val obsoleteConnectionIds = activeConnectionIds.subtract(desiredConnectionIds) val match = activeConnectionIds == desiredConnectionIds - val sinceByBaseUrl = connections - .map { e -> e.key.baseUrl to e.value.since() } // Use since=, avoid retrieving old messages (see comment below) - .toMap() Log.d(TAG, "Refreshing subscriptions") Log.d(TAG, "- Desired connections: $desiredConnectionIds") @@ -270,19 +266,15 @@ class SubscriberService : Service() { // Open new connections newConnectionIds.forEach { connectionId -> - // IMPORTANT: Do NOT request old messages for new connections; we call poll() in MainActivity to - // retrieve old messages. This is important, so we don't download attachments from old messages. - - val since = sinceByBaseUrl[connectionId.baseUrl] ?: "none" val serviceActive = { isServiceStarted } val user = repository.getUser(connectionId.baseUrl) val customHeaders = repository.getCustomHeaders(connectionId.baseUrl) val connection = if (connectionId.connectionProtocol == Repository.CONNECTION_PROTOCOL_WS) { val alarmManager = getSystemService(ALARM_SERVICE) as AlarmManager val httpClient = HttpUtil.wsClient(this, connectionId.baseUrl) - WsConnection(connectionId, repository, httpClient, user, customHeaders, since, ::onConnectionDetailsChanged, ::onNotificationReceived, alarmManager) + WsConnection(connectionId, repository, httpClient, user, customHeaders, ::onConnectionDetailsChanged, ::onNotificationReceived, alarmManager) } else { - JsonConnection(connectionId, scope, repository, api, user, since, ::onConnectionDetailsChanged, ::onNotificationReceived, serviceActive) + JsonConnection(connectionId, repository, api, user, ::onConnectionDetailsChanged, ::onNotificationReceived, serviceActive) } connections[connectionId] = connection connection.start() @@ -378,12 +370,14 @@ class SubscriberService : Service() { if (notification.sequenceId.isNotEmpty()) { repository.markAsReadBySequenceId(subscription.id, notification.sequenceId) } + repository.updateLastNotificationId(subscription.id, notification.id) dispatcher.dispatch(subscription, notification) } ApiService.EVENT_MESSAGE_DELETE -> { if (notification.sequenceId.isNotEmpty()) { repository.markAsDeletedBySequenceId(subscription.id, notification.sequenceId) } + repository.updateLastNotificationId(subscription.id, notification.id) dispatcher.dispatch(subscription, notification) } ApiService.EVENT_MESSAGE -> { diff --git a/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt b/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt index 1e66cc2a..56f0b635 100644 --- a/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt @@ -8,19 +8,23 @@ import io.heckel.ntfy.db.Notification import io.heckel.ntfy.db.Repository import io.heckel.ntfy.db.Subscription import io.heckel.ntfy.db.User +import io.heckel.ntfy.msg.ApiService import io.heckel.ntfy.msg.NotificationParser import io.heckel.ntfy.util.HttpUtil import io.heckel.ntfy.util.Log import io.heckel.ntfy.util.topicShortUrl import io.heckel.ntfy.util.topicUrlWs +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.launch import okhttp3.OkHttpClient import okhttp3.Response import okhttp3.WebSocket import okhttp3.WebSocketListener -import java.net.ProtocolException import java.util.Calendar import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.atomic.AtomicReference /** * Connect to ntfy server via WebSockets. This connection represents a single connection to a server, with @@ -38,12 +42,12 @@ class WsConnection( private val httpClient: OkHttpClient, private val user: User?, private val customHeaders: List, - private val sinceId: String?, private val connectionDetailsListener: (String, ConnectionState, Throwable?, Long) -> Unit, private val notificationListener: (Subscription, Notification) -> Unit, private val alarmManager: AlarmManager ) : Connection { private val parser = NotificationParser() + private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) private var errorCount = 0 private var webSocket: WebSocket? = null private var state: State? = null @@ -52,7 +56,6 @@ class WsConnection( private val globalId = GLOBAL_ID.incrementAndGet() private val listenerId = AtomicLong(0) - private val since = AtomicReference(sinceId) private val baseUrl = connectionId.baseUrl private val topicsToSubscriptionIds = connectionId.topicsToSubscriptionIds private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",") @@ -73,9 +76,8 @@ class WsConnection( } state = State.Connecting val nextListenerId = listenerId.incrementAndGet() - val sinceId = since.get() - val sinceVal = sinceId ?: "all" - val urlWithSince = topicUrlWs(baseUrl, topicsStr, sinceVal) + val since = repository.getLastNotificationId(topicsToSubscriptionIds.values) ?: ApiService.SINCE_NONE + val urlWithSince = topicUrlWs(baseUrl, topicsStr, since) val request = HttpUtil.requestBuilder(urlWithSince, user, customHeaders).build() Log.d(TAG, "$shortUrl (gid=$globalId): Opening $urlWithSince with listener ID $nextListenerId ...") webSocket = httpClient.newWebSocket(request, Listener(nextListenerId)) @@ -84,6 +86,7 @@ class WsConnection( @Synchronized override fun close() { closed = true + scope.cancel() if (webSocket == null) { Log.d(TAG,"$shortUrl (gid=$globalId): Not closing existing connection, because there is no active web socket") return @@ -94,11 +97,6 @@ class WsConnection( webSocket = null } - @Synchronized - override fun since(): String? { - return since.get() - } - @Synchronized fun scheduleReconnect(seconds: Int) { if (closed || state == State.Connecting || state == State.Connected) { @@ -109,13 +107,16 @@ class WsConnection( Log.d(TAG,"$shortUrl (gid=$globalId): Scheduling a restart in $seconds seconds (via alarm manager)") val reconnectTime = Calendar.getInstance() reconnectTime.add(Calendar.SECOND, seconds) + // The AlarmManager callback runs on the main thread, but start() accesses the database, + // so we dispatch to a background thread using the connection's own scope. + val startOnBackgroundThread = { scope.launch { start() } } if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.TIRAMISU) { if (alarmManager.canScheduleExactAlarms()) { alarmManager.setExact( AlarmManager.RTC_WAKEUP, reconnectTime.timeInMillis, RECONNECT_TAG, - { start() }, + { startOnBackgroundThread() }, null ) } else { @@ -126,7 +127,7 @@ class WsConnection( AlarmManager.RTC_WAKEUP, reconnectTime.timeInMillis, RECONNECT_TAG, - { start() }, + { startOnBackgroundThread() }, null ) } @@ -158,7 +159,6 @@ class WsConnection( val subscription = repository.getSubscription(subscriptionId) ?: return@synchronize val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id) notificationListener(subscription, notificationWithSubscriptionId) - since.set(notification.id) } } diff --git a/app/src/play/java/io/heckel/ntfy/firebase/FirebaseService.kt b/app/src/play/java/io/heckel/ntfy/firebase/FirebaseService.kt index 252b0ee1..7d92512a 100644 --- a/app/src/play/java/io/heckel/ntfy/firebase/FirebaseService.kt +++ b/app/src/play/java/io/heckel/ntfy/firebase/FirebaseService.kt @@ -96,9 +96,10 @@ class FirebaseService : FirebaseMessagingService() { private fun handleMessageDelete(remoteMessage: RemoteMessage) { val data = remoteMessage.data + val id = data["id"] ?: return val topic = data["topic"] ?: return val sequenceId = data["sequence_id"] ?: return - Log.d(TAG, "Received message_delete: from=${remoteMessage.from}, topic=$topic, sequenceId=$sequenceId") + Log.d(TAG, "Received message_delete: from=${remoteMessage.from}, id=$id, topic=$topic, sequenceId=$sequenceId") CoroutineScope(job).launch { val baseUrl = getString(R.string.app_base_url) @@ -107,6 +108,9 @@ class FirebaseService : FirebaseMessagingService() { // Mark all notifications with this sequenceId as deleted repository.markAsDeletedBySequenceId(subscription.id, sequenceId) + // Update lastNotificationId to track message stream position + repository.updateLastNotificationId(subscription.id, id) + // Cancel the Android notification (scoped by baseUrl/topic/sequenceId) val notificationId = deriveNotificationId(baseUrl, topic, sequenceId) val notifier = NotificationService(this@FirebaseService) @@ -116,9 +120,10 @@ class FirebaseService : FirebaseMessagingService() { private fun handleMessageClear(remoteMessage: RemoteMessage) { val data = remoteMessage.data + val id = data["id"] ?: return val topic = data["topic"] ?: return val sequenceId = data["sequence_id"] ?: return - Log.d(TAG, "Received message_clear: from=${remoteMessage.from}, topic=$topic, sequenceId=$sequenceId") + Log.d(TAG, "Received message_clear: from=${remoteMessage.from}, id=$id, topic=$topic, sequenceId=$sequenceId") CoroutineScope(job).launch { val baseUrl = getString(R.string.app_base_url) @@ -127,6 +132,9 @@ class FirebaseService : FirebaseMessagingService() { // Mark all notifications with this sequenceId as read repository.markAsReadBySequenceId(subscription.id, sequenceId) + // Update lastNotificationId to track message stream position + repository.updateLastNotificationId(subscription.id, id) + // Cancel the Android notification (scoped by baseUrl/topic/sequenceId) val notificationId = deriveNotificationId(baseUrl, topic, sequenceId) val notifier = NotificationService(this@FirebaseService) diff --git a/fastlane/metadata/android/en-US/changelog/NEXT.txt b/fastlane/metadata/android/en-US/changelog/NEXT.txt index 8d80a23a..66638c38 100644 --- a/fastlane/metadata/android/en-US/changelog/NEXT.txt +++ b/fastlane/metadata/android/en-US/changelog/NEXT.txt @@ -5,3 +5,5 @@ Features: Bug fixes + maintenance: * Fix crash when URL scheme is missing in default server (#1582, ntfy-android#158, thanks to @hard-zero1 for reporting) +* Fix notification timestamp to use original send time instead of receive time (#1112, thanks to @voruti for reporting) +* Fix notifications being missed after service restart (#1591, thanks to @Epifeny for reporting) \ No newline at end of file