Dynamic subscriptions with proxy observables in RxJs

Jamie Perkins
2 min readOct 7, 2019
Photo by Dewang Gupta on Unsplash

Have you ever wished there was an RxJs method that would let you have one observable for multiple observable streams that are constantly starting and completing? In marble syntax, they might look like this:

^-a---b--c---|  ^---d---e-f---|  ^g--h----i-|

Here, an observable stream starts, emits some values, and then completes. And then it happens again. And again. This requires your app to maintain separate subscriptions for each one. But you want to perform the same logic when they emit, and you need it to be done in one observable. What operator can dynamically subscribe and unsubscribe to multiple, perhaps simultaneous streams? Answer: none of them.

Spoiler alert, there is no Github repo at the end of this article. The solution already ships with RxJs…

Another example; let’s say you have an array of observables. You need to subscribe to all of them in one subscription, and then perform some logic on all the values in the array whenever just one emits. But here’s the catch — the array adds and removes observables at runtime:

streams: Array<Observable<any>> = [x$, y$, z$];
// which becomes [x$, z$]
// and later [y$, a$, b$]

These might seem like contrived cases, but they’re actual cases I’ve run into.

The solution I landed on is a “proxy observable”. The proxy is just a Subject or BehaviorSubject that you call next() on from other subscriptions. Use the BehaviorSubject if you want the last emitted value available to new subscriptions. Otherwise just use Subject.

For example, in an Angular service for web socket connections, you could use a proxy observable to expose a single observable for multiple connections that are opening and closing.

proxy$ = new Subject<any>();// subscribe to the proxy
this.proxy$.pipe(
takeUntil(this.destroyed$)
).subscribe(value => {
// perform logic on any emitted value
});
newConnection(url: string) {
const connection = webSocket(url);
connection.pipe(
takeUntil(this.destroyed$)
).subscribe(message => this.proxy.next(message));
}

Your app can keep calling newConnection(). Connections can close and open, and emitted values will all be handled by the proxy subscription.

In the example of the array holding varying amounts of observables, your code might look something like this:

proxy$ = new Subject<any>();
subscription: Subscription;
multipleStreams: Array<Observable<any>>;
// subscribe to the proxy
this.proxy$.pipe(
takeUntil(this.destroyed$)
).subscribe(value => {
// perform logic on latest array values
});
renewArraySubscription() {
if (this.subscription) this.subscription.unsubscribe();
this.subscription = combineLatest(...this.multipleStreams).pipe(
takeUntil(this.destroyed$)
).subscribe(values => {
this.proxy$.next(values);
});
}

Here, therenewSubscription() method can be called every time the array’s contents change. You can perform logic inside the proxy$ subscription, or put it in a tap() and expose theproxy$ itself, for your app’s components. This allows all the subscription logic to abstracted away in your service.

That’s it. Got any other use-cases this solves? I’d love to hear about them in the comments.

--

--

Jamie Perkins

Dreamer and schemer, designer turned developer. Lover of music and fine ales.