[BUG] Race condition in Bookmark Management
Description
An activity creates a bookmark to be suspended and later resumed. The activity then publishes an event. A service waits for incoming events that are potential responses to previously published events. If an event arrives too early, the service will not find the bookmark that has already been created. As a result, the activity is never resumed.
Steps to Reproduce
-
Detailed Steps:
- The activity creates a bookmark (line 97).
- The activity asynchronously publishes an event (line 98).
- The activity is suspended to be resumed later.
- The workflow service receives the published event (line 135).
- The workflow service retrieves all bookmarks (line 137 ff.).
- The workflow service filters the list of bookmarks for the bookmark created by the activity (line 141).
- The workflow service creates a DispatchWorkflowInstanceRequest for each filtered bookmark (line 142).
- The workflow service dispatches all instances of DispatchWorkflowInstanceRequest.
If the workflow service retrieves all bookmarks immediately after receiving the incoming event, no bookmarks are found. If the workflow service calls
Thread.Sleep(2000);(line 136) after receiving the incoming event, the previously created bookmark is found. -
Code Snippets:
namespace WebApplication8;
using Elsa.EntityFrameworkCore.Modules.Management;
using Elsa.EntityFrameworkCore.Modules.Runtime;
using Elsa.Extensions;
using Elsa.Workflows;
using Elsa.Workflows.Runtime.Contracts;
using Elsa.Workflows.Runtime.Entities;
using Elsa.Workflows.Runtime.Filters;
using Elsa.Workflows.Runtime.Requests;
using Microsoft.Extensions.DependencyInjection;
using System.Collections.Concurrent;
using System.Threading;
public class Program
{
public static void Main(string[] args)
{
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddElsa(elsa =>
{
// Configure Management layer to use EF Core.
elsa.UseWorkflowManagement(management => management.UseEntityFrameworkCore());
// Configure Runtime layer to use EF Core.
elsa.UseWorkflowRuntime(runtime => runtime.UseEntityFrameworkCore());
// Default Identity features for authentication/authorization.
elsa.UseIdentity(identity =>
{
identity.TokenOptions = options => options.SigningKey = "sufficiently-large-secret-signing-key"; // This key needs to be at least 256 bits long.
identity.UseAdminUserProvider();
});
// Configure ASP.NET authentication/authorization.
elsa.UseDefaultAuthentication(auth => auth.UseAdminApiKey());
// Expose Elsa API endpoints.
elsa.UseWorkflowsApi();
// Setup a SignalR hub for real-time updates from the server.
elsa.UseRealTimeWorkflows();
// Enable HTTP activities.
elsa.UseHttp();
// Register custom activities from the application, if any.
elsa.AddActivitiesFrom<Program>();
});
// Configure CORS to allow designer app hosted on a different origin to invoke the APIs.
builder.Services.AddCors(cors => cors
.AddDefaultPolicy(policy => policy
.AllowAnyOrigin() // For demo purposes only. Use a specific origin instead.
.AllowAnyHeader()
.AllowAnyMethod()
.WithExposedHeaders("x-elsa-workflow-instance-id"))); // Required for Elsa Studio in order to support running workflows from the designer. Alternatively, you can use the `*` wildcard to expose all headers.
// Add Health Checks.
builder.Services.AddHealthChecks();
var queue = new BlockingCollection<Guid>();
builder.Services.AddSingleton(new Publish(queue.Add));
builder.Services.AddSingleton(new Receive(queue.Take));
builder.Services.AddHostedService<WorkflowService>();
// Build the web application.
var app = builder.Build();
// Configure web application's middleware pipeline.
app.UseCors();
app.UseRouting(); // Required for SignalR.
app.UseAuthentication();
app.UseAuthorization();
app.UseWorkflowsApi(); // Use Elsa API endpoints.
app.UseWorkflows(); // Use Elsa middleware to handle HTTP requests mapped to HTTP Endpoint activities.
app.UseWorkflowsSignalRHubs(); // Optional SignalR integration. Elsa Studio uses SignalR to receive real-time updates from the server.
app.Run();
}
public record Publish(Action<Guid> Action);
public record Receive(Func<Guid> Func);
public record BookmarkPayload(Guid guid);
public sealed class MyActivity : Activity
{
protected override async ValueTask ExecuteAsync(
ActivityExecutionContext context)
{
var publish = context.GetRequiredService<Publish>();
var guid = Guid.NewGuid();
var payload = new BookmarkPayload(guid);
context.CreateBookmark(payload, ResumeAsync);
publish.Action(guid);
await Task.Run(() => { });
}
private async ValueTask ResumeAsync(
ActivityExecutionContext context) =>
await context.CompleteActivityAsync();
}
public sealed class WorkflowService(
IServiceScopeFactory factory,
Receive receive) : BackgroundService
{
protected override Task ExecuteAsync(
CancellationToken stoppingToken)
{
var serviceProvider = factory.CreateScope().ServiceProvider;
var bookmarkStore = serviceProvider.GetRequiredService<IBookmarkStore>();
var workflowDispatcher = serviceProvider.GetRequiredService<IWorkflowDispatcher>();
var result = new Task(() => Execute(
receive.Func,
bookmarkStore,
workflowDispatcher,
stoppingToken));
result.Start();
return result;
}
private static async void Execute(
Func<Guid> receive,
IBookmarkStore bookmarkStore,
IWorkflowDispatcher workflowDispatcher,
CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var guid = receive();
// Thread.Sleep(2000);
var allBookmarks = await bookmarkStore.FindManyAsync(
new BookmarkFilter(),
stoppingToken);
var requests = allBookmarks
.Where(b => b.Payload is BookmarkPayload)
.Select(MakeDispatchWorkflowInstanceRequest);
foreach (var request in requests)
{
await workflowDispatcher.DispatchAsync(request, stoppingToken);
}
}
}
private static DispatchWorkflowInstanceRequest MakeDispatchWorkflowInstanceRequest(
StoredBookmark bookmark) =>
new DispatchWorkflowInstanceRequest(bookmark.WorkflowInstanceId)
{
BookmarkId = bookmark.BookmarkId,
};
}
}
-
Attachments:
- Workflow JSON: workflow.json
- Sample Project: Program.zip
-
Reproduction Rate: 100 %
-
Video/Screenshots: see below
-
Additional Configuration: none
Expected Behavior
A bookmark can be found as soon as it has been created.
Actual Behavior
Whether a bookmark is found is time-dependent (race condition).
Screenshots
Environment
- Elsa Package Version: 3.2.0-rc1
- Operating System: Microsoft Windows 10 Enterprise, 10.0.19045 Build 19045
- Browser and Version: Microsoft Edge for Business, 125.0.2535.92
Log Output
none
Troubleshooting Attempts
Thread.Sleep(2000);
Additional Context
none
Related Issues
none
Based on the provided context, it appears that there is a race condition between the creation of a bookmark and the retrieval of that bookmark by the workflow service. The Thread.Sleep(2000); is used as a workaround to ensure that the bookmark is created before the workflow service attempts to retrieve it.
To address this issue, you should ensure that the event publishing and bookmark creation are completed before the workflow service starts looking for bookmarks to resume. This could involve using synchronization mechanisms such as semaphores, mutexes, or other signaling constructs to coordinate between the activity that creates the bookmark and the service that processes the event and resumes the workflow.
Additionally, you may want to review the workflow's design to ensure that the event is not published before the bookmark is created, or to implement a retry mechanism in the workflow service that will keep checking for the bookmark for a certain period before giving up.
If the issue persists, you may need to delve deeper into the Elsa Workflow's internal mechanisms for creating and managing bookmarks, or reach out to the Elsa Workflow community for support.
Since the provided context does not include specific methods or properties to address this race condition directly, the above suggestions are based on general best practices for dealing with concurrency and timing issues in asynchronous systems.
This comment was generated by Glime.
The activity creates the bookmark before publishing the event. The activity asynchronously publishes the event after creating the bookmark. The service asynchronously receives the event published by the activity. The BlockingCollection used in the example is already a sufficient synchronization mechanism.
Your analysis of the issue is spot on. There are several ways to go about this:
- Use the
IWorkflowInboxto deliver events, which will keep a copy of the event and automatically resume the bookmark as soon as it is persisted. However, there are issues with this architecture, specifically, the event now has to persist in the system for while, which leads to issues where events are processed multiple times. For this reason, this service will be deprecated as of 3.3. - Use
context.Defer(=>)in your activity. This method accepts a lambda that gets called after bookmarks are actually persisted. The downside of this is that the "real" work to be done now happens during the first commit state only, and not directly as the activity executes. This may or may not be an issue in practice. The Defer method is introduced with 3.3. - Instead of persisting bookmarks during a commit state, we might consider storing bookmarks immediately as soon as the activity creates a bookmark. The reason we're not already doing so is to try and batch multiple bookmarks in the workflow execution context before hitting the database. However, this has the disadvantage of race conditions that you described in this issue.
Given that in typical scenarios, only one, or perhaps a handful, of activities create a bookmark, batching their persistence may be considered a premature optimization that is unnecessary and in fact can invite race conditions under certain circumstances as demonstrated here.
This issue should no longer exist with the introduction of the BookmarkQueue, but let's verify this.
This issue should no longer exist with the introduction of the BookmarkQueue, but let's verify this.
I would like to modify the above example to use IBookmarkQueueStore. Unfortunately, elements of type BookmarkQueueItem do not contain a reference to the bookmark itself or its payload. However, the payload is required to filter out the desired bookmarks. How would I have to change the example above to filter out the desired bookmarks?
Pretty sure my coworker and I also ran into this in #6523 . Our codebase is using a BookmarkQueue and having this issue so I think it's worth modifying the sample to repro it fully.