CQRS & Event Sourcing in Orleans
Command Query Responsibility Segregation (CQRS) is what I would call the Heart of the Captain Planet formation of architectural patterns, with Domain Driven Design and Event Sourcing being popular members. Individually, these patterns are powerful tools, but only together can they solve platform level problems. If you haven't familiarized yourself with these patterns individually, I highly recommend the following resources, as this post will not delve into their specifics. Rather, my intention is focusing on how one might realize these in the context of Microsoft Orleans.
These references cover a few providers, each one containing a wealth of information on all the topics discussed. It's important to have perspective, so be sure to read them all as well as differing opinions.
- An Introduction to Domain Driven Design - Microsoft
- CQRS - Martin Fowler
- Event Sourcing - microservices.io
So what are we trying to accomplish, and how will Orleans help us realize that vision?
- We want to separate Commands (enacting a change in data) from Reads (viewing the result of those changes).
- We want changes to propagate changes so we can react to them
- And we want our core model to use those events to maintain state and change history
- Finally, we'll design our model in a way that aligns with business processes, in a DDD approach.
Orleans provides a natural fit for these patterns because Grains are an excellent abstraction of an Aggregate Root. They have an ID, a Type, and can maintain State. We can design them to process commands or events, depending on whether they represent the Aggregate itself or some downstream View. And because of the runtime, we can host millions of these Aggregates and not worry so much about race conditions related to processing Commands.
Because Grains don't really come with a built-in mechanism to manage an Event Source Log other than IPersistentState
, we're going to need to build our own. This Event Log will need to maintain the state of the model, persist a historical record of events, and maintain snapshots (which may truncate the log). Additionally, this Event Log will be able to maintain an in-memory state that can drift from the persisted state. This is primarily for performance reasons, but can be also be used to relieve write pressure from storage (which may also lower costs).
A common thread in this post will be TView
and TEntry
. The former represents the model we're storing. The latter represents the events we're logging.
Please also note this article cuts out code in the name of brevity. The full source code is linked to at the end of the post.
public interface IEventLog<TView, TEntry>
where TView : class, new()
where TEntry : class
{
Task Hydrate();
void Submit(TEntry entry);
void Submit(IEnumerable<TEntry> entries);
Task Snapshot(bool truncate);
Task WaitForConfirmation();
TView TentativeView { get; }
TView ConfirmedView { get; }
int ConfirmedVersion { get; }
int TentativeVersion { get; }
}
We'll need a way to build a concrete version of this interface, so we'll use the factory pattern.
public interface IEventLogFactory
{
IEventLog<TView, TEntry> Create<TView, TEntry> (Type grainType, string viewId)
where TView : class, new()
where TEntry : class;
}
To handle snapshotting, we need a way to inject a decider on whether to snapshot, and whether that snapshot should also truncate the log. In some cases, a snapshot may just be a shortcut to hydrating state, but in others it is a way to clean up the event log to keep maximum performance.
public interface ISnapshotStrategy<TView>
where TView : class, new()
{
Task<(bool shouldSnapshot, bool shouldTruncate)> ShouldSnapshot(TView currentState, int version);
}
public interface ISnapshotStrategyFactory
{
ISnapshotStrategy<TView> Create<TView>(Type grainType, string viewId)
where TView : class, new();
}
Now that we have the necessary mechanisms, we can start to build our Grain base class. This class will be inherited by end-developers looking to get access to Event Sourcing for their Grain. I like this approach because it continues the encapsulation of model whereby Grains manage their data themselves.
public abstract class EventSourcedGrain<TGrainState, TEventBase>
: Grain, ILifecycleParticipant<IGrainLifecycle>
where TGrainState : class, new()
where TEventBase : class
{
private IDisposable? _saveTimer;
private bool _saveOnRaise = false;
private ISnapshotStrategy<TGrainState>? _snapshotStrategy;
private IEventLog<TGrainState, TEventBase> _eventLog = null!;
}
The first thing this Grain needs to do is subscribe to lifecycle events so that state can be loaded and persisted correctly. We use OnSetup
to grab references to our Factory classes and build our event log.
public virtual void Participate(IGrainLifecycle lifecycle)
{
lifecycle.Subscribe<EventSourcedGrain<TGrainState, TEventBase>>(
GrainLifecycleStage.SetupState,
OnSetup,
OnTearDown
);
lifecycle.Subscribe<EventSourcedGrain<TGrainState, TEventBase>>(
GrainLifecycleStage.Activate - 1,
OnHydrateState,
OnDestroyState
);
}
private Task OnSetup(CancellationToken token)
{
if (token.IsCancellationRequested)
{
return Task.CompletedTask;
}
// grab the event log factory and build the log service
var factory = ServiceProvider.GetRequiredService<IEventLogFactory>();
_eventLog = factory.Create<TGrainState, TEventBase>(GetType(), this.GetGrainId().ToString());
// attempt to grab a snapshot strategy
var snapshotFactory = ServiceProvider.GetService<ISnapshotStrategyFactory>();
if (snapshotFactory != null)
{
_snapshotStrategy = snapshotFactory.Create<TGrainState>(
GetType(),
this.GetGrainId().ToString()
);
}
return Task.CompletedTask;
}
We'll use OnHydrateState
and OnDestroyState
to, as you might have guessed, load state from storage and subsequently save it. Note that with OnHydrateState
we will also initialize our save timer if the Grain class has a PersistTimer
attribute.
private async Task OnHydrateState(CancellationToken token)
{
if (token.IsCancellationRequested)
{
return;
}
await _eventLog.Hydrate();
var timer = GetType().GetCustomAttribute<PersistTimerAttribute>()?.Time ??
PersistTimerAttribute.DefaultTime;
if (timer.Equals(TimeSpan.Zero))
{
_saveOnRaise = true;
}
else {
_saveTimer = this.RegisterGrainTimer(OnSaveTimerTicked, new object(), timer, timer);
}
}
private async Task OnDestroyState(CancellationToken token)
{
if (token.IsCancellationRequested)
{
return;
}
await _eventLog.WaitForConfirmation();
if (_saveTimer is not null)
{
_saveTimer.Dispose();
_saveTimer = null;
}
}
Our Event Log does the bulk of the work, allowing our Grain class to stay slim and independent of any implementation details of dealing with the log management. As a result, we'll provide some pass-through methods that call into the IEventLog
service.
Raising events will add them to the tentative model and persist them [if no timer exists]. In the case we have a timer or are otherwise delayed in persisting the event log, a WaitForConfirmation
call will immediately persist the event log to storage. Finally, calling Snapshot will save off a snapshot view of the model and truncate the log if necessary.
protected virtual Task Raise(TEventBase @event)
{
_eventLog.Submit(@event);
return _saveOnRaise ? _eventLog.WaitForConfirmation() : Task.CompletedTask;
}
protected virtual Task Raise(IEnumerable<TEventBase> events)
{
_eventLog.Submit(events);
return _saveOnRaise ? _eventLog.WaitForConfirmation() : Task.CompletedTask;
}
protected Task WaitForConfirmation()
{
return _eventLog.WaitForConfirmation();
}
protected Task Snapshot(bool truncate)
{
return _eventLog.Snapshot(truncate);
}
Now that we have the Grain, we need an implementation of the IEventLog
interface. We'll use MongoDB in this post as it's a favorite of mine, but you can choose any underlying provider you wish.
Much of the magic in my CQRS & Event Sourcing implementation comes from a nifty little trick I learned from reading Orleans source code. Casting an object to dynamic and calling a method that must exist. That is, if the target method didn't exist, then the implementation would be incomplete and the app would have a bug.
private void ApplyTentative(TEvent @event)
{
dynamic e = @event;
dynamic s = _tentativeState;
s.Apply(e);
}
To hydrate the log instance from storage we will use the Grain ID and look for a snapshot. This will allow us to "skip ahead" in the event log, but if there isn't one, that's okay. We load all the events we need and apply them to the tentative state. We then deep copy this state over to the confirmed state and apply the trailing events.
public async Task Hydrate()
{
var snapshotDb = _client.GetDatabase(SnapshotDatabaseName);
var snapshotCol = snapshotDb.GetCollection<Snapshot<TView>>(_settings.GrainType);
var db = _client.GetDatabase(EventDatabaseName);
var col = db.GetCollection<EventLogEntry<byte[]>>(_settings.GrainType);
// Look for a snapshot. If it exists, we load that directly
// If there is a snapshot, we need to do a delta from snapshot version to log tail
var snapshot = (await snapshotCol.FindAsync(m => m.GrainId == _settings.GrainId)).FirstOrDefault();
if (snapshot is not null)
{
_tentativeState = snapshot.View;
_state = DeepCopy(_tentativeState);
_confirmedVersion = _tentativeVersion = snapshot.Version;
}
_events = col.AsQueryable()
.Where(m => m.GrainId == _settings.GrainId && m.Version > _confirmedVersion)
.OrderBy(m => m.Version);
// loop through the events to hydrate the state
// these should be in a sequence ordered by version...
foreach (var ev in _events)
{
TEvent @event = _eventSerializer.Deserialize<TEvent>(ev.Data);
ApplyTentative(@event);
ApplyConfirmed(@event);
_tentativeVersion = _confirmedVersion = ev.Version;
}
}
Snapshotting is a straight forward process of serializing out the view to the persistence with version information and deleting log entries that lead up to that version, if the truncate flag is set.
public async Task Snapshot(bool truncate)
{
var db = _client.GetDatabase(EventDatabaseName);
var col = db.GetCollection<Snapshot<TView>>(_settings.GrainType);
await col.FindOneAndReplaceAsync<Snapshot<TView>>(
m => m.GrainId == _settings.GrainId,
new Snapshot<TView>
{
Id = Guid.NewGuid(),
GrainId = _settings.GrainId,
View = ConfirmedView,
Version = ConfirmedVersion
},
new FindOneAndReplaceOptions<Snapshot<TView>> { IsUpsert = true }
);
if (truncate)
{
var logDb = _client.GetDatabase(SnapshotDatabaseName);
var logCol = db.GetCollection<EventLogEntry<byte[]>>(_settings.GrainType);
// TODO: Validate if it's that easy...
await logCol.DeleteManyAsync(m => m.GrainId == _settings.GrainId && m.Version < ConfirmedVersion);
}
}
Lastly, to save events to storage, we simply serialize the data and send it out to storage. To create the lagging state system whereby timed saves occur out-of-band with event raising, we simply maintain an internal queue of events to save. There's probably some optimization here to submit multiple events at once and cut down on storage request or IO units, but for demo purposes it does well.
private async Task SaveQueueToStorage()
{
var db = _client.GetDatabase(EventDatabaseName);
var col = db.GetCollection<EventLogEntry<byte[]>>(_settings.GrainType);
while (_eventQueue.Count > 0)
{
var eventRecord = _eventQueue.Dequeue();
await col.InsertOneAsync(eventRecord);
// apply to the confirmed version
var @event = _eventSerializer.Deserialize<TEvent>(eventRecord.Data);
ApplyConfirmed(@event);
// this is fishy....
_confirmedVersion = Math.Max(eventRecord.Version, _confirmedVersion);
}
}
public Task WaitForConfirmation()
{
return SaveQueueToStorage();
}
I use this mechanism in Reach to maintain state for all aggregate roots. For instance, the way in which a Component is managed is by passing commands to the ComponentGrain
which performs validation and raises the matching event. The following example demonstrates renaming a Component. Note that while we persist the event to storage [eventually], the event is propagated to anyone who wants to know about it. Internally, the system maintains an in-memory version of the domain model which may be used by the ComponentGrain
itself or by a second grain calling this grain and requesting it.
public sealed class ComponentGrain : StreamingEventSourcedGrain<Component, BaseComponentEvent>,
IComponentGrain
{
public async Task<CommandResponse> Rename(RenameComponentCommand command)
{
await Raise(new ComponentRenamedEvent(command.AggregateId, command.OrganizationId, command.HubId)
{
Name = command.Name,
Slug = command.Slug
});
return CommandResponse.Success();
}
}
To support separating the read from the writes, a second grain, named ComponentViewGrain
, listens for events from a matching ComponentGrain
and updates a view in mongodb.
[ImplicitStreamSubscription(GrainConstants.Component_EventStream)]
public class ComponentViewGrain : SubscribedViewGrain<BaseComponentEvent>, IComponentViewGrain
{
public async Task Handle(ComponentRenamedEvent @event)
{
var result = await _componentViewReadRepository.Get(
@event.OrganizationId,
@event.HubId,
@event.AggregateId
);
result.Name = @event.Name;
result.Slug = @event.Slug;
await _componentViewWriteRepository.Upsert(result);
}
}
As updates are made via commands in the ComponentGrain
, these changes are propagated via events to any number of view stores and business workflows. We can then query those systems and scale them separately from the system which supports changing the data.
In the event we restart the Orleans silo, or in the case the Grain is migrated or otherwise deactivated and reactivated, it will automatically hydrate the internal domain model based on the event log and latest available snapshot. After which, we simply continue sending it Commands to process.
The source code that supports this article is currently available at https://github.com/jsedlak/petl. Please note that this library is being replaced by Diesel.Orleans.EventSourcing.