fbpx

Blog

История одного бага или не все буферы одинаково полезны.

Сегодня расскажу вам интересную историю, которая произошла при рефакторинге одного из проектов. В коде одного из интеракторов начал падать SQL-запрос SQLiteException: too many SQL variables.

Долго искали ошибку, и подозрение упало на недавно переписанную логику с RxJava на Kotlin Flow. Сравните 2 цепочки:

// Цепочка на Kotlin Flow

var ids = listOf(1,2,3,4,5,6,7,8,9,10)
flowOf(ids)
.buffer(BUFFER_SIZE)
Цепочка на RxJava
var ids = listOf(1,2,3,4,5,6,7,8,9,10)
Observable.fromIterable(ids)
.buffer(BUFFER_SIZE)

Может показаться, что они выглядят одинаково, однако это заблуждение. Во-первых, давайте присмотримся к первому оператору flowOf(ids) и .fromIterable(ids). В случае с flowOf(ids) мы получим сразу всю пачку элементов списка, в отличие от оператора fromIterable(ids) который будет излучать каждый i-ый элемент по очереди. Ну ладно, это пол беды. Самое интересное возникает в операторе buffer. 

Чтобы понять суть проблемы, давайте вспомним как работает Flow. Flow по своей природе не имеет проблемы backpressure. Это значит что если обработчик работает медленнее, чем источник, то источник просто чуть замедлится под темп обработчика. 

Пример показан ниже:

Коллектор обрабатывает элементы гораздо дольше, чем эмиттер. Проблема в том, что наш эмиттер простаивает, а мог бы готовить блинчики, пока мы их неспешно потребляем! И чтобы решить эту проблему, нам и понадобится оператор buffer()

Сравните результаты:

Пока мы неспешно уплетали блинчики, эмиттер заботливо приготовил еще пару штук. То есть резюмируя: оператор buffer() в Kotlin Flow позволяет нам запускать коллектор на другой корутине. И это позволяет нам выполнять параллельно работу коллектора и эмиттера. Подробнее можно почитать тут https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/buffer.html 

В отличие от работы оператора Observable.buffer(). Я специально говорю об Observable, потому что в RxJava у Observable нет работы с BackPressure (в отличие от Flowable).

И если посмотреть документацию оператора buffer() для RxJava, то мы увидим следующую Marble Diagram.

Buffer


Там написано:”Buffer periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time”

То есть для Observable оператор buffer() просто копит элементы и отправляет их пачками. В этом и была причина бага. Потому что в Flow-цепочке, во-первых элементы списка отправлялись сразу, да еще и не группировались на пачки, как было задумано в изначальной цепочке в RxJava. 

При этом в Kotlin 1.9.0 появился оператор chunked(), который и делает именно то, что нужно в контексте логики данной цепочки. https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/chunked.html

flowOf("a", "b", "c", "d", "e")
    .chunked(2) // ["a", "b"], ["c", "d"], ["e"]
    .map { it.joinToString(separator = "") }
    .collect {
        println(it) // Prints "ab", "cd", e"
    }

Надеюсь данный пост был вам полезен и вы не допустите подобных ошибок в будущем.

Интенсив по Android-разработке


Чтобы не пропустить новые интересные статьи на тему Android и мобильной разработки подписывайтесь на Telegram-канал