fbpx

Blog

Виды Subjects в Android

Эта статья будет полезна для тех, кто уже знаком с реактивным программированием и, в частности с основами RxJava, и хочет углубиться в тему Subject ов.

Итак, возможно, у вас возникали ситуации, когда необходимо было в одном месте отправить какое-либо событие, а в другом месте подписаться на получение обновлений и событий. Более технически, в одном случае вам необходимо использовать нечто как Observer (наблюдатель), а в другом случае, это же нечто использовать как Observable (то есть как источник данных). Так вот, для таких ситуаций существуют Subjects. Сегодня мы рассмотрим разные виды Subjects, такие как PublishSubjects, BehaviorSubject, ReplaySubject, AsyncSubject, UnicastSubject. Если о первых 2-ух скорее всего вы слышали или даже использовали, то последние не так популярны и мало кто о них знает, давайте рассмотрим каждый из них с примерами.

PublishSubject

Самым простым видом Subject является PublishSubject.

import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
public class Launcher {
	public static void main(String[] args) {
		Subject<String> subject = PublishSubject.create();
		subject.map(String::length)
		.subscribe(System.out::println);

		subject.onNext("Alpha");
		subject.onNext("Beta");
		subject.onNext("Gamma");
		subject.onComplete();
	}
}

Результат будет

5
4
5

Соответственно, мы можем передавать в PublishSubject некие события и получать эти события в том месте, где мы подписались на Subject и применять операторы к ним.

BehaviorSubject

Этот вид Subject будет повторять последний переданный элемент/событие каждому новому подписчику.

import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.Subject;
public class Launcher {
	public static void main(String[] args) {

		Subject<String> subject = BehaviorSubject.create();
		subject.subscribe(s -> System.out.println("Observer 1: " +s));

		subject.onNext("Alpha");
		subject.onNext("Beta");
		subject.onNext("Gamma");
		subject.subscribe(s -> System.out.println("Observer 2: " +s));
	}
}

Результат

Observer 1: Alpha
Observer 1: Beta
Observer 1: Gamma
Observer 2: Gamma

Как мы видим, после подписки на subject в самом конце, мы получаем результат Gamma. (Observer 2: Gamma) Как раз потому, что BehaviorSubject отправляет последнее переданное ему значение. В этому случае это  subject.onNext(“Gamma”);

ReplaySubject

ReplaySubject похож на PublishSubject с добавлением оператора cache(). То есть он сохраняет все предыдущие значения для следующего Observer.

import io.reactivex.subjects.ReplaySubject;
import io.reactivex.subjects.Subject;
	public class Launcher {
	
		public static void main(String[] args) {
		Subject<String> subject = ReplaySubject.create();
		subject.subscribe(s -> System.out.println("Observer 1: " +s));

		subject.onNext("Alpha");
		subject.onNext("Beta");
		subject.onNext("Gamma");
		subject.onComplete();
                        // Здесь подписываемся еще раз
                       // и получаем все переданные события
		subject.subscribe(s -> System.out.println("Observer 2: " +s));
		}
}

Результат работы будет

Observer 1: Alpha
Observer 1: Beta
Observer 1: Gamma

Observer 2: Alpha
Observer 2: Beta
Observer 2: Gamma

Как мы видим, после подписки на subject (последняя строчка) мы получим все уже переданные ранее события сразу.

AsyncSubject

Этот вид subject будет передавать только последний элемент, предшествующий событию onComplete(). Внимание Такой вид subject нужно использовать только с не бесконечными источниками событий, так последний элемент передаётся только когда был вызван onComplete()

import io.reactivex.subjects.AsyncSubject;
import io.reactivex.subjects.Subject;

public class Launcher {
	public static void main(String[] args) {
		Subject<String> subject = AsyncSubject.create();
		subject.subscribe(
				s ->System.out.println("Observer 1: " + s),
				Throwable::printStackTrace,
				() -> System.out.println("Observer 1 done!")
			);
		
		subject.onNext("Alpha");
		subject.onNext("Beta");
		subject.onNext("Gamma");
		subject.onComplete();

		subject.subscribe(
				s ->System.out.println("Observer 2: " + s), 
				Throwable::printStackTrace,
				() -> System.out.println("Observer 2 done!")
			);
		}
}

Результат:

Observer 1: Gamma
Observer 1 done!

Observer 2: Gamma
Observer 2 done!

UnicastSubject

Этот вид subject сохраняет все переданные ему данные пока некий Observer подписан на него, и потом, отдаёт все полученные данные сразу и очищает кэш.

import io.reactivex.Observable;
import io.reactivex.subjects.ReplaySubject;
import io.reactivex.subjects.Subject;
import io.reactivex.subjects.UnicastSubject;
import java.util.concurrent.TimeUnit;
	public class Launcher {
		public static void main(String[] args) {

		Subject<String> subject = UnicastSubject.create();
		Observable.interval(300, TimeUnit.MILLISECONDS)
				 .map(l -> ((l + 1) * 300) + " milliseconds")
				 .subscribe(subject);

		sleep(2000);

		subject.subscribe(s -> System.out.println("Observer 1: " + s));
		sleep(2000);
	}
}

Результат

Observer 1: 300 milliseconds
Observer 1: 600 milliseconds
Observer 1: 900 milliseconds
Observer 1: 1200 milliseconds
Observer 1: 1500 milliseconds
Observer 1: 1800 milliseconds
Observer 1: 2100 milliseconds
Observer 1: 2400 milliseconds
Observer 1: 2700 milliseconds
Observer 1: 3000 milliseconds
Observer 1: 3300 milliseconds
Observer 1: 3600 milliseconds
Observer 1: 3900 milliseconds

После запуска этого кода, вы увидите, что первые 6 событий будут получены сразу же после 2 секунд ожидания

Когда использовать Subject?

Чаще всего использование Subject может пригодится, чтобы консолидировать несколько событий от разных Observable  в один поток данных, как будто это один Observable. Так как Subject – это Observer, то вы можете передать его в метод subscribe(). Например, у вас есть Pull-to-refresh на экране, и кнопка “Обновить”. Вы как программист, хотите рассматривать оба этих события как один поток данных. Таким образом, вы может создать один Subject, подписаться на него, и передавать экземпляр в любой метод subscribe, тем самым, создавая один поток событий и реагируя одинаково без дублирования кода.

Serializing Subjects

Важно помнить, что методы onNext(), onComplete() и onError() не являются безопасными с точки зрения многопоточности. Таким образом, чтобы не нарушить контракт Observable, который требует, чтобы события проходили последовательно, необходимо вызвать метод .toSerialized() чтобы Subject был безопасен при многопоточном программировании.

 Subject<String> subject = PublishSubject.<String>create().toSerialized();

На этом все, подписывайтесь на телеграм– канал.  Ну а если хотите овладеть RxJava профессионально и применять данную библиотеку в своих Android-приложениях, то приглашаю пройти онлайн – интенсив по Android-разработке с наставником, где вы прокачаетесь до middle за 2 месяца