【Kotlin】Flow入門|リアクティブなデータストリームを扱う方法

開発
Screenshot

Coroutinesで非同期処理を書けるようになった。Roomでデータベースも扱える。

次のステップは「Flow」です。

Flowを使えば、データの変更をリアルタイムに監視し、UIを自動的に更新できるようになります。

この記事では、Flowの基本から実践的な使い方まで解説します。

Flowとは

Flowは、時間をかけて複数の値を返すコルーチンベースのデータストリームです。

通常の関数との違い

// 通常のsuspend関数:1つの値を返す
suspend fun fetchUser(): User {
    return api.getUser()
}

// Flow:複数の値を順次返す
fun observeUsers(): Flow<List<User>> {
    return userDao.getAllUsers()  // データが変更されるたびに新しい値が流れる
}

なぜFlowが必要なのか

例えば、Todoリストを表示するアプリを考えてみましょう。

// ❌ 問題:データ取得時の状態しか取得できない
suspend fun getTodos(): List<Todo> {
    return todoDao.getAll()
}

// 別の画面でTodoを追加しても、UIは更新されない
// ✅ Flow:データが変更されるたびに自動で通知
fun observeTodos(): Flow<List<Todo>> {
    return todoDao.observeAll()
}

// Todoが追加・削除されると、自動でUIが更新される

Flowの基本

Flowの作成

// flow { } ビルダーでFlowを作成
fun simpleFlow(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(1000)  // 1秒待機
        emit(i)      // 値を送出
    }
}

Flowの収集(collect)

// collectで値を受け取る
viewModelScope.launch {
    simpleFlow().collect { value ->
        println(value)  // 1, 2, 3 が順番に出力
    }
}

重要:FlowはCold Stream

Flowは「Cold Stream」です。つまり、collectされるまで何も実行されません

val flow = simpleFlow()  // この時点では何も実行されない

flow.collect { value ->  // collectを呼んだ時点で実行開始
    println(value)
}

Flowの演算子

Flowには多くの便利な演算子があります。

map:値を変換

flowOf(1, 2, 3)
    .map { it * 2 }
    .collect { println(it) }  // 2, 4, 6

filter:条件でフィルタリング

flowOf(1, 2, 3, 4, 5)
    .filter { it % 2 == 0 }
    .collect { println(it) }  // 2, 4

transform:より柔軟な変換

flowOf(1, 2, 3)
    .transform { value ->
        emit("Processing $value")
        emit("Done $value")
    }
    .collect { println(it) }
// Processing 1, Done 1, Processing 2, Done 2, ...

onEach:副作用を追加

flowOf(1, 2, 3)
    .onEach { println("Emitting: $it") }
    .collect { println("Collected: $it") }

catch:エラーハンドリング

flow {
    emit(1)
    throw RuntimeException("Error!")
    emit(2)
}
.catch { e -> 
    println("Caught: ${e.message}")
    emit(-1)  // 代替値を送出
}
.collect { println(it) }  // 1, -1

take:最初のN個だけ取得

flowOf(1, 2, 3, 4, 5)
    .take(3)
    .collect { println(it) }  // 1, 2, 3

debounce:連続した値を間引く

// 検索入力など、頻繁な更新を間引きたい時に便利
searchQueryFlow
    .debounce(300)  // 300ms以内の連続した値は最後の1つだけ
    .collect { query ->
        searchApi(query)
    }

StateFlowとSharedFlow

通常のFlowに加えて、特殊なFlowがあります。

StateFlow:状態を保持するFlow

StateFlowは常に最新の値を保持するFlowです。UIの状態管理に最適です。

class UserViewModel : ViewModel() {
    
    // 内部で変更可能なMutableStateFlow
    private val _uiState = MutableStateFlow(UiState())
    
    // 外部には読み取り専用のStateFlowとして公開
    val uiState: StateFlow<UiState> = _uiState.asStateFlow()
    
    fun updateName(name: String) {
        _uiState.update { it.copy(name = name) }
    }
}

data class UiState(
    val name: String = "",
    val isLoading: Boolean = false
)

StateFlowの特徴

特徴説明
常に値を持つ初期値が必須。nullにならない
最新値のみ同じ値は連続して流れない
Hot Streamcollectがなくても値を保持

SharedFlow:イベント通知用

SharedFlowは一回限りのイベントを通知するのに適しています。

class UserViewModel : ViewModel() {
    
    private val _events = MutableSharedFlow<Event>()
    val events: SharedFlow<Event> = _events.asSharedFlow()
    
    fun showMessage(message: String) {
        viewModelScope.launch {
            _events.emit(Event.ShowSnackbar(message))
        }
    }
}

sealed class Event {
    data class ShowSnackbar(val message: String) : Event()
    data class Navigate(val route: String) : Event()
}

StateFlow vs SharedFlow

項目StateFlowSharedFlow
初期値必須不要
最新値の取得.valueで取得可能不可
重複した値無視される送出される
用途UIの状態管理イベント通知

実践:RoomとFlowを組み合わせる

RoomのDAOでFlowを返すと、データベースの変更を自動で監視できます。

DAO

@Dao
interface TodoDao {
    
    @Query("SELECT * FROM todos ORDER BY createdAt DESC")
    fun observeAll(): Flow<List<Todo>>
    
    @Query("SELECT * FROM todos WHERE isCompleted = :isCompleted")
    fun observeByStatus(isCompleted: Boolean): Flow<List<Todo>>
    
    @Insert
    suspend fun insert(todo: Todo)
    
    @Update
    suspend fun update(todo: Todo)
}

Repository

class TodoRepository(private val todoDao: TodoDao) {
    
    val allTodos: Flow<List<Todo>> = todoDao.observeAll()
    
    fun getTodosByStatus(isCompleted: Boolean): Flow<List<Todo>> {
        return todoDao.observeByStatus(isCompleted)
    }
    
    suspend fun addTodo(title: String) {
        todoDao.insert(Todo(title = title))
    }
}

ViewModel

class TodoViewModel(private val repository: TodoRepository) : ViewModel() {
    
    // Flowをcollectしなくても使えるようにStateFlowに変換
    val todos: StateFlow<List<Todo>> = repository.allTodos
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5000),
            initialValue = emptyList()
        )
    
    // フィルター状態
    private val _filter = MutableStateFlow(TodoFilter.ALL)
    val filter: StateFlow<TodoFilter> = _filter.asStateFlow()
    
    // フィルターに応じたTodoリスト
    val filteredTodos: StateFlow<List<Todo>> = combine(
        repository.allTodos,
        _filter
    ) { todos, filter ->
        when (filter) {
            TodoFilter.ALL -> todos
            TodoFilter.ACTIVE -> todos.filter { !it.isCompleted }
            TodoFilter.COMPLETED -> todos.filter { it.isCompleted }
        }
    }.stateIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubscribed(5000),
        initialValue = emptyList()
    )
    
    fun setFilter(filter: TodoFilter) {
        _filter.value = filter
    }
}

enum class TodoFilter { ALL, ACTIVE, COMPLETED }

Activity/Fragment(Jetpack Compose)

@Composable
fun TodoScreen(viewModel: TodoViewModel = viewModel()) {
    
    // collectAsStateでStateFlowをComposeのStateに変換
    val todos by viewModel.todos.collectAsState()
    val filter by viewModel.filter.collectAsState()
    
    Column {
        FilterChips(
            selected = filter,
            onFilterSelected = { viewModel.setFilter(it) }
        )
        
        LazyColumn {
            items(todos) { todo ->
                TodoItem(todo = todo)
            }
        }
    }
}

Activity/Fragment(View)

class TodoFragment : Fragment() {
    
    private val viewModel: TodoViewModel by viewModels()
    
    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)
        
        // repeatOnLifecycleでライフサイクルに応じて自動で開始/停止
        viewLifecycleOwner.lifecycleScope.launch {
            viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
                viewModel.todos.collect { todos ->
                    adapter.submitList(todos)
                }
            }
        }
    }
}

FlowをStateFlowに変換:stateIn

stateInを使うと、FlowをStateFlowに変換できます。

val todos: StateFlow<List<Todo>> = repository.observeTodos()
    .stateIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubscribed(5000),
        initialValue = emptyList()
    )

SharingStarted の種類

動作
Eagerly即座に開始、永続的に収集
Lazily最初のsubscriberで開始、永続的に収集
WhileSubscribed(timeout)subscriberがいる間だけ収集。timeout後に停止

WhileSubscribed(5000)がおすすめ。画面が非表示になって5秒後に収集を停止し、リソースを節約します。

複数のFlowを結合:combine

複数のFlowを組み合わせて、1つのFlowを作れます。

val searchQuery = MutableStateFlow("")
val sortOrder = MutableStateFlow(SortOrder.DATE)

val filteredItems: StateFlow<List<Item>> = combine(
    repository.observeAll(),
    searchQuery,
    sortOrder
) { items, query, order ->
    items
        .filter { it.name.contains(query, ignoreCase = true) }
        .sortedWith(
            when (order) {
                SortOrder.DATE -> compareByDescending { it.createdAt }
                SortOrder.NAME -> compareBy { it.name }
            }
        )
}.stateIn(
    scope = viewModelScope,
    started = SharingStarted.WhileSubscribed(5000),
    initialValue = emptyList()
)

よくある間違いと対処法

1. collectをメインスレッドで忘れる

// ❌ NG: UIの更新はメインスレッドで
viewModelScope.launch(Dispatchers.IO) {
    flow.collect { data ->
        textView.text = data  // クラッシュの可能性
    }
}

// ✅ OK: flowOnでFlowの実行スレッドを指定
flow
    .flowOn(Dispatchers.IO)  // 上流の処理をIOスレッドで
    .collect { data ->       // collectはメインスレッド
        textView.text = data
    }

2. ライフサイクルを考慮しない

// ❌ NG: Activityが破棄されてもcollectが続く
lifecycleScope.launch {
    viewModel.todos.collect { updateUI(it) }
}

// ✅ OK: ライフサイクルに応じて自動停止
lifecycleScope.launch {
    repeatOnLifecycle(Lifecycle.State.STARTED) {
        viewModel.todos.collect { updateUI(it) }
    }
}

3. StateFlowの初期値を忘れる

// ❌ コンパイルエラー:初期値が必要
private val _state = MutableStateFlow<User>()

// ✅ OK:初期値を指定
private val _state = MutableStateFlow<User?>(null)
// または
private val _state = MutableStateFlow(User.DEFAULT)

Flowを使うための設定

build.gradle.kts

dependencies {
    // Coroutines(Flowを含む)
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-android:1.7.3")
    
    // Lifecycle(repeatOnLifecycle用)
    implementation("androidx.lifecycle:lifecycle-runtime-ktx:2.7.0")
    
    // Compose用(オプション)
    implementation("androidx.lifecycle:lifecycle-runtime-compose:2.7.0")
}

まとめ

Flowの基本をまとめると:

概念説明
Flow複数の値を順次返すコールドストリーム
StateFlow状態を保持するホットストリーム(UI状態管理)
SharedFlowイベント通知用のホットストリーム
collectFlowの値を受け取る
stateInFlowをStateFlowに変換
combine複数のFlowを結合

基本パターン

// Repository
fun observeData(): Flow<List<Data>> = dao.observeAll()

// ViewModel
val data: StateFlow<List<Data>> = repository.observeData()
    .stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), emptyList())

// UI (Compose)
val data by viewModel.data.collectAsState()

Flowを使いこなせば、データの変更に自動で反応するリアクティブなアプリが作れます。

Coroutines、Room、Flowの3つを組み合わせて、モダンなAndroidアプリを開発しましょう!


この記事が参考になれば幸いです。

タイトルとURLをコピーしました