Bringing Flutter's Bloc Architecture to Kotlin

Posted: 15 Jan 2021. Last modified on 04-Jun-22.

This article will take about 4 minutes to read.


See the github repo for this project here


The Bloc architecture is a relatively new architecture initially created for Flutter apps. However, it will likely see widespread adoption over the next 5 years in Android, as Jetpack Compose becomes more popular. Therefore, I’ll present 2 implementations of a bloc, one in Rx, and once in Coroutines.

RxJava

This is an implementation of a bloc in kotlin, for RxJava/RxKotlin

abstract class Bloc<A, S : Bloc.BlocState> {
    private val _out: PublishSubject<EitherResult<S, BlocError>> = PublishSubject.create()
    val out: Observable<EitherResult<S, BlocError>> = _out.hide()
    private val compositeDisposable = CompositeDisposable()
    protected abstract fun processInner(action: A): Observable<S>
    fun process(action: A) {
        processInner(action).subscribe(
            {
                _out.onNext(Success(it))
            }, {
                _out.onErrorResumeWith(Failure(BlocError(it)).observable())
            }).addTo(compositeDisposable)
    }

    fun clear() {
        compositeDisposable.clear()
        _out.onComplete()
    }

    interface BlocState
    class BlocError(t: Throwable) : BlocState
}

There can be multiple blocs in a feature, but if they are there are they need to be routed through a collector, which will put all of their emitted states into a single stream. That looks like this:

fun collector(
    vararg listOf: Bloc<*, *>,
    scheduler: Scheduler = Schedulers.computation(),
    onEvent: (t: EitherResult<*, Bloc.BlocError>) -> Unit,
) {
    Observable
        .merge(listOf.map { it.out })
        .observeOn(scheduler)
        .subscribe(onEvent)
}

Coroutines

This is an implementation of a bloc in kotlin, for coroutines. Note that flowables are still experimental, so this should not be used in production code yet.

abstract class FlowBloc<A, S : BlocState>(
    coroutineDispatcher: CoroutineDispatcher = Dispatchers.Default
) {
    private val scope = CoroutineScope(coroutineDispatcher)
    private val _in: Channel<A> = Channel()
    private val _out: MutableSharedFlow<EitherResult<S, BlocError>> = MutableSharedFlow()
    val out: Flow<EitherResult<S, BlocError>> = _out.asSharedFlow()
    protected abstract suspend fun onEvent(event: A): S

    init {
        scope.launch {
            for (i in _in) {
                // todo catch the failure case with a try/catch
                _out.emit(Success(onEvent(i)))
            }
        }
    }

    fun process(action: A) = scope.launch { _in.send(action) }
    fun clear() = scope.cancel()
}

the coroutine bloc also requires a collector if there is more than one bloc per feature. The collector for flowables looks like this:

class FlowBlocCollector<A, S : Bloc.BlocState>(
    private vararg val blocs: FlowBloc<A, S>,
    private val scope: CoroutineScope = Scope.default(),
    onState: suspend (EitherResult<S, Bloc.BlocError>) -> Unit
) {
    init {
        blocs.forEach { flow ->
            scope.launch {
                flow.out.collect(onState)
            }
        }
    }

    fun clear() {
        scope.cancel()
        blocs.forEach { it.clear() }
    }
}