Bringing Flutter's Bloc Architecture to Kotlin

Posted: 15 Jan 2021. Last modified on 04-Jun-22.

This article will take about 4 minutes to read.

See the github repo for this project here

The Bloc architecture is a relatively new architecture initially created for Flutter apps. However, it will likely see widespread adoption over the next 5 years in Android, as Jetpack Compose becomes more popular. Therefore, I’ll present 2 implementations of a bloc, one in Rx, and once in Coroutines.


This is an implementation of a bloc in kotlin, for RxJava/RxKotlin

abstract class Bloc<A, S : Bloc.BlocState> {
    private val _out: PublishSubject<EitherResult<S, BlocError>> = PublishSubject.create()
    val out: Observable<EitherResult<S, BlocError>> = _out.hide()
    private val compositeDisposable = CompositeDisposable()
    protected abstract fun processInner(action: A): Observable<S>
    fun process(action: A) {
            }, {

    fun clear() {

    interface BlocState
    class BlocError(t: Throwable) : BlocState

There can be multiple blocs in a feature, but if they are there are they need to be routed through a collector, which will put all of their emitted states into a single stream. That looks like this:

fun collector(
    vararg listOf: Bloc<*, *>,
    scheduler: Scheduler = Schedulers.computation(),
    onEvent: (t: EitherResult<*, Bloc.BlocError>) -> Unit,
) {
        .merge( { it.out })


This is an implementation of a bloc in kotlin, for coroutines. Note that flowables are still experimental, so this should not be used in production code yet.

abstract class FlowBloc<A, S : BlocState>(
    coroutineDispatcher: CoroutineDispatcher = Dispatchers.Default
) {
    private val scope = CoroutineScope(coroutineDispatcher)
    private val _in: Channel<A> = Channel()
    private val _out: MutableSharedFlow<EitherResult<S, BlocError>> = MutableSharedFlow()
    val out: Flow<EitherResult<S, BlocError>> = _out.asSharedFlow()
    protected abstract suspend fun onEvent(event: A): S

    init {
        scope.launch {
            for (i in _in) {
                // todo catch the failure case with a try/catch

    fun process(action: A) = scope.launch { _in.send(action) }
    fun clear() = scope.cancel()

the coroutine bloc also requires a collector if there is more than one bloc per feature. The collector for flowables looks like this:

class FlowBlocCollector<A, S : Bloc.BlocState>(
    private vararg val blocs: FlowBloc<A, S>,
    private val scope: CoroutineScope = Scope.default(),
    onState: suspend (EitherResult<S, Bloc.BlocError>) -> Unit
) {
    init {
        blocs.forEach { flow ->
            scope.launch {

    fun clear() {
        blocs.forEach { it.clear() }