diff --git a/app/src/main/java/io/heckel/ntfy/db/Database.kt b/app/src/main/java/io/heckel/ntfy/db/Database.kt index bdefa523..d8856574 100644 --- a/app/src/main/java/io/heckel/ntfy/db/Database.kt +++ b/app/src/main/java/io/heckel/ntfy/db/Database.kt @@ -487,6 +487,9 @@ interface NotificationDao { @Query("UPDATE notification SET deleted = 1 WHERE subscriptionId = :subscriptionId AND sid = :sid") fun markAsDeletedBySid(subscriptionId: Long, sid: String) + @Query("DELETE FROM notification WHERE subscriptionId = :subscriptionId AND sid = :sid") + fun deleteBySid(subscriptionId: Long, sid: String) + @Query("UPDATE notification SET deleted = 1 WHERE subscriptionId = :subscriptionId") fun markAllAsDeleted(subscriptionId: Long) diff --git a/app/src/main/java/io/heckel/ntfy/db/Repository.kt b/app/src/main/java/io/heckel/ntfy/db/Repository.kt index 45f40abe..a3d4b7b4 100644 --- a/app/src/main/java/io/heckel/ntfy/db/Repository.kt +++ b/app/src/main/java/io/heckel/ntfy/db/Repository.kt @@ -152,10 +152,13 @@ class Repository(private val sharedPrefs: SharedPreferences, database: Database) if (maybeExistingNotification != null) { return false } - // If this is a delete notification, mark existing notifications with this sid as deleted + // Delete old notifications with the same SID (this is an update to an existing sequence) + if (notification.sid.isNotEmpty()) { + notificationDao.deleteBySid(notification.subscriptionId, notification.sid) + } + // If this is a delete notification, don't add it to the database if (notification.deleted) { - notificationDao.markAsDeletedBySid(notification.subscriptionId, notification.sid) - // Still add the delete notification to the database for proper sequence tracking + return false } subscriptionDao.updateLastNotificationId(notification.subscriptionId, notification.id) notificationDao.add(notification) @@ -178,6 +181,10 @@ class Repository(private val sharedPrefs: SharedPreferences, database: Database) notificationDao.markAsDeletedBySid(subscriptionId, sid) } + fun deleteBySid(subscriptionId: Long, sid: String) { + notificationDao.deleteBySid(subscriptionId, sid) + } + fun markAllAsDeleted(subscriptionId: Long) { notificationDao.markAllAsDeleted(subscriptionId) } diff --git a/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt b/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt index c7aaa6a7..a03e358f 100644 --- a/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt +++ b/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt @@ -138,7 +138,7 @@ class ApiService(context: Context) { val body = response.body.string().trim() if (body.isEmpty()) return emptyList() val notifications = body.lines().mapNotNull { line -> - parser.parse(line, subscriptionId = subscriptionId) // No notification when we poll + parser.parse(line, subscriptionId = subscriptionId) } Log.d(TAG, "Notifications: $notifications") @@ -169,7 +169,7 @@ class ApiService(context: Context) { val source = response.body.source() while (!source.exhausted()) { val line = source.readUtf8Line() ?: throw Exception("Unexpected response for $url: line is null") - val notification = parser.parseWithTopic(line, notify = true, subscriptionId = 0) // subscriptionId to be set downstream + val notification = parser.parseWithTopic(line, subscriptionId = 0) // subscriptionId to be set downstream if (notification != null) { notify(notification.topic, notification.notification) } diff --git a/app/src/main/java/io/heckel/ntfy/msg/NotificationParser.kt b/app/src/main/java/io/heckel/ntfy/msg/NotificationParser.kt index fa4eac98..951a89f4 100644 --- a/app/src/main/java/io/heckel/ntfy/msg/NotificationParser.kt +++ b/app/src/main/java/io/heckel/ntfy/msg/NotificationParser.kt @@ -14,12 +14,12 @@ import java.lang.reflect.Type class NotificationParser { private val gson = Gson() - fun parse(s: String, subscriptionId: Long = 0, notify: Boolean = false): Notification? { - val notificationWithTopic = parseWithTopic(s, subscriptionId = subscriptionId, notify = notify) + fun parse(s: String, subscriptionId: Long = 0,): Notification? { + val notificationWithTopic = parseWithTopic(s, subscriptionId = subscriptionId) return notificationWithTopic?.notification } - fun parseWithTopic(s: String, subscriptionId: Long = 0, notify: Boolean = false): NotificationWithTopic? { + fun parseWithTopic(s: String, subscriptionId: Long = 0): NotificationWithTopic? { val message = gson.fromJson(s, Message::class.java) if (message.event != ApiService.EVENT_MESSAGE) { return null @@ -51,7 +51,6 @@ class NotificationParser { } val icon: Icon? = if (message.icon != null && message.icon != "") Icon(url = message.icon) else null val sid = message.sid ?: message.id // Default to id if sid not provided - val notificationId = if (notify) deriveNotificationId(sid) else 0 val notification = Notification( id = message.id, subscriptionId = subscriptionId, @@ -67,7 +66,7 @@ class NotificationParser { icon = icon, actions = actions, attachment = attachment, - notificationId = notificationId, + notificationId = deriveNotificationId(sid), deleted = message.deleted ?: false ) return NotificationWithTopic(message.topic, notification) diff --git a/app/src/main/java/io/heckel/ntfy/msg/Poller.kt b/app/src/main/java/io/heckel/ntfy/msg/Poller.kt new file mode 100644 index 00000000..17abe4bf --- /dev/null +++ b/app/src/main/java/io/heckel/ntfy/msg/Poller.kt @@ -0,0 +1,86 @@ +package io.heckel.ntfy.msg + +import io.heckel.ntfy.db.Notification +import io.heckel.ntfy.db.Repository +import io.heckel.ntfy.db.Subscription +import io.heckel.ntfy.db.User +import io.heckel.ntfy.util.Log +import io.heckel.ntfy.util.deriveNotificationId + +/** + * Polls the server for notifications and updates the repository. + * Groups notifications by SID and only keeps the latest for each sequence. + * Deletes sequences where the latest notification is marked as deleted. + */ +class Poller( + private val api: ApiService, + private val repository: Repository +) { + /** + * Polls for notifications and updates the repository. + * Returns the list of new notifications that were added. + * + * @param subscription The subscription to poll + * @param user The user for authentication (may be null) + * @param since The message ID to poll since (null for all cached messages) + * @param notify Whether to derive notification IDs for popup notifications + */ + suspend fun poll( + subscription: Subscription, + user: User?, + since: String? = null, + notify: Boolean = false + ): List { + val notifications = api.poll( + subscriptionId = subscription.id, + baseUrl = subscription.baseUrl, + topic = subscription.topic, + user = user, + since = since + ) + return processNotifications(subscription.id, notifications, notify) + } + + /** + * Processes a list of notifications: groups by SID, deletes deleted sequences, + * and adds only non-deleted latest notifications. + * Returns the list of notifications that were added. + */ + private suspend fun processNotifications( + subscriptionId: Long, + notifications: List, + notify: Boolean + ): List { + // Group by SID and only keep the latest notification for each sequence + val latestBySid = notifications + .groupBy { it.sid.ifEmpty { it.id } } + .mapValues { (_, notifs) -> notifs.maxByOrNull { it.timestamp } } + .values + .filterNotNull() + + // Delete sequences where the latest notification is marked as deleted + latestBySid.filter { it.deleted }.forEach { notification -> + val sid = notification.sid.ifEmpty { notification.id } + Log.d(TAG, "Deleting notifications with sid $sid") + repository.deleteBySid(subscriptionId, sid) + } + + // Add only non-deleted latest notifications + val notificationsToAdd = latestBySid + .filter { !it.deleted } + .map { if (notify) it.copy(notificationId = deriveNotificationId(it.sid)) else it } + + val addedNotifications = mutableListOf() + notificationsToAdd.forEach { notification -> + if (repository.addNotification(notification)) { + addedNotifications.add(notification) + } + } + + return addedNotifications + } + + companion object { + private const val TAG = "NtfyPoller" + } +} diff --git a/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt b/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt index b119f5dd..d67f8ef3 100644 --- a/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt @@ -147,7 +147,7 @@ class WsConnection( override fun onMessage(webSocket: WebSocket, text: String) { synchronize("onMessage") { Log.d(TAG, "$shortUrl (gid=$globalId, lid=$id): Received message: $text") - val notificationWithTopic = parser.parseWithTopic(text, subscriptionId = 0, notify = true) + val notificationWithTopic = parser.parseWithTopic(text, subscriptionId = 0) if (notificationWithTopic == null) { Log.d(TAG, "$shortUrl (gid=$globalId, lid=$id): Irrelevant or unknown message. Discarding.") return@synchronize diff --git a/app/src/main/java/io/heckel/ntfy/ui/DetailActivity.kt b/app/src/main/java/io/heckel/ntfy/ui/DetailActivity.kt index 4c62cd62..bc38d915 100644 --- a/app/src/main/java/io/heckel/ntfy/ui/DetailActivity.kt +++ b/app/src/main/java/io/heckel/ntfy/ui/DetailActivity.kt @@ -36,6 +36,7 @@ import io.heckel.ntfy.db.Subscription import io.heckel.ntfy.firebase.FirebaseMessenger import io.heckel.ntfy.msg.ApiService import io.heckel.ntfy.msg.NotificationService +import io.heckel.ntfy.msg.Poller import io.heckel.ntfy.service.SubscriberServiceManager import io.heckel.ntfy.util.Log import io.heckel.ntfy.util.copyToClipboard @@ -67,6 +68,7 @@ class DetailActivity : AppCompatActivity(), NotificationFragment.NotificationSet } private val repository by lazy { (application as Application).repository } private val api by lazy { ApiService(this) } + private val poller by lazy { Poller(api, repository) } private val messenger = FirebaseMessenger() private var notifier: NotificationService? = null // Context-dependent private var appBaseUrl: String? = null // Context-dependent @@ -231,8 +233,7 @@ class DetailActivity : AppCompatActivity(), NotificationFragment.NotificationSet // Fetch cached messages try { val user = repository.getUser(subscription.baseUrl) // May be null - val notifications = api.poll(subscription.id, subscription.baseUrl, subscription.topic, user) - notifications.forEach { notification -> repository.addNotification(notification) } + poller.poll(subscription, user) } catch (e: Exception) { Log.e(TAG, "Unable to fetch notifications: ${e.message}", e) } @@ -707,14 +708,16 @@ class DetailActivity : AppCompatActivity(), NotificationFragment.NotificationSet try { val subscription = repository.getSubscription(subscriptionId) ?: return@launch val user = repository.getUser(subscription.baseUrl) // May be null - val notifications = api.poll(subscription.id, subscription.baseUrl, subscription.topic, user, subscription.lastNotificationId) - val newNotifications = repository.onlyNewNotifications(subscriptionId, notifications) - val toastMessage = if (newNotifications.isEmpty()) { + val addedNotifications = poller.poll( + subscription = subscription, + user = user, + since = subscription.lastNotificationId + ) + val toastMessage = if (addedNotifications.isEmpty()) { getString(R.string.refresh_message_no_results) } else { - getString(R.string.refresh_message_result, newNotifications.size) + getString(R.string.refresh_message_result, addedNotifications.size) } - newNotifications.forEach { notification -> repository.addNotification(notification) } runOnUiThread { Toast.makeText(this@DetailActivity, toastMessage, Toast.LENGTH_LONG).show() mainListContainer.isRefreshing = false diff --git a/app/src/main/java/io/heckel/ntfy/ui/MainActivity.kt b/app/src/main/java/io/heckel/ntfy/ui/MainActivity.kt index 2d2a7228..ff11fe62 100644 --- a/app/src/main/java/io/heckel/ntfy/ui/MainActivity.kt +++ b/app/src/main/java/io/heckel/ntfy/ui/MainActivity.kt @@ -54,6 +54,7 @@ import io.heckel.ntfy.msg.ApiService import io.heckel.ntfy.msg.DownloadManager import io.heckel.ntfy.msg.DownloadType import io.heckel.ntfy.msg.NotificationDispatcher +import io.heckel.ntfy.msg.Poller import io.heckel.ntfy.service.SubscriberService import io.heckel.ntfy.service.SubscriberServiceManager import io.heckel.ntfy.util.Log @@ -67,7 +68,6 @@ import io.heckel.ntfy.util.randomSubscriptionId import io.heckel.ntfy.util.shortUrl import io.heckel.ntfy.util.topicShortUrl import io.heckel.ntfy.work.DeleteWorker -import io.heckel.ntfy.util.deriveNotificationId import io.heckel.ntfy.work.PollWorker import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.delay @@ -84,6 +84,7 @@ class MainActivity : AppCompatActivity(), AddFragment.SubscribeListener, Notific } private val repository by lazy { (application as Application).repository } private val api by lazy { ApiService(this) } + private val poller by lazy { Poller(api, repository) } private val messenger = FirebaseMessenger() // UI elements @@ -663,9 +664,8 @@ class MainActivity : AppCompatActivity(), AddFragment.SubscribeListener, Notific lifecycleScope.launch(Dispatchers.IO) { try { val user = repository.getUser(subscription.baseUrl) // May be null - val notifications = api.poll(subscription.id, subscription.baseUrl, subscription.topic, user) - notifications.forEach { notification -> - repository.addNotification(notification) + val addedNotifications = poller.poll(subscription, user) + addedNotifications.forEach { notification -> if (notification.icon != null) { DownloadManager.enqueue(this@MainActivity, notification.id, userAction = false, DownloadType.ICON) } @@ -705,14 +705,15 @@ class MainActivity : AppCompatActivity(), AddFragment.SubscribeListener, Notific Log.d(TAG, "subscription: $subscription") try { val user = repository.getUser(subscription.baseUrl) // May be null - val notifications = api.poll(subscription.id, subscription.baseUrl, subscription.topic, user, subscription.lastNotificationId) - val newNotifications = repository.onlyNewNotifications(subscription.id, notifications) - newNotifications.forEach { notification -> - newNotificationsCount++ - val notificationWithId = notification.copy(notificationId = deriveNotificationId(notification.sid)) - if (repository.addNotification(notificationWithId)) { - dispatcher?.dispatch(subscription, notificationWithId) - } + val addedNotifications = poller.poll( + subscription = subscription, + user = user, + since = subscription.lastNotificationId, + notify = true + ) + newNotificationsCount += addedNotifications.size + addedNotifications.forEach { notification -> + dispatcher?.dispatch(subscription, notification) } } catch (e: Exception) { val topic = displayName(appBaseUrl, subscription) diff --git a/app/src/main/java/io/heckel/ntfy/work/PollWorker.kt b/app/src/main/java/io/heckel/ntfy/work/PollWorker.kt index e227745a..f122b015 100644 --- a/app/src/main/java/io/heckel/ntfy/work/PollWorker.kt +++ b/app/src/main/java/io/heckel/ntfy/work/PollWorker.kt @@ -7,8 +7,8 @@ import io.heckel.ntfy.BuildConfig import io.heckel.ntfy.db.Repository import io.heckel.ntfy.msg.ApiService import io.heckel.ntfy.msg.NotificationDispatcher +import io.heckel.ntfy.msg.Poller import io.heckel.ntfy.util.Log -import io.heckel.ntfy.util.deriveNotificationId import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext @@ -27,6 +27,7 @@ class PollWorker(ctx: Context, params: WorkerParameters) : CoroutineWorker(ctx, val repository = Repository.getInstance(applicationContext) val dispatcher = NotificationDispatcher(applicationContext, repository) val api = ApiService(applicationContext) + val poller = Poller(api, repository) val baseUrl = inputData.getString(INPUT_DATA_BASE_URL) val topic = inputData.getString(INPUT_DATA_TOPIC) @@ -40,20 +41,14 @@ class PollWorker(ctx: Context, params: WorkerParameters) : CoroutineWorker(ctx, subscriptions.forEach{ subscription -> try { val user = repository.getUser(subscription.baseUrl) - val notifications = api.poll( - subscriptionId = subscription.id, - baseUrl = subscription.baseUrl, - topic = subscription.topic, + val addedNotifications = poller.poll( + subscription = subscription, user = user, - since = subscription.lastNotificationId + since = subscription.lastNotificationId, + notify = true ) - val newNotifications = repository - .onlyNewNotifications(subscription.id, notifications) - .map { it.copy(notificationId = deriveNotificationId(it.sid)) } - newNotifications.forEach { notification -> - if (repository.addNotification(notification)) { - dispatcher.dispatch(subscription, notification) - } + addedNotifications.forEach { notification -> + dispatcher.dispatch(subscription, notification) } } catch (e: Exception) { Log.e(TAG, "Failed checking messages: ${e.message}", e) diff --git a/app/src/play/java/io/heckel/ntfy/firebase/FirebaseService.kt b/app/src/play/java/io/heckel/ntfy/firebase/FirebaseService.kt index 18688913..4a00199d 100644 --- a/app/src/play/java/io/heckel/ntfy/firebase/FirebaseService.kt +++ b/app/src/play/java/io/heckel/ntfy/firebase/FirebaseService.kt @@ -130,7 +130,6 @@ class FirebaseService : FirebaseMessagingService() { } else null val icon: Icon? = if (iconUrl != null && iconUrl != "") Icon(url = iconUrl) else null val actualSid = sid ?: id - val notificationId = if (deleted) 0 else deriveNotificationId(actualSid) val notification = Notification( id = id, subscriptionId = subscription.id, @@ -146,7 +145,7 @@ class FirebaseService : FirebaseMessagingService() { icon = icon, actions = parser.parseActions(actions), attachment = attachment, - notificationId = notificationId, + notificationId = deriveNotificationId(actualSid), deleted = deleted ) if (repository.addNotification(notification)) {