Безусловно, одним из преимуществ использования RxJava в Android – это легкий способ менять потоки выполнения для различных задач, занимающих много времени. Такие задачи как: сетевой запрос к серверу, парсинг JSON, маппинг данных из VO в DTO, сохранение данных в БД или на диск довольно ресурсоёмки и занимают много времени, и, чтобы сделать приложение более отзывчивым и приятным для пользователя, необходимо выносить всю “тяжёлую” работу для обработки в отдельный поток. Для этого в RxJava есть обширный набор, так называемых Шедулеров (Schedulers), которые позволяют обрабатывать данные в других потоках. Как ими пользоваться и какие между ними отличия – поговорим в этой статье.
Schedulers.io()
Schedulers.io() – наиболее распространенный вид scheduler’a. Этот тип используется для сетевых запросов, операций чтения данных из файла, записи или чтения данных из БД. Такой планировщик (scheduler) создаёт необходимое количество потоков для эффективного выполнения вашей задачи, удаляет ненужные, и старается переиспользовать потоки, которые могут понадобится. Важно осозновать, что количество созданных потоков не ограничено, соответственно есть опасность создания большого количества потоков и нехватки памяти. Неиспользуемый threads (потоки) очищаются после 60 секунд неактивности.
Schedulers.io() в RxJava предназначен для задач, связанных с вводом-выводом (IO-bound), таких как сетевые запросы, операции с файлами или другие задачи, которые предполагают ожидание внешних ресурсов. Под капотом он использует кэшированный пул потоков (cached thread pool), который динамически создает и управляет потоками по мере необходимости. Вот почему у него нет строгих ограничений и он может создавать новые потоки:
Кэшированный пул потоков
Schedulers.io() использует CachedThreadPool (через Executors.newCachedThreadPool()).
Кэшированный пул потоков создает новые потоки по мере необходимости, но повторно использует уже созданные потоки, если они доступны.
Если поток простаивает более 60 секунд (по умолчанию), он завершается и удаляется из пула.
Такая архитектура позволяет пулу динамически масштабироваться для обработки большого количества задач, что делает его подходящим для операций, связанных с вводом-выводом, где задачи могут блокироваться на неопределенное время.
Отсутствие фиксированного лимита потоков
В отличие от Schedulers.computation(), который имеет фиксированный размер пула потоков (обычно основанный на количестве ядер процессора), Schedulers.io() не имеет строгого ограничения на количество потоков.
Это связано с тем, что задачи, связанные с вводом-выводом, часто предполагают ожидание (например, ответа от сети или операций с диском), поэтому наличие большего количества потоков помогает повысить пропускную способность, позволяя другим задачам выполняться, пока некоторые из них заблокированы.
Динамическое создание потоков
Если все существующие потоки в пуле заняты, кэшированный пул создаст новый поток для обработки новой задачи.
Такое поведение гарантирует, что задачи не будут оставаться без ресурсов, даже при высокой нагрузке.
Почему нет ограничений?
Задачи, связанные с вводом-выводом, часто непредсказуемы по длительности и использованию ресурсов. Например, сетевой запрос может занять миллисекунды или секунды в зависимости от скорости ответа сервера.
Отсутствие строгого лимита позволяет Schedulers.io() эффективно обрабатывать большое количество concurrent задач без ненужных блокировок или задержек.
Возможные недостатки
Поскольку Schedulers.io() может создавать большое количество потоков, это может привести к исчерпанию ресурсов, если использовать его неправильно. Например, если вы планируете огромное количество задач, которые никогда не завершаются, пул потоков может расти бесконечно, потребляя память и ресурсы процессора.
Он не подходит для задач, связанных с интенсивными вычислениями (CPU-bound), так как может перегрузить систему слишком большим количеством потоков.
Рекомендации по использованию
Используйте Schedulers.io() для задач, связанных с вводом-выводом, и Schedulers.computation() для задач, связанных с вычислениями.
Будьте внимательны к количеству concurrent задач, которые вы планируете на Schedulers.io(). Если ожидается большое количество задач, рассмотрите использование backpressure или ограничение concurrency с помощью операторов, таких как flatMap с параметром concurrency.
Database db = Database.from(conn);
Observable customerNames =
db.select("SELECT NAME FROM CUSTOMER")
.getAs(String.class)
.subscribeOn(Schedulers.io());

Schedulers.computation()
Schedulers.computation() – этот вид планировщика используется для математичсеких вычислений.Такой тип планировщика удобен тем, что создает количество потоков, соответствующее числу ядер в процессоре для параллельной обработки.Если вы не знаете какой тип планировщика вам подходит, то можете использовать Schedulers.computation() в качестве дефолтного. Многие операторы RxJava используют по умолчанию как раз Schedulers.computation()
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
.subscribeOn(Schedulers.computation());
Schedulers.newThread()
Schedulers.newThread() – создаёт новый поток для каждого активного Observable, не пытаясь переиспользовать ранее созданные потоки. Важно понимать, что в сложных задачах количество созданных threads не ограничено, и можно “уронить” приложение из-за нехватки памяти.
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
.subscribeOn(Schedulers.newThread());
Schedulers.single()
Schedulers.single() – такой планировщик можно рассматривать, если вам необходимо выполнить всю работу последовательно в одном потоке. Такой тип планировщика может быть полезен если необходимо выполнить потоконебезопасный код в отдельном потоке.
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
.subscribeOn(Schedulers.single());
Schedulers.trampoline()
Schedulers.trampoline() – выполняет код в текущем потоке, используется для последовательного выполнения задач
AndroidSchedulers.mainThread()
AndroidSchedulers.mainThread() – является частью rxAndroid и используется для выполнения задач в UI-потоке, например для модификации UI. Обычно используется вместе с observeOn
ExecutorService
Если ни один из вышеперечисленных видов планировщиков не подошел, то вы можете использовать собственный, создав его с помощью ExecutorService.
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Launcher {
public static void main(String[] args) {
int numberOfThreads = 20;
ExecutorService executor =
Executors.newFixedThreadPool(numberOfThreads);
Scheduler scheduler = Schedulers.from(executor);
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
.subscribeOn(scheduler)
.doFinally(executor::shutdown)
.subscribe(System.out::println);
}
}
Методы Schedulers.start() и Schedulers.shutdown()
Данные методы используются соответсвенно для инициализации планировщика и его очищения. Метод Schedulers.shutdown() перестанет выполнять любые задачи и для того, чтобы код снова выполнялся на отдельном потоке, необходимо реинициализировать планировщик с помощью Schedulers.start()
Теперь, рассмотрев разные виды планировщиков, вы можете эффективнее управлять потоками с помощью RxJava. Кстати у нас есть бесплатный базовый курс по RxJava3.0. Ну а если хотите овладеть RxJava профессионально на практике и применять данную библиотеку в своих Android-приложениях, то приглашаю пройти онлайн – интенсив по Android-разработке с наставником, где вы прокачаетесь до middle за 2 месяца
- 📌 Онлайн — интенсив по Android-разработке с code review
- 📌 Telegram — канал @android_school_ru где публикуются полезные материалы для Android

