Saturday 20 October 2018

Understanding Rxjs Observable

What is Reactive Programming?
Reactive programming is programming with asynchronous data streams.

Reactive Extensions for JavaScript (RxJS) is a reactive streams library that allows you to work with asynchronous data streams. RxJS can be used both in the browser or in the server-side using Node.js.

When version 2 of Angular came out, it introduced us to observables. The Observable isn’t an Angular specific feature, but a new standard for managing async data that will be included in the ES7 release. Angular uses observables extensively in the event system and the HTTP service.

What is Observable:
Observables can be defined as a streams of data whose items arrive asynchronously over time.
To use observables, Angular uses a third-party library called Reactive Extensions (RxJS). Observables are a proposed feature for ES 2016, the next version of JavaScript.


You can think of an observable as an array whose items arrive asynchronously over time. Observables help you manage asynchronous data, such as data coming from a backend service. Observables are used within Angular itself, including Angular’s event system and its http client service. To use observables, Angular uses a third-party library called Reactive Extensions (RxJS). 

An Observable by default is unicast. Unicasting means that each subscribed observer owns an independent execution of the Observable. To demonstrate this:

const observable = Rx.Observable.create((observer=> {
    observer.next(Math.random());
    });
    
    // subscription 1
    observable.subscribe((data=> {
      console.log(data); // 0.24957144215097515 (random number)
    });
    
    // subscription 2
    observable.subscribe((data=> {
       console.log(data); // 0.004617340049055896 (random number)
    });

Subject:
While Observables are unicast by design, this can be pretty annoying if you expect that each subscriber receives the same values. Subjects can help us overcome this issue.
Subjects can multicast. Multicasting basically means that one Observable execution is shared among multiple subscribers.

Subjects are like Event Emitters, they maintain a registry of many listeners. When calling subscribe on a Subject it does not invoke a new execution that delivers data. It simply registers the given Observer in a list of Observers.

So how to use Subjects to multicast
Multicasting is a characteristic of a Subject. You don’t have to do anything special to achieve this behavior's. This is a small multicast demonstration:

const subject = new Rx.Subject();

// subscriber 1
subject.subscribe((data) => {
    console.log(data); // 0.24957144215097515 (random number)
});

// subscriber 2
subject.subscribe((data) => {
    console.log(data); // 0.24957144215097515 (random number)
});

subject.next(Math.random());

Nice! Now I got two subscriptions getting the same data. This however is not all that Subjects can do.

Whereas Observables are solely data producers, Subjects can both be used as a data producer and a data consumer. By using Subjects as a data consumer you can use them to convert Observables from unicast to multicast. Here’s a demonstration of that:

const observable = Rx.Observable.create((observer) => {
    observer.next(Math.random());
});

const subject = new Rx.Subject();

// subscriber 1
subject.subscribe((data) => {
    console.log(data); // 0.24957144215097515 (random number)
});

// subscriber 2
subject.subscribe((data) => {
    console.log(data); // 0.24957144215097515 (random number)
});

observable.subscribe(subject);

We pass our Subject to the subscribe function and let it take the values that come out of the Observable (data consuming). All the subscribers to that Subject will then all immediately receive that value.

merge: Turn multiple observables into a single observable.
var source1 = Rx.Observable.interval(100)
    .timeInterval()
    .pluck('interval');
var source2 = Rx.Observable.interval(150)
    .timeInterval()
    .pluck('interval');

var source = Rx.Observable.merge(
    source1,
    source2)
    .take(5);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

// => Next: 100
// => Next: 150
// => Next: 100
// => Next: 150
// => Next: 100
// => Completed

When do we apply forkJoin?
We use it when API requests are independent. It means that they do not depend on each other to complete and can execute in parallel. 
When all observables complete, emit the last emitted value from each. This operator is best used when you have a group of observables and only care about the final emitted value of each. Example link

When do we apply mergeMap/flatMap?
flatMap is an alias for mergeMap. When we need data from the first API request to make requests to the second API. Example link

concat: Concatenates all of the specified observable sequences, as long as the previous observable sequence terminated successfully.

/* Using Observable sequences */
var source1 = Rx.Observable.return(42);
var source2 = Rx.Observable.return(56);

var source = Rx.Observable.concat(source1, source2);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

// => Next: 42
// => Next: 56
// => Completed

empty: Returns an empty observable sequence, using the specified scheduler to send out the single OnCompleted message.

var source = Rx.Observable.empty();

var subscription = source.subscribe(
  function (x) {
    console.log('Next: %s', x);
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });
  
// => Completed

catch: Continues an observable sequence that is terminated by an exception with the next observable sequence.

var obs1 = Rx.Observable.throw(new Error('error'));
var obs2 = Rx.Observable.return(42);

var source = Rx.Observable.catch(obs1, obs2);

var subscription = source.subscribe(
  function (x) {
    console.log('Next: %s', x);
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });

// => Next: 42
// => Completed

zipMerges the specified observable sequences or Promises into one observable sequence by using the selector function, when all observable emit then only emit result as an array. The data emitted by the combined Observable is mapped one by one in order.

/* Without a result selector */
var range = Rx.Observable.range(0, 5);

var source = Rx.Observable.zip(
  range,
  range.skip(1),
  range.skip(2)
);

var subscription = source.subscribe(
  function (x) {
    console.log('Next: %s', x);
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });

// => Next: 0,1,2
// => Next: 1,2,3
// => Next: 2,3,4
// => Completed

map: Apply projection with each value from source. map is an operator that transforms data by applying a function.
// RxJS v6+
import { from } from 'rxjs';
import { map } from 'rxjs/operators';

//emit (1,2,3,4,5)
const source = from([1, 2, 3, 4, 5]);
//add 10 to each value
const example = source.pipe(map(val => val + 10));
//output: 11,12,13,14,15
const subscribe = example.subscribe(val => console.log(val));
pipe:
A Pipeable Operator is a function that takes an Observable as its input and returns another Observable. the previous Observable stays unmodified.
   You can use pipes to link operators together. Pipes let you combine multiple functions into a single function. The pipe() function takes as its arguments the functions you want to combine, and returns a new function that, when executed, runs the composed functions in sequence.
pipe was introduced to RxJS in v5.5 to take code that looked like this:

of(1,2,3).map(x => x + 1).filter(x => x > 2);

and turn it into this
of(1,2,3).pipe(
    map(x => x + 1),
    filter(x => x > 2)
  );

Example :
import { filtermap } from 'rxjs/operators';
const nums = of(12345);

// Create a function that accepts an Observable.
const squareOddVals = pipe(
  filter((nnumber=> n % 2 !== 0),
  map(n => n * n)
);

// Create an Observable that will run the filter and map functions
const squareOdd = squareOddVals(nums);

// Suscribe to run the combined functions
squareOdd.subscribe(x => console.log(x));

filter: Emit values that pass the provided condition
// RxJS v6+
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';

//emit (1,2,3,4,5)
const source = from([1, 2, 3, 4, 5]);
//filter out non-even numbers
const example = source.pipe(filter(num => num % 2 === 0));
//output: "Even number: 2", "Even number: 4"
const subscribe = example.subscribe(val => console.log(`Even number: ${val}`));

takeWhile: Emit values until provided expression is false.
// RxJS v6+
import { of } from 'rxjs';
import { takeWhile } from 'rxjs/operators';

//emit 1,2,3,4,5
const source = of(1, 2, 3, 4, 5);
//allow values until value from source is greater than 4, then complete
const example = source.pipe(takeWhile(val => val <= 4));
//output: 1,2,3,4
const subscribe = example.subscribe(val => console.log(val));
Difference between takeWhile() and filter()
// RxJS v6+
import { of } from 'rxjs';
import { takeWhile, filter } from 'rxjs/operators';

// emit 3, 3, 3, 9, 1, 4, 5, 8, 96, 3, 66, 3, 3, 3
const source = of(3, 3, 3, 9, 1, 4, 5, 8, 96, 3, 66, 3, 3, 3);

// allow values until value from source equals 3, then complete
// output: [3, 3, 3]
source
  .pipe(takeWhile(it => it === 3))
  .subscribe(val => console.log('takeWhile', val));

// output: [3, 3, 3, 3, 3, 3, 3]
source
  .pipe(filter(it => it === 3))
  .subscribe(val => console.log('filter', val));

From: convert various other objects and data types into Observables
// RxJS v6+
import { from } from 'rxjs';

//emit array as a sequence of values
const arraySource = from([1, 2, 3, 4, 5]);
//output: 1,2,3,4,5
const subscribe = arraySource.subscribe(val => console.log(val));

of: Converts the arguments to an observable sequence. link
// RxJS v6+
import { of } from 'rxjs';
//emits any number of provided values in sequence
const source = of(1, 2, 3, 4, 5);
//output: 1,2,3,4,5
const subscribe = source.subscribe(val => console.log(val));

switchMap
switchMap is very similar to flatMap, but with a very important distinction.

in short, every time an event comes down the stream, flatMap will subscribe to (and invoke) a new observable without unsubscribing from any other observable created by a previous event. switchMap on the other hand will automatically unsubscribe from any previous observable when a new event comes down the stream.
https://angular-2-training-book.rangle.io/

catchError: Gracefully handle errors in an observable sequence.

combineLatest: combineLatest combines the values from all the Observables passed as arguments. This happens by subscribing to each Observable in order and, whenever any Observable emits, collecting an array of the most recent values from each Observable. So if we pass an observable to the combineLatest operator, the returned Observable will always emit an array of ‘n’ values. Click here for more

dispatchEvent:

Read more https://github.com/Reactive-Extensions/operators




0 comments:

Post a Comment

Topics

ADFS (1) ADO .Net (1) Ajax (1) Angular (43) Angular Js (15) ASP .Net (14) Authentication (4) Azure (3) Breeze.js (1) C# (47) CD (1) CI (2) CloudComputing (2) Coding (7) CQRS (1) CSS (2) Design_Pattern (6) DevOps (4) DI (3) Dotnet (8) DotnetCore (16) Entity Framework (2) ExpressJS (4) Html (4) IIS (1) Javascript (17) Jquery (8) Lamda (3) Linq (11) microservice (3) Mongodb (1) MVC (46) NodeJS (8) React (11) SDLC (1) Sql Server (32) SSIS (3) SSO (1) TypeScript (1) UI (1) UnitTest (1) WCF (14) Web Api (15) Web Service (1) XMl (1)

Dotnet Guru Archives