Microsoft Orleans: Managing Distributed State Without Losing Your Mind

Microsoft Orleans is a framework for building scalable, distributed applications using a virtual actor model that simplifies state management, concurrency, and fault tolerance. In this post, we’ll dive into Microsoft Orleans, exploring its core concepts such as Grains, Silos, and Cluster Membership. We’ll then bring these concepts to life by guiding you through building a functional chat application. The next sections are intended to provide both foundational knowledge and practical steps needed to leverage Orleans for building scalable distributed systems.
There is an accompanying GitHub repository that you can follow along with here: https://github.com/Rushdown-Studios/OrleansChatSample.
Core Concepts
Grains
Grains are virtual actors, meaning they always exist even when not in memory. Grains are different from traditional actors because they are defined not just by an ID, but also by their behavior and state (although state is optional). Every grain activation is unique in a cluster.
This definition of a Grain has implications for uniqueness within a cluster as two grains with different behavior could share the same ID.
For example, two distinct activations of ITruckGrain and ICarGrain, both with the ID of 14, could be running and executing code concurrently.
For communication, Grains follow the same behavior as traditional actors in that they communicate through asynchronous messaging. Because Grain code execution is managed by the Orleans runtime, each Grain is single-threaded and supports reentancy.
Anatomy of a Grain Interface
[GenerateSerializer]
[Alias("RushdownGrainInterfaces.GameSessionStats")]
public struct GameSessionStats
{
[Id(0)]
public uint Kills;
[Id(1)]
public uint Deaths;
}
[Alias("RushdownGrainInterfaces.IUserGrain")]
public interface IUserGrain : IGrainWithGuidKey
{
[Alias("Login")]
Task<string> Login();
[Alias("JoinGameSession")]
Task JoinGameSession(Guid sessionId);
[Alias("LeaveGameSession")]
Task LeaveGameSession(Guid sessionId, GameSessionStats gameSessionStats);
}
Your eyes are not deceiving you, there are a lot of annotations. These are to support source generation. To support Grain communication that may be running on disparate hardware, RPC's are being generated behind the scenes.
Most primitive data types and data structures are serializable by Orleans automatically. Implementing a more complicated object like the GameSessionStats
struct seen above requires the [GenerateSerializer]
annotation along with the [Id()]
associated with each field. The [Alias()]
annotations are optional, but highly recommended as the serialized field name can remain consistent even if the struct or class is renamed during a code refactor.
Implementing the Grain
internal class UserGrain : Grain, IUserGrain
{
public Task<string> Login()
{
// .. add your logic here
return Task.FromResult("<some-token-here>");
}
public Task JoinGameSession(Guid sessionId)
{
// .. add your logic here
return Task.CompletedTask;
}
public Task LeaveGameSession(Guid sessionId, GameSessionStats gameSessionStats)
{
// .. add your logic here
return Task.CompletedTask;
}
}
To implement the grain you simply inherit from the Grain
class defined in the Orleans SDK, and implement the interface previously defined. If you use Visual Studio as your IDE, it will prompt you to auto-generate the required methods for you.
Grains support C# generics. There are things to consider when doing this though. Two grains that implement ICarGrain<TDriver>
will both be different Grain behaviors (eg. CarGrain<David>
and CarGrain<Sally>
). They can share the same ID and execute code at the same time. In some systems this might not be an issue, however it might be undesirable behavior in others.
Grain methods are required to return a Task
or Task<T>
. This is because there is no guarantee that grain->grain communication or client->grain communication happens within the same process. It might, but remember that Orleans is for building distributed systems, and with that context design decisions like this start to make more sense.
Silos
A Silo is just a part of your .NET application that you configure at startup. It provides the Orleans runtime that hosts your Grains. Silos usually run in a cluster of two or more. They can also run standalone. Orleans is intended to scale elastically from one to hundreds of silos and back down. Silos coordinate with each other to distribute work, detect and recover from failures. The Orleans runtime enables Grains in the cluster to communicate with each other as if they were in a single process.
Activating a silo is as easy as configuring it at startup.
var builder = WebApplication.CreateBuilder(args);
builder.UseOrleans(siloBuilder =>
{
siloBuilder.UseDynamoDBClustering(options => { options.Service = dynamoDb; })
.AddDynamoDBGrainStorage("GrainState", options => { options.Service = dynamoDb; })
.ConfigureEndpoints(siloPort, gatewayPort, AddressFamily.InterNetwork);
});
//.. register services here
var app = builder.Build();
//.. configure app here
app.Run()
Then, inject the provided dependencies into your classes for use at runtime.
internal class WebSocketService(
ILocalSiloDetails localSiloDetails,
IClusterClient clusterClient,
ILogger<IWebSocketService> logger)
: IWebSocketService, IClientCallback
{
private readonly ILocalSiloDetails _localSiloDetails = localSiloDetails;
private readonly IClusterClient _clusterClient = clusterClient;
private readonly ConcurrentDictionary<Guid, WebSocket> _connections = new();
private readonly ILogger<IWebSocketService> _logger = logger;
// .. truncated for brevity
}
Cluster And Membership
At the heart of Orleans clustering and membership is the IMembershipTable
. This is an abstraction that can sit on top of an in-memory data structure, a collection in MongoDB, a row in some SQL table, or just about any other durable store. You can even implement IMembershipTable
with a Redis or Kubernetes backing. It's very flexible.
Orleans already has out-of-the-box support for many popular databases, but implementing one is not very complex. There are many examples in the Orleans GitHub repo.
public interface IMembershipTable
{
Task InitializeMembershipTable(bool tryInitTableVersion);
Task DeleteMembershipTableEntries(string clusterId);
Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate);
Task<MembershipTableData> ReadRow(SiloAddress key);
Task<MembershipTableData> ReadAll();
Task<bool> InsertRow(MembershipEntry entry, TableVersion tableVersion);
Task<bool> UpdateRow(MembershipEntry entry, string etag, TableVersion tableVersion);
Task UpdateIAmAlive(MembershipEntry entry);
}
Cluster Membership Protocol
Below is a simplified version of the protocol, but it gives you an idea of what happens when a Silo starts up.
- Adds itself to the membership table (InsertRow)
- Begin monitoring a subset of other Silos via probes (heartbeats)
- If Silo
S
does not getY
probe replies from a monitored serverP
, it suspects it by writing its timestamp suspicion into P’s row in the IMembershipTable - If
P
has more thanZ
suspicions withinK
seconds,P
is marked as dead - If
P
is still operational, it will see that it has been marked dead and will shut itself down.
Below is an example of what a membership table in a NoSQL database might look like.
Lets Build A Chat System
What are our goals?
- Design and implement a Chat System using Microsoft Orleans.
- Client-Server communication will be done via WebSockets.
Considerations
Why Websockets?
They introduce state into a process and highlight the strengths of the Orleans framework. A real world implementation should probably use something with less overhead like HTTP/3 QUIC or another RUDP protocol.
All asynchronous operations will be blocking in this design (i.e. using Async/Await). However, in a real system we would want to utilize a durable store like queues for backpressure and persistence allowing for a more event driven architecture.
What is a common approach to scaling stateful applications?
Usual distributed state management would require external services such as Redis which can add complexity when scaling (requiring extra infra, compute resources, and replication).
What will our approach look like?
Our implementation will not require an external service for this. It technically does not require a database either as a Cluster can manage an in-memory membership snapshot. However, our system will still use one.
Now lets architect our cluster
- A Player will need to know which Silo is hosting its stateful websocket connection. Thankfully, in Orleans, Silos can serialize references to themselves and other Silos.
- A Player will need to track which Channel it is currently using.
- A Channel will need to map a Player to the Silo hosting its websocket connection.
- A Silo will need to be notified when to broadcast a message and which connections to broadcast to.
Silo Startup
Connecting With A Client
Client Sends A Message To A Channel
So now we have an idea of what we need to implement
- IPlayerGrain
- IChannelGrain
- IPublisherGrain
- IClientCallback
- IWebSocketService
Project Setup
There are several nuget dependencies that are required by our project(s). It is recommended that you split this application into 3 projects:
- The main ASPNETCORE application (RushdownRPC)
- Grains (RushdownGrains)
- Grain Interfaces (RushdownGrainInterfaces)
RushdownRPC
Microsoft.Orleans.Server
is required for an application to host a silo.
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>net9.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
<ServerGarbageCollection>true</ServerGarbageCollection>
<ConcurrentGarbageCollection>true</ConcurrentGarbageCollection>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Orleans.Clustering.DynamoDB" Version="8.2.0" />
<PackageReference Include="Microsoft.Orleans.Persistence.DynamoDB" Version="8.2.0" />
<PackageReference Include="Microsoft.Orleans.Server" Version="8.2.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.21.0" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="7.0.0" />
<PackageReference Include="System.Text.Json" Version="9.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\RushdownGrainInterfaces\RushdownGrainInterfaces.csproj" />
<ProjectReference Include="..\RushdownGrains\RushdownGrains.csproj" />
</ItemGroup>
</Project>
Grains
Microsoft.Orleans.Runtime
will be required because one of our grains will require persistent state.
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net9.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Orleans.Runtime" Version="8.2.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\RushdownGrainInterfaces\RushdownGrainInterfaces.csproj" />
</ItemGroup>
</Project>
Grain Interfaces
Defining Grain interfaces requires Microsoft.NET.Sdk
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net9.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Orleans.Sdk" Version="8.2.0" />
<PackageReference Include="System.Text.Json" Version="9.0.0" />
</ItemGroup>
</Project>
IPlayerGrain
Every player action will route their IPlayerGrain.
NOTE: it is keyed off of a GUID. The value of which will be the players ID.
[Alias("RushdownGrainInterfaces.IPlayerGrain")]
public interface IPlayerGrain : IGrainWithGuidKey
{
[Alias("SendMessage")]
Task SendMessage(string message);
[Alias("JoinChannel")]
Task JoinChannel(string channelId, SiloAddress siloAddress);
[Alias("LeaveChannel")]
Task LeaveChannel(SiloAddress siloAddress);
}
In this design, a player can only be in one channel at a time and that player's Grain will cache it in-memory. As you can see in this definition the interface IGrainFactory
can be provided at runtime via dependency injection. This object allows a Grain to get create references for other Grains and then use that to invoke methods using that reference. In the method IPlayerGrain.JoinChannel
, the IGrainFactory
is being used to create a reference to a Channel and then invoke the IChannelGrain.JoinChannel
method.
NOTE: this.GetPrimaryKey()
returns the Grain Key, in this case it is a GUID that represents the players ID.
internal class PlayerGrain(IGrainFactory grainFactory) : Grain, IPlayerGrain
{
private readonly IGrainFactory _grainFactory = grainFactory;
private string _currentChannel = string.Empty;
public async Task JoinChannel(string channelId, SiloAddress siloAddress)
{
if (string.IsNullOrWhiteSpace(channelId)) return;
var channelGrain = _grainFactory.GetGrain<IChannelGrain>(channelId);
await channelGrain.JoinChannel(this.GetPrimaryKey(), siloAddress);
_currentChannel = channelId;
}
public async Task LeaveChannel(SiloAddress siloAddress)
{
if (string.IsNullOrWhiteSpace(_currentChannel)) return;
var channelGrain = _grainFactory.GetGrain<IChannelGrain>(_currentChannel);
await channelGrain.LeaveChannel(this.GetPrimaryKey(), siloAddress);
_currentChannel = string.Empty;
}
public async Task SendMessage(string message)
{
if (string.IsNullOrWhiteSpace(_currentChannel) || string.IsNullOrWhiteSpace(message)) return;
var channelGrain = _grainFactory.GetGrain<IChannelGrain>(_currentChannel);
await channelGrain.SendMessage(new ChatMessage(this.GetPrimaryKey(), message));
}
}
IChannelGrain
The IChannelGrain
manages the state of a channel. It is intended to be invoked by an IPlayerGrain
directly.
[GenerateSerializer]
[Alias("RushdownGrainInterfaces.ChatMessage")]
public readonly struct ChatMessage(Guid senderId, string message)
{
[Id(0)]
[JsonProperty("sender")]
public readonly Guid SenderId = senderId;
[Id(1)]
[JsonProperty("message")]
public readonly string Message = message;
}
[Alias("RushdownGrainInterfaces.IChannelGrain")]
public interface IChannelGrain : IGrainWithStringKey
{
[Alias("JoinChannel")]
Task JoinChannel(Guid UserId, SiloAddress siloAddress);
[Alias("LeaveChannel")]
Task LeaveChannel(Guid userId, SiloAddress siloAddress);
[Alias("SendMessage")]
Task SendMessage(ChatMessage message);
}
The ChannelGrain
maintains a mapping of silo reference to a set of userIds. If a particular userId is mapped to a silo reference, that users websocket connection is hosted on the same process as that silo.
Example silo_1234 => [user1234, user3456]
NOTE: Performance of SendMessage could be improved with Parallel.ForEach
, but for readability and simplicity we will simply use ForEach
.
internal class ChannelGrain(IGrainFactory grainFactory) : Grain, IChannelGrain
{
private readonly IGrainFactory _grainFactory = grainFactory;
private readonly Dictionary<SiloAddress, HashSet<Guid>> _memberSiloMap = [];
public Task JoinChannel(Guid userId, SiloAddress siloAddress)
{
if (userId == Guid.Empty) return Task.CompletedTask;
if (_memberSiloMap.TryGetValue(siloAddress, out var members))
{
members.Add(userId);
}
else
{
_memberSiloMap.Add(siloAddress, [userId]);
}
return Task.CompletedTask;
}
public Task LeaveChannel(Guid userId, SiloAddress siloAddress)
{
if (userId == Guid.Empty) return Task.CompletedTask;
if (_memberSiloMap.TryGetValue(siloAddress, out var members))
{
members.Remove(userId);
}
return Task.CompletedTask;
}
public async Task SendMessage(ChatMessage message)
{
foreach (var memberSilo in _memberSiloMap)
{
var publisherGrain = _grainFactory.GetGrain<IPublisherGrain>(memberSilo.Key.ToParsableString());
await publisherGrain.PublishMessage([.. memberSilo.Value], message);
}
}
}
IPublisherGrain (And IClientCallback Declaration)
The IPublisherGrain
is 1:1 with a Silo. A Silo will subscribe to their IPublisherGrain
on startup. This Grain is intended to be keyed on the Silos address. Notice the PublisherGrainState
definition. More on that in the next paragraph.
[Alias("RushdownGrainInterfaces.IClientCallback")]
public interface IClientCallback : IGrainObserver
{
[Alias("PublishMessage")]
Task PublishMessage(Guid[] userIds, ChatMessage message);
}
[GenerateSerializer]
[Alias("RushdownGrainInterfaces.PublisherGrainState")]
public class PublisherGrainState
{
[Id(0)]
public IClientCallback? Subscriber { get; set; } = null;
}
[Alias("RushdownGrainInterfaces.ISiloBroadcastGrain")]
public interface IPublisherGrain : IGrainWithStringKey
{
[Alias("Subscribe")]
Task Subscribe(IClientCallback observer);
[Alias("PublishMessage")]
Task PublishMessage(Guid[] userIds, ChatMessage message);
}
NOTE: PublisherGrain
has persistent state!
Each PublisherGrain
is responsible for invoking the IClientCallback.PublishMesage
for its respective Silo. Notice that in the Subscribe
method, after the observer (IWebSocketService/IClientCallback) is registered as the sole Subscriber, it writes the state to the backing datastore immediately. This is not always required as a grain will write its state before deactivation. In this case, however, if a Silos were to crash and the IPublisherGrain
would lose it's in-memory subsriber. This would result in a totally broken state, and the only way to recover would be by killing and restarting the process.
More on the Subscriber in the next section!
internal class PublisherGrain(
[PersistentState("Publisher", "GrainState")] IPersistentState<PublisherGrainState> persistentState)
: Grain, IPublisherGrain
{
private readonly IPersistentState<PublisherGrainState> _persistentState = persistentState;
public async Task Subscribe(IClientCallback observer)
{
_persistentState.State.Subscriber = observer;
await _persistentState.WriteStateAsync();
}
public async Task PublishMessage(Guid[] userIds, ChatMessage message)
{
if (_persistentState.State.Subscriber != null)
{
await _persistentState.State.Subscriber.PublishMessage(userIds, message);
}
}
}
{
"$id": "1",
"$type": "RushdownGrainInterfaces.PublisherGrainState, RushdownGrainInterfaces",
"Subscriber": {
"Id": {
"Type": "sys.client",
"Key": "hosted-172.19.0.4:11111@100186465+c931280fe36a4f8fa6de7177f2ae7bdc"
},
"Interface": "RushdownGrainInterfaces.IClientCallback"
}
}
IWebSocketService And IClientCallback
IWebSocketService
is the interface that will be used by the websocket server to manage websocket state and manage input/output.
public readonly struct CommandType
{
public const string Channel = "channel";
public const string ChatMessage = "chat_message";
}
public struct Command
{
[JsonProperty("type")]
public string Type;
[JsonProperty("data")]
public string Data;
}
/// <summary>
/// Interface for the internal service that manages websocket connections and lifetime
/// </summary>
public interface IWebSocketService
{
/// <summary>
/// Initialize the web socket service
/// </summary>
/// <returns></returns>
Task Init();
/// <summary>
/// Add a new websocket connection
/// </summary>
/// <param name="playerId"></param>
/// <param name="webSocket"></param>
/// <param name="cancellationTokenSource"></param>
/// <returns></returns>
Task AddConnection(Guid playerId, WebSocket webSocket, CancellationToken cancellationToken = default);
/// <summary>
/// Start an async task to receive websocket messages
/// </summary>
/// <param name="playerId"></param>
/// <param name="webSocket"></param>
/// <param name="cancellationTokenSource"></param>
/// <returns></returns>
Task ReceiveLoopAsync(Guid playerId, WebSocket webSocket, CancellationToken cancellationToken = default);
}
WebSocketService
has three main responsibilities:
- Maintain websocket state for all connections to a silo's host application.
- React to websocket connection input/output.
- Act as the IGrainObserver (IClientCallback) for reacting to
IPublisherGrain
events.
WebSocketService
also implements our previously defined IClientCallback
.
A lot of what this class does is not very relevant to what we are using Orleans for. The important thing to understand is that this calls our IPlayerGrain
methods when the player sends a command/message and it owns the callback for broadcasting messages to player's websocket connections that is invoked by the Silo's IPublisherGrain
internal class WebSocketService(
ILocalSiloDetails localSiloDetails,
IClusterClient clusterClient,
ILogger<IWebSocketService> logger)
: IWebSocketService, IClientCallback
{
private readonly ILocalSiloDetails _localSiloDetails = localSiloDetails;
private readonly IClusterClient _clusterClient = clusterClient;
private readonly ConcurrentDictionary<Guid, WebSocket> _connections = new();
private readonly ILogger<IWebSocketService> _logger = logger;
private const string _defaultChannelId = "global";
private const int _bufferSizeKb = 1024 * 4;
public async Task Init()
{
var publisherGrain = _clusterClient.GetGrain<IPublisherGrain>(_localSiloDetails.SiloAddress.ToParsableString());
var observerReference = _clusterClient.CreateObjectReference<IClientCallback>(this);
await publisherGrain.Subscribe(observerReference);
}
public async Task AddConnection(Guid playerId, WebSocket webSocket, CancellationToken cancellationToken = default)
{
await AddOrUpdateWebSocket(playerId, webSocket);
var playerGrain = _clusterClient.GetGrain<IPlayerGrain>(playerId);
await playerGrain.JoinChannel(_defaultChannelId, _localSiloDetails.SiloAddress);
}
private async Task RemoveConnection(Guid playerId)
{
if (_connections.TryRemove(playerId, out var _))
{
var playerGrain = _clusterClient.GetGrain<IPlayerGrain>(playerId);
await playerGrain.LeaveChannel(_localSiloDetails.SiloAddress);
}
}
public async Task ReceiveLoopAsync(Guid playerId, WebSocket webSocket, CancellationToken cancellationToken = default)
{
try
{
_logger.LogInformation("User connected {userId}", playerId.ToString());
var playerGrain = _clusterClient.GetGrain<IPlayerGrain>(playerId);
var buffer = new byte[_bufferSizeKb];
var receiveResult = await webSocket.ReceiveAsync(
buffer: new ArraySegment<byte>(buffer),
cancellationToken: CancellationToken.None);
while (!receiveResult.CloseStatus.HasValue)
{
if (receiveResult.MessageType == WebSocketMessageType.Text)
{
string message = Encoding.UTF8.GetString(buffer, 0, receiveResult.Count);
var command = JsonConvert.DeserializeObject<Command>(message);
if (command.Type == CommandType.Channel)
{
await playerGrain.LeaveChannel(_localSiloDetails.SiloAddress);
await playerGrain.JoinChannel(command.Data, _localSiloDetails.SiloAddress);
}
else if (command.Type == CommandType.ChatMessage)
{
await playerGrain.SendMessage(command.Data);
}
else
{
_logger.LogError("Invalid command type = {commandType}", command.Type);
}
}
receiveResult = await webSocket.ReceiveAsync(
buffer: new ArraySegment<byte>(buffer),
cancellationToken: CancellationToken.None);
}
await webSocket.CloseAsync(
closeStatus: receiveResult.CloseStatus.Value,
statusDescription: receiveResult.CloseStatusDescription,
cancellationToken: CancellationToken.None);
}
catch (WebSocketException)
{
// The remote party closed the WebSocket connection without completing the close handshake
}
catch (Exception ex)
{
_logger.LogError("{ex}", ex);
}
finally
{
_logger.LogInformation("User disconnected {userId}", playerId.ToString());
await RemoveConnection(playerId);
}
}
public async Task PublishMessage(Guid[] playerIds, ChatMessage message)
{
var semaphore = new SemaphoreSlim(10);
var messageText = JsonConvert.SerializeObject(message);
var messageBytes = Encoding.UTF8.GetBytes(messageText);
var tasks = playerIds.Select(SendMessageAsync).ToList();
await Task.WhenAll(tasks);
async Task SendMessageAsync(Guid playerId)
{
if (!_connections.TryGetValue(playerId, out var webSocket))
{
return;
}
await semaphore.WaitAsync();
try
{
await webSocket.SendAsync(
buffer: new ArraySegment<byte>(messageBytes),
messageType: WebSocketMessageType.Text,
endOfMessage: true,
cancellationToken: CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogError("Failed to send message to {playerId}: {exMessage}", playerId, ex.Message);
}
finally
{
semaphore.Release();
}
}
}
private async Task AddOrUpdateWebSocket(Guid playerIds, WebSocket newWebSocket)
{
if (_connections.TryGetValue(playerIds, out var existingWebSocket))
{
if (existingWebSocket.State == WebSocketState.Open)
{
await existingWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Replaced by new connection", CancellationToken.None);
}
}
_connections[playerIds] = newWebSocket;
}
}
Other
Websocket Endpoint
An example of how the IWebSocketService
is used by the websocket connection endpoint.
public class WebSocketController(IWebSocketService webSocketService) : ControllerBase
{
private readonly IWebSocketService _webSocketService = webSocketService;
[Route("/ws")]
public async Task WebSocket([FromQuery] Guid userId)
{
if (HttpContext.WebSockets.IsWebSocketRequest)
{
using var cts = new CancellationTokenSource();
using var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
await _webSocketService.AddConnection(userId, webSocket);
await _webSocketService.ReceiveLoopAsync(userId, webSocket);
}
else
{
HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
}
}
}
WebsocketService Startup Utility (Call Init)
This is used at startup to register an IClientCallback
to a Silo's IPublisherGrain
.
internal class SiloSetupWorker(IWebSocketService webSocketService) : IHostedService
{
private readonly IWebSocketService _webSocketService = webSocketService;
public async Task StartAsync(CancellationToken cancellationToken) => await _webSocketService.Init();
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
Registering the Services in Program.cs
Not alot to say here. There are examples of setting up an orleans silo and registering some services for DI.
The line builder.Services.AddHostedService<SiloSetupWorker>();
is important to note, this is the service that subsribes this Silo's IClientCallback
to its IPublisherGrain
.
var siloPort = int.Parse(Environment.GetEnvironmentVariable(Names.SiloPort)
?? throw new Exception($"{Names.SiloPort} missing!"));
var gatewayPort = int.Parse(Environment.GetEnvironmentVariable(Names.GatewayPort)
?? throw new Exception($"{Names.GatewayPort} missing!"));
var dynamoDb = Environment.GetEnvironmentVariable(Names.DynamoDbConnectionString)
?? throw new Exception($"{Names.DynamoDbConnectionString} missing!");
var builder = WebApplication.CreateBuilder(args);
builder.UseOrleans(siloBuilder =>
{
siloBuilder.UseDynamoDBClustering(options => { options.Service = dynamoDb; })
.AddDynamoDBGrainStorage("GrainState", options => { options.Service = dynamoDb; })
.ConfigureEndpoints(siloPort, gatewayPort, AddressFamily.InterNetwork);
});
builder.Logging.AddSimpleConsole(options =>
{
options.IncludeScopes = false;
options.SingleLine = true;
});
builder.Services.AddControllers();
builder.Services.AddSingleton<IWebSocketService, WebSocketService>();
builder.Services.AddHostedService<SiloSetupWorker>();
var app = builder.Build();
app.UseWebSockets(new WebSocketOptions { KeepAliveInterval = TimeSpan.FromMinutes(2) });
app.MapControllers();
app.UseRouting();
app.Run();
public partial class Program;