From 88f34836f277748c8085cb043ce902232701afaa Mon Sep 17 00:00:00 2001 From: Matt Bishop Date: Tue, 2 Apr 2024 15:45:18 -0400 Subject: [PATCH] Event processor tuning (#3945) --- .../AzureQueueHostedService.cs | 29 ++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/src/EventsProcessor/AzureQueueHostedService.cs b/src/EventsProcessor/AzureQueueHostedService.cs index 03c003453..b1b309b50 100644 --- a/src/EventsProcessor/AzureQueueHostedService.cs +++ b/src/EventsProcessor/AzureQueueHostedService.cs @@ -30,6 +30,7 @@ public class AzureQueueHostedService : IHostedService, IDisposable _logger.LogInformation(Constants.BypassFiltersEventId, "Starting service."); _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); _executingTask = ExecuteAsync(_cts.Token); + return _executingTask.IsCompleted ? _executingTask : Task.CompletedTask; } @@ -39,8 +40,10 @@ public class AzureQueueHostedService : IHostedService, IDisposable { return; } + _logger.LogWarning("Stopping service."); - _cts.Cancel(); + + await _cts.CancelAsync(); await Task.WhenAny(_executingTask, Task.Delay(-1, cancellationToken)); cancellationToken.ThrowIfCancellationRequested(); } @@ -64,13 +67,15 @@ public class AzureQueueHostedService : IHostedService, IDisposable { try { - var messages = await _queueClient.ReceiveMessagesAsync(32); + var messages = await _queueClient.ReceiveMessagesAsync(32, + cancellationToken: cancellationToken); if (messages.Value?.Any() ?? false) { foreach (var message in messages.Value) { await ProcessQueueMessageAsync(message.DecodeMessageText(), cancellationToken); - await _queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt); + await _queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt, + cancellationToken); } } else @@ -78,14 +83,15 @@ public class AzureQueueHostedService : IHostedService, IDisposable await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); } } - catch (Exception e) + catch (Exception ex) { - _logger.LogError(e, "Exception occurred: " + e.Message); + _logger.LogError(ex, "Error occurred processing message block."); + await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); } } - _logger.LogWarning("Done processing."); + _logger.LogWarning("Done processing messages."); } public async Task ProcessQueueMessageAsync(string message, CancellationToken cancellationToken) @@ -98,14 +104,14 @@ public class AzureQueueHostedService : IHostedService, IDisposable try { _logger.LogInformation("Processing message."); - var events = new List(); + var events = new List(); using var jsonDocument = JsonDocument.Parse(message); var root = jsonDocument.RootElement; if (root.ValueKind == JsonValueKind.Array) { var indexedEntities = root.Deserialize>() - .SelectMany(e => EventTableEntity.IndexEvent(e)); + .SelectMany(EventTableEntity.IndexEvent); events.AddRange(indexedEntities); } else if (root.ValueKind == JsonValueKind.Object) @@ -114,12 +120,15 @@ public class AzureQueueHostedService : IHostedService, IDisposable events.AddRange(EventTableEntity.IndexEvent(eventMessage)); } + cancellationToken.ThrowIfCancellationRequested(); + await _eventWriteService.CreateManyAsync(events); + _logger.LogInformation("Processed message."); } - catch (JsonException) + catch (JsonException ex) { - _logger.LogError("JsonReaderException: Unable to parse message."); + _logger.LogError(ex, "Unable to parse message."); } } }