Reactive Exensions: Auto-unsubscribing observables

Reactive Exensions: Auto-unsubscribing observables
💡
Disposing subscription is mostly an antipattern. RX have completion of observables.

In .NET we are used to manually dispose anything IDisposable. IObservable<T>.Subscribe(..) method returns IDisposable, so we should dispose it to unsubscribe from underlying source and avoid possible memory leak, right? Wrong.

class MyComponent(IObservable<Message> longLivingObservable): IDisposable {
   List<IDisposable> _subscriptions = new();

   public void Initialize() {
      _subscriptions.Add(
         longLivingObservable.Subscribe(Console.WriteLine)
      );
   }
   
   public void Dispose() {
      foreach(var subscription in _subscriptions) {
          subscription.Dispose();
      }
   }
}

Sounds this pattern familiar? Don't do this!

class MyComponent(IObservable<Message> longLivingObservable) {
   _subscriptions = new Array<Subscription>();

   void init() {
      _subscriptions.push(
         longLivingObservable.Subscribe(console.log)
      );
   }
   
   public void destroy() {
      foreach(var subscription in _subscriptions) {
          subscription.Dispose();
      }
   }
}

RxJS calls it .unsubscribe() instead Dispose()

One of great RX features is that it supports completions. If an Observable completes, it removes all its subscribers. This eliminates the need for disposing/unsubscribing - thus the name auto-unsubscribing/

var sourceObservable = new Subject<int>();
sourceObservable.Subscribe(...); //Subscriber 1
sourceObservable.Subscribe(...); //Subscriber 1
//at some point later:
sourceObservable.Complete(); //removes all subscribers

You should ask:

"what if I don't have control over lifetime of the sourceObservable"?

In that case you can use some of the operators, that transforms it to observable that completes when it suits you

// unsubscribes when first value in emited
sourceObservable.FirstOrDefault().Subscribe(...);

// automatically unsubscribes after 1 second
sourceObservable.TakeUntil(DateTimeOffset.Now.AddSeconds(1));

// automatically unsubscribes when somethinElseHappenedObservable emits a value
sourceObservable.TakeUntil(somethingElseHappenedObservable).Subscribe();

TakeUntil(disposed) pattern

Let's refactor our original code:

class MyComponent(IObservable<Message> longLivingObservable) : IDisposable {
   private Subject<Unit> _disposed = new();

   public void Initialize() {
      longLivingObservable
        .TakeUntil(_disposed)
        .Subscribe(Console.WriteLine);
   }
   
   public void Dispose() {
      _disposed.OnNext(Unit.Value);
   }
}

Advantages:

  1. No need to keep reference to the subscription
  2. Easier to read - code reviewers see the subscription and it's lifetime in one place.
  3. Eliminating possible memory leaks by creating short lived observable as soon as possible - in ctor, in a base class or even at IoC container level (that's how Angular does it with ActivatedRoute.params):
class ChatClient: ComponentBase {
   private IObservable<Message> _messages = new();
  
   public MyComponent(IObservable<Message> longLivingObservable)
   {
      //now it's safe to subscribe to _messages without worrying about memory leaks
      _messages = longLivingObservable.TakeUntil(this.Disposed);
   }   

   public void Initialize() {
      _messages.OfType<ParticipantJoined>.Subscribe(Console.WriteLine);
      _messages.OfType<MessagePosted>.Subscribe(Console.WriteLine);
      _messages.OfType<ParticipantLeft>.Subscribe(Console.WriteLine);
   }
}

For example, if you have Singleton service, that keeps an long lived observable, you might turn it or wrap it into Scoped service, that keeps short lived observable.