Messaging API

ServiceStack provides a high-level Messaging API exposing a number of essential messaging features in order to publish and receive messages as well as registering and processing handlers for different message types. A class diagram of the core interfaces is below:

Messaging API

There are currently 5 supported MQ Server options:

Like other ServiceStack providers, all MQ Servers are interchangeable, visible in the shared MQ Server tests below:

Mix in MQ Server

In ASP.NET Core Apps we recommend using mix to configure your preferred MQ Service, other than being quicker to add, it proposes adopting a naming convention in app settings and file names that other mix features can also make use of:

x mix [mq]

Currently available list of MQ Services:

Results matching tag [mq]:

   1. backgroundmq  Use Memory Background MQ                    to: $HOST  by @ServiceStack  [mq]
   2. rabbitmq      Use RabbitMQ                                to: $HOST  by @ServiceStack  [mq]
   3. sqs           Use AWS SQS MQ                              to: $HOST  by @ServiceStack  [mq]
   4. servicebus    Use Azure Service Bus MQ                    to: $HOST  by @ServiceStack  [mq]
   5. redismq       Use Redis MQ                                to: $HOST  by @ServiceStack  [mq]
   6. feature-mq    Simple MQ Feature to test sending Messages  to: $HOST  by @ServiceStack  [feature,mq,sharp]

Worker Service Templates

If you'd instead prefer to develop stand-alone MQ Servers (i.e. without HTTP access) then it can be beneficial to run the MQ Server in an ASP.NET Core Worker Service by starting from a pre-configured project template:

.NET Core C# Worker Service Templates
worker-rabbitmq .NET 6.0 Rabbit MQ Worker Service
worker-redismq .NET 6.0 Redis MQ Worker Service
worker-servicebus .NET 6.0 Azure Service Bus MQ Worker Service
worker-sqs .NET 6.0 AWS SQS MQ Worker Service

MQ Stats on Admin UI Dashboard

The health and activity of your MQ Services can be monitored from the Admin UI Dashboard which maintains aggregate stats for how many messages have been received, processed or failed as well as detailed stats on each MQ Worker:

Benefits of Message Queues

One of the benefits of using ServiceStack is its integrated support for hosting MQ Servers allowing your Services to be invoked via a MQ Broker. There are a number of reasons why you'd want to use a MQ as an alternative to HTTP including:

  • Sender is decoupled from Receiver, eliminating point-to-point coupling and configuration
  • Allows no-touch deploy of new clients and servers without updating any configuration
  • Removes time-coupling allowing clients and servers to be deployed independently without downtime
  • Better reliability, consumers can still send messages when servers are down and vice-versa
  • Durable, messages can be persisted and survive application or server reboots
  • Allows for CPU Intensive or long operations without disrupting message workflow
  • Instant response times by queuing slow operations and executing them in the background
  • Allows for natural load-balancing where throughput can be increased by simply adding more processors or servers
  • Message-based design allows for easier parallelization and introspection of computations
  • Greater throttling and control of message throughput, message execution can be determined by server
  • Reduces request contention and can defer execution of high load spikes over time
  • Better recovery, messages generating server exceptions can be retried and maintained in a dead-letter-queue
  • DLQ messages can be introspected, fixed and later replayed after server updates and rejoin normal message workflow

More details of these and other advantages can be found in the definitive Enterprise Integration Patterns.

Likewise the HTTP Service Clients implement the Messaging API IMessageProducer:

void Publish<T>(T requestDto);
void Publish<T>(IMessage<T> message);

When publishing a IMessage<T> the message metadata are sent as HTTP Headers with an X- prefix.

MQ Client Architecture

By promoting clean (endpoint-ignorant and dependency-free) Service and DTO classes, your web services are instantly re-usable from non-http contexts like MQ Services.

Within MQ Services, Request DTO's get wrapped and sent within the IMessage body, which looks like:

ServiceStack MQ Client Architecture

MQ Server Architecture

The Server Architecture shows where MQ Hosts (in green) fits within the context of ServiceStack's Request Pipeline. Effectively MQ Services are treated as "internal Services" bypassing HTTP Web Services Content Negotiation and Global Request/Response Filters:

ServiceStack Logical Architecture View

MQ Services also have their own distinct GlobalMessageRequestFilters and GlobalMessageResponseFilters for registering custom logic that only applies to MQ Requests. Plugins like Validation Feature can execute custom behavior for both HTTP and MQ Services by registering it in both HTTP GlobalRequestFilters and MQ GlobalMessageRequestFilters, e.g:

appHost.GlobalRequestFilters.Add(ValidationFilters.RequestFilter);
appHost.GlobalMessageRequestFilters.Add(ValidationFilters.RequestFilter);

Message Workflow

By default, MQ Servers will send a response message after it's processed each message, what the response is and which Queue (or HTTP url) the response is published to is dependent on the outcome of the message handler, i.e:

MQ Flowchart

Messages with no responses are sent to '.outq' Topic

When a handler returns a null response, the incoming message is re-published as a "transient" message to the out queue, e.g: mq:Hello.outq having the effect of notifying any subscribers to mq:Hello.outq each time a message is processed.

We can use this behavior to block until a message gets processed with:

IMessage<Hello> msgCopy = mqClient.Get<Hello>(QueueNames<Hello>.Out);
mqClient.Ack(msgCopy);
msgCopy.GetBody().Name //= World

Also shown in this example is an explicit Ack (which should be done for each message you receive) to tell the MQ Broker that you've taken responsibility of the message so it can safely remove it off the queue.

Messages with Responses are published to the Response .inq

Often message handlers will just return a POCO response after it processes a message, e.g:

mqServer.RegisterHandler<Hello>(m =>
    new HelloResponse { Result = $"Hello, {m.GetBody().Name}!" });

Whenever there's a response, then instead of the .outq the response message is sent to the .inq of the response message type, which for a HelloResponse type is just mq:HelloResponse.inq, e.g:

mqClient.Publish(new Hello { Name = "World" });

var responseMsg = mqClient.Get<HelloResponse>(QueueNames<HelloResponse>.In);
mqClient.Ack(responseMsg);
responseMsg.GetBody().Result //= Hello, World!

INFO

this behavior can be limited to only publish responses for types in the mqServer.PublishResponsesWhitelist, otherwise all response messages can be disabled entirely by setting mqServer.DisablePublishingResponses = true.

Responses from Messages with ReplyTo are published to that address

Whilst for the most part you'll only need to publish POCO messages, you can also alter the default behavior by providing a customized IMessage<T> wrapper which ServiceStack will send instead, e.g. you can specify your own ReplyTo address to change the queue where the response gets published, e.g:

const string replyToMq = mqClient.GetTempQueueName();
mqClient.Publish(new Message<Hello>(new Hello { Name = "World" }) {
    ReplyTo = replyToMq
});

IMessage<HelloResponse> responseMsg = mqClient.Get<HelloResponse>(replyToMq);
mqClient.Ack(responseMsg);
responseMsg.GetBody().Result //= Hello, World!

ReplyTo addresses can be URLs

A nice feature unique in ServiceStack is that the ReplyTo address can even be a HTTP Uri, in which case ServiceStack will attempt to POST the raw response at that address. This works nicely with ServiceStack Services which excel at accepting serialized DTO's.

mqClient.Publish(new Message<Hello>(new Hello { Name = "World" }) {
    ReplyTo = "http://example.org/hello/callback"
});

Messages with exceptions are re-tried then published to the dead-letter-queue (.dlq)

By default Mq Servers lets you specify whether or not you want messages that cause an exception to be retried by specifying a RetryCount of 1 (default), or if you don't want any messages re-tried, specify a value of 0, e.g:

var mqServer = new RabbitMqServer { RetryCount = 1 };

To illustrate how this works we'll keep a counter of how many times a message handler is invoked, then throw an exception to force an error condition, e.g:

var called = 0;
mqServer.RegisterHandler<Hello>(m => {
    called++;
    throw new ArgumentException("Name");
});

Now when we publish a message the response instead gets published to the messages .dlq, after it's first transparently retried. We can verify this behavior by checking called=2:

mqClient.Publish(new Hello { Name = "World" });

IMessage<Hello> dlqMsg = mqClient.Get<Hello>(QueueNames<Hello>.Dlq);
mqClient.Ack(dlqMsg);

Assert.That(called, Is.EqualTo(2));

DLQ Messages retains the original message in their body as well as the last exception serialized in the IMessage.Error ResponseStatus metadata property, e.g:

dlqMsg.GetBody().Name   //= World
dlqMsg.Error.ErrorCode  //= typeof(ArgumentException).Name
dlqMsg.Error.Message    //= Name

Since the body of the original message is left in-tact, you're able to retry failed messages by removing them from the dead-letter-queue then re-publishing the original message, e.g:

IMessage<Hello> dlqMsg = mqClient.Get<Hello>(QueueNames<Hello>.Dlq);

mqClient.Publish(dlqMsg.GetBody());

mqClient.Ack(dlqMsg);

This is useful for recovering failed messages after identifying and fixing bugs that were previously causing exceptions, where you can replay and re-process DLQ messages and continue processing them as normal.

Priority Queues

MQ Servers spawns 2 threads for each handler, one to listen to the Message Inbox mq:<Message>.inq and another to listen on the Priority Queue located at mq:<Message>.priorityq.

You can white-list which messages to enable Priority Queue's for with mqServer.PriorityQueuesWhitelist or disable them all by setting:

mqServer.DisablePriorityQueues = true;

OneWay HTTP Requests are published to MQ then executed

Using the SendOneWay() Service Client APIs will publish DTO's to the /oneway Pre-defined Route:

var client = new JsonServiceClient(BaseUrl);
client.SendOneWay(new RequestDto { ... }); //POST's to /json/oneway/RequestDto

Where if there is an MQ Server is registered in the AppHost it will instead publish the Request DTO to the registered MQ Server who then pulls it from the MQ Broker and executes the message in a background thread like a normal MQ Message. If no MQ Server is registered then the Request is executed in the HTTP Handler like a normal HTTP Request.

OneWay MQ and HTTP Service Clients are Substitutable

Service Clients and MQ Clients are also interoperable where all MQ Clients implement the Service Clients IOneWayClient API which enables writing code that works with both HTTP and MQ Clients:

IOneWayClient client = GetClient();
client.SendOneWay(new RequestDto { ... });

Disable 'outq' notification messages

All MQ Servers support the ability to specify a whitelist of Requests you only want to publish .outq for:

PublishToOutqWhitelist = new[]{ nameof(MyRequest) }

Alternatively all .outq messages can be disabled with:

DisablePublishingToOutq = true

Flexible Queue Name strategies

There are more flexible options for specifying the Queue Names where you can categorize queue names or avoid conflicts with other MQ services by specifying a global prefix to be used for all Queue Names, e.g:

QueueNames.SetQueuePrefix("site1.");

QueueNames<Hello>.In //= site1.mq:Hello.inq

Or to gain complete control of each queue name used, provide a custom QueueName strategy, e.g:

QueueNames.ResolveQueueNameFn = (typeName, suffix) =>
    $"SITE.{typeName.ToLower()}{suffix.ToUpper()}";

QueueNames<Hello>.In  //= SITE.hello.INQ

Note: Custom QueueNames need to be declared on both MQ Client in addition to ServiceStack Hosts.

Authenticated Requests via MQ

As MQ Requests aren't executed within the Context of a HTTP Request they don't have access to any HTTP Info like HTTP Cookies, Headers, FormData, etc. This also means the Users Session isn't typically available as it's based on the ss-id Session Ids in Cookies.

Embedding Auth Info in MQ Request DTO

The easiest way to publish Authenticated MQ Requests is using the same approach as Encrypted Messaging by embedding the Auth Info in the Request DTO by implementing IHasSessionId if using session-based Authentication or IHasBearerToken if using token-based Auth, e.g:

public class AuthSessionMq : IHasSessionId
{
    public string SessionId { get; set; }
    //...
}

public class AuthTokenMq : IHasBearerToken
{
    public string BearerToken { get; set; }
    //...
}

public class MqServices : Service
{
    public void Any(PublishSessionMq request)
    {
        var mqRequest = request.ConvertTo<AuthSessionMq>();
        mqRequest.SessionId = Request.GetSessionId();
        PublishMessage(mqRequest);
    }

    public void Any(PublishTokenMq request)
    {
        var mqRequest = request.ConvertTo<AuthTokenMq>();
        mqRequest.BearerToken = Request.GetJwtToken();
        PublishMessage(mqRequest);
    }
}

Overriding MQ Request Headers

Alternatively as ServiceStack also lets you specify a Users Session ids using HTTP Headers (with X- prefix), we can instead specify Session Ids using Request Headers. To do this we can create a custom Request Context that the MQ Service is executed within and set the SessionId in the Headers:

var req = new BasicRequest { Verb = HttpMethods.Post };
req.Headers["X-ss-id"] = sessionId;

Alternatively we can inject the Session itself. As a Users Session is just a AuthUserSession POCO persisted against the SessionId, we can access the Users Session from the CacheClient directly ourselves, e.g:

//i.e. urn:iauthsession:{sessionId}
var sessionKey = SessionFeature.GetSessionKey(sessionId); 
var usersSession  = HostContext.TryResolve<ICacheClient>()
    .Get<IAuthUserSession>(sessionKey);

And inject to use this Session with:

req.Items[Keywords.Session] = usersSession;

We can then Execute the MQ Service with this custom Request Context with:

return this.ExecuteMessage(m, req);

See the Session docs for more info about Sessions in ServiceStack

We'll walk-through a minimal example using this approach to make authenticated Requests and access the Users Session in a Service that's accessible via both HTTP and MQ transports.

To start we'll configure an Authentication-enabled ServiceStack MQ and HTTP Self-Hosted Application:

public class AppHost : AppSelfHostBase
{
    public override void Configure(Container container)
    {
        //Enable Authentication
        Plugins.Add(new AuthFeature(() => new AuthUserSession(), 
            new IAuthProvider[] { 
                // Enable Username/Password Authentication
                new CredentialsAuthProvider(AppSettings), 
            }));

        //Use an InMemory Repository to persist User Authenticaiton Info
        container.Register<IUserAuthRepository>(c => new InMemoryAuthRepository());

        //Create a new User on Startup
        var authRepo = container.Resolve<IUserAuthRepository>();
        authRepo.CreateUserAuth(new UserAuth {
            Id = 1,
            UserName = "mythz",
            FirstName = "John",
            LastName = "Doe",
            DisplayName = "John Doe",
        }, "p@55word");

        //Register to use a Rabbit MQ Server
        container.Register<IMessageService>(c => new RabbitMqServer());

        var mqServer = container.Resolve<IMessageService>();

        mqServer.RegisterHandler<AuthOnly>(m => {
            var req = new BasicRequest { Verb = HttpMethods.Post };
            req.Headers["X-ss-id"] = m.GetBody().SessionId;
            var response = ExecuteMessage(m, req);
            return response;
        });

        //Start the Rabbit MQ Server listening for incoming MQ Requests
        AfterInitCallbacks.Add(appHost => mqServer.Start());
    }
}

and now add the Service Implementation:

public class AuthOnly : IReturn<AuthOnlyResponse>
{
    public string Name { get; set; }
    public string SessionId { get; set; }
}

public class AuthOnlyResponse
{
    public string Result { get; set; }
}

public class AuthOnlyService : Service 
{
    [Authenticate] //Only allow Authenticated Requests
    public object Any(AuthOnly request)
    {
        // Get the User Session for this Request
        var session = base.SessionAs<AuthUserSession>();

        return new AuthOnlyResponse {
            Result = $"Hello, {request.Name}! Your UserName is {session.UserAuthName}";
        };
    }
}

With the ServiceStack Host and Service implemented, we can start listening to both HTTP and Rabbit MQ Requests by starting the AppHost:

new AppHost().Start("http://localhost:1337/");

Then on the Client we can authenticate using UserName/Password credentials using the HTTP Service Client, e.g:

var client = new JsonServiceClient("http://localhost:1337/");

var response = client.Post(new Authenticate {
    UserName = "mythz",
    Password = "p@55word",
    RememberMe = true,
});

var sessionId = response.SessionId;

The above Request establishes an authenticated Session with the JsonServiceClient instance which has its Cookies populated with the Users SessionIds - allowing it to make subsequent authenticated requests as that User.

To make Authenticated Requests via MQ we can pass the returned sessionId into the MQ Request:

var mqFactory = RabbitMqMessageFactory();
using (var mqClient = mqFactory.CreateMessageQueueClient())
{
    mqClient.Publish(new AuthOnly {                        
        Name = "RabbitMQ Request",
        SessionId = sessionId,
    });

    //Block until the Response is Received:
    var responseMsg = mqClient.Get<AuthOnlyResponse>(QueueNames<AuthOnlyResponse>.In);
    mqClient.Ack(responseMsg); //Acknowledge the message was received
    
    //"Hello, RabbitMQ Request! Your UserName is mythz"
    Console.WriteLine(responseMsg.GetBody().Result); 
}

This works since we're extracting the SessionId and injecting into a Custom Request Context used by the MQ Service:

mqServer.RegisterHandler<AuthOnly>(m => {
    var req = new BasicRequest { Verb = HttpMethods.Post };
    req.Headers["X-ss-id"] = m.GetBody().SessionId;
    var response = ExecuteMessage(m, req);
    return response;
});

Global Filters vs Action Filters

One important difference worth re-iterating is that Filter Attribute declared on the Service class are top-level filters executed during Global Request filters which are only executed for HTTP Requests:

[Authenticate] //Top-level Filter Attribute - only executed for HTTP Requests
public class MqAuthOnlyService : Service 
{
    public object Any(AuthOnly request)
    {
        var session = base.SessionAs<AuthUserSession>();
        return new AuthOnlyResponse {
            Result = $"Hello, {request.Name}! Your UserName is {session.UserAuthName}";
        };
    }
}

This will allow you to separate behavior for external HTTP Requests from internal MQ Requests.

To validate both HTTP and MQ Requests, place the [Authenticate] attribute on the method itself, e.g:

public class AuthOnlyService : Service 
{
    [Authenticate] //Authenticate both HTTP and MQ Requests
    public object Any(AuthOnly request)
    {
        var session = base.SessionAs<AuthUserSession>(); 
        return new AuthOnlyResponse {
            Result = $"Hello, {request.Name}! Your UserName is {session.UserAuthName}";
        };
    }
}

feature-mq

The feature-mq adds MQ support to your App, complete with UI and includes 2 different ways of calling MQ Services in ServiceStack, just like example-validation UI, feature-mq is another complete working example with both UI and Service implementation in the following files:

As Configure.Mq.cs? is optional it will only add it if doesn't already exist, so it will either use your existing MQ configuration or configure your App to use the In Memory BackgroundMqService implementation.

Add feature-mq to your project with:

x mix feature-mq

This will add the TestMq Service and make it available to your MQ endpoint. TestMq is a regular service that updates values in the App's registered ICacheClient and returns the Cache's current values as well as the internal Stats of the MQ:

public class MqServices : Service
{
    public IMessageService MqService { get; set; }

    public void Any(PublishMq request)
    {
        PublishMessage(request.ConvertTo<TestMq>());
    }

    public object Any(TestMq request)
    {
        if (!string.IsNullOrEmpty(request.Name))
            Cache.Set("mq.name", request.Name);
        
        if (request.Add > 0)
            Cache.Increment("mq.counter", (uint)request.Add);
        else if (request.Add < 0)
            Cache.Decrement("mq.counter", (uint)(request.Add * -1));

        return new TestMqResponse {
            Name = Cache.Get<string>("mq.name"),
            Counter = Cache.Get<long>("mq.counter"),
            StatsDescription = MqService.GetStatsDescription(),
        };
    }
}

The 2 ways to call a MQ Service is directly using the publish or sendOneWay APIs (available in all ServiceStack Service Clients) where it send the request to the Service's /oneway endpoint which will automatically publish it to the registered MQ.

Alternatively you can publish the Request DTO yourself from a different Service Implementation as done in PublishMq, as-is typical for Services that queues sending emails without blocking Service Execution.

The feature's UI allows you to publish TestMq messages using either approach:

client = new JsonServiceClient('/');

var oneway = document.querySelector('input[name=mq-publish]:checked').value === "OneWay";
if (oneway) {
    client.publish(new TestMq({ name: $txtName.value, add: parseInt(add) }))
} else {
    client.post(new PublishMq({ name: $txtName.value, add: parseInt(add) }))
}

Both approaches end with the same result with the TestMq Request DTO published and executed by the registered MQ Service as shown in the UI which is periodically updated with the current state in the Cache and the internal stats of the MQ Service showing how many messages it processed.