abp icon indicating copy to clipboard operation
abp copied to clipboard

"Collection was modified; enumeration operation may not execute." error on EventBusBase

Open zh3305 opened this issue 1 year ago • 1 comments

Is there an existing issue for this?

  • [X] I have searched the existing issues

Description

image

The same error occurs in https://github.com/aspnetboilerplate/aspnetboilerplate/pull/6805

to https://github.com/abpframework/abp/blob/ac792033d78a7b7644835c866acc2d6a55e5990c/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs#L139

Reproduction Steps

No response

Expected behavior

No response

Actual behavior

No response

Regression?

No response

Known Workarounds

No response

Version

6.0.3

User Interface

MVC

Database Provider

EF Core (Default)

Tiered or separate authentication server

None (Default)

Operation System

Windows (Default)

Other information

no

zh3305 avatar Aug 26 '24 07:08 zh3305

hi

  • Reproduction Steps

maliming avatar Aug 26 '24 07:08 maliming

I am also facing this as well @maliming

this is because of this line

https://github.com/abpframework/abp/blob/5d0a6e01ca22b1797e61cdf087a161ec4d4e8d25/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs#L140

this is a race condition that can happen when you subscribe to an event WHILE publishing another event this is a common pattern in async APIs when you publish an event and wait for another event as a response.

e.g.

class SomeAppService(IDistributedEventBus distributedEventBus)
{
    [UnitOfWork]
    public async Task DoSomething()
    {
        // create a unique identifier for the request.
        var requestId = Guid.NewGuid();
        // create a completer to catch the reply.
        var completer = new TaskCompletionSource<ReplyEto>();
        // start listening for the reply.
        var disp = distributedEventBus.Subscribe<ReplyEto>(
            (eventData) =>
            {
                // listen for a reply with the same request id.
                if (eventData.RequestId == requestId)
                {
                    // set the result when the reply is received.
                    completer.TrySetResult(eventData);
                }
                return Task.CompletedTask;
            }
        );
        await distributedEventBus.PublishAsync(
            new RequestEto
            {
                RequestId = requestId,
                // other request data ...
            },
            // event needs to be published NOW.
            onUnitOfWorkComplete: false            
        );
        // Wait for the reply.
        var replyEto = await completer.Task;
        // stop listening after getting the reply.
        disp.Dispose();
    }
}

here is a workaround implementation until this is fixed:


[Dependency(ReplaceServices = true)]
[ExposeServices(
    typeof(IDistributedEventBus),
    typeof(RabbitMqDistributedEventBus),
    typeof(FixedRabbitMqDistributedEventBus)
)]
public class FixedRabbitMqDistributedEventBus(
    IOptions<AbpRabbitMqEventBusOptions> options,
    IConnectionPool connectionPool,
    IRabbitMqSerializer serializer,
    IServiceScopeFactory serviceScopeFactory,
    IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions,
    IRabbitMqMessageConsumerFactory messageConsumerFactory,
    ICurrentTenant currentTenant,
    IUnitOfWorkManager unitOfWorkManager,
    IGuidGenerator guidGenerator,
    IClock clock,
    IEventHandlerInvoker eventHandlerInvoker,
    ILocalEventBus localEventBus,
    ICorrelationIdProvider correlationIdProvider
)
    : RabbitMqDistributedEventBus(
        options,
        connectionPool,
        serializer,
        serviceScopeFactory,
        distributedEventBusOptions,
        messageConsumerFactory,
        currentTenant,
        unitOfWorkManager,
        guidGenerator,
        clock,
        eventHandlerInvoker,
        localEventBus,
        correlationIdProvider
    )
{
    protected override async Task TriggerHandlersAsync(
        Type eventType,
        object eventData,
        List<Exception> exceptions,
        InboxConfig? inboxConfig = null
    )
    {
        await new SynchronizationContextRemover();

        foreach (var handlerFactories in GetHandlerFactories(eventType))
        {
            // important part:
            foreach (var handlerFactory in handlerFactories.EventHandlerFactories.ToList())
            {
                await TriggerHandlerAsync(
                    handlerFactory,
                    handlerFactories.EventType,
                    eventData,
                    exceptions,
                    inboxConfig
                );
            }
        }

        //Implements generic argument inheritance. See IEventDataWithInheritableGenericArgument
        if (
            eventType.GetTypeInfo().IsGenericType
            && eventType.GetGenericArguments().Length == 1
            && typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(eventType)
        )
        {
            var genericArg = eventType.GetGenericArguments()[0];
            var baseArg = genericArg.GetTypeInfo().BaseType;
            if (baseArg != null)
            {
                var baseEventType = eventType.GetGenericTypeDefinition().MakeGenericType(baseArg);
                var constructorArgs = (
                    (IEventDataWithInheritableGenericArgument)eventData
                ).GetConstructorArgs();
                var baseEventData = Activator.CreateInstance(baseEventType, constructorArgs)!;
                await PublishToEventBusAsync(baseEventType, baseEventData);
            }
        }
    }
}

ahmednfwela avatar Jan 02 '25 02:01 ahmednfwela

Thanks @ahmednfwela

I will add some code to prevent this case.

maliming avatar Jan 02 '25 03:01 maliming