From 51babb4106a9dcef7c55e0b79cc4d035855fe62a Mon Sep 17 00:00:00 2001 From: Ross Anderson Date: Fri, 27 Mar 2026 08:37:08 +0000 Subject: [PATCH 1/3] Revert flatMapLatestAndMerge --- .../integration/datasourcex/util/flow/SetEvent.kt | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/SetEvent.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/SetEvent.kt index 30cd39f..15d43b7 100644 --- a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/SetEvent.kt +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/SetEvent.kt @@ -168,9 +168,8 @@ fun Flow>.flatMapLatestAndMerge( /** * Transforms a flow of [SetEvent] into a merged flow by applying [entryEventTransformer] to each - * entry event. When an [Insert] event is received, a new flow is created and its emissions are - * merged into the resulting flow. When a [Removed] event is received, the previously created flow - * for that value is cancelled. + * entry event. When an event is received, a new flow is created and its emissions are + * merged into the resulting flow. */ fun Flow>.flatMapLatestAndMerge( entryEventTransformer: (EntryEvent) -> Flow @@ -178,19 +177,15 @@ fun Flow>.flatMapLatestAndMerge( val jobs = ConcurrentHashMap() collect { setEvent -> when (setEvent) { - is Insert -> { + is EntryEvent -> { jobs[setEvent.value]?.cancelAndJoin() jobs[setEvent.value] = entryEventTransformer(setEvent) .onEach { send(it) } - .onCompletion { jobs.remove(setEvent.value) } + .onCompletion { throwable -> if (throwable == null) jobs.remove(setEvent.value) } .launchIn(this@channelFlow) } - is Removed -> { - jobs.remove(setEvent.value)?.cancelAndJoin() - } - is Populated -> {} } } From bd2fc8554199b218927105d5786f098905dec60e Mon Sep 17 00:00:00 2001 From: Ross Anderson Date: Fri, 27 Mar 2026 09:27:40 +0000 Subject: [PATCH 2/3] Fix formatting violations --- .../com/caplin/integration/datasourcex/util/flow/SetEvent.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/SetEvent.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/SetEvent.kt index 15d43b7..645aee5 100644 --- a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/SetEvent.kt +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/SetEvent.kt @@ -168,8 +168,8 @@ fun Flow>.flatMapLatestAndMerge( /** * Transforms a flow of [SetEvent] into a merged flow by applying [entryEventTransformer] to each - * entry event. When an event is received, a new flow is created and its emissions are - * merged into the resulting flow. + * entry event. When an event is received, a new flow is created and its emissions are merged into + * the resulting flow. */ fun Flow>.flatMapLatestAndMerge( entryEventTransformer: (EntryEvent) -> Flow From b49132a6de53ad90a806b05687df912b2b4f4fa2 Mon Sep 17 00:00:00 2001 From: Ross Anderson Date: Fri, 27 Mar 2026 10:02:16 +0000 Subject: [PATCH 3/3] Remove highly specific flatMapLatestAndMerge methods --- util/api/datasourcex-util.api | 2 - .../datasourcex/util/flow/SetEvent.kt | 42 ----------- .../datasourcex/util/flow/SetEventKtTest.kt | 73 +------------------ 3 files changed, 1 insertion(+), 116 deletions(-) diff --git a/util/api/datasourcex-util.api b/util/api/datasourcex-util.api index d90d911..0d5bfd5 100644 --- a/util/api/datasourcex-util.api +++ b/util/api/datasourcex-util.api @@ -362,8 +362,6 @@ public final class com/caplin/integration/datasourcex/util/flow/SetEvent$Populat } public final class com/caplin/integration/datasourcex/util/flow/SetEventKt { - public static final fun flatMapLatestAndMerge (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; - public static final fun flatMapLatestAndMergeSet (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static final fun runningFoldToSet (Lkotlinx/coroutines/flow/Flow;ZZ)Lkotlinx/coroutines/flow/Flow; public static synthetic fun runningFoldToSet$default (Lkotlinx/coroutines/flow/Flow;ZZILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun toEvents (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; diff --git a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/SetEvent.kt b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/SetEvent.kt index 645aee5..99e6784 100644 --- a/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/SetEvent.kt +++ b/util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/SetEvent.kt @@ -4,16 +4,9 @@ import com.caplin.integration.datasourcex.util.flow.SetEvent.EntryEvent import com.caplin.integration.datasourcex.util.flow.SetEvent.EntryEvent.Insert import com.caplin.integration.datasourcex.util.flow.SetEvent.EntryEvent.Removed import com.caplin.integration.datasourcex.util.flow.SetEvent.Populated -import java.util.concurrent.ConcurrentHashMap import kotlinx.collections.immutable.persistentSetOf -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onCompletion -import kotlinx.coroutines.flow.onEach /** Events representing a mutation to a [Set]. */ sealed interface SetEvent { @@ -155,38 +148,3 @@ fun Flow>.runningFoldToSet( } } } - -/** - * Transforms a flow of sets into a merged flow by applying [entryEventTransformer] to each entry - * event (insert or remove). When a value is inserted, a new flow is created and merged. When a - * value is removed, the corresponding flow is cancelled. - */ -@JvmName("flatMapLatestAndMergeSet") -fun Flow>.flatMapLatestAndMerge( - entryEventTransformer: (EntryEvent) -> Flow -): Flow = toEvents().flatMapLatestAndMerge(entryEventTransformer) - -/** - * Transforms a flow of [SetEvent] into a merged flow by applying [entryEventTransformer] to each - * entry event. When an event is received, a new flow is created and its emissions are merged into - * the resulting flow. - */ -fun Flow>.flatMapLatestAndMerge( - entryEventTransformer: (EntryEvent) -> Flow -) = channelFlow { - val jobs = ConcurrentHashMap() - collect { setEvent -> - when (setEvent) { - is EntryEvent -> { - jobs[setEvent.value]?.cancelAndJoin() - jobs[setEvent.value] = - entryEventTransformer(setEvent) - .onEach { send(it) } - .onCompletion { throwable -> if (throwable == null) jobs.remove(setEvent.value) } - .launchIn(this@channelFlow) - } - - is Populated -> {} - } - } -} diff --git a/util/src/test/kotlin/com/caplin/integration/datasourcex/util/flow/SetEventKtTest.kt b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/flow/SetEventKtTest.kt index 76b35c4..f76fbb4 100644 --- a/util/src/test/kotlin/com/caplin/integration/datasourcex/util/flow/SetEventKtTest.kt +++ b/util/src/test/kotlin/com/caplin/integration/datasourcex/util/flow/SetEventKtTest.kt @@ -10,9 +10,6 @@ import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.equals.shouldBeEqual import io.kotest.matchers.types.shouldBeInstanceOf import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.consumeAsFlow -import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.flowOf class SetEventKtTest : @@ -41,7 +38,7 @@ class SetEventKtTest : test("runningFoldToSet with emitPartials") { flowOf(Insert("A"), Insert("B"), Populated).runningFoldToSet(emitPartials = true).test { - awaitItem().toSet() shouldBeEqual emptySet() + awaitItem().toSet() shouldBeEqual emptySet() awaitItem().toSet() shouldBeEqual setOf("A") awaitItem().toSet() shouldBeEqual setOf("A", "B") awaitComplete() @@ -60,72 +57,4 @@ class SetEventKtTest : awaitError().shouldBeInstanceOf() } } - - test("flatMapLatestAndMerge - When upstream completes first, await inner complete") { - val setFlow = Channel>() - val aFlow = Channel() - - setFlow - .consumeAsFlow() - .flatMapLatestAndMerge { - when (it) { - is Insert -> - when (it.value) { - "a" -> aFlow.consumeAsFlow() - else -> throw IllegalArgumentException(it.value) - } - is Removed -> emptyFlow() - } - } - .test { - expectNoEvents() - - setFlow.send(setOf("a")) - expectNoEvents() - - aFlow.send("A") - - awaitItem() shouldBeEqual "A" - - setFlow.close() - - expectNoEvents() - - aFlow.send("B") - - awaitItem() shouldBeEqual "B" - - aFlow.close() - - awaitComplete() - } - } - - test("When inner completes first, await upstream complete") { - val setFlow = Channel>() - - setFlow - .consumeAsFlow() - .flatMapLatestAndMerge { - when (it) { - is Insert -> - when (it.value) { - "a" -> flowOf("A") - else -> throw IllegalArgumentException(it.value) - } - is Removed -> emptyFlow() - } - } - .test { - expectNoEvents() - - setFlow.send(setOf("a")) - - awaitItem() shouldBeEqual "A" - - setFlow.close() - - awaitComplete() - } - } })