Simple Single Instance .NET Job Manager

dotnet

Typically for mission-critical jobs in .NET, you would use either Hangfire or Quartz.NET. But what about for less important jobs that don’t even require a schedule?

For personal, “for-fun” projects, I like to have as few dependencies as possible. It’s a nice change from the professional world where you always go with whatever is battle-tested.

Recently, I needed to:

  • Run single instances of a job (only one instance of a job with the same parameters can run at a time)
  • Run in the background

Job contracts

To accomplish this, I first started with some contracts:

// Input for a job
public interface IJobInput
{
}
// Output for a job
public interface IJobOutput
{
}

// A job
public interface IJob
{
    Task<IJobOutput> Execute(IJobInput input, CancellationToken cancellationToken = default);
}

// Contravariant, allows IJob<in TIn, TOut> to be treated as IJob
public interface IJob<in TIn, TOut> : IJob
    where TIn : IJobInput
    where TOut : IJobOutput
{
    Task<TOut> Execute(TIn input, CancellationToken cancellationToken = default);
}

// Using the contravariant interface, you can invoke the job with a generic IJobInput and have it cast the input to the proper type
public abstract class BaseJob<TIn, TOut> : IJob<TIn, TOut>
    where TIn : IJobInput
    where TOut : IJobOutput
{
    public abstract Task<TOut> Execute(TIn input, CancellationToken cancellationToken = default);

    public async Task<IJobOutput> Execute(IJobInput input, CancellationToken cancellationToken = default)
    {
        return await Execute((TIn)input, cancellationToken);
    }
}

The important parts are the contravariant IJob<in TIn, TOut> and the abstract BaseJob. Defining IJob<in TIn, TOut> as an extension of IJob means we can make a generic job manager that takes any IJob, without worrying about its input or output types.

The BaseJob implements this by providing an implementation of IJob.Execute() that casts the input as the actual underlying type.

Job manager

This leaves the job manager:

public interface IJobManager
{
    Task<TOut> Run<T, TOut>(IJobInput input, CancellationToken cancellationToken = default)
        where T : IJob
        where TOut : IJobOutput;
}

The Run implementation starts off by keeping track of the type of job and its input to make sure the same job doesn’t run concurrently. It does this by checking if the current job type/input combination (JobExecution) is currently active. The TaskCompletionSource allows us to manually track the state of the running job’s Task, so that we know when it has completed or errored out.

// Body of JobManager implementation

/// <summary>
/// <see cref="IServiceScopeFactory"/> that will resolve jobs within their own scope
/// </summary>
private readonly IServiceScopeFactory _serviceScopeFactory;

/// <summary>
/// Represents an execution of a job
/// </summary>
private sealed record JobExecution(Type JobType, IJobInput JobInput);

/// <summary>
/// Status of the jobs, where the value is a <see cref="TaskCompletionSource{TResult}"/> that will track the result
/// and state of the underlying task
///
/// This class should be injected as a singleton and there can be multiple points of entry for triggering jobs,
/// so have a thread safe way of tracking active jobs
/// </summary>
private static readonly ConcurrentDictionary<JobExecution, TaskCompletionSource<IJobOutput>> ActiveJobs = new();
// Body of Run() implementation

/// <inheritdoc />
public async Task<TOut> Run<T, TOut>(IJobInput input, CancellationToken cancellationToken = default)
    where T : IJob
    where TOut : IJobOutput
{
    var jobType = typeof(T);
    // Only one instance of a job type + input can run at a time
    var jobExecution = new JobExecution(jobType, input);

    // The job is already running if the job and input exists in the active jobs
    if (ActiveJobs.TryGetValue(jobExecution, out var existingTcs))
    {
        return (TOut)await existingTcs.Task;
    }

    // Indicate that the job is in progress by updating the active jobs
    var tcs = new TaskCompletionSource<IJobOutput>();
    if (!ActiveJobs.TryAdd(jobExecution, tcs))
    {
        // This handles the case where multiple threads try to add the same job at the same time
        return (TOut)await ActiveJobs[jobExecution].Task;
    }

...

Since we are tracking each TaskCompletionSource, if the caller calls a job that is always running, they can simply await the task and still get the result of the previously running job.

If the manager is called to start a job and it is not already running, it will immediately start the job. In my case, I resolved the jobs into their own service scope before running but this is not required and depends on the scope of your dependencies.

...

    try
    {
        // Resolve the job
        var scope = _serviceScopeFactory.CreateScope();
        var job = scope.ServiceProvider.GetRequiredService(jobType) as IJob;

        // Run the job
        var result = await job!.Execute(input, cancellationToken);

        // When the job has finished, update the TaskCompletionSource result
        tcs.SetResult(result);
    }
    catch (Exception e)
    {
        tcs.SetException(e);
    }

    // The job is done and can be started again
    ActiveJobs.TryRemove(jobExecution, out _);

    return (TOut)await tcs.Task;
}

This awaits the result of the job and once it is finished, it sets the result to the TaskCompletionSource using tcs.SetResult(result). This means that any other callers that tried to invoke that same job (while it was running) will get the result of the job that finished. And finally, once the job is done, it is removed from ActiveJobs so it can be invoked again.

This job manager should be registered as a singleton. The full implementation is below.

/// <summary>
/// <see cref="IJobManager"/> that allows only one combination of <see cref="IJob"/> and <see cref="IJobInput"/> to be
/// executing at any given time. Each job execution is in its own service scope
/// </summary>
public sealed class ScopedSingleInstanceJobManager : IJobManager
{
    /// <summary>
    /// <see cref="IServiceScopeFactory"/> that will resolve jobs within their own scope
    /// </summary>
    private readonly IServiceScopeFactory _serviceScopeFactory;

    /// <summary>
    /// Status of the jobs, where the value is a <see cref="TaskCompletionSource{TResult}"/> that will track the result
    /// and state of the underlying task
    ///
    /// This class should be injected as a singleton and there can be multiple points of entry for triggering jobs,
    /// so have a thread safe way of tracking active jobs
    /// </summary>
    private static readonly ConcurrentDictionary<JobExecution, TaskCompletionSource<IJobOutput>> ActiveJobs = new();

    public ScopedSingleInstanceJobManager(IServiceScopeFactory serviceScopeFactory)
    {
        _serviceScopeFactory = serviceScopeFactory;
    }

    /// <inheritdoc />
    public async Task<TOut> Run<T, TOut>(IJobInput input, CancellationToken cancellationToken = default)
        where T : IJob
        where TOut : IJobOutput
    {
        var jobType = typeof(T);
        // Only one instance of a job type + input can run at a time
        var jobExecution = new JobExecution(jobType, input);

        // The job is already running if the job and input exists in the active jobs
        if (ActiveJobs.TryGetValue(jobExecution, out var existingTcs))
        {
            return (TOut)await existingTcs.Task;
        }

        // Indicate that the job is in progress by updating the active jobs
        var tcs = new TaskCompletionSource<IJobOutput>();
        if (!ActiveJobs.TryAdd(jobExecution, tcs))
        {
            // This handles the case where multiple threads try to add the same job at the same time
            return (TOut)await ActiveJobs[jobExecution].Task;
        }

        try
        {
            // Resolve the job
            var scope = _serviceScopeFactory.CreateScope();
            var job = scope.ServiceProvider.GetRequiredService(jobType) as IJob;

            // Run the job
            var result = await job!.Execute(input, cancellationToken);

            // When the job has finished, update the TaskCompletionSource result
            tcs.SetResult(result);
        }
        catch (Exception e)
        {
            tcs.SetException(e);
        }

        // The job is done and can be started again
        ActiveJobs.TryRemove(jobExecution, out _);

        return (TOut)await tcs.Task;
    }

    /// <summary>
    /// Represents an execution of a job
    /// </summary>
    private sealed record JobExecution(Type JobType, IJobInput JobInput);
}