This article will take about 4 minutes to read.
See the github repo for this project here
The Bloc architecture is a relatively new architecture initially created for Flutter apps. However, it will likely see widespread adoption over the next 5 years in Android, as Jetpack Compose becomes more popular. Therefore, I’ll present 2 implementations of a bloc, one in Rx
, and once in Coroutines
.
This is an implementation of a bloc in kotlin, for RxJava/RxKotlin
abstract class Bloc<A, S : Bloc.BlocState> {
private val _out: PublishSubject<EitherResult<S, BlocError>> = PublishSubject.create()
val out: Observable<EitherResult<S, BlocError>> = _out.hide()
private val compositeDisposable = CompositeDisposable()
protected abstract fun processInner(action: A): Observable<S>
fun process(action: A) {
processInner(action).subscribe(
{
_out.onNext(Success(it))
}, {
_out.onErrorResumeWith(Failure(BlocError(it)).observable())
}).addTo(compositeDisposable)
}
fun clear() {
compositeDisposable.clear()
_out.onComplete()
}
interface BlocState
class BlocError(t: Throwable) : BlocState
}
There can be multiple blocs in a feature, but if they are there are they need to be routed through a collector, which will put all of their emitted states into a single stream. That looks like this:
fun collector(
vararg listOf: Bloc<*, *>,
scheduler: Scheduler = Schedulers.computation(),
onEvent: (t: EitherResult<*, Bloc.BlocError>) -> Unit,
) {
Observable
.merge(listOf.map { it.out })
.observeOn(scheduler)
.subscribe(onEvent)
}
This is an implementation of a bloc in kotlin, for coroutines. Note that flowables are still experimental, so this should not be used in production code yet.
abstract class FlowBloc<A, S : BlocState>(
coroutineDispatcher: CoroutineDispatcher = Dispatchers.Default
) {
private val scope = CoroutineScope(coroutineDispatcher)
private val _in: Channel<A> = Channel()
private val _out: MutableSharedFlow<EitherResult<S, BlocError>> = MutableSharedFlow()
val out: Flow<EitherResult<S, BlocError>> = _out.asSharedFlow()
protected abstract suspend fun onEvent(event: A): S
init {
scope.launch {
for (i in _in) {
// todo catch the failure case with a try/catch
_out.emit(Success(onEvent(i)))
}
}
}
fun process(action: A) = scope.launch { _in.send(action) }
fun clear() = scope.cancel()
}
the coroutine bloc also requires a collector if there is more than one bloc per feature. The collector for flowables looks like this:
class FlowBlocCollector<A, S : Bloc.BlocState>(
private vararg val blocs: FlowBloc<A, S>,
private val scope: CoroutineScope = Scope.default(),
onState: suspend (EitherResult<S, Bloc.BlocError>) -> Unit
) {
init {
blocs.forEach { flow ->
scope.launch {
flow.out.collect(onState)
}
}
}
fun clear() {
scope.cancel()
blocs.forEach { it.clear() }
}
}