Skip to content

SharedFlow #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion buildSrc/src/main/kotlin/deps.kt
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object deps {
}

object jetbrains {
private const val version = "1.3.9"
private const val version = "1.4.0-M1"

const val coroutinesCore = "org.jetbrains.kotlinx:kotlinx-coroutines-core:$version"
const val coroutinesAndroid = "org.jetbrains.kotlinx:kotlinx-coroutines-android:$version"
Expand Down
15 changes: 6 additions & 9 deletions data/src/main/java/com/hoc/flowmvi/data/UserRepositoryImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ import com.hoc.flowmvi.domain.entity.User
import com.hoc.flowmvi.domain.repository.UserRepository
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onEach
Expand All @@ -36,7 +34,7 @@ internal class UserRepositoryImpl constructor(
class Added(val user: User) : Change()
}

private val changesChannel = BroadcastChannel<Change>(Channel.CONFLATED)
private val changesFlow = MutableSharedFlow<Change>()

private suspend fun getUsersFromRemote(): List<User> {
return withContext(dispatchers.io) {
Expand All @@ -49,8 +47,7 @@ internal class UserRepositoryImpl constructor(
return flow {
val initial = getUsersFromRemote()

changesChannel
.asFlow()
changesFlow
.onEach { Log.d("###", "[USER_REPO] Change=$it") }
.scan(initial) { acc, change ->
when (change) {
Expand All @@ -65,20 +62,20 @@ internal class UserRepositoryImpl constructor(
}

override suspend fun refresh() =
getUsersFromRemote().let { changesChannel.send(Change.Refreshed(it)) }
getUsersFromRemote().let { changesFlow.emit(Change.Refreshed(it)) }

override suspend fun remove(user: User) {
withContext(dispatchers.io) {
val response = userApiService.remove(domainToResponse(user).id)
changesChannel.send(Change.Removed(responseToDomain(response)))
changesFlow.emit(Change.Removed(responseToDomain(response)))
}
}

override suspend fun add(user: User) {
withContext(dispatchers.io) {
val body = domainToBody(user).copy(avatar = avatarUrls.random())
val response = userApiService.add(body)
changesChannel.send(Change.Added(responseToDomain(response)))
changesFlow.emit(Change.Added(responseToDomain(response)))
delay(400)
}
}
Expand Down
37 changes: 17 additions & 20 deletions feature-add/src/main/java/com/hoc/flowmvi/ui/add/AddVM.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.hoc.flowmvi.ui.add

import android.util.Log
import androidx.core.util.PatternsCompat
import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
Expand All @@ -9,50 +10,42 @@ import com.hoc.flowmvi.domain.entity.User
import com.hoc.flowmvi.domain.usecase.AddUserUseCase
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.scan
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.flow.stateIn

@FlowPreview
@ExperimentalCoroutinesApi
internal class AddVM(private val addUser: AddUserUseCase) : ViewModel() {
private val _eventChannel = BroadcastChannel<SingleEvent>(capacity = Channel.BUFFERED)
private val _intentChannel = BroadcastChannel<ViewIntent>(capacity = Channel.BUFFERED)
private val _eventFlow = MutableSharedFlow<SingleEvent>()
private val _intentFlow = MutableSharedFlow<ViewIntent>()

val viewState: StateFlow<ViewState>
val singleEvent: Flow<SingleEvent> get() = _eventFlow

val singleEvent: Flow<SingleEvent>

suspend fun processIntent(intent: ViewIntent) = _intentChannel.send(intent)
suspend fun processIntent(intent: ViewIntent) = _intentFlow.emit(intent)

init {
val initialVS = ViewState.initial()

viewState = MutableStateFlow(initialVS)
singleEvent = _eventChannel.asFlow()

_intentChannel
.asFlow()
viewState = _intentFlow
.toPartialStateChangesFlow()
.sendSingleEvent()
.scan(initialVS) { state, change -> change.reduce(state) }
.onEach { viewState.value = it }
.catch { }
.launchIn(viewModelScope)
.catch { Log.d("###", "[ADD_VM] Throwable: $it") }
.stateIn(viewModelScope, SharingStarted.Eagerly, initialVS)
}

private fun Flow<PartialStateChange>.sendSingleEvent(): Flow<PartialStateChange> {
Expand All @@ -69,7 +62,7 @@ internal class AddVM(private val addUser: AddUserUseCase) : ViewModel() {
PartialStateChange.FirstChange.FirstNameChangedFirstTime -> return@onEach
PartialStateChange.FirstChange.LastNameChangedFirstTime -> return@onEach
}
_eventChannel.send(event)
_eventFlow.emit(event)
}
}

Expand Down Expand Up @@ -99,6 +92,10 @@ internal class AddVM(private val addUser: AddUserUseCase) : ViewModel() {
)
)
}
.shareIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed()
)

val addUserChanges = filterIsInstance<ViewIntent.Submit>()
.withLatestFrom(userFormFlow) { _, userForm -> userForm }
Expand Down
39 changes: 19 additions & 20 deletions feature-main/src/main/java/com/hoc/flowmvi/ui/main/MainVM.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import com.hoc.flowmvi.domain.usecase.RefreshGetUsersUseCase
import com.hoc.flowmvi.domain.usecase.RemoveUserUseCase
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.catch
Expand All @@ -22,12 +21,13 @@ import kotlinx.coroutines.flow.filterNot
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.scan
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.flow.take

@Suppress("USELESS_CAST")
Expand All @@ -38,32 +38,31 @@ internal class MainVM(
private val refreshGetUsers: RefreshGetUsersUseCase,
private val removeUser: RemoveUserUseCase,
) : ViewModel() {
private val _eventChannel = BroadcastChannel<SingleEvent>(capacity = Channel.BUFFERED)
private val _intentChannel = BroadcastChannel<ViewIntent>(capacity = Channel.BUFFERED)
private val _eventFlow = MutableSharedFlow<SingleEvent>()
private val _intentFlow = MutableSharedFlow<ViewIntent>()

val viewState: StateFlow<ViewState>
val singleEvent: Flow<SingleEvent> get() = _eventFlow

val singleEvent: Flow<SingleEvent>

suspend fun processIntent(intent: ViewIntent) = _intentChannel.send(intent)
suspend fun processIntent(intent: ViewIntent) = _intentFlow.emit(intent)

init {
val initialVS = ViewState.initial()

viewState = MutableStateFlow(initialVS)
singleEvent = _eventChannel.asFlow()

val intentFlow = _intentChannel.asFlow()
merge(
intentFlow.filterIsInstance<ViewIntent.Initial>().take(1),
intentFlow.filterNot { it is ViewIntent.Initial }
viewState = merge(
_intentFlow.filterIsInstance<ViewIntent.Initial>().take(1),
_intentFlow.filterNot { it is ViewIntent.Initial }
)
.shareIn(viewModelScope, SharingStarted.WhileSubscribed())
.toPartialChangeFlow()
.sendSingleEvent()
.scan(initialVS) { vs, change -> change.reduce(vs) }
.onEach { viewState.value = it }
.catch { }
.launchIn(viewModelScope)
.catch { Log.d("###", "[MAIN_VM] Throwable: $it") }
.stateIn(
viewModelScope,
SharingStarted.Eagerly,
initialVS
)
}

private fun Flow<PartialChange>.sendSingleEvent(): Flow<PartialChange> {
Expand All @@ -81,7 +80,7 @@ internal class MainVM(
is PartialChange.GetUser.Data -> return@onEach
PartialChange.Refresh.Loading -> return@onEach
}
_eventChannel.send(event)
_eventFlow.emit(event)
}
}

Expand Down