How to convert promises to an observable

A promise is a future value. An observable is a flow of past and future values. So it makes sense to convert a list of promises into an observable. Then we can do nice things on it, like .every(...), which will result in a single boolean observable according to whether all the promises satisfy a given condition or not.

Understanding observables requires some time, plus the latest RxJs library, plus an interactive dev tool like JS Bin.

I’m now going to share what I just learned. But first, let me introduce some snippets I’ll be using later on.

Creating promises that resolve.

function resolvePromise(value, delay) {
  return new Promise(function(resolve, reject) {
    setTimeout(function(){
      console.log(value);
      resolve(value);
    }, delay);
  });
}

resolvePromise(true, 500).then(
  function(val){console.log('resolved: ' + val);}
, function(err){console.log('rejected: ' + err);}
);
/*

[object Promise] { ... }
true
"resolved: true"

*/

Subscribing to a source.

source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);   
    },
    function () {
        console.log('Completed');   
    }
);

Getting to the right observable

Starting with no promises at all

Let’s see how the source and subscription work for the simplest case, when no promises at all are involved.

var booleans = [true, false, true, true, true];
var source = Rx.Observable.of.apply(null, booleans);
/*

"Next: true"
"Next: false"
"Next: true"
"Next: true"
"Next: true"
"Completed"

*/

Notice that above and in the next snippets I’m going to show the console output corresponding to the subscription defined earlier and using the last source.

Passing from values to promises of values

With a simple .map(...) we can convert our booleans to promises.

var promises = booleans.map(function (x, i) {
  return resolvePromise(x, 1000*(5-i));
});
/*

true
true
true
false
true

*/

Notice that we are now getting reversed logs because, to get a better sense of promises, we are creating them so that they will resolve in reversed order.

Passing from promises to observable: 1/3

Unfortunately the .from(...) applied to a list of promises doesn’t really do much:

var source = Rx.Observable.from(promises);
/*

"Next: [object Promise]"
"Next: [object Promise]"
"Next: [object Promise]"
"Next: [object Promise]"
"Next: [object Promise]"
"Completed"
true
true
true
false
true

*/

Notice that the subscription is notified all at once with the pending promises, without waiting for them to (slowly) resolve.

Passing from promises to observable: 2/3

We could transform each to an observable, applying.from(...) to each.

var source = Rx.Observable.from(promises.map(function (promise) {
  return Rx.Observable.from(promise);
}));
/*

"Next: [object Object]"
"Next: [object Object]"
"Next: [object Object]"
"Next: [object Object]"
"Next: [object Object]"
"Completed"
true
true
true
false
true

*/

Notice that the only difference is that now the subscription is notified with the pending observables, still without waiting.

Passing from promises to observable: 3/3

Wait a moment… that is an observable of observables… a higher-order observable then (by definition)!

Yes, and there are many ways to bring a higher order back to the first one.

combineAll

Converts a higher-order Observable into a first-order Observable by waiting for the outer Observable to complete, then applying combineLatest.

var source = Rx.Observable.from(promises.map(function (promise) {
  return Rx.Observable.from(promise);
})).combineAll();
/*

true
true
true
false
true
"Next: true,false,true,true,true"
"Completed"

*/

Notice how the subscription is notified only once, after all the promises have resolved, with a combination that respects the order of the booleans.

concatAll

Converts a higher-order Observable into a first-order Observable by concatenating the inner Observables in order.

var source = Rx.Observable.from(promises.map(function (promise) {
  return Rx.Observable.from(promise);
})).concatAll();
/*

true
true
true
false
true
"Next: true"
"Next: false"
"Next: true"
"Next: true"
"Next: true"
"Completed"

*/

Notice how the subscription is notified once per resolved promise, but only after all the promises have resolved. Also notice that the notification order respects the order of the booleans.

exhaust

Converts a higher-order Observable into a first-order Observable by dropping inner Observables while the previous inner Observable has not yet completed.

var source = Rx.Observable.from(promises.map(function (promise) {
  return Rx.Observable.from(promise);
})).exhaust();
/*

true
true
true
false
true
"Next: true"
"Completed"

*/

Notice how the subscription is notified only once, with the resolved value of the first promise (i.e. the first boolean here, not the first promise to resolve, which would be the last boolean). That’s because the first boolean is associated to the longer running promise, which exhausts the processing capacity of the resulting observable until it gets resolved (and completed). Also notice that the fact that the notification happens only after all the promises have resolved, is coincidental (because we made it happen by forcing each promise to complete sooner than the previous one).

mergeAll

mergeAll(concurrent: number): Observable

Converts a higher-order Observable into a first-order Observable which concurrently delivers all values that are emitted on the inner Observables.

var source = Rx.Observable.from(promises.map(function (promise) {
  return Rx.Observable.from(promise);
})).mergeAll();
/*

true
"Next: true"
true
"Next: true"
true
"Next: true"
false
"Next: false"
true
"Next: true"
"Completed"

*/

Notice how the subscription is notified once per resolved promise, as soon as each promise is resolved.

switch

Converts a higher-order Observable into a first-order Observable by subscribing to only the most recently emitted of those inner Observables.

var source = Rx.Observable.from(promises.map(function (promise) {
  return Rx.Observable.from(promise);
})).switch();
/*

true
"Next: true"
"Completed"
true
true
false
true

*/

Notice how the subscription is notified only once, as soon as the first promise is resolved.

References

One Reply to “How to convert promises to an observable”

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.