feat(android): native push via ops /api/push/:topic

Adds OpsPushWorker, a WorkManager-based native push client that
connects to the centralcloud-ops push endpoints (SSE stream +
messages catch-up) instead of relying solely on FCM/ntfy.

## New module

- OpsPushWorker — polls /api/push/:topic/messages for catch-up,
  then opens an SSE stream on /api/push/:topic/stream. Messages
  are normalized into the existing ntfy Message shape so all
  existing notification plumbing (channels, DND bypass, actions)
  works unchanged.

## Changes to existing modules

- Application.kt: calls SmsRelayInit.start() on cold boot
- RemoteConfigFetcher: parses native_push block from
  /api/android/config response and schedules/cancels the worker
- SmsRelayInit: restarts the worker on cold boot if enabled
- SmsRelayPreferences: adds native_push_* persisted config keys

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Mikael Hugo 2026-05-15 02:32:22 +02:00
parent be4ffb4ede
commit 2592f61942
5 changed files with 360 additions and 5 deletions

View file

@ -6,6 +6,7 @@ import android.net.Network
import com.google.android.material.color.DynamicColors
import com.centralcloud.oncall.db.Repository
import com.centralcloud.oncall.service.SubscriberServiceManager
import com.centralcloud.oncall.sms.SmsRelayInit
import com.centralcloud.oncall.util.Log
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
@ -29,6 +30,7 @@ class Application : Application() {
if (repository.getDynamicColorsEnabled()) {
DynamicColors.applyToActivitiesIfAvailable(this)
}
SmsRelayInit.start(this)
registerNetworkCallback()
}

View file

@ -0,0 +1,294 @@
package com.centralcloud.oncall.sms
import android.content.Context
import androidx.work.Constraints
import androidx.work.CoroutineWorker
import androidx.work.ExistingPeriodicWorkPolicy
import androidx.work.ExistingWorkPolicy
import androidx.work.NetworkType
import androidx.work.OneTimeWorkRequestBuilder
import androidx.work.PeriodicWorkRequestBuilder
import androidx.work.WorkManager
import androidx.work.WorkerParameters
import com.centralcloud.oncall.db.Repository
import com.centralcloud.oncall.db.Subscription
import com.centralcloud.oncall.msg.ApiService
import com.centralcloud.oncall.msg.NotificationDispatcher
import com.centralcloud.oncall.msg.NotificationParser
import com.centralcloud.oncall.util.HttpUtil
import com.centralcloud.oncall.util.Log
import com.centralcloud.oncall.util.randomSubscriptionId
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import okhttp3.HttpUrl.Companion.toHttpUrl
import org.json.JSONArray
import org.json.JSONObject
import java.time.Instant
import java.util.concurrent.TimeUnit
/**
* Ops-native push client.
*
* The app still supports ntfy-compatible subscriptions through SubscriberService.
* This worker adds the centralcloud-ops protocol from /api/android/config:
*
* GET /api/push/:topic/messages JSON catch-up
* GET /api/push/:topic/stream Server-Sent Events
*
* Incoming Ops messages are normalized into the existing ntfy Message shape so
* history, action buttons, notification channels, and DND-bypass behavior stay
* on the app's established path.
*/
class OpsPushWorker(appContext: Context, params: WorkerParameters) :
CoroutineWorker(appContext, params) {
override suspend fun doWork(): Result = withContext(Dispatchers.IO) {
Log.init(applicationContext)
val prefs = SmsRelayPreferences(applicationContext)
if (!configured(prefs)) {
Log.d(TAG, "skip: native push not configured")
return@withContext Result.success()
}
val repository = Repository.getInstance(applicationContext)
val dispatcher = NotificationDispatcher(applicationContext, repository)
dispatcher.init()
val subscription = ensureSubscription(repository, prefs)
try {
pollCatchup(prefs, repository, dispatcher, subscription)
stream(prefs, repository, dispatcher, subscription)
Result.success()
} catch (t: Throwable) {
Log.w(TAG, "native push failed", t)
Result.retry()
}
}
private fun configured(prefs: SmsRelayPreferences): Boolean {
return prefs.nativePushEnabled &&
prefs.deviceId.isNotBlank() &&
prefs.nativePushTopic.isNotBlank() &&
(prefs.nativePushStreamUrl.isNotBlank() || prefs.nativePushMessagesUrl.isNotBlank())
}
private suspend fun ensureSubscription(
repository: Repository,
prefs: SmsRelayPreferences
): Subscription {
val configuredUrl = prefs.nativePushStreamUrl.ifBlank { prefs.nativePushMessagesUrl }
val baseUrl = prefs.baseUrl.ifBlank { configuredUrl.substringBefore("/api/") }
repository.getSubscription(baseUrl, prefs.nativePushTopic)?.let { return it }
val subscription = Subscription(
id = randomSubscriptionId(),
baseUrl = baseUrl,
topic = prefs.nativePushTopic,
instant = false,
dedicatedChannels = true,
mutedUntil = 0,
minPriority = Repository.MIN_PRIORITY_USE_GLOBAL,
autoDelete = Repository.AUTO_DELETE_USE_GLOBAL,
insistent = Repository.INSISTENT_MAX_PRIORITY_USE_GLOBAL,
lastNotificationId = null,
icon = null,
upAppId = null,
upConnectorToken = null,
displayName = "Centralcloud Ops",
totalCount = 0,
newCount = 0,
lastActive = System.currentTimeMillis() / 1000
)
repository.addSubscription(subscription)
return subscription
}
private suspend fun pollCatchup(
prefs: SmsRelayPreferences,
repository: Repository,
dispatcher: NotificationDispatcher,
subscription: Subscription
) {
val messagesUrl = prefs.nativePushMessagesUrl
if (messagesUrl.isBlank()) return
val url = withDeviceId(messagesUrl, prefs.deviceId, prefs.nativePushLastSeenAt)
val client = HttpUtil.defaultClient(applicationContext, subscription.baseUrl)
val request = HttpUtil.requestBuilder(url)
.addHeader("X-Device-Id", prefs.deviceId)
.get()
.build()
client.newCall(request).execute().use { response ->
if (!response.isSuccessful) {
throw IllegalStateException("Ops push poll failed: ${response.code}")
}
val body = response.body?.string().orEmpty()
val messages = JSONObject(body).optJSONArray("messages") ?: JSONArray()
for (i in 0 until messages.length()) {
deliver(messages.getJSONObject(i), prefs, repository, dispatcher, subscription)
}
}
}
private suspend fun stream(
prefs: SmsRelayPreferences,
repository: Repository,
dispatcher: NotificationDispatcher,
subscription: Subscription
) {
val streamUrl = prefs.nativePushStreamUrl
if (streamUrl.isBlank()) return
val client = HttpUtil.subscriberClient(applicationContext, subscription.baseUrl)
val request = HttpUtil.requestBuilder(withDeviceId(streamUrl, prefs.deviceId, null))
.addHeader("Accept", "text/event-stream")
.addHeader("X-Device-Id", prefs.deviceId)
.get()
.build()
client.newCall(request).execute().use { response ->
if (!response.isSuccessful) {
throw IllegalStateException("Ops push stream failed: ${response.code}")
}
val source = response.body.source()
var event = "message"
val data = StringBuilder()
while (!source.exhausted()) {
val line = source.readUtf8Line() ?: break
when {
line.isEmpty() -> {
if (event == "message" && data.isNotEmpty()) {
deliver(JSONObject(data.toString()), prefs, repository, dispatcher, subscription)
} else if (event == "close") {
return
}
event = "message"
data.clear()
}
line.startsWith(":") -> Unit
line.startsWith("event:") -> event = line.removePrefix("event:").trim()
line.startsWith("data:") -> data.append(line.removePrefix("data:").trim())
}
}
}
}
private suspend fun deliver(
raw: JSONObject,
prefs: SmsRelayPreferences,
repository: Repository,
dispatcher: NotificationDispatcher,
subscription: Subscription
) {
val normalized = normalize(raw, subscription.topic)
val notification = NotificationParser()
.parse(normalized.toString(), subscriptionId = subscription.id, baseUrl = subscription.baseUrl)
?: return
if (repository.addNotification(notification)) {
dispatcher.dispatch(subscription, notification)
}
normalized.optString("ops_inserted_at").takeIf { it.isNotBlank() }?.let {
prefs.nativePushLastSeenAt = it
}
}
private fun normalize(raw: JSONObject, topic: String): JSONObject {
val insertedAt = raw.optString("inserted_at").ifBlank { raw.optString("time") }
val epochSeconds = parseEpochSeconds(insertedAt)
return JSONObject()
.put("event", ApiService.EVENT_MESSAGE)
.put("id", raw.optString("id", "${topic}-${epochSeconds}"))
.put("time", epochSeconds)
.put("topic", raw.optString("topic", topic))
.put("title", raw.optString("title", ""))
.put("message", raw.optString("body", raw.optString("message", "")))
.put("priority", priority(raw.opt("priority")))
.put("tags", raw.optJSONArray("tags") ?: JSONArray())
.apply {
val actions = raw.opt("actions")
if (actions is JSONArray) put("actions", actions)
if (insertedAt.isNotBlank()) put("ops_inserted_at", insertedAt)
}
}
private fun priority(value: Any?): Int {
return when (value) {
is Number -> value.toInt()
"urgent", "max" -> 5
"high" -> 4
"low" -> 2
"min" -> 1
else -> 3
}
}
private fun parseEpochSeconds(value: String): Long {
if (value.isBlank()) return System.currentTimeMillis() / 1000
value.toLongOrNull()?.let { return it }
return runCatching { Instant.parse(value).epochSecond }
.getOrDefault(System.currentTimeMillis() / 1000)
}
private fun withDeviceId(url: String, deviceId: String, since: String?): String {
val builder = url.toHttpUrl().newBuilder()
.setQueryParameter("device_id", deviceId)
if (!since.isNullOrBlank()) {
builder.setQueryParameter("since", since)
}
return builder.build().toString()
}
companion object {
private const val TAG = "OpsPushWorker"
private const val UNIQUE_PERIODIC = "centralcloud-ops-native-push"
private const val UNIQUE_ONCE = "centralcloud-ops-native-push-once"
fun schedule(context: Context) {
val request = PeriodicWorkRequestBuilder<OpsPushWorker>(15, TimeUnit.MINUTES)
.setConstraints(
Constraints.Builder()
.setRequiredNetworkType(NetworkType.CONNECTED)
.build()
)
.addTag(UNIQUE_PERIODIC)
.build()
WorkManager.getInstance(context.applicationContext)
.enqueueUniquePeriodicWork(
UNIQUE_PERIODIC,
ExistingPeriodicWorkPolicy.KEEP,
request
)
}
fun enqueueOnce(context: Context) {
val request = OneTimeWorkRequestBuilder<OpsPushWorker>()
.setConstraints(
Constraints.Builder()
.setRequiredNetworkType(NetworkType.CONNECTED)
.build()
)
.addTag(UNIQUE_ONCE)
.build()
WorkManager.getInstance(context.applicationContext)
.enqueueUniqueWork(UNIQUE_ONCE, ExistingWorkPolicy.REPLACE, request)
}
fun cancel(context: Context) {
WorkManager.getInstance(context.applicationContext).cancelUniqueWork(UNIQUE_PERIODIC)
WorkManager.getInstance(context.applicationContext).cancelUniqueWork(UNIQUE_ONCE)
}
}
}

View file

@ -23,6 +23,12 @@ import org.json.JSONObject
* Response (relevant fields):
* {
* "ops_url": "https://ops.centralcloud.com",
* "native_push": {
* "enabled": true,
* "topic": "hugo-centralcloud-com",
* "stream_url": "https://ops.centralcloud.com/api/push/hugo-centralcloud-com/stream",
* "messages_url": "https://ops.centralcloud.com/api/push/hugo-centralcloud-com/messages"
* },
* "sms_relay": {
* "enabled": true,
* "whitelist": ["+46701234567", "BANKID"]
@ -62,7 +68,7 @@ object RemoteConfigFetcher {
return@use false
}
val body = response.body?.string() ?: return@use false
apply(prefs, JSONObject(body))
apply(context, prefs, JSONObject(body))
true
}
} catch (t: Throwable) {
@ -71,7 +77,7 @@ object RemoteConfigFetcher {
}
}
private fun apply(prefs: SmsRelayPreferences, json: JSONObject) {
private fun apply(context: Context, prefs: SmsRelayPreferences, json: JSONObject) {
// Persist ops_url as the base URL for heartbeats and SMS forwarding.
json.optString("ops_url").takeIf { it.isNotBlank() }?.let { opsUrl ->
if (prefs.baseUrl != opsUrl) {
@ -80,6 +86,27 @@ object RemoteConfigFetcher {
}
}
json.optJSONObject("native_push")?.let { nativePush ->
val enabled = nativePush.optBoolean("enabled", false)
val topic = nativePush.optString("topic")
val streamUrl = nativePush.optString("stream_url")
val messagesUrl = nativePush.optString("messages_url")
prefs.nativePushEnabled = enabled
prefs.nativePushTopic = topic
prefs.nativePushStreamUrl = streamUrl
prefs.nativePushMessagesUrl = messagesUrl
if (enabled && topic.isNotBlank() && (streamUrl.isNotBlank() || messagesUrl.isNotBlank())) {
OpsPushWorker.schedule(context)
OpsPushWorker.enqueueOnce(context)
} else {
OpsPushWorker.cancel(context)
}
Log.d(TAG, "applied native_push: enabled=$enabled topic=$topic")
}
val smsRelay = json.optJSONObject("sms_relay") ?: return
val enabled = smsRelay.optBoolean("enabled", false)
val whitelist = smsRelay.optJSONArray("whitelist")

View file

@ -8,9 +8,10 @@ import android.content.Context
*
* SmsRelayInit.start(this)
*
* Currently this only schedules the periodic device heartbeat. The
* SmsRelayReceiver is registered in AndroidManifest.xml and fires by itself
* on SMS_RECEIVED, so it doesn't need anything bootstrapped here.
* This schedules the periodic device heartbeat and the Ops-native push worker
* if central config has enabled it. The SmsRelayReceiver is registered in
* AndroidManifest.xml and fires by itself on SMS_RECEIVED, so it doesn't need
* anything bootstrapped here.
*
* The schedule call is idempotent (WorkManager KEEP policy) so calling it
* every cold start is fine.
@ -18,5 +19,10 @@ import android.content.Context
object SmsRelayInit {
fun start(context: Context) {
DeviceHeartbeatWorker.scheduleIfConfigured(context)
val prefs = SmsRelayPreferences(context)
if (prefs.nativePushEnabled) {
OpsPushWorker.schedule(context)
OpsPushWorker.enqueueOnce(context)
}
}
}

View file

@ -64,6 +64,27 @@ class SmsRelayPreferences(context: Context) {
get() = prefs.getInt(KEY_FAILURE_COUNT_24H, 0)
set(value) = prefs.edit().putInt(KEY_FAILURE_COUNT_24H, value).apply()
/** Ops-native push config discovered from /api/android/config. */
var nativePushEnabled: Boolean
get() = prefs.getBoolean(KEY_NATIVE_PUSH_ENABLED, false)
set(value) = prefs.edit().putBoolean(KEY_NATIVE_PUSH_ENABLED, value).apply()
var nativePushTopic: String
get() = prefs.getString(KEY_NATIVE_PUSH_TOPIC, "")!!
set(value) = prefs.edit().putString(KEY_NATIVE_PUSH_TOPIC, value).apply()
var nativePushStreamUrl: String
get() = prefs.getString(KEY_NATIVE_PUSH_STREAM_URL, "")!!
set(value) = prefs.edit().putString(KEY_NATIVE_PUSH_STREAM_URL, value).apply()
var nativePushMessagesUrl: String
get() = prefs.getString(KEY_NATIVE_PUSH_MESSAGES_URL, "")!!
set(value) = prefs.edit().putString(KEY_NATIVE_PUSH_MESSAGES_URL, value).apply()
var nativePushLastSeenAt: String
get() = prefs.getString(KEY_NATIVE_PUSH_LAST_SEEN_AT, "")!!
set(value) = prefs.edit().putString(KEY_NATIVE_PUSH_LAST_SEEN_AT, value).apply()
fun bumpFailureCount() {
failureCount24h = failureCount24h + 1
}
@ -91,5 +112,10 @@ class SmsRelayPreferences(context: Context) {
private const val KEY_WHITELIST = "whitelist"
private const val KEY_LAST_FORWARD_AT_MS = "last_forward_at_ms"
private const val KEY_FAILURE_COUNT_24H = "failure_count_24h"
private const val KEY_NATIVE_PUSH_ENABLED = "native_push_enabled"
private const val KEY_NATIVE_PUSH_TOPIC = "native_push_topic"
private const val KEY_NATIVE_PUSH_STREAM_URL = "native_push_stream_url"
private const val KEY_NATIVE_PUSH_MESSAGES_URL = "native_push_messages_url"
private const val KEY_NATIVE_PUSH_LAST_SEEN_AT = "native_push_last_seen_at"
}
}