Poller
This commit is contained in:
parent
6b2aa25a68
commit
85a5bb1b16
10 changed files with 138 additions and 45 deletions
|
|
@ -487,6 +487,9 @@ interface NotificationDao {
|
|||
@Query("UPDATE notification SET deleted = 1 WHERE subscriptionId = :subscriptionId AND sid = :sid")
|
||||
fun markAsDeletedBySid(subscriptionId: Long, sid: String)
|
||||
|
||||
@Query("DELETE FROM notification WHERE subscriptionId = :subscriptionId AND sid = :sid")
|
||||
fun deleteBySid(subscriptionId: Long, sid: String)
|
||||
|
||||
@Query("UPDATE notification SET deleted = 1 WHERE subscriptionId = :subscriptionId")
|
||||
fun markAllAsDeleted(subscriptionId: Long)
|
||||
|
||||
|
|
|
|||
|
|
@ -152,10 +152,13 @@ class Repository(private val sharedPrefs: SharedPreferences, database: Database)
|
|||
if (maybeExistingNotification != null) {
|
||||
return false
|
||||
}
|
||||
// If this is a delete notification, mark existing notifications with this sid as deleted
|
||||
// Delete old notifications with the same SID (this is an update to an existing sequence)
|
||||
if (notification.sid.isNotEmpty()) {
|
||||
notificationDao.deleteBySid(notification.subscriptionId, notification.sid)
|
||||
}
|
||||
// If this is a delete notification, don't add it to the database
|
||||
if (notification.deleted) {
|
||||
notificationDao.markAsDeletedBySid(notification.subscriptionId, notification.sid)
|
||||
// Still add the delete notification to the database for proper sequence tracking
|
||||
return false
|
||||
}
|
||||
subscriptionDao.updateLastNotificationId(notification.subscriptionId, notification.id)
|
||||
notificationDao.add(notification)
|
||||
|
|
@ -178,6 +181,10 @@ class Repository(private val sharedPrefs: SharedPreferences, database: Database)
|
|||
notificationDao.markAsDeletedBySid(subscriptionId, sid)
|
||||
}
|
||||
|
||||
fun deleteBySid(subscriptionId: Long, sid: String) {
|
||||
notificationDao.deleteBySid(subscriptionId, sid)
|
||||
}
|
||||
|
||||
fun markAllAsDeleted(subscriptionId: Long) {
|
||||
notificationDao.markAllAsDeleted(subscriptionId)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -138,7 +138,7 @@ class ApiService(context: Context) {
|
|||
val body = response.body.string().trim()
|
||||
if (body.isEmpty()) return emptyList()
|
||||
val notifications = body.lines().mapNotNull { line ->
|
||||
parser.parse(line, subscriptionId = subscriptionId) // No notification when we poll
|
||||
parser.parse(line, subscriptionId = subscriptionId)
|
||||
}
|
||||
|
||||
Log.d(TAG, "Notifications: $notifications")
|
||||
|
|
@ -169,7 +169,7 @@ class ApiService(context: Context) {
|
|||
val source = response.body.source()
|
||||
while (!source.exhausted()) {
|
||||
val line = source.readUtf8Line() ?: throw Exception("Unexpected response for $url: line is null")
|
||||
val notification = parser.parseWithTopic(line, notify = true, subscriptionId = 0) // subscriptionId to be set downstream
|
||||
val notification = parser.parseWithTopic(line, subscriptionId = 0) // subscriptionId to be set downstream
|
||||
if (notification != null) {
|
||||
notify(notification.topic, notification.notification)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,12 +14,12 @@ import java.lang.reflect.Type
|
|||
class NotificationParser {
|
||||
private val gson = Gson()
|
||||
|
||||
fun parse(s: String, subscriptionId: Long = 0, notify: Boolean = false): Notification? {
|
||||
val notificationWithTopic = parseWithTopic(s, subscriptionId = subscriptionId, notify = notify)
|
||||
fun parse(s: String, subscriptionId: Long = 0,): Notification? {
|
||||
val notificationWithTopic = parseWithTopic(s, subscriptionId = subscriptionId)
|
||||
return notificationWithTopic?.notification
|
||||
}
|
||||
|
||||
fun parseWithTopic(s: String, subscriptionId: Long = 0, notify: Boolean = false): NotificationWithTopic? {
|
||||
fun parseWithTopic(s: String, subscriptionId: Long = 0): NotificationWithTopic? {
|
||||
val message = gson.fromJson(s, Message::class.java)
|
||||
if (message.event != ApiService.EVENT_MESSAGE) {
|
||||
return null
|
||||
|
|
@ -51,7 +51,6 @@ class NotificationParser {
|
|||
}
|
||||
val icon: Icon? = if (message.icon != null && message.icon != "") Icon(url = message.icon) else null
|
||||
val sid = message.sid ?: message.id // Default to id if sid not provided
|
||||
val notificationId = if (notify) deriveNotificationId(sid) else 0
|
||||
val notification = Notification(
|
||||
id = message.id,
|
||||
subscriptionId = subscriptionId,
|
||||
|
|
@ -67,7 +66,7 @@ class NotificationParser {
|
|||
icon = icon,
|
||||
actions = actions,
|
||||
attachment = attachment,
|
||||
notificationId = notificationId,
|
||||
notificationId = deriveNotificationId(sid),
|
||||
deleted = message.deleted ?: false
|
||||
)
|
||||
return NotificationWithTopic(message.topic, notification)
|
||||
|
|
|
|||
86
app/src/main/java/io/heckel/ntfy/msg/Poller.kt
Normal file
86
app/src/main/java/io/heckel/ntfy/msg/Poller.kt
Normal file
|
|
@ -0,0 +1,86 @@
|
|||
package io.heckel.ntfy.msg
|
||||
|
||||
import io.heckel.ntfy.db.Notification
|
||||
import io.heckel.ntfy.db.Repository
|
||||
import io.heckel.ntfy.db.Subscription
|
||||
import io.heckel.ntfy.db.User
|
||||
import io.heckel.ntfy.util.Log
|
||||
import io.heckel.ntfy.util.deriveNotificationId
|
||||
|
||||
/**
|
||||
* Polls the server for notifications and updates the repository.
|
||||
* Groups notifications by SID and only keeps the latest for each sequence.
|
||||
* Deletes sequences where the latest notification is marked as deleted.
|
||||
*/
|
||||
class Poller(
|
||||
private val api: ApiService,
|
||||
private val repository: Repository
|
||||
) {
|
||||
/**
|
||||
* Polls for notifications and updates the repository.
|
||||
* Returns the list of new notifications that were added.
|
||||
*
|
||||
* @param subscription The subscription to poll
|
||||
* @param user The user for authentication (may be null)
|
||||
* @param since The message ID to poll since (null for all cached messages)
|
||||
* @param notify Whether to derive notification IDs for popup notifications
|
||||
*/
|
||||
suspend fun poll(
|
||||
subscription: Subscription,
|
||||
user: User?,
|
||||
since: String? = null,
|
||||
notify: Boolean = false
|
||||
): List<Notification> {
|
||||
val notifications = api.poll(
|
||||
subscriptionId = subscription.id,
|
||||
baseUrl = subscription.baseUrl,
|
||||
topic = subscription.topic,
|
||||
user = user,
|
||||
since = since
|
||||
)
|
||||
return processNotifications(subscription.id, notifications, notify)
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes a list of notifications: groups by SID, deletes deleted sequences,
|
||||
* and adds only non-deleted latest notifications.
|
||||
* Returns the list of notifications that were added.
|
||||
*/
|
||||
private suspend fun processNotifications(
|
||||
subscriptionId: Long,
|
||||
notifications: List<Notification>,
|
||||
notify: Boolean
|
||||
): List<Notification> {
|
||||
// Group by SID and only keep the latest notification for each sequence
|
||||
val latestBySid = notifications
|
||||
.groupBy { it.sid.ifEmpty { it.id } }
|
||||
.mapValues { (_, notifs) -> notifs.maxByOrNull { it.timestamp } }
|
||||
.values
|
||||
.filterNotNull()
|
||||
|
||||
// Delete sequences where the latest notification is marked as deleted
|
||||
latestBySid.filter { it.deleted }.forEach { notification ->
|
||||
val sid = notification.sid.ifEmpty { notification.id }
|
||||
Log.d(TAG, "Deleting notifications with sid $sid")
|
||||
repository.deleteBySid(subscriptionId, sid)
|
||||
}
|
||||
|
||||
// Add only non-deleted latest notifications
|
||||
val notificationsToAdd = latestBySid
|
||||
.filter { !it.deleted }
|
||||
.map { if (notify) it.copy(notificationId = deriveNotificationId(it.sid)) else it }
|
||||
|
||||
val addedNotifications = mutableListOf<Notification>()
|
||||
notificationsToAdd.forEach { notification ->
|
||||
if (repository.addNotification(notification)) {
|
||||
addedNotifications.add(notification)
|
||||
}
|
||||
}
|
||||
|
||||
return addedNotifications
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val TAG = "NtfyPoller"
|
||||
}
|
||||
}
|
||||
|
|
@ -147,7 +147,7 @@ class WsConnection(
|
|||
override fun onMessage(webSocket: WebSocket, text: String) {
|
||||
synchronize("onMessage") {
|
||||
Log.d(TAG, "$shortUrl (gid=$globalId, lid=$id): Received message: $text")
|
||||
val notificationWithTopic = parser.parseWithTopic(text, subscriptionId = 0, notify = true)
|
||||
val notificationWithTopic = parser.parseWithTopic(text, subscriptionId = 0)
|
||||
if (notificationWithTopic == null) {
|
||||
Log.d(TAG, "$shortUrl (gid=$globalId, lid=$id): Irrelevant or unknown message. Discarding.")
|
||||
return@synchronize
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ import io.heckel.ntfy.db.Subscription
|
|||
import io.heckel.ntfy.firebase.FirebaseMessenger
|
||||
import io.heckel.ntfy.msg.ApiService
|
||||
import io.heckel.ntfy.msg.NotificationService
|
||||
import io.heckel.ntfy.msg.Poller
|
||||
import io.heckel.ntfy.service.SubscriberServiceManager
|
||||
import io.heckel.ntfy.util.Log
|
||||
import io.heckel.ntfy.util.copyToClipboard
|
||||
|
|
@ -67,6 +68,7 @@ class DetailActivity : AppCompatActivity(), NotificationFragment.NotificationSet
|
|||
}
|
||||
private val repository by lazy { (application as Application).repository }
|
||||
private val api by lazy { ApiService(this) }
|
||||
private val poller by lazy { Poller(api, repository) }
|
||||
private val messenger = FirebaseMessenger()
|
||||
private var notifier: NotificationService? = null // Context-dependent
|
||||
private var appBaseUrl: String? = null // Context-dependent
|
||||
|
|
@ -231,8 +233,7 @@ class DetailActivity : AppCompatActivity(), NotificationFragment.NotificationSet
|
|||
// Fetch cached messages
|
||||
try {
|
||||
val user = repository.getUser(subscription.baseUrl) // May be null
|
||||
val notifications = api.poll(subscription.id, subscription.baseUrl, subscription.topic, user)
|
||||
notifications.forEach { notification -> repository.addNotification(notification) }
|
||||
poller.poll(subscription, user)
|
||||
} catch (e: Exception) {
|
||||
Log.e(TAG, "Unable to fetch notifications: ${e.message}", e)
|
||||
}
|
||||
|
|
@ -707,14 +708,16 @@ class DetailActivity : AppCompatActivity(), NotificationFragment.NotificationSet
|
|||
try {
|
||||
val subscription = repository.getSubscription(subscriptionId) ?: return@launch
|
||||
val user = repository.getUser(subscription.baseUrl) // May be null
|
||||
val notifications = api.poll(subscription.id, subscription.baseUrl, subscription.topic, user, subscription.lastNotificationId)
|
||||
val newNotifications = repository.onlyNewNotifications(subscriptionId, notifications)
|
||||
val toastMessage = if (newNotifications.isEmpty()) {
|
||||
val addedNotifications = poller.poll(
|
||||
subscription = subscription,
|
||||
user = user,
|
||||
since = subscription.lastNotificationId
|
||||
)
|
||||
val toastMessage = if (addedNotifications.isEmpty()) {
|
||||
getString(R.string.refresh_message_no_results)
|
||||
} else {
|
||||
getString(R.string.refresh_message_result, newNotifications.size)
|
||||
getString(R.string.refresh_message_result, addedNotifications.size)
|
||||
}
|
||||
newNotifications.forEach { notification -> repository.addNotification(notification) }
|
||||
runOnUiThread {
|
||||
Toast.makeText(this@DetailActivity, toastMessage, Toast.LENGTH_LONG).show()
|
||||
mainListContainer.isRefreshing = false
|
||||
|
|
|
|||
|
|
@ -54,6 +54,7 @@ import io.heckel.ntfy.msg.ApiService
|
|||
import io.heckel.ntfy.msg.DownloadManager
|
||||
import io.heckel.ntfy.msg.DownloadType
|
||||
import io.heckel.ntfy.msg.NotificationDispatcher
|
||||
import io.heckel.ntfy.msg.Poller
|
||||
import io.heckel.ntfy.service.SubscriberService
|
||||
import io.heckel.ntfy.service.SubscriberServiceManager
|
||||
import io.heckel.ntfy.util.Log
|
||||
|
|
@ -67,7 +68,6 @@ import io.heckel.ntfy.util.randomSubscriptionId
|
|||
import io.heckel.ntfy.util.shortUrl
|
||||
import io.heckel.ntfy.util.topicShortUrl
|
||||
import io.heckel.ntfy.work.DeleteWorker
|
||||
import io.heckel.ntfy.util.deriveNotificationId
|
||||
import io.heckel.ntfy.work.PollWorker
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.delay
|
||||
|
|
@ -84,6 +84,7 @@ class MainActivity : AppCompatActivity(), AddFragment.SubscribeListener, Notific
|
|||
}
|
||||
private val repository by lazy { (application as Application).repository }
|
||||
private val api by lazy { ApiService(this) }
|
||||
private val poller by lazy { Poller(api, repository) }
|
||||
private val messenger = FirebaseMessenger()
|
||||
|
||||
// UI elements
|
||||
|
|
@ -663,9 +664,8 @@ class MainActivity : AppCompatActivity(), AddFragment.SubscribeListener, Notific
|
|||
lifecycleScope.launch(Dispatchers.IO) {
|
||||
try {
|
||||
val user = repository.getUser(subscription.baseUrl) // May be null
|
||||
val notifications = api.poll(subscription.id, subscription.baseUrl, subscription.topic, user)
|
||||
notifications.forEach { notification ->
|
||||
repository.addNotification(notification)
|
||||
val addedNotifications = poller.poll(subscription, user)
|
||||
addedNotifications.forEach { notification ->
|
||||
if (notification.icon != null) {
|
||||
DownloadManager.enqueue(this@MainActivity, notification.id, userAction = false, DownloadType.ICON)
|
||||
}
|
||||
|
|
@ -705,14 +705,15 @@ class MainActivity : AppCompatActivity(), AddFragment.SubscribeListener, Notific
|
|||
Log.d(TAG, "subscription: $subscription")
|
||||
try {
|
||||
val user = repository.getUser(subscription.baseUrl) // May be null
|
||||
val notifications = api.poll(subscription.id, subscription.baseUrl, subscription.topic, user, subscription.lastNotificationId)
|
||||
val newNotifications = repository.onlyNewNotifications(subscription.id, notifications)
|
||||
newNotifications.forEach { notification ->
|
||||
newNotificationsCount++
|
||||
val notificationWithId = notification.copy(notificationId = deriveNotificationId(notification.sid))
|
||||
if (repository.addNotification(notificationWithId)) {
|
||||
dispatcher?.dispatch(subscription, notificationWithId)
|
||||
}
|
||||
val addedNotifications = poller.poll(
|
||||
subscription = subscription,
|
||||
user = user,
|
||||
since = subscription.lastNotificationId,
|
||||
notify = true
|
||||
)
|
||||
newNotificationsCount += addedNotifications.size
|
||||
addedNotifications.forEach { notification ->
|
||||
dispatcher?.dispatch(subscription, notification)
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
val topic = displayName(appBaseUrl, subscription)
|
||||
|
|
|
|||
|
|
@ -7,8 +7,8 @@ import io.heckel.ntfy.BuildConfig
|
|||
import io.heckel.ntfy.db.Repository
|
||||
import io.heckel.ntfy.msg.ApiService
|
||||
import io.heckel.ntfy.msg.NotificationDispatcher
|
||||
import io.heckel.ntfy.msg.Poller
|
||||
import io.heckel.ntfy.util.Log
|
||||
import io.heckel.ntfy.util.deriveNotificationId
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.withContext
|
||||
|
||||
|
|
@ -27,6 +27,7 @@ class PollWorker(ctx: Context, params: WorkerParameters) : CoroutineWorker(ctx,
|
|||
val repository = Repository.getInstance(applicationContext)
|
||||
val dispatcher = NotificationDispatcher(applicationContext, repository)
|
||||
val api = ApiService(applicationContext)
|
||||
val poller = Poller(api, repository)
|
||||
|
||||
val baseUrl = inputData.getString(INPUT_DATA_BASE_URL)
|
||||
val topic = inputData.getString(INPUT_DATA_TOPIC)
|
||||
|
|
@ -40,20 +41,14 @@ class PollWorker(ctx: Context, params: WorkerParameters) : CoroutineWorker(ctx,
|
|||
subscriptions.forEach{ subscription ->
|
||||
try {
|
||||
val user = repository.getUser(subscription.baseUrl)
|
||||
val notifications = api.poll(
|
||||
subscriptionId = subscription.id,
|
||||
baseUrl = subscription.baseUrl,
|
||||
topic = subscription.topic,
|
||||
val addedNotifications = poller.poll(
|
||||
subscription = subscription,
|
||||
user = user,
|
||||
since = subscription.lastNotificationId
|
||||
since = subscription.lastNotificationId,
|
||||
notify = true
|
||||
)
|
||||
val newNotifications = repository
|
||||
.onlyNewNotifications(subscription.id, notifications)
|
||||
.map { it.copy(notificationId = deriveNotificationId(it.sid)) }
|
||||
newNotifications.forEach { notification ->
|
||||
if (repository.addNotification(notification)) {
|
||||
dispatcher.dispatch(subscription, notification)
|
||||
}
|
||||
addedNotifications.forEach { notification ->
|
||||
dispatcher.dispatch(subscription, notification)
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
Log.e(TAG, "Failed checking messages: ${e.message}", e)
|
||||
|
|
|
|||
|
|
@ -130,7 +130,6 @@ class FirebaseService : FirebaseMessagingService() {
|
|||
} else null
|
||||
val icon: Icon? = if (iconUrl != null && iconUrl != "") Icon(url = iconUrl) else null
|
||||
val actualSid = sid ?: id
|
||||
val notificationId = if (deleted) 0 else deriveNotificationId(actualSid)
|
||||
val notification = Notification(
|
||||
id = id,
|
||||
subscriptionId = subscription.id,
|
||||
|
|
@ -146,7 +145,7 @@ class FirebaseService : FirebaseMessagingService() {
|
|||
icon = icon,
|
||||
actions = parser.parseActions(actions),
|
||||
attachment = attachment,
|
||||
notificationId = notificationId,
|
||||
notificationId = deriveNotificationId(actualSid),
|
||||
deleted = deleted
|
||||
)
|
||||
if (repository.addNotification(notification)) {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue