Testing Coroutines
1. Giới thiệu
Testing code với coroutines đòi hỏi các tools đặc biệt để:
- Kiểm soát thời gian (virtual time)
- Test suspend functions
- Test Flows
- Xử lý concurrent operations
2. Setup
Thêm dependencies trong build.gradle.kts:
dependencies {
// Coroutines test
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.7.3")
// Turbine for Flow testing
testImplementation("app.cash.turbine:turbine:1.0.0")
// JUnit
testImplementation("junit:junit:4.13.2")
}3. runTest - Basic Coroutine Testing
runTest là builder chính cho coroutine tests:
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertEquals
class RepositoryTest {
@Test
fun `fetchUser returns correct user`() = runTest {
val repository = UserRepository()
val user = repository.fetchUser("123")
assertEquals("Alice", user.name)
}
}
class UserRepository {
suspend fun fetchUser(id: String): User {
delay(1000) // Simulated network delay
return User(id, "Alice")
}
}
data class User(val id: String, val name: String)runTest tự động skip delays - test chạy ngay lập tức!
4. Virtual Time với TestDispatcher
advanceTimeBy và advanceUntilIdle
import kotlinx.coroutines.test.*
import kotlinx.coroutines.*
class TimerTest {
@Test
fun `timer emits values over time`() = runTest {
var count = 0
launch {
repeat(3) {
delay(1000)
count++
}
}
assertEquals(0, count)
advanceTimeBy(1000)
assertEquals(1, count)
advanceTimeBy(1000)
assertEquals(2, count)
advanceUntilIdle() // Advance đến hết
assertEquals(3, count)
}
}Kiểm soát chính xác timing
class DebounceTest {
@Test
fun `debounce only emits after delay`() = runTest {
val results = mutableListOf<String>()
launch {
debounce(300) { results.add("First") }
}
// Trước 300ms - chưa emit
advanceTimeBy(200)
assertEquals(0, results.size)
// Sau 300ms - emit
advanceTimeBy(100)
assertEquals(1, results.size)
assertEquals("First", results[0])
}
private suspend fun debounce(delayMs: Long, action: () -> Unit) {
delay(delayMs)
action()
}
}5. TestDispatcher và Dependency Injection
Inject dispatcher cho testability
// Production code
class DataSyncService(
private val dispatcher: CoroutineDispatcher = Dispatchers.IO
) {
suspend fun syncData(): List<String> = withContext(dispatcher) {
delay(5000) // Long operation
listOf("data1", "data2")
}
}
// Test code
class DataSyncServiceTest {
@Test
fun `syncData returns data`() = runTest {
val testDispatcher = StandardTestDispatcher(testScheduler)
val service = DataSyncService(testDispatcher)
val deferred = async { service.syncData() }
advanceUntilIdle()
assertEquals(listOf("data1", "data2"), deferred.await())
}
}UnconfinedTestDispatcher vs StandardTestDispatcher
@Test
fun `immediate execution with UnconfinedTestDispatcher`() = runTest {
val unconfinedDispatcher = UnconfinedTestDispatcher(testScheduler)
var executed = false
launch(unconfinedDispatcher) {
executed = true
}
// Với Unconfined, đã execute ngay
assertTrue(executed)
}
@Test
fun `controlled execution with StandardTestDispatcher`() = runTest {
val standardDispatcher = StandardTestDispatcher(testScheduler)
var executed = false
launch(standardDispatcher) {
executed = true
}
// Với Standard, chưa execute
assertFalse(executed)
advanceUntilIdle()
assertTrue(executed)
}6. Testing Flows với Turbine
Turbine là library phổ biến nhất để test Flows:
Basic Flow testing
import app.cash.turbine.test
import kotlinx.coroutines.flow.*
class FlowTest {
@Test
fun `flow emits expected values`() = runTest {
flow {
emit(1)
emit(2)
emit(3)
}.test {
assertEquals(1, awaitItem())
assertEquals(2, awaitItem())
assertEquals(3, awaitItem())
awaitComplete()
}
}
}Testing StateFlow
class CounterViewModel {
private val _count = MutableStateFlow(0)
val count: StateFlow<Int> = _count.asStateFlow()
fun increment() {
_count.value++
}
}
class CounterViewModelTest {
@Test
fun `increment increases count`() = runTest {
val viewModel = CounterViewModel()
viewModel.count.test {
assertEquals(0, awaitItem()) // Initial value
viewModel.increment()
assertEquals(1, awaitItem())
viewModel.increment()
assertEquals(2, awaitItem())
cancelAndIgnoreRemainingEvents()
}
}
}Testing SharedFlow (events)
sealed class UiEvent {
data class ShowToast(val message: String) : UiEvent()
object NavigateBack : UiEvent()
}
class EventViewModel {
private val _events = MutableSharedFlow<UiEvent>()
val events: SharedFlow<UiEvent> = _events.asSharedFlow()
suspend fun showToast(message: String) {
_events.emit(UiEvent.ShowToast(message))
}
}
class EventViewModelTest {
@Test
fun `showToast emits event`() = runTest {
val viewModel = EventViewModel()
viewModel.events.test {
viewModel.showToast("Hello")
val event = awaitItem()
assertTrue(event is UiEvent.ShowToast)
assertEquals("Hello", (event as UiEvent.ShowToast).message)
cancelAndIgnoreRemainingEvents()
}
}
}Testing Flow với delay
fun tickerFlow(intervalMs: Long): Flow<Int> = flow {
var count = 0
while (true) {
emit(count++)
delay(intervalMs)
}
}
class TickerFlowTest {
@Test
fun `ticker emits at intervals`() = runTest {
tickerFlow(1000).test {
assertEquals(0, awaitItem())
advanceTimeBy(1000)
assertEquals(1, awaitItem())
advanceTimeBy(1000)
assertEquals(2, awaitItem())
cancelAndIgnoreRemainingEvents()
}
}
}7. Testing Error Handling
Testing exceptions
class ErrorHandlingTest {
@Test
fun `flow catches errors`() = runTest {
flow {
emit(1)
throw RuntimeException("Error!")
}
.catch { emit(-1) }
.test {
assertEquals(1, awaitItem())
assertEquals(-1, awaitItem())
awaitComplete()
}
}
@Test
fun `flow propagates uncaught errors`() = runTest {
flow<Int> {
throw RuntimeException("Error!")
}.test {
val error = awaitError()
assertEquals("Error!", error.message)
}
}
}Testing retry logic
class RetryTest {
@Test
fun `retry succeeds after failures`() = runTest {
var attempts = 0
flow {
attempts++
if (attempts < 3) {
throw RuntimeException("Attempt $attempts failed")
}
emit("Success")
}
.retry(3) { true }
.test {
assertEquals("Success", awaitItem())
awaitComplete()
}
assertEquals(3, attempts)
}
}8. Testing ViewModels
Complete ViewModel test
class UserViewModel(
private val repository: UserRepository,
private val dispatcher: CoroutineDispatcher = Dispatchers.Main
) : ViewModel() {
private val _uiState = MutableStateFlow<UiState>(UiState.Loading)
val uiState: StateFlow<UiState> = _uiState.asStateFlow()
fun loadUser(userId: String) {
viewModelScope.launch(dispatcher) {
_uiState.value = UiState.Loading
try {
val user = repository.getUser(userId)
_uiState.value = UiState.Success(user)
} catch (e: Exception) {
_uiState.value = UiState.Error(e.message ?: "Unknown error")
}
}
}
}
sealed class UiState {
object Loading : UiState()
data class Success(val user: User) : UiState()
data class Error(val message: String) : UiState()
}
class UserViewModelTest {
@Test
fun `loadUser success updates state`() = runTest {
val fakeRepository = FakeUserRepository()
val viewModel = UserViewModel(
repository = fakeRepository,
dispatcher = StandardTestDispatcher(testScheduler)
)
viewModel.uiState.test {
assertEquals(UiState.Loading, awaitItem())
viewModel.loadUser("123")
advanceUntilIdle()
val state = awaitItem()
assertTrue(state is UiState.Success)
assertEquals("Alice", (state as UiState.Success).user.name)
cancelAndIgnoreRemainingEvents()
}
}
@Test
fun `loadUser error updates state`() = runTest {
val fakeRepository = FakeUserRepository(shouldFail = true)
val viewModel = UserViewModel(
repository = fakeRepository,
dispatcher = StandardTestDispatcher(testScheduler)
)
viewModel.uiState.test {
skipItems(1) // Skip initial Loading
viewModel.loadUser("123")
advanceUntilIdle()
val state = awaitItem()
assertTrue(state is UiState.Error)
cancelAndIgnoreRemainingEvents()
}
}
}
class FakeUserRepository(private val shouldFail: Boolean = false) : UserRepository {
override suspend fun getUser(id: String): User {
if (shouldFail) throw RuntimeException("Network error")
return User(id, "Alice")
}
}9. Testing Timeouts
class TimeoutTest {
@Test
fun `operation completes within timeout`() = runTest {
val result = withTimeout(1000) {
delay(500)
"Success"
}
assertEquals("Success", result)
}
@Test
fun `operation timeout returns null`() = runTest {
val result = withTimeoutOrNull(500) {
delay(1000)
"Never returned"
}
assertNull(result)
}
}10. Best Practices
// ✅ Inject dispatchers
class MyRepository(
private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO
)
// ✅ Use runTest for all coroutine tests
@Test
fun myTest() = runTest { }
// ✅ Use Turbine for Flow testing
flow.test {
assertEquals(expected, awaitItem())
}
// ✅ Use TestDispatcher for controlled execution
val testDispatcher = StandardTestDispatcher(testScheduler)
// ✅ Clean up with cancelAndIgnoreRemainingEvents()
flow.test {
// assertions
cancelAndIgnoreRemainingEvents()
}
// ❌ Don't use runBlocking in tests
// @Test fun badTest() = runBlocking { } // BAD
// ❌ Don't use real delays in tests
// delay(5000) // Makes tests slow📝 Tóm tắt
| Tool | Purpose |
|---|---|
runTest | Test builder với virtual time |
advanceTimeBy() | Skip time forward |
advanceUntilIdle() | Run all pending coroutines |
StandardTestDispatcher | Controlled execution |
UnconfinedTestDispatcher | Immediate execution |
Turbine.test {} | Flow testing |
awaitItem() | Get next Flow emission |
awaitComplete() | Assert Flow completes |
awaitError() | Assert Flow errors |
Key points:
- Inject dispatchers để dễ test
runTesttự động skip delays- Turbine là cách tốt nhất để test Flows
- Dùng StandardTestDispatcher cho controlled timing
Last updated on