Codementor Events

So how does RxJS QueueScheduler actually work?

Published Jan 01, 2019Last updated Jun 29, 2019

And how it can help me to prevent re-entry bug (which causes stack-overflow) + implement breadth-first approach in stream combination operators like combineLatest.


Queue-ing candies

Prerequisites: you should have good knowledge about RxJS, Observables and Schedulers.

In 95 percent of tasks you don’t have to use Schedulers. But in some tricky cases knowledge how Schedulers work may be crucial. I still think Schedulers are purely documented part of RxJS. And in this article I will try to fill in some info-gaps of this topic.

If we take a look at Schedulers definition in scheduler.md of rx.js github repo, then we will find out that:

A scheduler controls when a subscription starts and when notifications are delivered.

What does it mean — when notifications are delivered?

In means that we can deliver event from observable to subscriber in same mAcrotask, in mIcrotask(that happens just after current mAcrotask) or we scheduler delivery to be done in separate mAcrotask (so we put task to the event loop queue).

All the possibilities are listed in this table:


Schedulers

I didn’t listed VirtualTimeScheduler and TestScheduler because this post is not about them.

Examples:

//value will be emitted at once — synchronously
of(1).subscribe(console.log)

**//value will be emitted at once — synchronously **
of(1, queueScheduler).subscribe(console.log)

**// value will be emitted in mIcrotask just after current mAcrotask **
of(1, asapScheduler).subscribe(console.log)

**// value will be emitted in another mAcrotask **
of(1, asyncScheduler).subscribe(console.log)

**// value will be emitted in another mAcrotask just before browser repaint **
of(1, animationFrameScheduler).subscribe(console.log)

As you can see we have null and queueScheduler both emitting data in same mAcrotask. So what is the difference between them?

Some factory functions like of or range if no scheduler is defined just uses ‘for’ or ‘do-while’ loop to emit all the values synchronously. So they don’t use schedulers at all.

If we use queueScheduler — then rx.js just adds all values to scheduler queue and then flush it. It happens in same macrotask on subscription event.
For the customer who uses rx.js there is no context difference.
Here the code from range.ts:


rx.js range.ts

And of operator uses subscribeToArray:


‘of’ by default (with no Scheduler) calls ‘fromArray’ which uses subscribeToArray.

Is there any difference then?

To see when null and queue scheduler may behave differently lets consider this example from stackOverflow:

const a$ = of(1, 2);  
const b$ = of(10);
const c$ = combineLatest(a$, b$, (a,b)=>a+b);
c$.subscribe(console.log);

The output you may expect:
11
12
Actual output:
12

Here is a codepen so you can run it too.

combineLatest combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observables. It starts emitting if argument observervables (a$ and b$ in our case) have at least one value. If it has resultSelector function (here it is (a,b)=>a+b ) — then it will emit its output (a+b)

What we may expect, that combineLatest subscribes to a,gets1,thensubscribestob** , gets 1, then subscribes to **b , gets 10 and emits 1+10=11. And then a$ will produce 2, so combineLatest observable will emit 2+10=12.

But since no scheduler is fed to ‘of’ function so it works not like we may expect: combineLatest subscribes to **a** and since ‘ **of’** works with ‘ **for** ’ loop — a emits 1 and 2 at once. Then combineLatest subscribes to b$ and gets 10. Now it takes last values and feed them to selectorFunction, so 2+10=12, so 12 is emitted.

It is called depth-first approach but can we somehow make it breadth-first? So first all argument-observable can emit its first value and so on and so forth?

Yes, with queueScheduler!

const a$ = of(1, 2, queueScheduler);
const b$ = of(10, queueScheduler);
const c$ = combineLatest(a$, b$, (a,b)=>a+b, queueScheduler);
c$.subscribe(console.log);

Output:
11
12

If you are curious how it works — in article appendix I will put animated gif with combineLatest+queueScheduler debugging session record.


One more example:

let Rx = window[‘rxjs’];  
const {Subject, queueScheduler} = Rx;  
const {take, observeOn} = Rx.operators;  
console.clear();

const signal = new Subject();  
let count = 0;

const somecalculations = (count) => console.log(‘do some calculations with ‘, count);

console.log(‘Start’);  
signal.pipe(take(1600))
 .subscribe(() => {  
    somecalculations** (count);  
    signal.next(count++);  
  });


signal.next(count++);  
console.log(‘Stop’);

You can try this code in a codepen as well.

Here we use Subject named signal to recursively make some calculations.

After calculations next recursive call is initiated with signal.next.

Since Subject.next method works in synchronous way - what we have here is recursive calls of subscriber handler.

And if number of calls is small -it works good. But if we make many calls recursively — we will get Stack overflow problem.

Can we somehow fix it?

Yes, with queueScheduler! We will apply it to signal Subject with observeOn operator.

console.log(‘Start’);
signal.pipe(take(1600), observeOn(queueScheduler))
 .subscribe(() => {
  somecalculations (count);
 signal.next(count++);
});

queueScheduler will replace recursive calls with iterative ones. All calculations will be done during same mAcrotask.

If you are interested, how queueScheduler is doing that under the hood: here is my answer on stackoverflow.com with more debugging details. And also I will provide animated gif with recorded debugging session just below the article.

So time to wrap up:

When do we need queueScheduler? : In case you need to replace recursive subscription calls with iterative runs. And if you need some combining operators (like combineLatest) to work in breadth-first manner.

Hope you enjoyed the article. Please leave comments with your personal use-cases of rx.js schedulers!

Packpub.com and me prepared a whole rx.js review course with many other details of how you can solve you every-day developers tasks with this amazing library. Take a look!

APPENDIX 1. combineLatest with queueScheduler debugging session:

APPENDIX 2. Subject.next with queueScheduler debugging session:

Discover and read more posts from Oleksandr Poshtaruk
get started
post commentsBe the first to share your opinion
Show more replies