Интенсив по RxJS
Интенсив по реактивному програмированию с библиотекой RxJS.
Набор на этот интенсив проходит раз в 1.5-2 месяца, стоимость 3500р (≈37$).
Вы можете запросить уведомление.
Цели интенсива
RxJS — библиотека для «Продвинутого управления событиями» и «Более мощная альтернатива промисам» в одном лице. Она позволяет удобно организовать работу с событиями и асинхронным кодом, а также писать сложную логику декларативно.
RxJS активно используется в фреймворке Angular, а также с Vue (Vue-rx) и, кроме того, лежит в основе реализации middleware для Redux (redux-observable) для React.
За время нашей встречи мы изучим концепцию реактивного программирования, с головой окунемся в реализацию RxJS и детально изучим его реальное использование. Научимся писать свои операторы, а также разберем как самые популярные базовые операторы, так и менее популярные, но не менее полезные. Научимся правильно обрабатывать ошибки, манипулировать потоками и тестировать их. И все это – на практических задачах!
Как организовано обучение?
Онлайн-интенсив на один день
Интенсив длится один день, приблизительно 8 часов с перерывом на обед и небольшими, опциональными перерывами между блоками.
Занятие проходит в формате вебинара, запись которого доступна через 30 минут после окончания.
Общение
Специально для каждого потока создается групповой чат для общения участников и вопросов преподавателю. Он доступен как во время интенсива, так после. Конечно, во время интенсива возможно общение голосом.
Результат
Вы понимаете концепцию реактивного программирования.
Вы знаете библиотеку RxJS, свободно разрабатываете и отлаживаете программы.
Вы можете гибко управлять асинхронностью в ваших приложениях.
У вас достаточно знаний для применения RxJS с фреймворками (Angular, React, Vue и других) и Node.js.
Сертификат
По окончанию курсов вы получаете сертификат в электронном виде на русском и английском языках.
Хотя сертификатам в нашей профессии обычно не придают значения. Главное — знания и умения, которые вы получите, если будете полноценно участвовать в интенсиве.
![]()
Программа интенсива
Реактивное программирование, паттерн ReactiveX и библиотека RxJS
Разбираем проблемы других подходов для работы с асинхронным кодом в JavaScript и в каких случаях использовать ReactiveX паттерн для работы с асинхронным кодом.
Изучаем с структуру данных Observable и его API.
Учимся грамотно создавать Observable и классифицировать его:
- Конечные и бесконечные.
- Горячие и холодные.
Разбираем базовые функции для создания потоков.
Операторы RxJS
Разбираемся, что такое «оператор RxJS» и учимся писать собственные операторы.
Разбираемся как визуализировать Observable, используя «мраморные» (marble) диаграммы.
Знакомимся с базовыми операторами и учимся применять их для решения типовых задач.
Знакомимся с HOO (Hight order observable) и операторами высшего порядка, разбираем их особенности и применяем для решения типовых задач.
Учимся комибинировать существующие операторы RxJS для создания новых.
Ошибки
Изучаем способы отлова ошибок и их обработки.
Рассматриваем важность расположения операторов RxJS для обработки ошибок.
Контролируемыe Observable (Subject)
Разбираем новую структуру данных Subject и его подвиды.
Изучаем механизм мультикастинга и его применение для решения типовых зачач.
Учимся применять потоки для коммуникации между компонентами.
Виртуальное распределение (Schedulers)
Разбираем синхронность и асинхронность потоков и операторов.
Рассматриваем основные типы виртуального распределения и их особенности.
Изучаем способы изменения виртуального распределения через операторы RxJS.
Тестирование
Знакомимся с TestScheduler — механизмом для тестирования потоков и его синтаксисом.
Тестируем Observable, используя «мраморные» (marble) диаграммы и TestScheduler.
Предварительные требования
- Знакомство с HTML/CSS: верстать макеты не понадобится, но основные теги, позиционирование, margin/padding надо знать.
- Опыт JavaScript с использованием ООП от 1 года (не только HTML/CSS) или пройденный курс JavaScript для новичков.
- Интернет 256кб/с или быстрее для видео.
Опыт требуется не просто так: новые подходы в работе с асинхронным кодом и реактивное программирование имеет смысл изучать, когда JavaScript сам по себе давно знаком. Кроме того, зная, что у вас уже есть опыт в программировании, мы можем сосредоточиться именно на особенностях реактивного программирования (RxJS) и, тем самым, успеть больше.
Преподаватель

Егор Сидоров Ведёт курс с 14 мая 2022
Front-end | Angular разработчик в Tinkoff, занимаюсь разработкой чата для обслуживания и CMS для контроля качества. Обучаю стажёров.
Паралельно провожу собеседования по Angular. Также обучаю разработке в финтехе.
Что говорят о курсе участники?
Мы занимаемся обучением с 2007 года. За это время у нас обучились тысячи разработчиков из разных стран и компаний.
Все отзывы являются честными. Мы не модерируем их.
Angular и RxJS¶
RxJS — это библиотека, реализующая принципы реактивного программирования для JavaScript. Основанная на объектах типа Observable , она упрощает написание и контроль асинхронного и событийного кода.
Подробно об RxJS
Observable¶
Observables обрабатывают любой поток данных: примитивные типы, инициированные пользователем события, ответы сервера на HTTP-запросы, синхронный и асинхронный код.
Именно благодаря всему перечисленному библиотека нашла широкое применение в Angular.
Принцип работы объектов RxJS Observable можно сравнить с push-уведомлениями.
Так, объект выступает в качестве поставщика данных, который имеет обработчики поставляемых данных. Обработчики выполняют роль пользователей, реагирующих на отправку поставщиком данных.
При создании Observable конструктор класса принимает функцию с набором callback-функций в качестве аргумента. В переданной функции описывается логика обработки и возвращения значений.
Объект, принимаемый функцией, реализует интерфейс с тремя методами:
- next() — принимает значение, которое будет возвращено функции-обработчику;
- error() — принимает значение, возвращаемое функции-обработчику ошибок;
- complete() — вызывается для уведомления «подписчиков» об окончании рассылки.
Для обработки поставляемых данных используется метод subscribe() , который принимает три функции: next , error и complete — для каждого из методов объекта конструктора.
Создание и использование объекта типа Observable .
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
const values = new Observable((observer) => observer.next(8); observer.next(9); const handler = () => console.log('Click detected'); >; document.addEventListener('click', handler); observer.complete(); return unsubscribe() console.log('Unsubscribed'); document.removeEventListener('click', handler); >, >; >); const subscription = values.subscribe( (v) => console.log(v); >, (error) => console.log(error); >, () => console.log('Completed'); > ); subscription.unsubscribe();
Здесь принимаемый функцией объект observer сначала используется для передачи значений (метод next() ), а затем для оповещения всех подписчиков об окончании рассылки (метод complete() ).
Принимаемая конструктором функция возвращает метод unsubscribe() , который вызывается для «ручной» отмены подписки. Обычно используется для освобождения занятых ресурсов (удаление таймеров или обработчиков событий).
Создавать собственные объекты Observable приходится не так уж часто, поскольку большинство потенциально необходимого функционала, где можно было бы это применить, реализовано самим Angular.
Для создания, обработки и преобразования возвращаемых данных в RxJS предусмотрены специальные функции, называемые операторами. Например, оператор of используется для преобразования примитивных типов в объект Observable :
1 2 3 4 5 6 7 8
import Observable > from 'rxjs'; import of > from 'rxjs'; const values = of([1, 2, 3]); values.subscribe((value) => console.log(value); >);
Оператор of() — более краткая запись следующего кода:
1 2 3
const values = new Observable((observer) => observer.next([1, 2, 3]); >);
Если необходимо, чтобы обработчик вместо всего массива сразу получал каждый его элемент в отдельности, используйте оператор from .
Начиная с версии Angular 5, импорт всех RxJS операторов осуществляется точно также, как и других сущностей Angular.
Теперь рассмотрим пример преобразования данных с использованием оператора map .
1 2 3 4 5
const values = Observable.of(1, 2, 3); values.pipe(map((number) => number * 2)).subscribe((v) => console.log(v); >);
Здесь map() умножает каждое значение элемента массива на 2.
Все операторы преобразования данных объявляются в методе pipe() через запятую. Сам метод pipe() импортировать не надо и он должен быть вызван перед методом subscribe() .
Полный список RxJS операторов, разделенных по категориям, можно изучить на официальном сайте.
Subject¶
В некоторых случаях целесообразнее использовать объекты типа Subject . Тип Subject — разновидность RxJS Observable, который может доставлять данные сразу нескольким подписчикам.
1 2 3 4 5 6 7 8 9 10
let subject = new Subject(); subject.subscribe((v) => console.log('Observer 1: ' + v); >); subject.subscribe((v) => console.log('Observer 2: ' + v); >); subject.next(9);
В примере выше для объекта типа Subject регистрируются два подписчика. Далее для передачи значения и вызова функций-обработчиков подписчиков используется метод next() .
В результате в консоль будет выведено две строки:
Observer 1: 9 Observer 2: 9
RxJS Subject в свою очередь также имеет разновидности.
BehaviorSubject — передает новому подписчику последнее значение, в качестве аргумента принимает начальное значение.
1 2 3 4 5 6 7 8 9 10 11
let behaviorSubject = new BehaviorSubjectNumber>(3); behaviorSubject.subscribe((v) => console.log('Observer with value of 3: ' + v); >); behaviorSubject.next(9); behaviorSubject.subscribe((v) => console.log('Observer with value of 9: ' + v); >);
ReplaySubject — передает новому подписчику все предыдущие значения, принимаемый параметр — количество предыдущих значений.
1 2 3 4 5 6 7 8 9 10
let replaySubject = new ReplaySubjectNumber>(2); replaySubject.next(3); replaySubject.next(6); replaySubject.next(9); replaySubject.next(12); replaySubject.subscribe((value) => console.log('ReplaySubject: ' + value); >);
AsyncSubject — передает новому подписчику последнее значение, но только после того, как будет вызван метод complete() .
1 2 3 4 5 6 7 8 9 10 11
let asyncSubject = new AsyncSubjectNumber>(3); asyncSubject.subscribe((value) => console.log('AsyncSubject: ' + value); >); asyncSubject.next(3); asyncSubject.next(6); asyncSubject.next(9); asyncSubject.complete();
Ссылки¶
- The RxJS library
- Observables in Angular
Rxjs что это
Методы класса HttpClient после выполнения запроса возвращают объект Observable , который определен в библиотеке RxJS («Reactive Extensions»). Она не является непосредственно частью Angular, однако широко используется особенно при взаимодействии с сервером по http. Эта библиотека реализует паттерн «асинхронный наблюдатель» (asynchronous observable). Так, выполнение запроса к серверу с помощью класса HttpClient выполняются в асинхронном режиме.
Естественно чтобы задействовать функционал RxJS в приложении, в проект должна быть добавлена соответствующая зависимость «rxjs»:
< "name": "helloapp", "version": "1.0.0", "description": "First Angular 16 Project", "author": "Eugene Popov ", "scripts": < "ng": "ng", "start": "ng serve", "build": "ng build" >, "dependencies": < "rxjs": "7.5.0", // остальное содержимое секции >, "devDependencies": < // содержимое секции >>
Используя специальные методы для объекта Observable, например, map и filter , можно произвести некоторую постобработку полученных от сервера результатов.
Так, возьмем проект из прошлой темы:

Например, определим в файле users.json данные, которые напрямую не соответствуют массиву объектов User:
В качестве модели данных используем класс User:
export class User < constructor(public name:string, public age:number)<>>
То есть в данном случае у нас нет соответствия по именам свойствам: name — username и age — userage.
Определим следующий код сервиса, который будет получать данные из users.json:
import from '@angular/core'; import from '@angular/common/http'; import from './user'; import from 'rxjs'; import < map >from 'rxjs/operators'; @Injectable() export class HttpService < constructor(private http: HttpClient)< >getUsers() : Observable< return this.http.get('assets/users.json').pipe(map((data:any)=>< let usersList = data["userList"]; return usersList.map(function(user: any): User < return new User(user.userName, user.userAge); >); >)); > >
Смысл использования специального сервиса для работы с http заключается в сокрытии деталей отправки запросов. Компонент же ожидает получить какие-то конкретные данные, например, в виде набора объектов User. С помощью метода map библиотеки rxjs можно преобразовать данные из одного формата в другой.
У результата метода get() мы можем вызвать метод pipe() , который позволяет обработать результаты запроса. Для этого метод pipe в качестве первого параметра принимает функцию обработки данных запроса. В данном случае в роли такой функции выступает оператор map , который преобразует результаты запроса в новые объекты.
Но чтобы использовать элементы библиотеки RxJS, их надо импортировать:
import from 'rxjs'; import < map >from 'rxjs/operators';
В итоге весь метод getUsers() возвращает объект Observable
Теперь используем сервис в классе компонента:
RxJS с нуля, обзор “Обозреваемого”
Приветствую! В данной статье мы рассмотрим 6-ю версию библиотеки RxJS, которая позволяет применить подход реактивного программирования в JavaScript. Главное отличие от предыдущих версий это pipe-операторы. Но ключевая особенность — улучшение производительности и модульности. Итак, начнем!
RxJS — это библиотека для JS, которая использует паттерн Observable (с англ. “Обозреваемый” / “Наблюдаемый”) для упрощения обработки и компановки асинхронного или callback кода.
Для того, чтобы начать использовать RxJS в вашем коде необходимо выполнить две вещи: установить при помощи npm и подключить.
$ npm install --save rxjs// внутри вашего компонента
import < Observable >from 'rxjs';
Observable.create — функция, которая возвращает другую функцию, которая будет вызвана в момент отписки (в т.ч. после ошибок и завершения, т.е. когда Observable разрушается). Внутри возвращаемой функции должна производиться очистка всего того, что было использовано (например, таймеры).
const source = Observable.create((observer) => let count = 0;
console.log('Observable created');
const timer = setInterval(() => observer.next(count);
count++;
>, 1000);
return () => console.log('Observable destroyed');
clearInterval(timer);
>
>);
При подписке необходимо передать функции-обработчики для поступающих значений, ошибки и завершения Observable.
const subscription = source.subscribe(
val => console.log('next:', val),
err => console.error('error:', err),
() => console.log('completed')
);// не забываем отписаваться
setTimeout(() => subscription.unsubscribe(), 4500);
Также, вместо observer.next(count); можно бросать (для наглядности можно представить, что в трубу) не значение, а ошибку через observable.error(‘Damn!’); . Давайте кинем три значения, а на четвертый раз выкинем ошибку.
Кроме того, сама “труба” может решить, что выкинутых событий достаточно и сообщить нам о завершении вызовом observer.complete(); .
Помимо создания через Observable.create() существует целое множество функций, при помощи которых можно получить Observable. Рассмотрим некоторые из них. (Для простоты обработки можно передавать не все обработчики при подписке).
Первым рассмотрим оператор of для создания Observable. Значение, которое было передано в качестве аргумента, будет “прокинуто в трубу” для дальнейшей обработки.
import < of >from 'rxjs';source = of();subscription = source.subscribe(
val => console.log('next:', val),
err => console.error('error:', err),
() => console.log('completed')
);
Если же мы хотим прокинуть несколько значений раздельно, то стоит воспользоваться оператором from в который необходимо передать массив объектов, каждый из которых будет обработан отдельно. Для уменьшения кода будем считать, что подписка аналогична.
import < from >from 'rxjs';source = from([10, 20, 30]);
Кроме методов для создания обозревателей, которые бросают какие-либо значения, существуют и другие. Например можно сделать совершенно пустой обозреватель, который тут же завершится, не выкинув ни одного значения, при помощи константы EMPTY . Либо можно сделать Observable, который никогда не завершится и абсолютно ничего не сделает, используя константу NEVER .
import < EMPTY >from 'rxjs';source = EMPTY;
import < NEVER >from 'rxjs';source = NEVER;
// При подписке ничего не произойдет, т.к. не будет брошено значений // и ошибок, а также не будет завершения.
Но еще можно сразу выбрасывать ошибку через throwError .
import < throwError >from 'rxjs';source = throwError('Damn!');
А что если надо не просто словить ошибку конечным подписчиком и завершить работу, а навесить дополнительную обработку внутри “трубы”, чтобы нормально продолжить выполнение процесса / задачи? С этим помогут pipe функции для обработки ошибок (все они вызываются внутри pipe секции, в которой и навешиваются дополнительные возможности перед непосредственно подпиской).
Для начала рассмотрим метод catchError , который уже импортируется из rxjs/operators (как и любые другие “обвесы” на наш Observable). Этот оператор позволяет словить ошибку в середине потока и корректно обработать ее (пробросить ошибку далее, где она будет обработана, либо послать далее безопасное значение, которое уже может быть принято).
import < throwError, of >from 'rxjs';
import < catchError >from 'rxjs/operators';source = throwError('Damn');subscription = source.pipe(
catchError(err => console.log('catch:', err);
return of('safety result');
>)
)
.subscribe(
val => console.log('next:', val),
err => console.error('error:', err),
() => console.log('completed')
);
Предположим другой вариант, когда мы знаем, что на сервере возможна какая-то неполадка или любая другая причина, по которой необходимо попробовать получить данные несколько раз. Для этого стоит использовать оператор retry , который при возникновении ошибки попробует переподписаться на Observable столько раз, сколько указано в аргументе. Если же и после этого количества попыток ошибка останется, то она будет брошена далее для обработки.
import < Observable >from 'rxjs';
import < retry >from 'rxjs/operators';source = Observable.create(observer => console.log('Next attempt');
observer.error('Damn!');
>);subscription = source.pipe(
retry(2)
)
.subscribe(
val => console.log('next:', val),
err => console.error('handle error:', err),
() => console.log('completed')
);
retryWhen — функция, которая принимает на вход поток ошибок, который можно обработать. Также оператор применим для асинхронного поведения, например, переподписка через секунду, либо health-check сервера и т.д. (именно асинхронные действия). На выход callback-функция должна так же возвращать Observable, который будет отвечать за момент переподписки на исходный Observable (когда придет новое значение, тогда переподпишется).
import < Observable, throwError >from 'rxjs';
import < retryWhen >from 'rxjs/operators';flag = true;source = Observable.create(observer => console.log('Fake server call');
if (flag) observer.error('Damn');
flag = false;
> else observer.next('success');
>
>);obs = new Observable(observer => console.log('let wait a second');
setTimeout(() => observer.next('any'), 1000);
>);subscription = source.pipe(
retryWhen(err$ => obs)
)
.subscribe( val => console.log('next:', val) );
Также при возникновении ошибки можно начать получать данные из другого потока (“трубы”), просто переподписавшись на запасной Observable при помощи оператора onErrorResumeNext .
import < Observable, of >from 'rxjs';
import < onErrorResumeNext >from 'rxjs/operators';source = Observable.create(observer => console.log('Attempt');
observer.error('Damn!');
>);planB = of('Nice solution ;)');subscription = source.pipe(
onErrorResumeNext(planB)
)
.subscribe(
val => console.log('next:', val),
err => console.error('handle error:', err),
() => console.log('completed')
);
Стоит всегда помнить, что программы состоят не только из успешных сценариев, особенно Web-приложения. Необходимо обработать возможные ошибки.