System.NullReferenceException: Object reference not set to an instance of an object in CometD.NetCore.Salesforce.ResilientStreamingClient.ErrorExtension_ConnectionError function.
Currently, we are using this library to listen to push topics whenever there is a change in salesforce. We are using this library as follows.
Code for registering an event and configuring the Salesforce Streaming Client in Program.cs is:
public static void Main(string[] args) { var builder = WebApplication.CreateBuilder(args); builder.Host.ConfigureServices((hostContext, services) => { var salesforceConfiguration = services.BuildServiceProvider().GetRequiredService<IOptions<SalesforceConfiguration>>().Value; services.AddResilientStreamingClient("", "", (conf) => { conf.ClientId = salesforceConfiguration.ClientId; conf.ClientSecret = salesforceConfiguration.ClientSecret; conf.RefreshToken = salesforceConfiguration.RefreshToken; conf.LoginUrl = salesforceConfiguration.BaseUrl; conf.OAuthUri = Constants.OAuthUriConfig; conf.EventOrTopicUri = Constants.EventOrTopicConfig; conf.CometDUri = salesforceConfiguration.CometdUrl; }); services.AddSingleton<IEventBus, EventBus>(); services.AddHostedService<SalesforceEventBusHostedService>(); services.AddTransient<IMessageListener, UpdatedListener>(); services.AddTransient<IMessageListener, DeletedListener>(); });
var app = builder.Build();
app.Run();
}
Code for subscribing to events is:
public class SalesforceEventBusHostedService:IHostedService { private readonly ILogger<SalesforceEventBusHostedService> _logger; private readonly IEventBus _eventBus; private readonly ICacheService<ReplayIdDto> _cacheService;
//Make a list of push topics that we created in salesforce private readonly List<Tuple<string,int,Type>> _eventsMapping = new() { new Tuple<string,int,Type>("topic/Updated",-1,typeof(UpdatedListener)), new Tuple<string, int, Type>("topic/Deleted", -1, typeof(DeletedListener)) };
public SalesforceEventBusHostedService(ILogger<SalesforceEventBusHostedService> logger,IEventBus eventBus, ICacheService<ReplayIdDto> cacheService)
{
_logger = logger;
_eventBus = eventBus;
_cacheService = cacheService;
_cacheService.CacheRegion = $"{Constants.MessageListenerCacheRegion}";
}
private static object[] GetPlatformEventObject(string eventName,int replayId,Type eventType)
{
var platformEvent = Activator.CreateInstance(typeof(PlatformEvent<>).MakeGenericType(eventType));
platformEvent?.GetType().GetProperty("Name")?.SetValue(platformEvent, eventName);
platformEvent?.GetType().GetProperty("ReplayId")?.SetValue(platformEvent, replayId);
return new[] {platformEvent};
}
private async Task SubscribeToEvent(string eventName,int replayId,Type eventType)
{
var platformEvent = GetPlatformEventObject(eventName, replayId, eventType);
var subscribeMethod = typeof(IEventBus).GetMethod("Subscribe")?.MakeGenericMethod(eventType);
await ((Task) subscribeMethod?.Invoke(_eventBus, platformEvent))!;
}
private async Task UnSubscribeToEvent(string eventName,int replayId,Type eventType)
{
var platformEvent = GetPlatformEventObject(eventName, replayId, eventType);
var subscribeMethod = typeof(IEventBus).GetMethod("Unsubscribe")?.MakeGenericMethod(eventType);
await ((Task) subscribeMethod?.Invoke(_eventBus, platformEvent))!;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation($"{nameof(SalesforceEventBusHostedService)} starting.");
for (var i = 0; i < _eventsMapping.ToList().Count; i++)
{
var (originalEventName, originalReplayId, originalEventType) = _eventsMapping[i];
var replayIdFromCache = (await _cacheService.GetItemAsync(StreamingApiHelper.RemoveEventOrTopicPrefix(originalEventName)))?.ReplayId??originalReplayId;
var (eventName, replayId, eventType) = _eventsMapping[i] = new Tuple<string, int, Type>(originalEventName, replayIdFromCache, originalEventType);
await SubscribeToEvent(eventName, replayId, eventType);
}
}
public async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation($"{nameof(SalesforceEventBusHostedService)} stopped.");
foreach (var (eventName,replayId, eventType) in _eventsMapping)
{
await UnSubscribeToEvent(eventName, replayId, eventType);
}
}
}
But I face an issue like:
System.NullReferenceException: Object reference not set to an instance of an object. at CometD.NetCore.Salesforce.ResilientStreamingClient.ErrorExtension_ConnectionError(Object sender, String e) in C:\projects\cometd-netcore-salesforce\src\CometD.NetCore.Salesforce\ResilientStreamingClient.cs:line 210 at CometD.NetCore.Client.Extension.ErrorExtension.ReceiveMeta(IClientSession session, IMutableMessage message) in C:\projects\cometd-netcore\src\CometD.NetCore\Client\Extension\ErrorExtension.cs:line 74 at CometD.NetCore.Common.AbstractClientSession.ExtendReceive(IMutableMessage message) in C:\projects\cometd-netcore\src\CometD.NetCore\Common\AbstractClientSession.cs:line 188 at CometD.NetCore.Common.AbstractClientSession.Receive(IMutableMessage message) in C:\projects\cometd-netcore\src\CometD.NetCore\Common\AbstractClientSession.cs:line 155 at CometD.NetCore.Client.BayeuxClient.PublishTransportListener.OnMessages(IList`1 messages) in C:\projects\cometd-netcore\src\CometD.NetCore\Client\BayeuxClient.cs:line 769 at CometD.NetCore.Client.Transport.LongPollingTransport.GetResponseCallback(IAsyncResult asynchronousResult) in C:\projects\cometd-netcore\src\CometD.NetCore\Client\Transport\LongPollingTransport.cs:line 310
Can anyone please guide me on why this error is received and if there is something I should be updating on our end?
We're encountering similar issues with our Salesforce subscriber implementation.
Specifically, we have three active subscribers for push topics that track replay IDs, and about once a week, one subscriber (always same) encounters an error during reconnection. Our temporary fix has been to reset the replay ID to -2, but the issue tends to resurface after a few days.