Concurrency in Kotlin, KMP and Compose Multiplatform
TL;DR
- В Kotlin асинхронность = корутины + структурная конкуррентность.
- В KMP те же API работают на JVM/Android, iOS (Native), JS и Desktop, но есть нюансы диспетчеров и моста со Swift.
- В Compose Multiplatform корутины «живут» внутри эффектов (
LaunchedEffect,rememberCoroutineScope) и обмениваются состоянием черезStateFlow/SharedFlow/mutableStateOf/snapshotFlow. - Держите тяжёлую работу вне UI-потока, отмену — по иерархии
Job, данные в UI — горячим стейт-потоком.
1) Ментальная модель корутин
Корутина — лёгкая кооперативно отменяемая задача. Это не поток. Корутины планируются диспетчерами и подчиняются правилам структурной конкуррентности.
- Структурная конкуррентность: каждая корутина привязана к
CoroutineScope. Родитель отменяет детей, ошибки поднимаются вверх. - Билдеры:
launch { … }— запустить и забыть (возвращаетJob).async { … }— параллельный подсчёт, результат черезawait().coroutineScope { … }— супERVИЗИЯ по умолчанию: ошибка любого — падают все.supervisorScope { … }— ошибки детей не валят соседей.
- Контекст:
Dispatchers.Default/IO/Main+Job+ элементы (CoroutineName,SupervisorJob, т.д.). - Правила: избегайте
GlobalScope; не блокируйте UI потоки; для переключения используйтеwithContext(Dispatchers.IO/Default).
suspend fun loadProfile(): Profile = coroutineScope {
val user = async { api.user() }
val posts = async { api.posts() }
Profile(user.await(), posts.await())
}
Описание:
- Полезно для параллельного получения зависимых данных.
- Ошибка одного
asyncуронит весьcoroutineScope, что правильно для согласованности данных.
2) Отмена и обработка ошибок
Отмена — кооперативная. Точки отмены — suspend вызовы, yield(), delay(). В долгих циклах проверяйте ensureActive().
suspend fun watchLongJob() = coroutineScope {
val job = launch {
while (isActive) {
ensureActive()
doChunk()
}
}
withTimeout(5_000) { job.join() } // или job.cancelAndJoin()
}
- Таймауты:
withTimeout/withTimeoutOrNull. - Исключения:
- В
launchисключение сразу пробрасывается в родителя. - В
asyncошибка хранится доawait(). - Изолировать сбоевших детей:
SupervisorJob/supervisorScope.
- В
- Никогда не глотайте
CancellationException— это механизм отмены.
3) Потоки данных: Flow, StateFlow, SharedFlow, Channel
- Cold
Flow— ленивый, начинается наcollect. - Hot
StateFlow— текущее состояние с initial-значением. - Hot
SharedFlow— мультикаст одноразовых событий (рекомендуетсяreplay = 0). - Channel — очереди/конвейеры между корутинами.
Производительность и backpressure:
buffer(),conflate(),collectLatest { … }(отменяет обработчик предыдущего элемента).flowOn(Dispatchers.IO)— меняет контекст выше по цепочке операторов.- Мост с колбэками:
callbackFlow { trySend(...) ; awaitClose { ... } }.
val uiState: StateFlow<UiState> = repo
.streamThings() // Flow<Data>
.map(::toUiState)
.stateIn(
scope = scope,
started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 5_000),
initialValue = UiState.Loading
)
Когда использовать:
Flow— для «ленивых» последовательностей/стримов.StateFlow— для состояния экрана (горячий источник).SharedFlow/Channel— для одноразовых событий (навигация, тосты).
4) Специфика KMP (мультиплатформенность)
Один kotlinx-coroutines-core работает на всех таргетах, но у диспетчеров разные механики.
- JVM/Android:
Dispatchers.Main(Looper),Default,IO. - iOS/Native:
Dispatchers.Main= main-queue;Default— фоновые. Не блокируйте (никакихrunBlockingв UI). - JS: один event-loop, только
suspend, никакого блокирования. - Kotlin/Native память (новый MM): «заморозка» больше не обязательна, но держите мутабельность на том потоке, где она используется (обычно main для UI).
Рекомендуемый провайдер диспетчеров:
interface DispatcherProvider {
val Main: CoroutineDispatcher
val Default: CoroutineDispatcher
val IO: CoroutineDispatcher
}
object DefaultDispatchers : DispatcherProvider {
override val Main = Dispatchers.Main
override val Default = Dispatchers.Default
override val IO = Dispatchers.IO
}
Почему так: инъекция упрощает тестирование и платформенные различия.
5) Compose Multiplatform: эффекты и снапшоты
Не запускайте побочные эффекты «просто в @Composable». Используйте эффекты:
LaunchedEffect(key) { … }— старт при появлении/смене ключа, отмена при уходе.rememberCoroutineScope()— запускать из обработчиков (onClick).produceState— построитьState<T>изsuspendисточника.snapshotFlow { readSnapshotState() }— наблюдать Compose-состояние какFlow.
@Composable
fun Screen(vm: MyViewModel) {
val state by vm.ui.collectAsState() // StateFlow -> Compose state
LaunchedEffect(Unit) {
vm.effects.collect { effect ->
when (effect) {
is UiEffect.Navigate -> navController.navigate(effect.route)
is UiEffect.Toast -> showToast(effect.message)
}
}
}
// ... отрисовка state
}
Советы:
- Тяжёлые операции — в
withContext(Dispatchers.IO/Default). - Одноразовые события — через
SharedFlow/Channel, а неStateFlow.
6) Архитектурный скелет (KMP + CMP)
Слои:
- Data:
suspendAPI/DAO,Flowисточники. - Domain: use-cases (
suspend/Flow), параллелизм черезasync/awaitAll. - Presentation (shared):
ViewModel-подобный класс сCoroutineScope,MutableStateFlow<UiState>,MutableSharedFlow<UiEffect>.
data class UiState(
val isLoading: Boolean = false,
val data: List<Item> = emptyList(),
val error: String? = null
)
sealed interface UiEffect {
data class Navigate(val route: String) : UiEffect
data class Toast(val message: String) : UiEffect
}
class SharedViewModel(
private val repo: Repo,
private val dispatchers: DispatcherProvider = DefaultDispatchers
) {
private val job = SupervisorJob()
private val scope = CoroutineScope(job + dispatchers.Main)
private val _ui = MutableStateFlow(UiState(isLoading = true))
val ui: StateFlow<UiState> = _ui
private val _effects = MutableSharedFlow<UiEffect>(extraBufferCapacity = 16)
val effects: SharedFlow<UiEffect> = _effects
fun load() {
scope.launch {
_ui.value = _ui.value.copy(isLoading = true, error = null)
runCatching { repo.fetchAll() }
.onSuccess { _ui.value = UiState(data = it) }
.onFailure { _ui.value = UiState(error = it.message) }
}
}
fun onItemClick(id: String) {
scope.launch { _effects.emit(UiEffect.Navigate("/details/$id")) }
}
fun clear() { job.cancel() } // вызвать из платформенного слоя
}
7) Рецепты параллелизма (copy/paste)
Параллельные запросы:
suspend fun <T> parallel(vararg blocks: suspend () -> T): List<T> = coroutineScope {
blocks.map { async { it() } }.awaitAll()
}
Ограничить конкурентность (семафор):
suspend fun <T> List<T>.mapConcurrent(limit: Int, block: suspend (T) -> Unit) = coroutineScope {
val sem = kotlinx.coroutines.sync.Semaphore(limit)
map { item ->
async {
sem.withPermit { block(item) }
}
}.awaitAll()
}
Гонка «кто первый»:
suspend fun <T> race(vararg blocks: suspend () -> T): T = coroutineScope {
val deferred = blocks.map { async(start = CoroutineStart.DEFAULT) { it() } }
try {
val first = select<T> {
deferred.forEach { d -> d.onAwait { it } }
}
first.also { deferred.forEach { it.cancel() } }
} catch (e: Throwable) {
deferred.forEach { it.cancel() }
throw e
}
}
Retry c экспоненциальной паузой:
suspend fun <T> retrying(times: Int, baseDelayMs: Long = 200, block: suspend () -> T): T {
var attempt = 0
var delayMs = baseDelayMs
var last: Throwable? = null
while (attempt < times) {
try { return block() } catch (t: Throwable) {
last = t
delay(delayMs)
delayMs *= 2
attempt++
}
}
throw last ?: IllegalStateException("retrying: unknown error")
}
Backpressure в UI (collectLatest):
suspend fun observeSearch(queryFlow: Flow<String>, search: suspend (String) -> List<Item>): Flow<List<Item>> =
queryFlow
.debounce(300)
.filter { it.length >= 2 }
.distinctUntilChanged()
.mapLatest { q -> search(q) }
8) Тестирование корутин и Flow
- Используйте
runTest { … }+StandardTestDispatcherдля детерминированности. - Для тестирования
Flow— Turbine.
@OptIn(ExperimentalCoroutinesApi::class)
class VmTest {
private val testDispatcher = StandardTestDispatcher()
@Before fun setup() {
Dispatchers.setMain(testDispatcher)
}
@After fun tearDown() {
Dispatchers.resetMain()
}
@Test fun uiState_updates() = runTest(testDispatcher) {
val vm = SharedViewModel(FakeRepo(), object : DispatcherProvider {
override val Main = testDispatcher
override val Default = testDispatcher
override val IO = testDispatcher
})
vm.load()
testDispatcher.scheduler.advanceUntilIdle()
assertTrue(vm.ui.value.isLoading || vm.ui.value.data.isNotEmpty())
}
}
Тестирование Flow с Turbine:
@Test
fun flow_emits_items() = runTest {
repo.streamThings()
.test {
assertEquals(Thing(1), awaitItem())
cancelAndIgnoreRemainingEvents()
}
}
9) Частые грабли и как их избежать
- Подвис UI: забыли
withContext(Dispatchers.IO/Default)вокруг тяжёлого вызова. - Потерянная ошибка:
async { … }безawait()— вы её не увидите. - «Вечные» корутины: собственные
Scopeбез отмены (не вызвалиcancel()/clear()). - Двойной сбор
Flowв Compose из-за неправильных ключейLaunchedEffect. - iOS мост: вызвали
suspendиз Swift без удержания таска — отменится раньше времени; оборачивайте вTaskи держите ссылку.
10) Interop со Swift (iOS)
Вариант 1: использовать популярную библиотеку KMP-NativeCoroutines (Rick Clephas) — аннотации для автоматического моста suspend → async/await, Flow → AsyncSequence.
Вариант 2: минимальные ручные обёртки.
Kotlin (общий модуль):
// Превращаем Flow<T> в «наблюдаемый» класс для iOS
class CFlow<T>(private val origin: Flow<T>) {
fun watch(block: (T) -> Unit): Closeable {
val scope = MainScope()
val job = origin.onEach { block(it) }.launchIn(scope)
return Closeable { job.cancel() }
}
}
fun <T> Flow<T>.asCFlow(): CFlow<T> = CFlow(this)
Swift (использование):
// suspend-функции Kotlin видны со completion-колбэком; в Swift 5.5+
// удобно оборачивать в Task для async/await стиля.
func load() {
Task { @MainActor in
do {
let data = try await KotlinSharedRepo().fetchAll() // при наличии автосгенерированного async
// или через completion-API: repo.fetchAll { data, error in ... }
} catch {
// handle
}
}
}
// Подписка на CFlow
let closeable = vm.ui.asCFlow().watch { state in
// обновить SwiftUI
}
// Позже: closeable.close()
Примечания:
- На iOS всегда взаимодействуйте с UI на main-очереди.
- Держите ссылку на
Task/Closeable, чтобы не отменялось досрочно.
11) Мини-шпаргалка выбора примитива
- Однократный запрос →
suspend fun. - Поток данных/наблюдение →
Flow. - Состояние экрана →
StateFlow(горячее, с initial). - Одноразовые события UI →
SharedFlow(replay = 0)илиChannel. - Слияние UI-состояния и данных →
combine,flatMapLatest. - Compose локальное состояние →
remember { mutableStateOf }; общее —collectAsState().
12) Готовый каркас (скелет) KMP-модуля
// Repo
interface Repo { suspend fun fetchAll(): List<Item>; fun streamThings(): Flow<Thing> }
// ViewModel-like (shared)
class Vm(
private val repo: Repo,
private val dispatchers: DispatcherProvider = DefaultDispatchers
) {
private val scope = CoroutineScope(SupervisorJob() + dispatchers.Main)
private val _ui = MutableStateFlow(UiState())
val ui: StateFlow<UiState> = _ui
private val _effects = MutableSharedFlow<UiEffect>()
val effects: SharedFlow<UiEffect> = _effects
fun start() { scope.launch { _ui.value = UiState(isLoading = true); _ui.value = UiState(data = repo.fetchAll()) } }
fun stop() { scope.cancel() }
}
// Compose Screen (CMP)
@Composable
fun VmScreen(vm: Vm) {
val state by vm.ui.collectAsState()
LaunchedEffect(Unit) { vm.start() }
// ... UI по state
}
Описание:
- Каркас минимален и переносим между Android/iOS/Desktop/Web.
- Главное — корректно управлять жизненным циклом: вызывать
stop()/clear().
Заключение
Структурная конкуррентность + горячие стейт-потоки — базис для чистого, предсказуемого и переносимого асинхронного кода в KMP + Compose Multiplatform. Следуйте описанным правилам, используйте рецепты и тестируйте с тестовыми диспетчерами — и у вас будет надёжная, отзывчивая UI-архитектура на всех платформах.