CQRS abstraction with a MediatR implementation

dotnet cqrs domain-driven design dependency injection

MediatR Abstraction Command Flow

I have been prototyping a new project recently and wanted to reiterate on a pattern I worked on in the past: CQRS. One slight modification is that the CQRS contracts will be fully abstracted in the Application layer and implemented with MediatR in the separate Infrastructure layer, so it can be replaced if needed.

Zero-dependency application layer

My rule of thumb will be zero dependencies for the Domain and Application layer projects - that means no nuget packages, only the core .NET library. The reasoning is that it becomes easier to accidentally blend domain/business logic with the other layers, something to avoid in a DDD oriented project.

What does this mean for the CQRS abstraction? It means that in the Application layer, we define the interface contracts and implementations whose logic only depend on Application layer plumbing. In other words, those implementations only reference other Domain or Application layer contracts and are unaware of Infrastructure layer logic and implementations. As a result, nothing leaks from the Infrastructure layer and the Application-level implementations need no third party libraries.

The contracts are defined below:

/// A command that will mutate the system
public interface ICommand
{
}

/// A read-only query that retrieves something from the system
/// <typeparam name="TResponse">The query response type</typeparam>
public interface IQuery<out TResponse>
{
}

/// Sends a command
public interface ICommandSender
{
    Task Send<TCommand>(TCommand command, CancellationToken cancellationToken = default) where TCommand : ICommand;
}

/// Query sender
public interface IQuerySender
{
    Task<TResponse?> Send<TResponse>(IQuery<TResponse> query, CancellationToken cancellationToken = default);
}

/// Handles a command
/// <typeparam name="TCommand">The type of command</typeparam>
public interface ICommandHandler<in TCommand> where TCommand : ICommand
{
    Task Handle(TCommand command, CancellationToken cancellationToken = default);
}

/// Query handler
/// <typeparam name="TQuery">The type of the query that implements <see cref="IQuery{TResponse}"/></typeparam>
/// <typeparam name="TResponse">The query response type</typeparam>
public interface IQueryHandler<in TQuery, TResponse> where TQuery : IQuery<TResponse>
{
    Task<TResponse?> Handle(TQuery query, CancellationToken cancellationToken);
}

And for Application-level implementation examples, say we have a command that creates a player and a query that retrieves a player. Notice that only Domain/Application interface contracts are referenced by the command and query.

internal readonly record struct CreatePlayerCommand(PlayerData Data) : ICommand;

internal sealed class CreatePlayerCommandHandler : ICommandHandler<CreatePlayerCommand>
{
    // Defined in the Domain layer
    private readonly IUnitOfWork _unitOfWork;
    // Defined in the Domain layer
    private readonly IPlayerRepository _playerRepository;

    public async Task Handle(CreatePlayerCommand command, CancellationToken cancellationToken = default)
    {
        var player = CreatePlayer(command.Data);

        await _playerRepository.Add(player);

        await _unitOfWork.CommitAsync(cancellationToken);
    }
    ...
}


internal readonly record struct GetPlayerByIdQuery(int Id) : IQuery<Player>;

internal sealed class GetPlayerByIdQueryHandler : IQueryHandler<GetPlayerByIdQuery, Player>
{
    // Defined in the Domain layer
    private readonly IPlayerRepository _playerRepository;

    public async Task<Player?> Handle(GetPlayerByIdQuery query, CancellationToken cancellationToken)
    {
        return await _playerRepository.GetById(query.Id);
    }
}

MediatR infrastructure

The Infrastructure layer is where we have a dependency on the MediatR library and use it to implement the interface contracts. For ICommand and IQuery implementations, these were already defined in the Application layer because conceptually they represent application plumbing - or how the Domain and other components interact with each other independent of third party libraries:

  • CreatePlayerCommand
  • GetPlayerByIdQuery

With the implementations already defined, we instead we create MediatR wrappers that will be used to send the underlying types (TCommand and TQuery) through MediatR.

public readonly record struct MediatRCommand<TCommand>(TCommand Command) : IRequest where TCommand : ICommand;

public readonly record struct MediatRQuery<TQuery, TResponse>(TQuery Query) : IRequest<TResponse>
    where TQuery : IQuery<TResponse>;

The ICommandSender and IQuerySender implementations use MediatR to send the commands/queries to the proper handlers based on their type. The Send methods are generic, so they can wrap the underlying command/query types in MediatRCommand/MediatRQuery and dispatch them.

public sealed class MediatRCommandSender : ICommandSender
{
    private readonly IMediator _mediator;

    public MediatRCommandSender(IMediator mediator)
    {
        _mediator = mediator;
    }

    public async Task Send<TCommand>(TCommand command, CancellationToken cancellationToken = default)
        where TCommand : ICommand
    {
        // Wrap the command in MediatRCommand
        var request = new MediatRCommand<TCommand>(command);
        // Dispatch
        await _mediator.Send(request, cancellationToken);
    }
}

public sealed class MediatRQuerySender : IQuerySender
{
    private readonly IMediator _mediator;

    public MediatRQuerySender(IMediator mediator)
    {
        _mediator = mediator;
    }

    public async Task<TResponse?> Send<TResponse>(IQuery<TResponse> query,
        CancellationToken cancellationToken = default)
    {
        // Wrap the query in MediatRQuery
        var queryWrapperType = typeof(MediatRQuery<,>).MakeGenericType(query.GetType(), typeof(TResponse));
        var queryWrapper = Activator.CreateInstance(queryWrapperType, query) as IRequest<TResponse>;
         // Dispatch
        return await _mediator.Send(queryWrapper!, cancellationToken);
    }
}

The handler implementations are again just wrappers. They implement the MediatR IRequestHandler interface and will resolve the underlying handlers ICommandHandler<T> and IQueryHandler<TQuery, TResponse>, so they can delegate the actual handling to them (CreatePlayerCommandHandler and GetPlayerByIdQueryHandler defined above):

public sealed class MediatRCommandHandler<T> : IRequestHandler<MediatRCommand<T>> where T : ICommand
{
    // The underlying handler that will be resolved
    private readonly ICommandHandler<T> _handler;

    public MediatRCommandHandler(ICommandHandler<T> handler)
    {
        _handler = handler;
    }

    public async Task Handle(MediatRCommand<T> request, CancellationToken cancellationToken)
    {
        // Delegate the handling
        await _handler.Handle(request.Command, cancellationToken);
    }
}

public sealed class MediatRQueryHandler<TQuery, TResponse> : IRequestHandler<MediatRQuery<TQuery, TResponse>, TResponse>
    where TQuery : IQuery<TResponse>
{
    // The underlying handler that will be resolved
    private readonly IQueryHandler<TQuery, TResponse> _queryHandler;

    public MediatRQueryHandler(IQueryHandler<TQuery, TResponse> queryHandler)
    {
        _queryHandler = queryHandler;
    }

    public async Task<TResponse> Handle(MediatRQuery<TQuery, TResponse> request, CancellationToken cancellationToken)
    {
        // Delegate the handling
        return (await _queryHandler.Handle(request.Query, cancellationToken))!;
    }
}

Resolving the dependencies

You might be wondering how dependency injection will work with these classes as some of them need to implement MediatR interfaces. For example, say we have a concrete command CreatePlayerCommand and a handler CreatePlayerCommandHandler. How do we register these with the DI container? Since MediatRCommandHandler<T> resolves ICommandHandler<T>, that means you just need to register the following:

  • services.AddTransient<ICommandHandler<CreatePlayerCommand>, CreatePlayerCommandHandler>();
  • services.AddTransient<IRequestHandler<MediatRCommand<CreatePlayerCommand>>, MediatRCommandHandler<CreatePlayerCommand>>();
  • services.AddTransient<ICommandSender, MediatRCommandSender>();

The injection of the MediatRCommandSender just needs to be done once since its Send method is the only generic part. For other commands and queries, you will need to register them manually like CreatePlayerCommand.

This basically boils down to:

  1. Send MediatR command of type CreatePlayerCommand using MediatRCommandSender
  2. The wrapper MediatRCommandHandler<CreatePlayerCommand> receives the command
  3. It resolves ICommandHandler<CreatePlayerCommand> as CreatePlayerCommandHandler where the actual handling logic lives
  4. MediatRCommandHandler<CreatePlayerCommand> forwards the command to CreatePlayerCommandHandler

MediatR Abstraction Command Flow Traced

This exact same pattern applies to the queries.

Registering many commands with reflection

Now if you have many commands you need to register with your DI container, you can use reflection. This will be done in your app startup, which should have access to all of your projects. First, get a collection of the assemblies where your commands and queries live:

var assembliesToScan = new List<Assembly>()
{
    typeof(SomeCommand).Assembly,
};

For each assembly, you can then do the following:

  1. Get any types that implement ICommandHandler<T> like CreatePlayerCommandHandler in the Application layer
  2. Get the command type for each implementation
    • For example, CreatePlayerCommandHandler implements ICommandHandler<CreatePlayerCommand>, so you can get the command type CreatePlayerCommand
  3. Register: services.AddTransient<ICommandHandler<CreatePlayerCommand>, CreatePlayerCommandHandler>();
  4. Then you register the wrapper: services.AddTransient<IRequestHandler<MediatRCommand<CreatePlayerCommand>>, MediatRCommandHandler<CreatePlayerCommand>>();
// Get any implementations of ICommandHandler
assembliesToScan.SelectMany(a => a.GetTypes())
    .Where(t => t.GetInterfaces() // Only concrete ICommandHandlers
        .Where(i => i.IsGenericType)
        .Any(i => i.GetGenericTypeDefinition() == typeof(ICommandHandler<>)) && !t.IsAbstract && !t.IsInterface)
    .ToList()
    .ForEach(handlerType => // For each concrete implementation of ICommandHandler
    {
        // The type as an interface, eg. ICommandHandler<TCommand>
        var handlerInterfaceType = handlerType.GetInterfaces()
            .First(x => x.GetGenericTypeDefinition() == typeof(ICommandHandler<>));

        // Get the command type (implementation of ICommand) for the ICommandHandler
        var commandType = handlerInterfaceType.GetGenericArguments().First();

        // Register the command handler
        services.AddTransient(handlerInterfaceType, handlerType);

        // Register the command handler wrapper
        var commandWrapperType = typeof(MediatRCommand<>).MakeGenericType(commandType);
        var handlerWrapperType = typeof(MediatRCommandHandler<>).MakeGenericType(commandType);
        var mediatRHandlerType = typeof(IRequestHandler<>).MakeGenericType(commandWrapperType);
        services.AddTransient(mediatRHandlerType, handlerWrapperType);
    });

Again, the same pattern applies to queries.