Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,88 +2,147 @@ package io.github.jan.supabase.realtime

import io.github.jan.supabase.SupabaseSerializer
import io.github.jan.supabase.annotations.SupabaseInternal
import io.github.jan.supabase.collections.AtomicMutableList
import io.github.jan.supabase.serializer.KotlinXSerializer
import kotlinx.collections.immutable.PersistentList
import kotlinx.collections.immutable.PersistentMap
import kotlinx.collections.immutable.persistentHashMapOf
import kotlinx.collections.immutable.persistentListOf
import kotlinx.collections.immutable.plus
import kotlinx.serialization.json.JsonObject
import kotlin.concurrent.atomics.AtomicInt
import kotlin.concurrent.atomics.AtomicReference
import kotlin.concurrent.atomics.fetchAndIncrement
import kotlin.concurrent.atomics.update

@SupabaseInternal
sealed interface CallbackManager {
sealed class RealtimeCallbackId(val value: Int) {

class Postgres(value: Int) : RealtimeCallbackId(value)

class Presence(value: Int) : RealtimeCallbackId(value)

class Broadcast(value: Int) : RealtimeCallbackId(value)

}

@SupabaseInternal
interface CallbackManager {

fun triggerPostgresChange(ids: List<Int>, data: PostgresAction)

fun triggerBroadcast(event: String, data: JsonObject)

fun triggerPresenceDiff(joins: Map<String, Presence>, leaves: Map<String, Presence>)

fun addBroadcastCallback(event: String, callback: (JsonObject) -> Unit): Int
fun hasPresenceCallback(): Boolean

fun addPostgresCallback(filter: PostgresJoinConfig, callback: (PostgresAction) -> Unit): Int
fun addBroadcastCallback(event: String, callback: (JsonObject) -> Unit): RealtimeCallbackId.Broadcast

fun addPresenceCallback(callback: (PresenceAction) -> Unit): Int
fun addPostgresCallback(filter: PostgresJoinConfig, callback: (PostgresAction) -> Unit): RealtimeCallbackId.Postgres

fun removeCallbackById(id: Int)
fun addPresenceCallback(callback: (PresenceAction) -> Unit): RealtimeCallbackId.Presence

fun setServerChanges(changes: List<PostgresJoinConfig>)
fun removeCallbackById(id: RealtimeCallbackId)

fun getCallbacks(): List<RealtimeCallback<*>>
fun setServerChanges(changes: List<PostgresJoinConfig>)

}

private typealias BroadcastMap = PersistentMap<String, PersistentList<RealtimeCallback.BroadcastCallback>>
private typealias PresenceMap = PersistentMap<Int, RealtimeCallback.PresenceCallback>
private typealias PostgresMap = PersistentMap<Int, RealtimeCallback.PostgresCallback>

internal class CallbackManagerImpl(
private val serializer: SupabaseSerializer = KotlinXSerializer()
) : CallbackManager {

private val nextId = AtomicInt(0)
private val _serverChanges = AtomicReference(listOf<PostgresJoinConfig>())
val serverChanges: List<PostgresJoinConfig> get() = _serverChanges.load()
private val callbacks = AtomicMutableList<RealtimeCallback<*>>()

override fun getCallbacks(): List<RealtimeCallback<*>> {
return callbacks.toList()
}
private val presenceCallbacks = AtomicReference<PresenceMap>(persistentHashMapOf())

override fun addBroadcastCallback(event: String, callback: (JsonObject) -> Unit): Int {
private val broadcastCallbacks = AtomicReference<BroadcastMap>(persistentHashMapOf())
// Additional map to know from which list a callback may be removed in broadcastCallbacks without searching through the whole map
private val broadcastEventId = AtomicReference<PersistentMap<Int, String>>(persistentHashMapOf())

private val postgresCallbacks = AtomicReference<PostgresMap>(persistentHashMapOf())

override fun addBroadcastCallback(event: String, callback: (JsonObject) -> Unit): RealtimeCallbackId.Broadcast {
val id = nextId.fetchAndIncrement()
callbacks += RealtimeCallback.BroadcastCallback(callback, event, id)
return id
broadcastCallbacks.update {
val current = it[event] ?: persistentListOf()
it.put(event, current + RealtimeCallback.BroadcastCallback(callback, event, id))
}
broadcastEventId.update {
it.put(id, event)
}
return RealtimeCallbackId.Broadcast(id)
}

override fun addPostgresCallback(filter: PostgresJoinConfig, callback: (PostgresAction) -> Unit): Int {
override fun addPostgresCallback(filter: PostgresJoinConfig, callback: (PostgresAction) -> Unit): RealtimeCallbackId.Postgres {
val id = nextId.fetchAndIncrement()
callbacks += RealtimeCallback.PostgresCallback(callback, filter, id)
return id
postgresCallbacks.update {
it.put(id, RealtimeCallback.PostgresCallback(callback, filter, id))
}
return RealtimeCallbackId.Postgres(id)
}

override fun triggerPostgresChange(ids: List<Int>, data: PostgresAction) {
val filter = serverChanges.filter { it.id in ids }
val postgresCallbacks = callbacks.filterIsInstance<RealtimeCallback.PostgresCallback>()
val callbacks =
postgresCallbacks.filter { cc -> filter.any { sc -> cc.filter == sc } }
postgresCallbacks.load().values.filter { cc -> filter.any { sc -> cc.filter == sc } }
callbacks.forEach { it.callback(data) }
}

override fun triggerBroadcast(event: String, data: JsonObject) {
val broadcastCallbacks = callbacks.filterIsInstance<RealtimeCallback.BroadcastCallback>()
val callbacks = broadcastCallbacks.filter { it.event == event }
callbacks.forEach { it.callback(data) }
broadcastCallbacks.load()[event]?.forEach { it.callback(data) }
}

override fun triggerPresenceDiff(joins: Map<String, Presence>, leaves: Map<String, Presence>) {
val presenceCallbacks = callbacks.filterIsInstance<RealtimeCallback.PresenceCallback>()
presenceCallbacks.forEach { it.callback(PresenceActionImpl(serializer, joins, leaves)) }
presenceCallbacks.load().values.forEach { it.callback(PresenceActionImpl(serializer, joins, leaves)) }
}

override fun addPresenceCallback(callback: (PresenceAction) -> Unit): Int {
override fun hasPresenceCallback(): Boolean {
return presenceCallbacks.load().isNotEmpty()
}

override fun addPresenceCallback(callback: (PresenceAction) -> Unit): RealtimeCallbackId.Presence {
val id = nextId.fetchAndIncrement()
callbacks += RealtimeCallback.PresenceCallback(callback, id)
return id
presenceCallbacks.update {
it.put(id, RealtimeCallback.PresenceCallback(callback, id))
}
return RealtimeCallbackId.Presence(id)
}

fun removeBroadcastCallbackById(id: Int) {
val event = broadcastEventId.load()[id] ?: return
broadcastCallbacks.update {
it.put(event, it[event]?.removeAll { c -> c.id == id } ?: persistentListOf())
}
broadcastEventId.update {
it.remove(id)
}
}

fun removePresenceCallbackById(id: Int) {
presenceCallbacks.update {
it.remove(id)
}
}

fun removePostgresCallbackById(id: Int) {
postgresCallbacks.update {
it.remove(id)
}
}

override fun removeCallbackById(id: Int) {
callbacks.indexOfFirst { it.id == id }.takeIf { it != -1 }?.let { callbacks.removeAt(it) }
override fun removeCallbackById(id: RealtimeCallbackId) {
when (id) {
is RealtimeCallbackId.Broadcast -> removeBroadcastCallbackById(id.value)
is RealtimeCallbackId.Presence -> removePresenceCallbackById(id.value)
is RealtimeCallbackId.Postgres -> removePostgresCallbackById(id.value)
}
}

override fun setServerChanges(changes: List<PostgresJoinConfig>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ sealed interface PresenceAction {

}

@PublishedApi internal class PresenceActionImpl(
@PublishedApi
internal class PresenceActionImpl(
@PublishedApi internal val serializer: SupabaseSerializer,
override val joins: Map<String, Presence>,
override val leaves: Map<String, Presence>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ internal class RealtimeChannelImpl(
Realtime.logger.d { "Subscribing to channel $topic" }
val currentJwt = accessToken()
val postgrestChanges = clientChanges.toList()
val hasPresenceCallback = callbackManager.getCallbacks().filterIsInstance<RealtimeCallback.PresenceCallback>().isNotEmpty()
val hasPresenceCallback = callbackManager.hasPresenceCallback()
presenceJoinConfig.enabled = hasPresenceCallback
val joinConfig = RealtimeJoinPayload(RealtimeJoinConfig(broadcastJoinConfig, presenceJoinConfig, postgrestChanges, isPrivate))
val joinConfigObject = buildJsonObject {
Expand Down
19 changes: 4 additions & 15 deletions Realtime/src/commonTest/kotlin/CallbackManagerTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import io.github.jan.supabase.realtime.HasRecord
import io.github.jan.supabase.realtime.PostgresAction
import io.github.jan.supabase.realtime.PostgresJoinConfig
import io.github.jan.supabase.realtime.Presence
import io.github.jan.supabase.realtime.RealtimeCallback
import io.github.jan.supabase.serializer.KotlinXSerializer
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.buildJsonObject
Expand Down Expand Up @@ -37,18 +36,6 @@ class CallbackManagerTest {
assertFalse { called }
}

@Test
fun testGetCallbacks() {
val cm = CallbackManagerImpl()
val expectedEvent = "event"
cm.addBroadcastCallback(expectedEvent) {
//...
}
val callbacks = cm.getCallbacks()
assertTrue { callbacks.isNotEmpty() }
assertTrue { callbacks.any { it is RealtimeCallback.BroadcastCallback && it.event == expectedEvent } }
}

@Test
fun testPresenceCallbacks() {
val cm = CallbackManagerImpl()
Expand All @@ -64,10 +51,12 @@ class CallbackManagerTest {
assertEquals(expectedLeaves, it.leaves)
called = true
}
assertTrue { cm.hasPresenceCallback() }
cm.triggerPresenceDiff(expectedJoins, expectedLeaves)
assertTrue { called }
cm.removeCallbackById(id)
called = false
assertFalse { cm.hasPresenceCallback() }
cm.triggerPresenceDiff(expectedJoins, expectedLeaves)
assertFalse { called }
}
Expand Down Expand Up @@ -96,15 +85,15 @@ class CallbackManagerTest {
called = true
}
cm.setServerChanges(listOf(joinConfig))
cm.triggerPostgresChange(listOf(id), actionFromEvent(event, expectedRecord, expectedOldRecord))
cm.triggerPostgresChange(listOf(id.value), actionFromEvent(event, expectedRecord, expectedOldRecord))
assertTrue { called }
called = false
if(event != "*") {
cm.triggerPostgresChange(listOf(2), actionFromEvent(events.filter { it != event && it != "*" }.random(), expectedRecord, expectedOldRecord))
assertFalse { called }
}
cm.removeCallbackById(id)
cm.triggerPostgresChange(listOf(id), actionFromEvent(event, expectedRecord, expectedOldRecord))
cm.triggerPostgresChange(listOf(id.value), actionFromEvent(event, expectedRecord, expectedOldRecord))
assertFalse { called }
}
}
Expand Down
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[versions]
kotlin = "2.2.20-RC2"
kotlin = "2.2.20"
accompanist-permissions = "0.37.3"
ktor = "3.2.3"
dokka = "2.0.0"
Expand Down
Loading