Skip to main content

Concurrency in Kotlin, KMP and Compose Multiplatform

TL;DR

  • В Kotlin асинхронность = корутины + структурная конкуррентность.
  • В KMP те же API работают на JVM/Android, iOS (Native), JS и Desktop, но есть нюансы диспетчеров и моста со Swift.
  • В Compose Multiplatform корутины «живут» внутри эффектов (LaunchedEffect, rememberCoroutineScope) и обмениваются состоянием через StateFlow/SharedFlow/mutableStateOf/snapshotFlow.
  • Держите тяжёлую работу вне UI-потока, отмену — по иерархии Job, данные в UI — горячим стейт-потоком.

1) Ментальная модель корутин

Корутина — лёгкая кооперативно отменяемая задача. Это не поток. Корутины планируются диспетчерами и подчиняются правилам структурной конкуррентности.

  • Структурная конкуррентность: каждая корутина привязана к CoroutineScope. Родитель отменяет детей, ошибки поднимаются вверх.
  • Билдеры:
    • launch { … } — запустить и забыть (возвращает Job).
    • async { … } — параллельный подсчёт, результат через await().
    • coroutineScope { … } — супERVИЗИЯ по умолчанию: ошибка любого — падают все.
    • supervisorScope { … } — ошибки детей не валят соседей.
  • Контекст: Dispatchers.Default/IO/Main + Job + элементы (CoroutineName, SupervisorJob, т.д.).
  • Правила: избегайте GlobalScope; не блокируйте UI потоки; для переключения используйте withContext(Dispatchers.IO/Default).
suspend fun loadProfile(): Profile = coroutineScope {
val user = async { api.user() }
val posts = async { api.posts() }
Profile(user.await(), posts.await())
}

Описание:

  • Полезно для параллельного получения зависимых данных.
  • Ошибка одного async уронит весь coroutineScope, что правильно для согласованности данных.

2) Отмена и обработка ошибок

Отмена — кооперативная. Точки отмены — suspend вызовы, yield(), delay(). В долгих циклах проверяйте ensureActive().

suspend fun watchLongJob() = coroutineScope {
val job = launch {
while (isActive) {
ensureActive()
doChunk()
}
}
withTimeout(5_000) { job.join() } // или job.cancelAndJoin()
}
  • Таймауты: withTimeout/withTimeoutOrNull.
  • Исключения:
    • В launch исключение сразу пробрасывается в родителя.
    • В async ошибка хранится до await().
    • Изолировать сбоевших детей: SupervisorJob/supervisorScope.
  • Никогда не глотайте CancellationException — это механизм отмены.

3) Потоки данных: Flow, StateFlow, SharedFlow, Channel

  • Cold Flow — ленивый, начинается на collect.
  • Hot StateFlow — текущее состояние с initial-значением.
  • Hot SharedFlow — мультикаст одноразовых событий (рекомендуется replay = 0).
  • Channel — очереди/конвейеры между корутинами.

Производительность и backpressure:

  • buffer(), conflate(), collectLatest { … } (отменяет обработчик предыдущего элемента).
  • flowOn(Dispatchers.IO) — меняет контекст выше по цепочке операторов.
  • Мост с колбэками: callbackFlow { trySend(...) ; awaitClose { ... } }.
val uiState: StateFlow<UiState> = repo
.streamThings() // Flow<Data>
.map(::toUiState)
.stateIn(
scope = scope,
started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 5_000),
initialValue = UiState.Loading
)

Когда использовать:

  • Flow — для «ленивых» последовательностей/стримов.
  • StateFlow — для состояния экрана (горячий источник).
  • SharedFlow/Channel — для одноразовых событий (навигация, тосты).

4) Специфика KMP (мультиплатформенность)

Один kotlinx-coroutines-core работает на всех таргетах, но у диспетчеров разные механики.

  • JVM/Android: Dispatchers.Main (Looper), Default, IO.
  • iOS/Native: Dispatchers.Main = main-queue; Default — фоновые. Не блокируйте (никаких runBlocking в UI).
  • JS: один event-loop, только suspend, никакого блокирования.
  • Kotlin/Native память (новый MM): «заморозка» больше не обязательна, но держите мутабельность на том потоке, где она используется (обычно main для UI).

Рекомендуемый провайдер диспетчеров:

interface DispatcherProvider {
val Main: CoroutineDispatcher
val Default: CoroutineDispatcher
val IO: CoroutineDispatcher
}

object DefaultDispatchers : DispatcherProvider {
override val Main = Dispatchers.Main
override val Default = Dispatchers.Default
override val IO = Dispatchers.IO
}

Почему так: инъекция упрощает тестирование и платформенные различия.


5) Compose Multiplatform: эффекты и снапшоты

Не запускайте побочные эффекты «просто в @Composable». Используйте эффекты:

  • LaunchedEffect(key) { … } — старт при появлении/смене ключа, отмена при уходе.
  • rememberCoroutineScope() — запускать из обработчиков (onClick).
  • produceState — построить State<T> из suspend источника.
  • snapshotFlow { readSnapshotState() } — наблюдать Compose-состояние как Flow.
@Composable
fun Screen(vm: MyViewModel) {
val state by vm.ui.collectAsState() // StateFlow -> Compose state

LaunchedEffect(Unit) {
vm.effects.collect { effect ->
when (effect) {
is UiEffect.Navigate -> navController.navigate(effect.route)
is UiEffect.Toast -> showToast(effect.message)
}
}
}

// ... отрисовка state
}

Советы:

  • Тяжёлые операции — в withContext(Dispatchers.IO/Default).
  • Одноразовые события — через SharedFlow/Channel, а не StateFlow.

6) Архитектурный скелет (KMP + CMP)

Слои:

  • Data: suspend API/DAO, Flow источники.
  • Domain: use-cases (suspend/Flow), параллелизм через async/awaitAll.
  • Presentation (shared): ViewModel-подобный класс с CoroutineScope, MutableStateFlow<UiState>, MutableSharedFlow<UiEffect>.
data class UiState(
val isLoading: Boolean = false,
val data: List<Item> = emptyList(),
val error: String? = null
)

sealed interface UiEffect {
data class Navigate(val route: String) : UiEffect
data class Toast(val message: String) : UiEffect
}

class SharedViewModel(
private val repo: Repo,
private val dispatchers: DispatcherProvider = DefaultDispatchers
) {
private val job = SupervisorJob()
private val scope = CoroutineScope(job + dispatchers.Main)

private val _ui = MutableStateFlow(UiState(isLoading = true))
val ui: StateFlow<UiState> = _ui

private val _effects = MutableSharedFlow<UiEffect>(extraBufferCapacity = 16)
val effects: SharedFlow<UiEffect> = _effects

fun load() {
scope.launch {
_ui.value = _ui.value.copy(isLoading = true, error = null)
runCatching { repo.fetchAll() }
.onSuccess { _ui.value = UiState(data = it) }
.onFailure { _ui.value = UiState(error = it.message) }
}
}

fun onItemClick(id: String) {
scope.launch { _effects.emit(UiEffect.Navigate("/details/$id")) }
}

fun clear() { job.cancel() } // вызвать из платформенного слоя
}

7) Рецепты параллелизма (copy/paste)

Параллельные запросы:

suspend fun <T> parallel(vararg blocks: suspend () -> T): List<T> = coroutineScope {
blocks.map { async { it() } }.awaitAll()
}

Ограничить конкурентность (семафор):

suspend fun <T> List<T>.mapConcurrent(limit: Int, block: suspend (T) -> Unit) = coroutineScope {
val sem = kotlinx.coroutines.sync.Semaphore(limit)
map { item ->
async {
sem.withPermit { block(item) }
}
}.awaitAll()
}

Гонка «кто первый»:

suspend fun <T> race(vararg blocks: suspend () -> T): T = coroutineScope {
val deferred = blocks.map { async(start = CoroutineStart.DEFAULT) { it() } }
try {
val first = select<T> {
deferred.forEach { d -> d.onAwait { it } }
}
first.also { deferred.forEach { it.cancel() } }
} catch (e: Throwable) {
deferred.forEach { it.cancel() }
throw e
}
}

Retry c экспоненциальной паузой:

suspend fun <T> retrying(times: Int, baseDelayMs: Long = 200, block: suspend () -> T): T {
var attempt = 0
var delayMs = baseDelayMs
var last: Throwable? = null
while (attempt < times) {
try { return block() } catch (t: Throwable) {
last = t
delay(delayMs)
delayMs *= 2
attempt++
}
}
throw last ?: IllegalStateException("retrying: unknown error")
}

Backpressure в UI (collectLatest):

suspend fun observeSearch(queryFlow: Flow<String>, search: suspend (String) -> List<Item>): Flow<List<Item>> =
queryFlow
.debounce(300)
.filter { it.length >= 2 }
.distinctUntilChanged()
.mapLatest { q -> search(q) }

8) Тестирование корутин и Flow

  • Используйте runTest { … } + StandardTestDispatcher для детерминированности.
  • Для тестирования Flow — Turbine.
@OptIn(ExperimentalCoroutinesApi::class)
class VmTest {
private val testDispatcher = StandardTestDispatcher()

@Before fun setup() {
Dispatchers.setMain(testDispatcher)
}

@After fun tearDown() {
Dispatchers.resetMain()
}

@Test fun uiState_updates() = runTest(testDispatcher) {
val vm = SharedViewModel(FakeRepo(), object : DispatcherProvider {
override val Main = testDispatcher
override val Default = testDispatcher
override val IO = testDispatcher
})

vm.load()
testDispatcher.scheduler.advanceUntilIdle()
assertTrue(vm.ui.value.isLoading || vm.ui.value.data.isNotEmpty())
}
}

Тестирование Flow с Turbine:

@Test
fun flow_emits_items() = runTest {
repo.streamThings()
.test {
assertEquals(Thing(1), awaitItem())
cancelAndIgnoreRemainingEvents()
}
}

9) Частые грабли и как их избежать

  • Подвис UI: забыли withContext(Dispatchers.IO/Default) вокруг тяжёлого вызова.
  • Потерянная ошибка: async { … } без await() — вы её не увидите.
  • «Вечные» корутины: собственные Scope без отмены (не вызвали cancel()/clear()).
  • Двойной сбор Flow в Compose из-за неправильных ключей LaunchedEffect.
  • iOS мост: вызвали suspend из Swift без удержания таска — отменится раньше времени; оборачивайте в Task и держите ссылку.

10) Interop со Swift (iOS)

Вариант 1: использовать популярную библиотеку KMP-NativeCoroutines (Rick Clephas) — аннотации для автоматического моста suspendasync/await, FlowAsyncSequence.

Вариант 2: минимальные ручные обёртки.

Kotlin (общий модуль):

// Превращаем Flow<T> в «наблюдаемый» класс для iOS
class CFlow<T>(private val origin: Flow<T>) {
fun watch(block: (T) -> Unit): Closeable {
val scope = MainScope()
val job = origin.onEach { block(it) }.launchIn(scope)
return Closeable { job.cancel() }
}
}

fun <T> Flow<T>.asCFlow(): CFlow<T> = CFlow(this)

Swift (использование):

// suspend-функции Kotlin видны со completion-колбэком; в Swift 5.5+
// удобно оборачивать в Task для async/await стиля.
func load() {
Task { @MainActor in
do {
let data = try await KotlinSharedRepo().fetchAll() // при наличии автосгенерированного async
// или через completion-API: repo.fetchAll { data, error in ... }
} catch {
// handle
}
}
}

// Подписка на CFlow
let closeable = vm.ui.asCFlow().watch { state in
// обновить SwiftUI
}
// Позже: closeable.close()

Примечания:

  • На iOS всегда взаимодействуйте с UI на main-очереди.
  • Держите ссылку на Task/Closeable, чтобы не отменялось досрочно.

11) Мини-шпаргалка выбора примитива

  • Однократный запрос → suspend fun.
  • Поток данных/наблюдение → Flow.
  • Состояние экрана → StateFlow (горячее, с initial).
  • Одноразовые события UI → SharedFlow(replay = 0) или Channel.
  • Слияние UI-состояния и данных → combine, flatMapLatest.
  • Compose локальное состояние → remember { mutableStateOf }; общее — collectAsState().

12) Готовый каркас (скелет) KMP-модуля

// Repo
interface Repo { suspend fun fetchAll(): List<Item>; fun streamThings(): Flow<Thing> }

// ViewModel-like (shared)
class Vm(
private val repo: Repo,
private val dispatchers: DispatcherProvider = DefaultDispatchers
) {
private val scope = CoroutineScope(SupervisorJob() + dispatchers.Main)

private val _ui = MutableStateFlow(UiState())
val ui: StateFlow<UiState> = _ui

private val _effects = MutableSharedFlow<UiEffect>()
val effects: SharedFlow<UiEffect> = _effects

fun start() { scope.launch { _ui.value = UiState(isLoading = true); _ui.value = UiState(data = repo.fetchAll()) } }
fun stop() { scope.cancel() }
}

// Compose Screen (CMP)
@Composable
fun VmScreen(vm: Vm) {
val state by vm.ui.collectAsState()
LaunchedEffect(Unit) { vm.start() }
// ... UI по state
}

Описание:

  • Каркас минимален и переносим между Android/iOS/Desktop/Web.
  • Главное — корректно управлять жизненным циклом: вызывать stop()/clear().

Заключение

Структурная конкуррентность + горячие стейт-потоки — базис для чистого, предсказуемого и переносимого асинхронного кода в KMP + Compose Multiplatform. Следуйте описанным правилам, используйте рецепты и тестируйте с тестовыми диспетчерами — и у вас будет надёжная, отзывчивая UI-архитектура на всех платформах.