Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions util/api/datasourcex-util.api
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,6 @@ public final class com/caplin/integration/datasourcex/util/flow/SetEvent$Populat
}

public final class com/caplin/integration/datasourcex/util/flow/SetEventKt {
public static final fun flatMapLatestAndMerge (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatMapLatestAndMergeSet (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun runningFoldToSet (Lkotlinx/coroutines/flow/Flow;ZZ)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun runningFoldToSet$default (Lkotlinx/coroutines/flow/Flow;ZZILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun toEvents (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,9 @@ import com.caplin.integration.datasourcex.util.flow.SetEvent.EntryEvent
import com.caplin.integration.datasourcex.util.flow.SetEvent.EntryEvent.Insert
import com.caplin.integration.datasourcex.util.flow.SetEvent.EntryEvent.Removed
import com.caplin.integration.datasourcex.util.flow.SetEvent.Populated
import java.util.concurrent.ConcurrentHashMap
import kotlinx.collections.immutable.persistentSetOf
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach

/** Events representing a mutation to a [Set]. */
sealed interface SetEvent<out V : Any> {
Expand Down Expand Up @@ -155,43 +148,3 @@ fun <V : Any> Flow<SetEvent<V>>.runningFoldToSet(
}
}
}

/**
* Transforms a flow of sets into a merged flow by applying [entryEventTransformer] to each entry
* event (insert or remove). When a value is inserted, a new flow is created and merged. When a
* value is removed, the corresponding flow is cancelled.
*/
@JvmName("flatMapLatestAndMergeSet")
fun <V : Any, R> Flow<Set<V>>.flatMapLatestAndMerge(
entryEventTransformer: (EntryEvent<V>) -> Flow<R>
): Flow<R> = toEvents().flatMapLatestAndMerge(entryEventTransformer)

/**
* Transforms a flow of [SetEvent] into a merged flow by applying [entryEventTransformer] to each
* entry event. When an [Insert] event is received, a new flow is created and its emissions are
* merged into the resulting flow. When a [Removed] event is received, the previously created flow
* for that value is cancelled.
*/
fun <V : Any, R> Flow<SetEvent<V>>.flatMapLatestAndMerge(
entryEventTransformer: (EntryEvent<V>) -> Flow<R>
) = channelFlow {
val jobs = ConcurrentHashMap<V, Job>()
collect { setEvent ->
when (setEvent) {
is Insert<V> -> {
jobs[setEvent.value]?.cancelAndJoin()
jobs[setEvent.value] =
entryEventTransformer(setEvent)
.onEach { send(it) }
.onCompletion { jobs.remove(setEvent.value) }
.launchIn(this@channelFlow)
}

is Removed<V> -> {
jobs.remove(setEvent.value)?.cancelAndJoin()
}

is Populated -> {}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.equals.shouldBeEqual
import io.kotest.matchers.types.shouldBeInstanceOf
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flowOf

class SetEventKtTest :
Expand Down Expand Up @@ -41,7 +38,7 @@ class SetEventKtTest :

test("runningFoldToSet with emitPartials") {
flowOf(Insert("A"), Insert("B"), Populated).runningFoldToSet(emitPartials = true).test {
awaitItem().toSet() shouldBeEqual emptySet<String>()
awaitItem().toSet() shouldBeEqual emptySet()
awaitItem().toSet() shouldBeEqual setOf("A")
awaitItem().toSet() shouldBeEqual setOf("A", "B")
awaitComplete()
Expand All @@ -60,72 +57,4 @@ class SetEventKtTest :
awaitError().shouldBeInstanceOf<IllegalStateException>()
}
}

test("flatMapLatestAndMerge - When upstream completes first, await inner complete") {
val setFlow = Channel<Set<String>>()
val aFlow = Channel<String>()

setFlow
.consumeAsFlow()
.flatMapLatestAndMerge {
when (it) {
is Insert ->
when (it.value) {
"a" -> aFlow.consumeAsFlow()
else -> throw IllegalArgumentException(it.value)
}
is Removed -> emptyFlow()
}
}
.test {
expectNoEvents()

setFlow.send(setOf("a"))
expectNoEvents()

aFlow.send("A")

awaitItem() shouldBeEqual "A"

setFlow.close()

expectNoEvents()

aFlow.send("B")

awaitItem() shouldBeEqual "B"

aFlow.close()

awaitComplete()
}
}

test("When inner completes first, await upstream complete") {
val setFlow = Channel<Set<String>>()

setFlow
.consumeAsFlow()
.flatMapLatestAndMerge {
when (it) {
is Insert ->
when (it.value) {
"a" -> flowOf("A")
else -> throw IllegalArgumentException(it.value)
}
is Removed -> emptyFlow()
}
}
.test {
expectNoEvents()

setFlow.send(setOf("a"))

awaitItem() shouldBeEqual "A"

setFlow.close()

awaitComplete()
}
}
})
Loading