@@ -18,19 +18,17 @@ import com.hoc.flowmvi.domain.model.UserValidationError
1818import com.hoc.flowmvi.domain.repository.UserRepository
1919import com.hoc081098.flowext.flowFromSuspend
2020import com.hoc081098.flowext.retryWithExponentialBackoff
21+ import com.hoc081098.flowext.scanWith
2122import java.io.IOException
2223import kotlin.time.Duration.Companion.milliseconds
2324import kotlin.time.ExperimentalTime
2425import kotlinx.coroutines.ExperimentalCoroutinesApi
2526import kotlinx.coroutines.FlowPreview
26- import kotlinx.coroutines.flow.Flow
2727import kotlinx.coroutines.flow.MutableSharedFlow
2828import kotlinx.coroutines.flow.catch
2929import kotlinx.coroutines.flow.first
30- import kotlinx.coroutines.flow.flatMapConcat
3130import kotlinx.coroutines.flow.map
3231import kotlinx.coroutines.flow.onEach
33- import kotlinx.coroutines.flow.scan
3432import kotlinx.coroutines.withContext
3533import timber.log.Timber
3634
@@ -44,25 +42,25 @@ internal class UserRepositoryImpl(
4442 private val domainToBody : Mapper <User , UserBody >,
4543 private val errorMapper : Mapper <Throwable , UserError >,
4644) : UserRepository {
47- private sealed class Change {
45+ private sealed interface Change {
4846 class Removed (
4947 val removed : User ,
50- ) : Change()
48+ ) : Change
5149
5250 class Refreshed (
5351 val user : List <User >,
54- ) : Change()
52+ ) : Change
5553
5654 class Added (
5755 val user : User ,
58- ) : Change()
56+ ) : Change
5957 }
6058
6159 private val changesFlow = MutableSharedFlow <Change >(extraBufferCapacity = 64 )
6260
6361 private suspend inline fun sendChange (change : Change ) = changesFlow.emit(change)
6462
65- private fun getUsersFromRemote (): Flow < List <User > > =
63+ private suspend fun getUsersFromRemoteWithRetry (): List <User > =
6664 flowFromSuspend {
6765 Timber .d(" [USER_REPO] getUsersFromRemote ..." )
6866
@@ -79,19 +77,17 @@ internal class UserRepositoryImpl(
7977 initialDelay = 500 .milliseconds,
8078 factor = 2.0 ,
8179 ) { it is IOException }
80+ .first()
8281
8382 override fun getUsers () =
84- getUsersFromRemote()
85- .flatMapConcat { initial ->
86- changesFlow
87- .onEach { Timber .d(" [USER_REPO] Change=$it " ) }
88- .scan(initial) { acc, change ->
89- when (change) {
90- is Change .Removed -> acc.filter { it.id != change.removed.id }
91- is Change .Refreshed -> change.user
92- is Change .Added -> acc + change.user
93- }
94- }
83+ changesFlow
84+ .onEach { Timber .d(" [USER_REPO] Change=$it " ) }
85+ .scanWith(::getUsersFromRemoteWithRetry) { acc, change ->
86+ when (change) {
87+ is Change .Removed -> acc.filter { it.id != change.removed.id }
88+ is Change .Refreshed -> change.user
89+ is Change .Added -> acc + change.user
90+ }
9591 }.onEach { Timber .d(" [USER_REPO] Emit users.size=${it.size} " ) }
9692 .map { it.right().leftWiden<UserError , _ , _ >() }
9793 .catch {
@@ -100,9 +96,9 @@ internal class UserRepositoryImpl(
10096 }
10197
10298 override suspend fun refresh () =
103- catchEither { getUsersFromRemote().first () }
99+ catchEither { getUsersFromRemoteWithRetry () }
104100 .onRight { sendChange(Change .Refreshed (it)) }
105- .map { }
101+ .map {}
106102 .onLeft { logError(it, " refresh" ) }
107103 .mapLeft(errorMapper)
108104
0 commit comments