Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,78 +136,11 @@ public async Task UpsertStatuses(IReadOnlyCollection<ResourceSearchParameterStat
return;
}

await UpsertStatusesWithRetry(statuses, 3, cancellationToken);
}

private async Task UpsertStatusesWithRetry(IReadOnlyCollection<ResourceSearchParameterStatus> statuses, int maxRetries, CancellationToken cancellationToken)
{
var currentStatuses = statuses.ToList();
int retryCount = 0;

while (retryCount <= maxRetries)
{
try
{
await UpsertStatusesInternal(currentStatuses, cancellationToken);
return; // Success
}
catch (SqlException sqlEx) when (sqlEx.Number == 50001 && retryCount < maxRetries) // Our custom concurrency error
{
// Optimistic concurrency conflict detected - refresh and retry
retryCount++;
_logger.LogWarning("Optimistic concurrency conflict detected on attempt {RetryCount}. Retrying...", retryCount);

// Refresh the statuses with current LastUpdated values
var refreshedStatuses = await GetSearchParameterStatuses(cancellationToken);
var refreshedDict = refreshedStatuses.ToDictionary(s => s.Uri.OriginalString, s => s);

// Update our statuses with fresh LastUpdated values
foreach (var status in currentStatuses)
{
if (refreshedDict.TryGetValue(status.Uri.OriginalString, out var refreshed))
{
status.LastUpdated = refreshed.LastUpdated;
}
}

// Wait before retry to reduce contention
await Task.Delay(TimeSpan.FromMilliseconds(100.0 * retryCount), cancellationToken);
}
catch (SqlException sqlEx) when (sqlEx.Number == 50001)
{
// Max retries exceeded
throw new SearchParameterConcurrencyException("Maximum retry attempts exceeded due to concurrency conflicts", sqlEx);
}
}
}

private async Task UpsertStatusesInternal(IReadOnlyCollection<ResourceSearchParameterStatus> statuses, CancellationToken cancellationToken)
{
using var cmd = new SqlCommand();
cmd.CommandType = CommandType.StoredProcedure;
cmd.CommandText = "dbo.MergeSearchParams";
new SearchParamListTableValuedParameterDefinition("@SearchParams").AddParameter(cmd.Parameters, new SearchParamListRowGenerator().GenerateRows(statuses.ToList()));

var results = await cmd.ExecuteReaderAsync(
_sqlRetryService,
(reader) => { return reader.ReadRow(VLatest.SearchParam.SearchParamId, VLatest.SearchParam.Uri, VLatest.SearchParam.LastUpdated); },
_logger,
cancellationToken);

foreach (var result in results)
{
(short searchParamId, string searchParamUri, DateTimeOffset lastUpdated) = result;

// Add the new search parameters to the FHIR model dictionary.
_fhirModel.TryAddSearchParamIdToUriMapping(searchParamUri, searchParamId);

// Update the LastUpdated in our original collection for future operations
var matchingStatus = statuses.FirstOrDefault(s => s.Uri.OriginalString == searchParamUri);
if (matchingStatus != null)
{
matchingStatus.LastUpdated = lastUpdated;
}
}
await cmd.ExecuteNonQueryAsync(_sqlRetryService, _logger, cancellationToken);
}

// Synchronize the FHIR model dictionary with the data in SQL search parameter status table
Expand Down
Loading