В этой статье мы рассмотрим основные принципы и механизмы разработки приложений с использованием методик реактивного программирования. Эта парадигма программирования появилась относительно недавно и ориентирована на потоки данных и распространение изменений. Вкратце, это способ написания программного кода, основанного на реакциях на определенные события, возникающие внутри приложения. При этом мы должны четко разделять данные на динамические и статические, и наша модель должна автоматически распространять изменения динамических данных при помощи потоков. Реактивное программирование предполагалось как легкий путь для создания пользовательских интерфейсов, анимации и любых других процессов, изменяющихся со временем. Реактивное программирование может быть осуществлено несколькими подходами:
- императивным программированием;
- объектно-ориентированным;
- функциональным.
Однако наиболее естественным базисом для реактивного программирования и работой с с реактивными структурами данных является функциональный подход.
Наиболее популярной библиотекой, использующей данный подход, является RxJS, которая была взята за основу такого фреймворка как Angular.
RxJS – это Javascript библиотека для трансформации, составления и извлечения асинхронных потоков данных. Она может быть использована как в браузере, так и на стороне сервера.
Итак, что же такое реактивное программирование и что значит «мыслить реактивно»? Это такая парадигма, при которой написание программного кода предполагает работу с асинхронным потоком данных и описание логики реакции на их изменения в отличие от императивного подхода, где мы явно обслуживаем их изменение.
При реактивном подходе, любой перечисляемый или композитный тип данных, как например список, рассматривается как поток и может быть представлен следующим образом:
Любой поток на временной шкале имеет начало, набор последовательно испускаемых значений и конец, обозначаемый вертикальной линией. Таким образом поток генерирует нам события, упорядоченные во времени. Этими событиями может быть что угодно: клик на кнопке, http-запрос на сервер, данные, приходящие через сокет-соединение, анимация и т.д.
И мы имеем возможность прослушивать такой поток и соответственно реагировать на эти события. Используя обычные функции, мы можем комбинировать, изменять и фильтровать такие потоки.
Поток может испускать три вида событий:
- значение;
- ошибка;
- завершение.
И наша задача их перехватить и описать реакцию на эти три вида асинхронных (или синхронных) событий. При этом мы будем работать с несколькими программными сущностями, первая из которых – наблюдаемость.
Наблюдаемость (Observable) – это обычная функция с несколькими специфичными для нее характеристиками.
В качестве параметра она принимает объект-наблюдатель (observer). Этот наблюдатель описывает 3 метода:
- Next – получение очередного значения из потока;
- Error – ошибка;
- Complete – событие завершения потока.
Наблюдаемость обеспечивает механизм передачи сообщений между тем, кто генерирует данные и тем кто их использует.
При этом существует 2 вида стратегии поведения между производителем и потребителем данных.
При первой (pull-стратегии) место и время реакции на изменение или получение данных определяется потребителем, при второй (push-стратегии) – производителем.
Примером pull-стратегии является обычный вызов функции из того места программы, где требуются те данные которые она возвращает, при этом сама функция ничего не знает о том, кто и откуда ее будет «дергать».
При push-стратегии – наоборот, функция-производитель как бы «обвязывается» обработчиками (потребителями), которые ничего не знают о том, где и когда эти данные будут сгенерированы.
RxJS использует вторую push-стратегию.
Наблюдаемость ленива. Она не начинает испускать значений, пока кто-то не подпишется на нее функцией subscribe
в отличие от промисов.
Метод subscribe() возвращает объект типа подписчик (subscription), который в себе имеет функцию unsubscribe()
, отписывающую его от наблюдаемости и отменяющей дальнейшую генерацию событий, чего также нет у промисов.
Observable-объект подобен функции с отсутствием аргументов, но способной принимать и возвращать множество значений в отличие от обычных функций.
Библиотека RjJS предоставляет ряд инструментов, которые облегчают создание наблюдаемостей из разных элементов системы, таких как события, промисы, таймеры и т.д. Приведем ряд примеров создания Observable-объектов из различных источников.
Создание при помощи конструктора.
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
const id = setInterval(() => {
subscriber.next('hi')
}, 1000);
})
Чаще всего объекты Observable создаются при помощи специальных функций-фабрик или операторов (of, from, interval и т.д.), которые возвращают объекты Observable из переданных им аргументов. При подписке на них мы передаем функцию-обработчик, которая принимает генерируемые потоком данные и обрабатывает их.
Операторы RxJS.
Операторы импортируются из пакета rxjs и служат для создания, управления и изменения Observable-объектами. Существует два вида операторов: потоковые (pipeable) и созидательные (creation). Рассмотрим самые широко-используемые из них.
Функции of, from, fromEvent.
Эти операторы относятся к созидательным и вызываются как самостоятельные функции для создания новых Observable-объектов исходя из переданных аргументов.
Следующий пример демонстрирует создание Observable-объекта из списка при помощи функции from c последующей подпиской и отпиской.
import { from } from 'rxjs';
const observable = from([10, 20, 30]);
const subscription = observable.subscribe(x => console.log(x));
subscription.unsubscribe()
В отличие от функции of(), функция from() принимает аргументы разных видов (промисы, списки и т.д), преобразовывая их (если необходимо) в объекты Observable, на которые можно подписаться. Функция of() принимает значения и возвращает их потоком без преобразования. Еще одно важное отличие в том, как эти аргументы обрабатываются.
of([1,2,3]).subscribe( x => {
console.log(x);
});
from([1,2,3]).subscribe( x => {
console.log(x);
})
В приведенном выше примере, в первом случае в поток зайдет весь массив, когда во втором будут последовательного сгенерированы его элементы. С помощью функции from мы можем сгенерировать Observable из промиса.
const promiseSource = from(new Promise(resolve => resolve('Hello World!')));
const subscribe = promiseSource.subscribe(val => console.log(val))
Функция fromEvent позволяет сгенерировать Observable из события по переданному источнику событий, например ссылке, кнопке или объекта типа EventEmitter.
В примере ниже мы создаем в шаблоне кнопку помечая ее якорем #mybutton, чтобы иметь возможность выбрать ее в компоненте при помощи декоратора ViewChild.
Затем создаем поток Observable по переданному элементу DOM.
import { Component, ViewChild, OnInit} from '@angular/core';
import { fromEvent } from 'rxjs';
@Component({
...
})
export class AppComponent implements OnInit{
@ViewChild('mybutton') button;
ngOnInit(){
fromEvent(this.button.nativeElement, 'click').subscribe(evt => {
console.log(evt);
}
);
Функции map
и filter
.
Функция map работает аналогично одноименному методу списка но оперирует Observable-объектом для генерации значений потока. К примеру, проход по массиву при помощи метода списка map
[1, 2, 3].map(x => x * x)
Может быть переписан с использованием Observable.
map(x => x * x)(of(1, 2, 3)).subscribe((v) => {...});
Оператор map() принимает функцию, которая последовательно применяется во всем эмитируемым значениям Observable потока-источника. Таким образом он используется для того, чтобы изменять элементы потока, в отличие от другого оператора filter(), который также принимает функцию, которую применяет к каждому элементу, но при этом не изменяет а возвращает неизмененный элемент потока в зависимости от того, соблюдается ли условие переданной ей функции. Таки образом, мы имеем возможность убрать (отфильтровывать) ненужные нам элементы из потока.
Следующий пример удалит из списка нечетный числа.
from([1, 2, 3, 4, 5]).pipe(filter(num => num % 2 === 0)).subscribe(x => console.log(x));
Так как оператор filter()
является потоковым (pipeable), мы используем его внутри функции pipe()
, о которой речь пойдет ниже.
Потоковые (pipeable) операторы служат для изменения потока, либо создания нового из существующего и используются внутри конструкции pipe.
observableInstance.pipe(operator())
К ним относятся такие операторы: filter(), mergeMap()
и switchMap()
. Когда они применяются, то не изменяют оригинальный Observable-объект, вместо этого, они возвращают новый Observable-объект, который содержит ту же логику подписки что и оригинал.
Используя оператор filter(), можно отфильтровывать ненужные или оставлять нужные элементы потока. Например в этом примере мы выбираем те объекты пользователей возраст которых больше или равен 30.
const source = from([{ name: 'Joe', age: 31 }, { name: 'Bob', age: 25 }]);
const example = source.pipe(filter(person => person.age >= 30));
const subscribe = example.subscribe(val => console.log(`Over 30: $`))
Операторы mergaMap()
и switchMap().
Данный оператор собирает данные из нескольких потоков (Observable объектов) в один поток. При этом эти потоки могут не завершаться и продолжать свое выполнение, что может привести к утечками. Оператор switchMap() так же собирает данные из нескольких потоков, но при этом оставляет активным (подписанным) только один – последний.
Оператор switchMap() полезен в тех сценариях, когда нам не важны данные запросов, которые поступили ранее самого последнего. Поэтому этот оператор будет отменять подписки всех ранее поступивших запросов.
Следующий пример обновляет счетчик после каждого клика по прошествию 1 сек.
fromEvent(this.button.nativeElement, 'click')
.pipe(
switchMap((e: any) => interval(1000))
)
.subscribe(e => console.log(e))
Выводы
В данной статье рассмотрены базовые принципы программирования на основе Observable-объектов с применением библиотеки RxJS. Раскрыта суть реактивного подхода к программированию и использование потоков данных для создания приложений, реагирующих на изменения их состояний. Освещена работа наиболее широко используемых операторов библиотеки.