It's a paradox that Reactive Extensions were originally developed in .NET, but it was Angular who made popular their JavaScript port (RxJS) and now they will surely find their way back to .NET in Blazor.

I this blog I will show some best practices from Angular applied in Blazor.

If you are not somewhat familiar with the concept of Observables and Subjects then you should definitely learn some basics. It will change your life. There are many online resources, for example this youtube video.

The problem

Let's start with naïve example where we load data based on parameter and let me explain what's so wrong about it:

[Parameter]
public int Id { get; set; }

public Customer Customer { get; set; }

protected override async Task OnInitializedAsync()
{
    Customer = await _customerService.GetCustomerAsync(Id);
}
This won't reflect changes to Id parameter

At first sight, this would work but as soon as the parameter Id changes, the customer is not reloaded. We could detect changes to Id parameter in SetParametersAsync method or perhaps using custom setter in the Id property.

public override async Task SetParametersAsync(ParameterView parameters)
{
    bool customerIdSet = parameters.TryGetValue(nameof(Id), out int customerId);
    await base.SetParametersAsync(parameters);
    if (customerIdSet)
    {
        await LoadCustomerAsync(customerIdSet);
    }
}
This does not handle concurency well

This is much better, but there is a flaw:

  1. If the parameter changes quickly, LoadCustomerAsync will be running multiple times concurrently  and we might get result in wrong order. This means that we will show wrong data!
  2. Ideally, we should cancel LoadCustomerAsync when the component is disposed

This is where Reactive Extensions comes very handy. We turn parameter changes into Observable and then let the Rx operators to handle the concurrency and cancelling

Turning parameter into Observable

private BehaviorSubject<int> _ids = new BehaviorSubject<int>(0);

[Parameter]
public int Id
{
    get => _ids.Value;
    set => _ids.OnNext(value);
}
Alternative 1: Wrap BehaviorSubject in parameter Property
private Subject<int> _ids = new Subject<int>();

[Parameter]
public int Id { get; set; }

public override async Task SetParametersAsync(ParameterView parameters)
{
    bool customerIdSet = parameters.TryGetValue(nameof(Id), out int customerId);
    await base.SetParametersAsync(parameters);
    if (customerIdSet)
    {
        _ids.OnNext(customerId);
    }
}
Alternative 2: Using SetParameterAsync

Handling the concurrency with Rx

protected override async Task OnInitialized()
{
    _ids.DistinctUntilChanged()
        .Select(id => Observable.FromAsync(cancellationToken => LoadCustomerAsync(id, cancellationToken)))
        .Switch()
        .Subscribe();
}
Cancelling previous tasks using Switch operator

The Switch operator will cancel the previous LoadCustomerAsync call when there is a new one. We will not get mixed results anymore! Beautiful.

Additional benefit is that we can use the CancellationToken to actually cancel the underlying loading logic and save unnecessary database calls:

async Task LoadCustomerAsync(int customerId, CancellationToken cancellationToken)
{
   using var dbContext = new ApplicationDbContext();
   
   Customer = await dbContext.Customers
       .FirstOrDefaultAsync(c => c.Id == customerId, cancellationToken);

   Orders = await dbContext.Orders
       .Where(o => o.CustomerId == customerId)
       .ToListAsync(cancellationToken);
}
Example of cancelling using CancellationToken

Cancelling the loading when component is destroyed

Obviously, when user navigates away from the component, we are not interested it the LoadCustomerAsync results anymore and we should cancel it. Fortunately, Rx comes with handy TakeUntil  operator:

    public class MyComponent : IDisposable
    {
        // ...
        protected override void OnInitialized()
        {
            _ids.DistinctUntilChanged()
                .Select(id => Observable.FromAsync(cancellationToken => LoadCustomerAsync(id, cancellationToken)))
                .Switch()
                .TakeUntil(_disposed)
                .Subscribe();
        }

        private Subject<bool> _disposed = new Subject<bool>();
        public void Dispose()
        {
            _disposed.OnNext(true);
        }
    }
Cancelling running asynchronous tasks when component is disposed using TakeUntil

The TakeUntil operator will cancel LoadCustomerAsync task if it is running and dispose the subscription created by calling Subscribe on the observable

TakeUntil(Disposed) pattern

In previous example, livespan of _ids Observable is bound to the component instance and therefore subscribing to it in OnInitialized won't cause memory leak.

However if the observable was a Singleton or Scoped instance, we need to unsubscribe from it, otherwise it would keep reference to our component instance long after it has been disposed causing memory leak

IDisposable _subscription;

protected override void OnInitialized()
{
    _subscription = someService.SomeLongLivedObservable
        .Subscribe(...);
}

public void Dispose()
{
    _subscription?.Dispose();
}
Don't do this. Avoid unsubscribing manually

This can however quickly become messy when there are multiple subscriptions or when the subscription are created when user clicks a button and almost always leads to memory leaks.

The TakeUntil operator allows you to create auto-unsubscribing subscriptions. In this case, they will automatically dispose when page is disposed:

protected override void OnInitialized()
{
    someService.SomeLongLivedObservable
        .TakeUntil(_disposed)
        .Subscribe(...);
}

private Subject<bool> _disposed = new Subject<bool>();
public void Dispose()
{
    _disposed.OnNext(true);
}
Became familiar with TakeUntil operator

Making the code reusable

We have written bunch of boilerplate code. Fortunately we can easily make it reusable so that we don't need to write it over and over again in every component.

You can create base type, from which will your components inherit. It will take care of turning the parameters into observables and creates observable that emits when component is disposed:

public class MyComponentBase : ComponentBase, IDisposable
{
    Subject<bool> _disposed = new Subject<bool>();
    ObservableParameters _observableParameters = new ObservableParameters();
    
    public IObservable<bool> Disposed => _disposed.AsObservable();

    public IObservable<T> ObserveParameter<T>(Expression<Func<T>> parameterSelector)
    {
        MemberInfo parameterInfo = ((MemberExpression)parameterSelector.Body).Member;
        if (parameterInfo.GetCustomAttribute<ParameterAttribute>() == null)
        {
            throw new ArgumentException("Member is not a parameter. It must be public property annotated with ParameterAttribute", nameof(parameterInfo));
        }
        return this._parameters.Observe<T>(parameterInfo.Name);
    }
    public override async Task SetParametersAsync(ParameterView parameters)
    {
        var paramsDict = parameters.ToDictionary();
        await base.SetParametersAsync(parameters);
        ObservableParameters.OnNext(paramsDict);
    }

    public void Dispose()
    {
        _disposed.OnNext(true);
    }
}

/// <summary>
/// Turns component parameters properties into observable.
/// You can observe values on specific property using <see cref="Observe"/>("MyProperty");
/// Values are emmited when <see cref="OnNext"/>() is called which typically happends in SetParametersAsync()
/// </summary>
/// <example>
/// </example>
public class ObservableParameters
{
    private Dictionary<string, Subject<object>>? _paramsObservables;

    public IObservable<TValue> Observe<TValue>(string parameterName)
    {
        if (_paramsObservables == null)
        {
            _paramsObservables = new Dictionary<string, Subject<object>>();
        }

        IObservable<object> observable = _paramsObservables.GetOrAdd(parameterName, () => new Subject<object>());
        return observable.Cast<TValue>();
    }

    /// <summary>
    /// This is supposed to be called from SetParametersAsync();
    /// </summary>
    public void OnNext(IReadOnlyDictionary<string, object> parameters)
    {
        if (_paramsObservables != null)
        {
            foreach (var param in parameters)
            {
                if (_paramsObservables.TryGetValue(param.Key, out var observable))
                {
                    observable.OnNext(param.Value);
                }
            }
        }
    }
}

The final component code will then look as simple as:

protected override void OnInitialized()
{
	this.ObserveParameter<int>(nameof(Id))
        .TakeUntil(Disposed) //you could even move this to the base class
        .Subscribe();
}
This is all you need to write in order to handle concurrency and cancelling 

Homework exercise

  1. Expand the initial example with a "Reload Customer" button functionally. When user clicks the button, previous ongoing load task will be canceled.

2. Add loading indicator to the component.