Concurrency in Kotlin, KMP and Compose Multiplatform
TL;DR
- В Kotlin асинхронность = корутины + структурная конкуррентность.
- В KMP те же API работают на JVM/Android, iOS (Native), JS и Desktop, но есть нюансы диспетчеров и моста со Swift.
- В Compose Multiplatform корутины «живут» внутри эффектов (
LaunchedEffect
,rememberCoroutineScope
) и обмениваются состоянием черезStateFlow
/SharedFlow
/mutableStateOf
/snapshotFlow
. - Держите тяжёлую работу вне UI-потока, отмену — по иерархии
Job
, данные в UI — горячим стейт-потоком.
1) Ментальная модель корутин
Корутина — лёгкая кооперативно отменяемая задача. Это не поток. Корутины планируются диспетчерами и подчиняются правилам структурной конкуррентности.
- Структурная конкуррентность: каждая корутина привязана к
CoroutineScope
. Родитель отменяет детей, ошибки поднимаются вверх. - Билдеры:
launch { … }
— запустить и забыть (возвращаетJob
).async { … }
— параллельный подсчёт, результат черезawait()
.coroutineScope { … }
— супERVИЗИЯ по умолчанию: ошибка любого — падают все.supervisorScope { … }
— ошибки детей не валят соседей.
- Контекст:
Dispatchers.Default/IO/Main
+Job
+ элементы (CoroutineName
,SupervisorJob
, т.д.). - Правила: избегайте
GlobalScope
; не блокируйте UI потоки; для переключения используйтеwithContext(Dispatchers.IO/Default)
.
suspend fun loadProfile(): Profile = coroutineScope {
val user = async { api.user() }
val posts = async { api.posts() }
Profile(user.await(), posts.await())
}
Описание:
- Полезно для параллельного получения зависимых данных.
- Ошибка одного
async
уронит весьcoroutineScope
, что правильно для согласованности данных.
2) Отмена и обработка ошибок
Отмена — кооперативная. Точки отмены — suspend
вызовы, yield()
, delay()
. В долгих циклах проверяйте ensureActive()
.
suspend fun watchLongJob() = coroutineScope {
val job = launch {
while (isActive) {
ensureActive()
doChunk()
}
}
withTimeout(5_000) { job.join() } // или job.cancelAndJoin()
}
- Таймауты:
withTimeout
/withTimeoutOrNull
. - Исключения:
- В
launch
исключение сразу пробрасывается в родителя. - В
async
ошибка хранится доawait()
. - Изолировать сбоевших детей:
SupervisorJob
/supervisorScope
.
- В
- Никогда не глотайте
CancellationException
— это механизм отмены.
3) Потоки данных: Flow, StateFlow, SharedFlow, Channel
- Cold
Flow
— ленивый, начинается наcollect
. - Hot
StateFlow
— текущее состояние с initial-значением. - Hot
SharedFlow
— мультикаст одноразовых событий (рекомендуетсяreplay = 0
). - Channel — очереди/конвейеры между корутинами.
Производительность и backpressure:
buffer()
,conflate()
,collectLatest { … }
(отменяет обработчик предыдущего элемента).flowOn(Dispatchers.IO)
— меняет контекст выше по цепочке операторов.- Мост с колбэками:
callbackFlow { trySend(...) ; awaitClose { ... } }
.
val uiState: StateFlow<UiState> = repo
.streamThings() // Flow<Data>
.map(::toUiState)
.stateIn(
scope = scope,
started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 5_000),
initialValue = UiState.Loading
)
Когда использовать:
Flow
— для «ленивых» последовательностей/стримов.StateFlow
— для состояния экрана (горячий источник).SharedFlow
/Channel
— для одноразовых событий (навигация, тосты).
4) Специфика KMP (мультиплатформенность)
Один kotlinx-coroutines-core
работает на всех таргетах, но у диспетчеров разные механики.
- JVM/Android:
Dispatchers.Main
(Looper),Default
,IO
. - iOS/Native:
Dispatchers.Main
= main-queue;Default
— фоновые. Не блокируйте (никакихrunBlocking
в UI). - JS: один event-loop, только
suspend
, никакого блокирования. - Kotlin/Native память (новый MM): «заморозка» больше не обязательна, но держите мутабельность на том потоке, где она используется (обычно main для UI).
Рекомендуемый провайдер диспетчеров:
interface DispatcherProvider {
val Main: CoroutineDispatcher
val Default: CoroutineDispatcher
val IO: CoroutineDispatcher
}
object DefaultDispatchers : DispatcherProvider {
override val Main = Dispatchers.Main
override val Default = Dispatchers.Default
override val IO = Dispatchers.IO
}
Почему так: инъекция упрощает тестирование и платформенные различия.
5) Compose Multiplatform: эффекты и снапшоты
Не запускайте побочные эффекты «просто в @Composable
». Используйте эффекты:
LaunchedEffect(key) { … }
— старт при появлении/смене ключа, отмена при уходе.rememberCoroutineScope()
— запускать из обработчиков (onClick
).produceState
— построитьState<T>
изsuspend
источника.snapshotFlow { readSnapshotState() }
— наблюдать Compose-состояние какFlow
.
@Composable
fun Screen(vm: MyViewModel) {
val state by vm.ui.collectAsState() // StateFlow -> Compose state
LaunchedEffect(Unit) {
vm.effects.collect { effect ->
when (effect) {
is UiEffect.Navigate -> navController.navigate(effect.route)
is UiEffect.Toast -> showToast(effect.message)
}
}
}
// ... отрисовка state
}
Советы:
- Тяжёлые операции — в
withContext(Dispatchers.IO/Default)
. - Одноразовые события — через
SharedFlow
/Channel
, а неStateFlow
.
6) Архитектурный скелет (KMP + CMP)
Слои:
- Data:
suspend
API/DAO,Flow
источники. - Domain: use-cases (
suspend
/Flow
), параллелизм черезasync/awaitAll
. - Presentation (shared):
ViewModel
-подобный класс сCoroutineScope
,MutableStateFlow<UiState>
,MutableSharedFlow<UiEffect>
.
data class UiState(
val isLoading: Boolean = false,
val data: List<Item> = emptyList(),
val error: String? = null
)
sealed interface UiEffect {
data class Navigate(val route: String) : UiEffect
data class Toast(val message: String) : UiEffect
}
class SharedViewModel(
private val repo: Repo,
private val dispatchers: DispatcherProvider = DefaultDispatchers
) {
private val job = SupervisorJob()
private val scope = CoroutineScope(job + dispatchers.Main)
private val _ui = MutableStateFlow(UiState(isLoading = true))
val ui: StateFlow<UiState> = _ui
private val _effects = MutableSharedFlow<UiEffect>(extraBufferCapacity = 16)
val effects: SharedFlow<UiEffect> = _effects
fun load() {
scope.launch {
_ui.value = _ui.value.copy(isLoading = true, error = null)
runCatching { repo.fetchAll() }
.onSuccess { _ui.value = UiState(data = it) }
.onFailure { _ui.value = UiState(error = it.message) }
}
}
fun onItemClick(id: String) {
scope.launch { _effects.emit(UiEffect.Navigate("/details/$id")) }
}
fun clear() { job.cancel() } // вызвать из платформенного слоя
}
7) Рецепты параллелизма (copy/paste)
Параллельные запросы:
suspend fun <T> parallel(vararg blocks: suspend () -> T): List<T> = coroutineScope {
blocks.map { async { it() } }.awaitAll()
}
Ограничить конкурентность (семафор):
suspend fun <T> List<T>.mapConcurrent(limit: Int, block: suspend (T) -> Unit) = coroutineScope {
val sem = kotlinx.coroutines.sync.Semaphore(limit)
map { item ->
async {
sem.withPermit { block(item) }
}
}.awaitAll()
}
Гонка «кто первый»:
suspend fun <T> race(vararg blocks: suspend () -> T): T = coroutineScope {
val deferred = blocks.map { async(start = CoroutineStart.DEFAULT) { it() } }
try {
val first = select<T> {
deferred.forEach { d -> d.onAwait { it } }
}
first.also { deferred.forEach { it.cancel() } }
} catch (e: Throwable) {
deferred.forEach { it.cancel() }
throw e
}
}
Retry c экспоненциальной паузой:
suspend fun <T> retrying(times: Int, baseDelayMs: Long = 200, block: suspend () -> T): T {
var attempt = 0
var delayMs = baseDelayMs
var last: Throwable? = null
while (attempt < times) {
try { return block() } catch (t: Throwable) {
last = t
delay(delayMs)
delayMs *= 2
attempt++
}
}
throw last ?: IllegalStateException("retrying: unknown error")
}
Backpressure в UI (collectLatest
):
suspend fun observeSearch(queryFlow: Flow<String>, search: suspend (String) -> List<Item>): Flow<List<Item>> =
queryFlow
.debounce(300)
.filter { it.length >= 2 }
.distinctUntilChanged()
.mapLatest { q -> search(q) }
8) Тестирование корутин и Flow
- Используйте
runTest { … }
+StandardTestDispatcher
для детерминированности. - Для тестирования
Flow
— Turbine.
@OptIn(ExperimentalCoroutinesApi::class)
class VmTest {
private val testDispatcher = StandardTestDispatcher()
@Before fun setup() {
Dispatchers.setMain(testDispatcher)
}
@After fun tearDown() {
Dispatchers.resetMain()
}
@Test fun uiState_updates() = runTest(testDispatcher) {
val vm = SharedViewModel(FakeRepo(), object : DispatcherProvider {
override val Main = testDispatcher
override val Default = testDispatcher
override val IO = testDispatcher
})
vm.load()
testDispatcher.scheduler.advanceUntilIdle()
assertTrue(vm.ui.value.isLoading || vm.ui.value.data.isNotEmpty())
}
}
Тестирование Flow с Turbine:
@Test
fun flow_emits_items() = runTest {
repo.streamThings()
.test {
assertEquals(Thing(1), awaitItem())
cancelAndIgnoreRemainingEvents()
}
}
9) Частые грабли и как их избежать
- Подвис UI: забыли
withContext(Dispatchers.IO/Default)
вокруг тяжёлого вызова. - Потерянная ошибка:
async { … }
безawait()
— вы её не увидите. - «Вечные» корутины: собственные
Scope
без отмены (не вызвалиcancel()
/clear()
). - Двойной сбор
Flow
в Compose из-за неправильных ключейLaunchedEffect
. - iOS мост: вызвали
suspend
из Swift без удержания таска — отменится раньше времени; оборачивайте вTask
и держите ссылку.
10) Interop со Swift (iOS)
Вариант 1: использовать популярную библиотеку KMP-NativeCoroutines
(Rick Clephas) — аннотации для автоматического моста suspend
→ async/await
, Flow
→ AsyncSequence
.
Вариант 2: минимальные ручные обёртки.
Kotlin (общий модуль):
// Превращаем Flow<T> в «наблюдаемый» класс для iOS
class CFlow<T>(private val origin: Flow<T>) {
fun watch(block: (T) -> Unit): Closeable {
val scope = MainScope()
val job = origin.onEach { block(it) }.launchIn(scope)
return Closeable { job.cancel() }
}
}
fun <T> Flow<T>.asCFlow(): CFlow<T> = CFlow(this)
Swift (использование):
// suspend-функции Kotlin видны со completion-колбэком; в Swift 5.5+
// удобно оборачивать в Task для async/await стиля.
func load() {
Task { @MainActor in
do {
let data = try await KotlinSharedRepo().fetchAll() // при наличии автосгенерированного async
// или через completion-API: repo.fetchAll { data, error in ... }
} catch {
// handle
}
}
}
// Подписка на CFlow
let closeable = vm.ui.asCFlow().watch { state in
// обновить SwiftUI
}
// Позже: closeable.close()
Примечания:
- На iOS всегда взаимодействуйте с UI на main-очереди.
- Держите ссылку на
Task
/Closeable
, чтобы не отменялось досрочно.
11) Мини-шпаргалка выбора примитива
- Однократный запрос →
suspend fun
. - Поток данных/наблюдение →
Flow
. - Состояние экрана →
StateFlow
(горячее, с initial). - Одноразовые события UI →
SharedFlow(replay = 0)
илиChannel
. - Слияние UI-состояния и данных →
combine
,flatMapLatest
. - Compose локальное состояние →
remember { mutableStateOf }
; общее —collectAsState()
.
12) Готовый каркас (скелет) KMP-модуля
// Repo
interface Repo { suspend fun fetchAll(): List<Item>; fun streamThings(): Flow<Thing> }
// ViewModel-like (shared)
class Vm(
private val repo: Repo,
private val dispatchers: DispatcherProvider = DefaultDispatchers
) {
private val scope = CoroutineScope(SupervisorJob() + dispatchers.Main)
private val _ui = MutableStateFlow(UiState())
val ui: StateFlow<UiState> = _ui
private val _effects = MutableSharedFlow<UiEffect>()
val effects: SharedFlow<UiEffect> = _effects
fun start() { scope.launch { _ui.value = UiState(isLoading = true); _ui.value = UiState(data = repo.fetchAll()) } }
fun stop() { scope.cancel() }
}
// Compose Screen (CMP)
@Composable
fun VmScreen(vm: Vm) {
val state by vm.ui.collectAsState()
LaunchedEffect(Unit) { vm.start() }
// ... UI по state
}
Описание:
- Каркас минимален и переносим между Android/iOS/Desktop/Web.
- Главное — корректно управлять жизненным циклом: вызывать
stop()
/clear()
.
Заключение
Структурная конкуррентность + горячие стейт-потоки — базис для чистого, предсказуемого и переносимого асинхронного кода в KMP + Compose Multiplatform. Следуйте описанным правилам, используйте рецепты и тестируйте с тестовыми диспетчерами — и у вас будет надёжная, отзывчивая UI-архитектура на всех платформах.