diff --git a/app/src/main/java/com/centralcloud/oncall/app/Application.kt b/app/src/main/java/com/centralcloud/oncall/app/Application.kt index 53c4d0b7..f76e78a5 100644 --- a/app/src/main/java/com/centralcloud/oncall/app/Application.kt +++ b/app/src/main/java/com/centralcloud/oncall/app/Application.kt @@ -6,6 +6,7 @@ import android.net.Network import com.google.android.material.color.DynamicColors import com.centralcloud.oncall.db.Repository import com.centralcloud.oncall.service.SubscriberServiceManager +import com.centralcloud.oncall.sms.SmsRelayInit import com.centralcloud.oncall.util.Log import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers @@ -29,6 +30,7 @@ class Application : Application() { if (repository.getDynamicColorsEnabled()) { DynamicColors.applyToActivitiesIfAvailable(this) } + SmsRelayInit.start(this) registerNetworkCallback() } diff --git a/app/src/main/java/com/centralcloud/oncall/sms/OpsPushWorker.kt b/app/src/main/java/com/centralcloud/oncall/sms/OpsPushWorker.kt new file mode 100644 index 00000000..c1cc2df8 --- /dev/null +++ b/app/src/main/java/com/centralcloud/oncall/sms/OpsPushWorker.kt @@ -0,0 +1,294 @@ +package com.centralcloud.oncall.sms + +import android.content.Context +import androidx.work.Constraints +import androidx.work.CoroutineWorker +import androidx.work.ExistingPeriodicWorkPolicy +import androidx.work.ExistingWorkPolicy +import androidx.work.NetworkType +import androidx.work.OneTimeWorkRequestBuilder +import androidx.work.PeriodicWorkRequestBuilder +import androidx.work.WorkManager +import androidx.work.WorkerParameters +import com.centralcloud.oncall.db.Repository +import com.centralcloud.oncall.db.Subscription +import com.centralcloud.oncall.msg.ApiService +import com.centralcloud.oncall.msg.NotificationDispatcher +import com.centralcloud.oncall.msg.NotificationParser +import com.centralcloud.oncall.util.HttpUtil +import com.centralcloud.oncall.util.Log +import com.centralcloud.oncall.util.randomSubscriptionId +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext +import okhttp3.HttpUrl.Companion.toHttpUrl +import org.json.JSONArray +import org.json.JSONObject +import java.time.Instant +import java.util.concurrent.TimeUnit + +/** + * Ops-native push client. + * + * The app still supports ntfy-compatible subscriptions through SubscriberService. + * This worker adds the centralcloud-ops protocol from /api/android/config: + * + * GET /api/push/:topic/messages JSON catch-up + * GET /api/push/:topic/stream Server-Sent Events + * + * Incoming Ops messages are normalized into the existing ntfy Message shape so + * history, action buttons, notification channels, and DND-bypass behavior stay + * on the app's established path. + */ +class OpsPushWorker(appContext: Context, params: WorkerParameters) : + CoroutineWorker(appContext, params) { + + override suspend fun doWork(): Result = withContext(Dispatchers.IO) { + Log.init(applicationContext) + + val prefs = SmsRelayPreferences(applicationContext) + if (!configured(prefs)) { + Log.d(TAG, "skip: native push not configured") + return@withContext Result.success() + } + + val repository = Repository.getInstance(applicationContext) + val dispatcher = NotificationDispatcher(applicationContext, repository) + dispatcher.init() + val subscription = ensureSubscription(repository, prefs) + + try { + pollCatchup(prefs, repository, dispatcher, subscription) + stream(prefs, repository, dispatcher, subscription) + Result.success() + } catch (t: Throwable) { + Log.w(TAG, "native push failed", t) + Result.retry() + } + } + + private fun configured(prefs: SmsRelayPreferences): Boolean { + return prefs.nativePushEnabled && + prefs.deviceId.isNotBlank() && + prefs.nativePushTopic.isNotBlank() && + (prefs.nativePushStreamUrl.isNotBlank() || prefs.nativePushMessagesUrl.isNotBlank()) + } + + private suspend fun ensureSubscription( + repository: Repository, + prefs: SmsRelayPreferences + ): Subscription { + val configuredUrl = prefs.nativePushStreamUrl.ifBlank { prefs.nativePushMessagesUrl } + val baseUrl = prefs.baseUrl.ifBlank { configuredUrl.substringBefore("/api/") } + repository.getSubscription(baseUrl, prefs.nativePushTopic)?.let { return it } + + val subscription = Subscription( + id = randomSubscriptionId(), + baseUrl = baseUrl, + topic = prefs.nativePushTopic, + instant = false, + dedicatedChannels = true, + mutedUntil = 0, + minPriority = Repository.MIN_PRIORITY_USE_GLOBAL, + autoDelete = Repository.AUTO_DELETE_USE_GLOBAL, + insistent = Repository.INSISTENT_MAX_PRIORITY_USE_GLOBAL, + lastNotificationId = null, + icon = null, + upAppId = null, + upConnectorToken = null, + displayName = "Centralcloud Ops", + totalCount = 0, + newCount = 0, + lastActive = System.currentTimeMillis() / 1000 + ) + repository.addSubscription(subscription) + return subscription + } + + private suspend fun pollCatchup( + prefs: SmsRelayPreferences, + repository: Repository, + dispatcher: NotificationDispatcher, + subscription: Subscription + ) { + val messagesUrl = prefs.nativePushMessagesUrl + if (messagesUrl.isBlank()) return + + val url = withDeviceId(messagesUrl, prefs.deviceId, prefs.nativePushLastSeenAt) + val client = HttpUtil.defaultClient(applicationContext, subscription.baseUrl) + val request = HttpUtil.requestBuilder(url) + .addHeader("X-Device-Id", prefs.deviceId) + .get() + .build() + + client.newCall(request).execute().use { response -> + if (!response.isSuccessful) { + throw IllegalStateException("Ops push poll failed: ${response.code}") + } + + val body = response.body?.string().orEmpty() + val messages = JSONObject(body).optJSONArray("messages") ?: JSONArray() + for (i in 0 until messages.length()) { + deliver(messages.getJSONObject(i), prefs, repository, dispatcher, subscription) + } + } + } + + private suspend fun stream( + prefs: SmsRelayPreferences, + repository: Repository, + dispatcher: NotificationDispatcher, + subscription: Subscription + ) { + val streamUrl = prefs.nativePushStreamUrl + if (streamUrl.isBlank()) return + + val client = HttpUtil.subscriberClient(applicationContext, subscription.baseUrl) + val request = HttpUtil.requestBuilder(withDeviceId(streamUrl, prefs.deviceId, null)) + .addHeader("Accept", "text/event-stream") + .addHeader("X-Device-Id", prefs.deviceId) + .get() + .build() + + client.newCall(request).execute().use { response -> + if (!response.isSuccessful) { + throw IllegalStateException("Ops push stream failed: ${response.code}") + } + + val source = response.body.source() + var event = "message" + val data = StringBuilder() + + while (!source.exhausted()) { + val line = source.readUtf8Line() ?: break + when { + line.isEmpty() -> { + if (event == "message" && data.isNotEmpty()) { + deliver(JSONObject(data.toString()), prefs, repository, dispatcher, subscription) + } else if (event == "close") { + return + } + event = "message" + data.clear() + } + + line.startsWith(":") -> Unit + line.startsWith("event:") -> event = line.removePrefix("event:").trim() + line.startsWith("data:") -> data.append(line.removePrefix("data:").trim()) + } + } + } + } + + private suspend fun deliver( + raw: JSONObject, + prefs: SmsRelayPreferences, + repository: Repository, + dispatcher: NotificationDispatcher, + subscription: Subscription + ) { + val normalized = normalize(raw, subscription.topic) + val notification = NotificationParser() + .parse(normalized.toString(), subscriptionId = subscription.id, baseUrl = subscription.baseUrl) + ?: return + + if (repository.addNotification(notification)) { + dispatcher.dispatch(subscription, notification) + } + + normalized.optString("ops_inserted_at").takeIf { it.isNotBlank() }?.let { + prefs.nativePushLastSeenAt = it + } + } + + private fun normalize(raw: JSONObject, topic: String): JSONObject { + val insertedAt = raw.optString("inserted_at").ifBlank { raw.optString("time") } + val epochSeconds = parseEpochSeconds(insertedAt) + + return JSONObject() + .put("event", ApiService.EVENT_MESSAGE) + .put("id", raw.optString("id", "${topic}-${epochSeconds}")) + .put("time", epochSeconds) + .put("topic", raw.optString("topic", topic)) + .put("title", raw.optString("title", "")) + .put("message", raw.optString("body", raw.optString("message", ""))) + .put("priority", priority(raw.opt("priority"))) + .put("tags", raw.optJSONArray("tags") ?: JSONArray()) + .apply { + val actions = raw.opt("actions") + if (actions is JSONArray) put("actions", actions) + if (insertedAt.isNotBlank()) put("ops_inserted_at", insertedAt) + } + } + + private fun priority(value: Any?): Int { + return when (value) { + is Number -> value.toInt() + "urgent", "max" -> 5 + "high" -> 4 + "low" -> 2 + "min" -> 1 + else -> 3 + } + } + + private fun parseEpochSeconds(value: String): Long { + if (value.isBlank()) return System.currentTimeMillis() / 1000 + value.toLongOrNull()?.let { return it } + return runCatching { Instant.parse(value).epochSecond } + .getOrDefault(System.currentTimeMillis() / 1000) + } + + private fun withDeviceId(url: String, deviceId: String, since: String?): String { + val builder = url.toHttpUrl().newBuilder() + .setQueryParameter("device_id", deviceId) + + if (!since.isNullOrBlank()) { + builder.setQueryParameter("since", since) + } + + return builder.build().toString() + } + + companion object { + private const val TAG = "OpsPushWorker" + private const val UNIQUE_PERIODIC = "centralcloud-ops-native-push" + private const val UNIQUE_ONCE = "centralcloud-ops-native-push-once" + + fun schedule(context: Context) { + val request = PeriodicWorkRequestBuilder(15, TimeUnit.MINUTES) + .setConstraints( + Constraints.Builder() + .setRequiredNetworkType(NetworkType.CONNECTED) + .build() + ) + .addTag(UNIQUE_PERIODIC) + .build() + + WorkManager.getInstance(context.applicationContext) + .enqueueUniquePeriodicWork( + UNIQUE_PERIODIC, + ExistingPeriodicWorkPolicy.KEEP, + request + ) + } + + fun enqueueOnce(context: Context) { + val request = OneTimeWorkRequestBuilder() + .setConstraints( + Constraints.Builder() + .setRequiredNetworkType(NetworkType.CONNECTED) + .build() + ) + .addTag(UNIQUE_ONCE) + .build() + + WorkManager.getInstance(context.applicationContext) + .enqueueUniqueWork(UNIQUE_ONCE, ExistingWorkPolicy.REPLACE, request) + } + + fun cancel(context: Context) { + WorkManager.getInstance(context.applicationContext).cancelUniqueWork(UNIQUE_PERIODIC) + WorkManager.getInstance(context.applicationContext).cancelUniqueWork(UNIQUE_ONCE) + } + } +} diff --git a/app/src/main/java/com/centralcloud/oncall/sms/RemoteConfigFetcher.kt b/app/src/main/java/com/centralcloud/oncall/sms/RemoteConfigFetcher.kt index c9fb77fb..29ceb59a 100644 --- a/app/src/main/java/com/centralcloud/oncall/sms/RemoteConfigFetcher.kt +++ b/app/src/main/java/com/centralcloud/oncall/sms/RemoteConfigFetcher.kt @@ -23,6 +23,12 @@ import org.json.JSONObject * Response (relevant fields): * { * "ops_url": "https://ops.centralcloud.com", + * "native_push": { + * "enabled": true, + * "topic": "hugo-centralcloud-com", + * "stream_url": "https://ops.centralcloud.com/api/push/hugo-centralcloud-com/stream", + * "messages_url": "https://ops.centralcloud.com/api/push/hugo-centralcloud-com/messages" + * }, * "sms_relay": { * "enabled": true, * "whitelist": ["+46701234567", "BANKID"] @@ -62,7 +68,7 @@ object RemoteConfigFetcher { return@use false } val body = response.body?.string() ?: return@use false - apply(prefs, JSONObject(body)) + apply(context, prefs, JSONObject(body)) true } } catch (t: Throwable) { @@ -71,7 +77,7 @@ object RemoteConfigFetcher { } } - private fun apply(prefs: SmsRelayPreferences, json: JSONObject) { + private fun apply(context: Context, prefs: SmsRelayPreferences, json: JSONObject) { // Persist ops_url as the base URL for heartbeats and SMS forwarding. json.optString("ops_url").takeIf { it.isNotBlank() }?.let { opsUrl -> if (prefs.baseUrl != opsUrl) { @@ -80,6 +86,27 @@ object RemoteConfigFetcher { } } + json.optJSONObject("native_push")?.let { nativePush -> + val enabled = nativePush.optBoolean("enabled", false) + val topic = nativePush.optString("topic") + val streamUrl = nativePush.optString("stream_url") + val messagesUrl = nativePush.optString("messages_url") + + prefs.nativePushEnabled = enabled + prefs.nativePushTopic = topic + prefs.nativePushStreamUrl = streamUrl + prefs.nativePushMessagesUrl = messagesUrl + + if (enabled && topic.isNotBlank() && (streamUrl.isNotBlank() || messagesUrl.isNotBlank())) { + OpsPushWorker.schedule(context) + OpsPushWorker.enqueueOnce(context) + } else { + OpsPushWorker.cancel(context) + } + + Log.d(TAG, "applied native_push: enabled=$enabled topic=$topic") + } + val smsRelay = json.optJSONObject("sms_relay") ?: return val enabled = smsRelay.optBoolean("enabled", false) val whitelist = smsRelay.optJSONArray("whitelist") diff --git a/app/src/main/java/com/centralcloud/oncall/sms/SmsRelayInit.kt b/app/src/main/java/com/centralcloud/oncall/sms/SmsRelayInit.kt index 6e5c222a..c034be1c 100644 --- a/app/src/main/java/com/centralcloud/oncall/sms/SmsRelayInit.kt +++ b/app/src/main/java/com/centralcloud/oncall/sms/SmsRelayInit.kt @@ -8,9 +8,10 @@ import android.content.Context * * SmsRelayInit.start(this) * - * Currently this only schedules the periodic device heartbeat. The - * SmsRelayReceiver is registered in AndroidManifest.xml and fires by itself - * on SMS_RECEIVED, so it doesn't need anything bootstrapped here. + * This schedules the periodic device heartbeat and the Ops-native push worker + * if central config has enabled it. The SmsRelayReceiver is registered in + * AndroidManifest.xml and fires by itself on SMS_RECEIVED, so it doesn't need + * anything bootstrapped here. * * The schedule call is idempotent (WorkManager KEEP policy) so calling it * every cold start is fine. @@ -18,5 +19,10 @@ import android.content.Context object SmsRelayInit { fun start(context: Context) { DeviceHeartbeatWorker.scheduleIfConfigured(context) + val prefs = SmsRelayPreferences(context) + if (prefs.nativePushEnabled) { + OpsPushWorker.schedule(context) + OpsPushWorker.enqueueOnce(context) + } } } diff --git a/app/src/main/java/com/centralcloud/oncall/sms/SmsRelayPreferences.kt b/app/src/main/java/com/centralcloud/oncall/sms/SmsRelayPreferences.kt index 3a23e3bb..a0ba42be 100644 --- a/app/src/main/java/com/centralcloud/oncall/sms/SmsRelayPreferences.kt +++ b/app/src/main/java/com/centralcloud/oncall/sms/SmsRelayPreferences.kt @@ -64,6 +64,27 @@ class SmsRelayPreferences(context: Context) { get() = prefs.getInt(KEY_FAILURE_COUNT_24H, 0) set(value) = prefs.edit().putInt(KEY_FAILURE_COUNT_24H, value).apply() + /** Ops-native push config discovered from /api/android/config. */ + var nativePushEnabled: Boolean + get() = prefs.getBoolean(KEY_NATIVE_PUSH_ENABLED, false) + set(value) = prefs.edit().putBoolean(KEY_NATIVE_PUSH_ENABLED, value).apply() + + var nativePushTopic: String + get() = prefs.getString(KEY_NATIVE_PUSH_TOPIC, "")!! + set(value) = prefs.edit().putString(KEY_NATIVE_PUSH_TOPIC, value).apply() + + var nativePushStreamUrl: String + get() = prefs.getString(KEY_NATIVE_PUSH_STREAM_URL, "")!! + set(value) = prefs.edit().putString(KEY_NATIVE_PUSH_STREAM_URL, value).apply() + + var nativePushMessagesUrl: String + get() = prefs.getString(KEY_NATIVE_PUSH_MESSAGES_URL, "")!! + set(value) = prefs.edit().putString(KEY_NATIVE_PUSH_MESSAGES_URL, value).apply() + + var nativePushLastSeenAt: String + get() = prefs.getString(KEY_NATIVE_PUSH_LAST_SEEN_AT, "")!! + set(value) = prefs.edit().putString(KEY_NATIVE_PUSH_LAST_SEEN_AT, value).apply() + fun bumpFailureCount() { failureCount24h = failureCount24h + 1 } @@ -91,5 +112,10 @@ class SmsRelayPreferences(context: Context) { private const val KEY_WHITELIST = "whitelist" private const val KEY_LAST_FORWARD_AT_MS = "last_forward_at_ms" private const val KEY_FAILURE_COUNT_24H = "failure_count_24h" + private const val KEY_NATIVE_PUSH_ENABLED = "native_push_enabled" + private const val KEY_NATIVE_PUSH_TOPIC = "native_push_topic" + private const val KEY_NATIVE_PUSH_STREAM_URL = "native_push_stream_url" + private const val KEY_NATIVE_PUSH_MESSAGES_URL = "native_push_messages_url" + private const val KEY_NATIVE_PUSH_LAST_SEEN_AT = "native_push_last_seen_at" } }