• Skip to primary navigation
  • Skip to main content
  • Skip to primary sidebar
  • Skip to footer
Codemotion Magazine

Codemotion Magazine

We code the future. Together

  • Magazine
  • Dev Hub
    • Community Manager
    • CTO
    • DevOps Engineer
    • Backend Developer
    • Frontend Developer
    • Web Developer
    • Mobile Developer
    • Game Developer
    • Machine Learning Developer
    • Blockchain Developer
    • Designer – CXO
    • Big Data Analyst
    • Security Manager
    • Cloud Manager
  • Articles
    • Stories
    • Events
  • Sign In
Home » Dev Hub » Web Developer » The Magic behind RxJS
Web Developer

The Magic behind RxJS

RxJS is the Reactive Extensions for JavaScript language, which allows you to write reactive programs with a functional approach.

Last update December 10, 2019 by Lorenzo Franceschini

The Magic behind RxJS

Reinventing the wheel is often necessary in order to understand how a library works under the hood and to better learn something complex.

It is a very good option, contrary to popular belief, in the case of a developer wanting to improve his/her knowledge.

Max Gallo, at Codemotion Rome 2019, performed a live coding session, where he showed us how we can reverse engineer the RxJS library, the Reactive Extensions for JavaScript language which allows you to write reactive programs with a functional approach.

Max Gallo is an engineer working at DAZN, and he is experienced as a Full Stack Developer, iOS Developer and UI/UX Designer.

RxJS is a library with a clear API to work with both asynchronous and synchronous code, thanks to pipeline operators and using the concept of Observable.

The observer pattern is a software design pattern in which an object, called the subject, maintains a list of its dependents, called observers, and automatically notifies them of any state changes (usually by calling one of their methods). (Wikipedia)

What is the goal of this topic? A lot of people are afraid of Observable and RxJS library, so Max’s intention was to show us how we can build something like RxJS, of course, not all of the standard library, just some line of working demo code, and he did with a VI editor. Cool! We can now create custom operators.

An operator is simple a function, they are the horse-power behind observables, providing an elegant, declarative and common API to complex tasks in both async/sync world.

Max showing RxJS Library

If you’ve used RxJS before and want to better understand some internal aspects of the library and the magic behind Observables, as well as the operators, then this post is for you.

Firstly, Max began with showing us some RxJS code, how to create an Observable from an Array with some operators within the pipeline operator, introduced with the latest RxJS 6 release (it was actually already added in RxJS 5.5), in substitution of dot-chaining operators. Then, a step forward, he rewrote a couple of the most common function operators: from and map, fully compatible with RxJS. So we can now write our custom operators! Wow!

RxJS

 

Let’s explore this. The purpose of the library is to offer a wide range of operators to transform, manipulate and create a stream of data from a source such as an event from UI, from a XHR response or from some sort of data (an array, a collection or whatever you want to stream and observe).

A stream is a collection of future values and it’s called Subject or Observable; it can emit three different kinds of event with a value (object, array, primitive value), an error or a completed event when the full cycle of values are finished.

Observable is an Object with a subscribe method on it. This subscribe method takes the observer as an argument and is used to invoke the execution of the Observable. In the RxJS library, the subscribe method returns a reference to an unsubscribe, which allows us to spot listen to the values emitted by Observable.

Observables can either be asynchronous or synchronous, depending on the underlying implementation. An example of an asynchronous observable is one that wraps an XHR call or a timer function, while a stream of values comes from an array can be synchronous.

However, we capture these emitted events only asynchronously, with an Observer, by defining three parameters:

  1. a function for the value emitted is succeeded;
  2. a function when an error is emitted;
  3. a function when ‘completed’ is emitted.

Observer is a collection of callback functions, which reacts to the value arriving via Observable.

You can recap the relationship from Observable, Observer, Subscribe and Subscription with a type definition in TypeScript:

  Observable.subscribe(observer:Observer):Subscription;

So far, we have read what is an Observable and how to subscribed to them. With RxJS we can program in a functional way with asynchronous values, we can use Array’s map, filter or any other methods and operators to modify the original Observable.

Those operators are chainable and composable and to work with them we have a method on the Observable object named pipe. This pipe method takes single or multiple operators as an input and returns a new Observable.

Here is an example of using a filter and map operator in RxJS:

const { from } = require('rxjs');
const { map, filter } = require('rxjs/operators');

const observable = from([1, 2, 3, 4, 5])
.pipe(
map ( x => x + 1),
filter( x => x % 2 === 0),
map( x => x - 1)
);

observable.subscribe(
val => console.log('odd:', val),
error => console.error(error),
() => console.log('complete')
);

The syntax looks like very familiar to the functional world where we have:

  1. Explicit subscription with Observable.subscribe;
  2. A set of operators;
  3. Observable and Pipeline operators are library-specific today but can become language-specific in the future;

Reinventing RxJS

Let’s explore RxJS by starting writing own operator. We will start with from operator to generate a stream of values.

To be compliant with RxJS specification, take some initial data and return an object with the pipe function. The pipe method accepts a set of functions as parameters. We are going to use the rest operators from ES6 because we don’t know how many functions we will have. The output of the pipe is an Observable object with a subscribe method used for consuming it:

function from(initialData) {
    return {
    pipe(...pipeFunction) {
return {
                subscribe(onNext, onError, onComplete) {
                }
            }
        }
}
}

The subscribe method has an object called dataObservable with a subscribe method too, which takes the function that will be the function we pass to the subscribe method to take the emitted value. This innermost subscribe method emits the value with a forEach passing the next method, in this case a simple console.log function. At this time we don’t apply any pipe function, we do it early.

Before we move on and made our code more reusable, we must define a function called createObservable:

function createObservable(operator) {
return {
    subscribe(next) {
    operator(next);
    }
}
}
How Operators Works in RxJS

As we can see from the picture, RxJS can chain more than one operator, where we got an observable source and produce another observable for the next operator. So we have the previous, the current and the next observable. When we have to produce the next value, we want to apply a pipe function, so we need to define it inside our from function:

function from(initialData){
return {
        pipe: (...pipeFunctions) => {
return {
                subscribe: (onNext, onError, onComplete) => {
                const dataObservable = createObservable(x => initialData.forEach(x))

                    let currentObservable = dataObservable;

                    pipeFunctions.forEach(pipeFunc => {
                        currentObservable = pipeFunc(currentObservable)
})

                    currentObservable.subscribe(onNext);

                    onComplete();
                }
        }
}
}
}

Now we can define our custom map and filter:

function map(mapFunction){
return sourceObservable => {
const currentObservable = createObservable(destinationNext => {
sourceObservable.subscribe(value => {
const newValue = mapFunction(value)
destinationNext(newValue);
})
})
return currentObservable;
}
}

function filter(filterFunction){
return sourceObservable => {
const currentObservable = createObservable(destinationNext => {
sourceObservable.subscribe(value => {
if(filterFunction(value)){
destinationNext(value);
}
})
})
return currentObservable;
}
}

The filter function takes one parameter that returns a source observable, we construct the observable and subscribe to it where we need to check if my filter function of value is true and, if that is true, we get to the next destination with this value.

So to recap, I create an observable from my initial data, I pipe all my functions one with another, and at the end I subscribe to the final observable.

So far, all that we see is synchronous, we don’t talk about order and speed. How I can control the order and the speed of the emission of my value.

Scheduler in RxJS controls the order of event emissions and the speed of those event emissions.

In RxJS we have five kinds of schedulers available, three of them are very useful and common: asyncScheduler, queueScheduler and asapScheduler.

We can use Virtual Scheduler to control the speed of our Observable, especially in test ambient:

const { interval } = require('rxjs');
const { take, filter } = require('rxjs/operators');

const source = interval(1000)
    .pipe(take(3600));

source.subscribe(
    console.log,
    console.err,
() => console.log('--> Completed!'),
);

console.log('--> Subscribed!')

So, to better understand how something works under the hood, try to disassemble, simplify and challenge it by reinventing the wheel to learn – and please share it with us. Good luck!

Tagged as:Codemotion Rome JavaScript

Agora: interview with Vasile Pește
Previous Post
How Unity3D and Data Driven Modding Created an Award Winning Urban Survival Game
Next Post

Primary Sidebar

Whitepaper & Checklist: How to Organise an Online Tech Conference

To help community managers and companies like ours overcome the Covid-19 emergency we have decided to share our experience organizing our first large virtual conference. Learn how to organise your first online event thanks to our success story – and mistakes!

DOWNLOAD

Latest

What are the Main Areas of Development for Programmers to Land Their Dream Job? Codemotion

What are the Main Areas of Development for Programmers to Land Their Dream Job?

Backend Developer

How to Contribute to an Open-Source Project

How to Contribute to an Open-Source Project

Backend Developer

6 Great DevOps Metrics - and How to Choose the Right Metrics

6 Great DevOps Metrics – and How to Choose the Right Metrics

DevOps Engineer

Codemotion Interview with Chad Arimura

Thinking Like a Founder – meet Chad Arimura

CTO

DesignOps and UX Engineers

Move Over DevOps! Time for DesignOps and UX Engineers

Designer - CXO

Related articles

  • Why Should You Care to Learn TypeScript?
  • GraphQL Testing With Karate
  • Introducing a new and improved Twitter API
  • Speeding up innovation with Arun Gupta
  • 18 Books & Blogs Every Developer Should Read
  • Kick Off A React Project: CRA, Next.js or Gatsby?
  • Tips For Every Full-Stack Developer In 2020
  • Getting started with WebAssembly and Rust
  • Douglas Crockford and his book “How JavaScript works”
  • Shokunin of the Web

Subscribe to our platform

Subscribe

Share and learn. Launch and grow your Dev Community. Join thousands of developers like you and code the future. Together.

Footer

  • Learning
  • Magazine
  • Community
  • Events
  • Kids
  • How to use our platform
  • About Codemotion Magazine
  • Contact us
  • Become a contributor
  • How to become a CTO
  • How to run a meetup
  • Tools for virtual conferences

Follow us

  • Facebook
  • Twitter
  • LinkedIn
  • Instagram
  • YouTube
  • RSS

DOWNLOAD APP

© Copyright Codemotion srl Via Marsala, 29/H, 00185 Roma P.IVA 12392791005 | Privacy policy | Terms and conditions

  • Learning
  • Magazine
  • Community
  • Events
  • Kids
  • How to use our platform
  • About Codemotion Magazine
  • Contact us
  • Become a contributor
  • How to become a CTO
  • How to run a meetup
  • Tools for virtual conferences

Follow us

  • Facebook
  • Twitter
  • LinkedIn
  • Instagram
  • YouTube
  • RSS

DOWNLOAD APP

CONFERENCE CHECK-IN