Skip to Content
Kotlin⚡ Kotlin CoroutinesTesting Coroutines

Testing Coroutines

1. Giới thiệu

Testing code với coroutines đòi hỏi các tools đặc biệt để:

  • Kiểm soát thời gian (virtual time)
  • Test suspend functions
  • Test Flows
  • Xử lý concurrent operations

2. Setup

Thêm dependencies trong build.gradle.kts:

dependencies { // Coroutines test testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.7.3") // Turbine for Flow testing testImplementation("app.cash.turbine:turbine:1.0.0") // JUnit testImplementation("junit:junit:4.13.2") }

3. runTest - Basic Coroutine Testing

runTest là builder chính cho coroutine tests:

import kotlinx.coroutines.test.runTest import kotlin.test.Test import kotlin.test.assertEquals class RepositoryTest { @Test fun `fetchUser returns correct user`() = runTest { val repository = UserRepository() val user = repository.fetchUser("123") assertEquals("Alice", user.name) } } class UserRepository { suspend fun fetchUser(id: String): User { delay(1000) // Simulated network delay return User(id, "Alice") } } data class User(val id: String, val name: String)

runTest tự động skip delays - test chạy ngay lập tức!

4. Virtual Time với TestDispatcher

advanceTimeBy và advanceUntilIdle

import kotlinx.coroutines.test.* import kotlinx.coroutines.* class TimerTest { @Test fun `timer emits values over time`() = runTest { var count = 0 launch { repeat(3) { delay(1000) count++ } } assertEquals(0, count) advanceTimeBy(1000) assertEquals(1, count) advanceTimeBy(1000) assertEquals(2, count) advanceUntilIdle() // Advance đến hết assertEquals(3, count) } }

Kiểm soát chính xác timing

class DebounceTest { @Test fun `debounce only emits after delay`() = runTest { val results = mutableListOf<String>() launch { debounce(300) { results.add("First") } } // Trước 300ms - chưa emit advanceTimeBy(200) assertEquals(0, results.size) // Sau 300ms - emit advanceTimeBy(100) assertEquals(1, results.size) assertEquals("First", results[0]) } private suspend fun debounce(delayMs: Long, action: () -> Unit) { delay(delayMs) action() } }

5. TestDispatcher và Dependency Injection

Inject dispatcher cho testability

// Production code class DataSyncService( private val dispatcher: CoroutineDispatcher = Dispatchers.IO ) { suspend fun syncData(): List<String> = withContext(dispatcher) { delay(5000) // Long operation listOf("data1", "data2") } } // Test code class DataSyncServiceTest { @Test fun `syncData returns data`() = runTest { val testDispatcher = StandardTestDispatcher(testScheduler) val service = DataSyncService(testDispatcher) val deferred = async { service.syncData() } advanceUntilIdle() assertEquals(listOf("data1", "data2"), deferred.await()) } }

UnconfinedTestDispatcher vs StandardTestDispatcher

@Test fun `immediate execution with UnconfinedTestDispatcher`() = runTest { val unconfinedDispatcher = UnconfinedTestDispatcher(testScheduler) var executed = false launch(unconfinedDispatcher) { executed = true } // Với Unconfined, đã execute ngay assertTrue(executed) } @Test fun `controlled execution with StandardTestDispatcher`() = runTest { val standardDispatcher = StandardTestDispatcher(testScheduler) var executed = false launch(standardDispatcher) { executed = true } // Với Standard, chưa execute assertFalse(executed) advanceUntilIdle() assertTrue(executed) }

6. Testing Flows với Turbine

Turbine là library phổ biến nhất để test Flows:

Basic Flow testing

import app.cash.turbine.test import kotlinx.coroutines.flow.* class FlowTest { @Test fun `flow emits expected values`() = runTest { flow { emit(1) emit(2) emit(3) }.test { assertEquals(1, awaitItem()) assertEquals(2, awaitItem()) assertEquals(3, awaitItem()) awaitComplete() } } }

Testing StateFlow

class CounterViewModel { private val _count = MutableStateFlow(0) val count: StateFlow<Int> = _count.asStateFlow() fun increment() { _count.value++ } } class CounterViewModelTest { @Test fun `increment increases count`() = runTest { val viewModel = CounterViewModel() viewModel.count.test { assertEquals(0, awaitItem()) // Initial value viewModel.increment() assertEquals(1, awaitItem()) viewModel.increment() assertEquals(2, awaitItem()) cancelAndIgnoreRemainingEvents() } } }

Testing SharedFlow (events)

sealed class UiEvent { data class ShowToast(val message: String) : UiEvent() object NavigateBack : UiEvent() } class EventViewModel { private val _events = MutableSharedFlow<UiEvent>() val events: SharedFlow<UiEvent> = _events.asSharedFlow() suspend fun showToast(message: String) { _events.emit(UiEvent.ShowToast(message)) } } class EventViewModelTest { @Test fun `showToast emits event`() = runTest { val viewModel = EventViewModel() viewModel.events.test { viewModel.showToast("Hello") val event = awaitItem() assertTrue(event is UiEvent.ShowToast) assertEquals("Hello", (event as UiEvent.ShowToast).message) cancelAndIgnoreRemainingEvents() } } }

Testing Flow với delay

fun tickerFlow(intervalMs: Long): Flow<Int> = flow { var count = 0 while (true) { emit(count++) delay(intervalMs) } } class TickerFlowTest { @Test fun `ticker emits at intervals`() = runTest { tickerFlow(1000).test { assertEquals(0, awaitItem()) advanceTimeBy(1000) assertEquals(1, awaitItem()) advanceTimeBy(1000) assertEquals(2, awaitItem()) cancelAndIgnoreRemainingEvents() } } }

7. Testing Error Handling

Testing exceptions

class ErrorHandlingTest { @Test fun `flow catches errors`() = runTest { flow { emit(1) throw RuntimeException("Error!") } .catch { emit(-1) } .test { assertEquals(1, awaitItem()) assertEquals(-1, awaitItem()) awaitComplete() } } @Test fun `flow propagates uncaught errors`() = runTest { flow<Int> { throw RuntimeException("Error!") }.test { val error = awaitError() assertEquals("Error!", error.message) } } }

Testing retry logic

class RetryTest { @Test fun `retry succeeds after failures`() = runTest { var attempts = 0 flow { attempts++ if (attempts < 3) { throw RuntimeException("Attempt $attempts failed") } emit("Success") } .retry(3) { true } .test { assertEquals("Success", awaitItem()) awaitComplete() } assertEquals(3, attempts) } }

8. Testing ViewModels

Complete ViewModel test

class UserViewModel( private val repository: UserRepository, private val dispatcher: CoroutineDispatcher = Dispatchers.Main ) : ViewModel() { private val _uiState = MutableStateFlow<UiState>(UiState.Loading) val uiState: StateFlow<UiState> = _uiState.asStateFlow() fun loadUser(userId: String) { viewModelScope.launch(dispatcher) { _uiState.value = UiState.Loading try { val user = repository.getUser(userId) _uiState.value = UiState.Success(user) } catch (e: Exception) { _uiState.value = UiState.Error(e.message ?: "Unknown error") } } } } sealed class UiState { object Loading : UiState() data class Success(val user: User) : UiState() data class Error(val message: String) : UiState() } class UserViewModelTest { @Test fun `loadUser success updates state`() = runTest { val fakeRepository = FakeUserRepository() val viewModel = UserViewModel( repository = fakeRepository, dispatcher = StandardTestDispatcher(testScheduler) ) viewModel.uiState.test { assertEquals(UiState.Loading, awaitItem()) viewModel.loadUser("123") advanceUntilIdle() val state = awaitItem() assertTrue(state is UiState.Success) assertEquals("Alice", (state as UiState.Success).user.name) cancelAndIgnoreRemainingEvents() } } @Test fun `loadUser error updates state`() = runTest { val fakeRepository = FakeUserRepository(shouldFail = true) val viewModel = UserViewModel( repository = fakeRepository, dispatcher = StandardTestDispatcher(testScheduler) ) viewModel.uiState.test { skipItems(1) // Skip initial Loading viewModel.loadUser("123") advanceUntilIdle() val state = awaitItem() assertTrue(state is UiState.Error) cancelAndIgnoreRemainingEvents() } } } class FakeUserRepository(private val shouldFail: Boolean = false) : UserRepository { override suspend fun getUser(id: String): User { if (shouldFail) throw RuntimeException("Network error") return User(id, "Alice") } }

9. Testing Timeouts

class TimeoutTest { @Test fun `operation completes within timeout`() = runTest { val result = withTimeout(1000) { delay(500) "Success" } assertEquals("Success", result) } @Test fun `operation timeout returns null`() = runTest { val result = withTimeoutOrNull(500) { delay(1000) "Never returned" } assertNull(result) } }

10. Best Practices

// ✅ Inject dispatchers class MyRepository( private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO ) // ✅ Use runTest for all coroutine tests @Test fun myTest() = runTest { } // ✅ Use Turbine for Flow testing flow.test { assertEquals(expected, awaitItem()) } // ✅ Use TestDispatcher for controlled execution val testDispatcher = StandardTestDispatcher(testScheduler) // ✅ Clean up with cancelAndIgnoreRemainingEvents() flow.test { // assertions cancelAndIgnoreRemainingEvents() } // ❌ Don't use runBlocking in tests // @Test fun badTest() = runBlocking { } // BAD // ❌ Don't use real delays in tests // delay(5000) // Makes tests slow

📝 Tóm tắt

ToolPurpose
runTestTest builder với virtual time
advanceTimeBy()Skip time forward
advanceUntilIdle()Run all pending coroutines
StandardTestDispatcherControlled execution
UnconfinedTestDispatcherImmediate execution
Turbine.test {}Flow testing
awaitItem()Get next Flow emission
awaitComplete()Assert Flow completes
awaitError()Assert Flow errors

Key points:

  • Inject dispatchers để dễ test
  • runTest tự động skip delays
  • Turbine là cách tốt nhất để test Flows
  • Dùng StandardTestDispatcher cho controlled timing
Last updated on