fbpx

Blog

Виды Schedulers в RxJava2.0

Безусловно, одним из преимуществ использования RxJava в Android – это легкий способ менять потоки выполнения для различных задач, занимающих много времени. Такие задачи как: сетевой запрос к серверу, парсинг JSON, маппинг данных из VO в DTO, сохранение данных в БД или на диск довольно ресурсоёмки и занимают много времени, и, чтобы сделать приложение более отзывчивым и приятным для пользователя, необходимо выносить всю “тяжёлую” работу для обработки в отдельный поток. Для этого в RxJava есть обширный набор, так называемых Шедулеров (Schedulers), которые позволяют обрабатывать данные в других потоках. Как ими пользоваться и какие между ними отличия – поговорим в этой статье.

Schedulers.io()

Schedulers.io() – наиболее распространенный вид scheduler’a. Этот тип используется для сетевых запросов, операций чтения данных из файла, записи или чтения данных из БД. Такой планировщик (scheduler) создаёт необходимое количество потоков для эффективного выполнения вашей задачи, удаляет ненужные, и старается переиспользовать потоки, которые могут понадобится. Важно осозновать, что количество созданных потоков не ограничено, соответственно есть опасность создания большого количества потоков и нехватки памяти. Неиспользуемый threads (потоки) очищаются после 60 секунд неактивности.

 Database db = Database.from(conn);

 Observable customerNames = 
     db.select("SELECT NAME FROM CUSTOMER")
         .getAs(String.class)
         .subscribeOn(Schedulers.io());

Schedulers.computation()

Schedulers.computation() – этот вид планировщика используется для математичсеких вычислений.Такой тип планировщика удобен тем, что создает количество потоков, соответствующее числу ядер в процессоре для параллельной обработки.Если вы не знаете какой тип планировщика вам подходит, то можете использовать Schedulers.computation() в качестве дефолтного. Многие операторы RxJava используют по умолчанию как раз Schedulers.computation()

Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
    .subscribeOn(Schedulers.computation());

Schedulers.newThread()

Schedulers.newThread() – создаёт новый поток для каждого активного Observable, не пытаясь переиспользовать ранее созданные потоки. Важно понимать, что в сложных задачах количество созданных threads не ограничено, и можно “уронить” приложение из-за нехватки памяти.

Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
  .subscribeOn(Schedulers.newThread());

Schedulers.single()

Schedulers.single() – такой планировщик можно рассматривать, если вам необходимо выполнить всю работу последовательно в одном потоке. Такой тип планировщика может быть полезен если необходимо выполнить потоконебезопасный код в отдельном потоке.

Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
 .subscribeOn(Schedulers.single());

Schedulers.trampoline()

Schedulers.trampoline() – выполняет код в текущем потоке, используется для последовательного выполнения задач

AndroidSchedulers.mainThread()

AndroidSchedulers.mainThread() – является частью rxAndroid и используется для выполнения задач в UI-потоке, например для модификации UI. Обычно используется вместе с observeOn

ExecutorService

Если ни один из вышеперечисленных видов планировщиков не подошел, то вы можете использовать собственный, создав его с помощью ExecutorService.

import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Launcher {

    public static void main(String[] args) {

        int numberOfThreads = 20;

        ExecutorService executor =
                Executors.newFixedThreadPool(numberOfThreads);

        Scheduler scheduler = Schedulers.from(executor);

        Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                .subscribeOn(scheduler)
                .doFinally(executor::shutdown)
                .subscribe(System.out::println);
    }
}

Методы Schedulers.start() и Schedulers.shutdown()

Данные методы используются соответсвенно для инициализации планировщика и его очищения. Метод Schedulers.shutdown() перестанет выполнять любые задачи и для того, чтобы код снова выполнялся на отдельном потоке, необходимо реинициализировать планировщик с помощью Schedulers.start()

Теперь, рассмотрев разные виды планировщиков, вы можете эффективнее управлять потоками с помощью RxJava. Ну а если хотите овладеть RxJava профессионально и применять данную библиотеку в своих Android-приложениях, рекомендую курс на Stepic по реактивному программированию