Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 34 additions & 11 deletions SharedKernel.Demo/MassTransitExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,47 @@ public static WebApplicationBuilder AddMassTransit(this WebApplicationBuilder bu
{
x.AddConsumers(assemblies);
x.SetKebabCaseEndpointNameFormatter();

// Quorum queue for HA — apply regardless of env so staging mirrors prod.
// Degrades gracefully on single-node (still works, just no replication).
x.AddConfigureEndpointsCallback((_, cfg) =>
{
if (cfg is IRabbitMqReceiveEndpointConfigurator rmq && builder.Environment.IsProduction())
{
rmq.SetQuorumQueue(3);
}
});

x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(builder.Configuration.GetConnectionString("RabbitMq")!);
cfg.ConfigureEndpoints(context);

// Required for UseDelayedRedelivery to bind to the RabbitMQ delayed exchange plugin.
cfg.UseDelayedMessageScheduler();

// Outer: delayed redelivery — survives 3rd-party outages.
// After exhaustion → *_error queue for manual intervention / alerting.
cfg.UseDelayedRedelivery(r => r.Intervals(
TimeSpan.FromMinutes(5),
TimeSpan.FromMinutes(30),
TimeSpan.FromHours(2),
TimeSpan.FromHours(6),
TimeSpan.FromHours(12),
TimeSpan.FromHours(24)));

// Inner: immediate in-process retries for transient faults (network blip, deadlock).
// Keep low — expensive failures should flow to delayed redelivery fast.
cfg.UseMessageRetry(r =>
r.Exponential(5, TimeSpan.FromSeconds(30), TimeSpan.FromHours(1), TimeSpan.FromSeconds(60)));

//Approximate Retry Timings:
//Retry Attempt Approximate Delay
//1st Retry 30 sec
//2nd Retry ~5 min
//3rd Retry ~15 min
//4th Retry ~30 min
//5th Retry ~1 hour(capped)
r.Exponential(4, TimeSpan.FromMilliseconds(200), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2)));

// Tune based on consumer count and message processing time. Higher → better throughput but more messages in-flight (potentially out of order).
cfg.PrefetchCount = 32;

cfg.ConfigureEndpoints(context);
});
});

builder.AddRmqHealthCheck();

return builder;
}
}
Expand All @@ -47,6 +69,7 @@ public static WebApplicationBuilder AddRmqHealthCheck(this WebApplicationBuilder
{
Uri = new Uri(rmqConnectionString)
};

return factory.CreateConnectionAsync()
.GetAwaiter()
.GetResult();
Expand Down
4 changes: 2 additions & 2 deletions SharedKernel.Demo/appsettings.Development.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
"MinimumLevel": {
"Default": "Information",
"Override": {
"Microsoft": "Information",
"System": "Information"
"Microsoft": "Warning",
"System": "Warning"
}
}
},
Expand Down
1 change: 1 addition & 0 deletions src/SharedKernel/Logging/LogCleanupHostedService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace SharedKernel.Logging;

public class LogCleanupHostedService(string logsDirectory, TimeSpan retentionPeriod) : BackgroundService
{
/// <inheritdoc />
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
Expand Down
21 changes: 15 additions & 6 deletions src/SharedKernel/Logging/Middleware/CappedResponseBodyStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ public override long Position
private void Capture(ReadOnlySpan<byte> source)
{
if (_bufferLength >= capBytes)
{
return;
}

var toCopy = Math.Min(capBytes - _bufferLength, source.Length);
if (toCopy <= 0)
{
return;
}

source[..toCopy]
.CopyTo(_buffer.AsSpan(_bufferLength));
Expand All @@ -43,11 +47,10 @@ public override void Write(byte[] buffer, int offset, int count)
Capture(buffer.AsSpan(offset, count));
}

// Delegates to the ValueTask overload to unify capture logic and avoid duplication.
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await inner.WriteAsync(buffer.AsMemory(offset, count), cancellationToken);
TotalWritten += count;
Capture(buffer.AsSpan(offset, count));
await WriteAsync(buffer.AsMemory(offset, count), cancellationToken);
}

public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
Expand All @@ -65,21 +68,27 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationTo
protected override void Dispose(bool disposing)
{
if (_disposed)
{
return;
}

_disposed = true;
ArrayPool<byte>.Shared.Return(_buffer);
base.Dispose(disposing);
}

public override ValueTask DisposeAsync()
// Bug fix: original skipped base.DisposeAsync(), leaving the finalizer un-suppressed
// and the inner stream without an async-dispose signal through the chain.
public override async ValueTask DisposeAsync()
{
if (_disposed)
return ValueTask.CompletedTask;
{
return;
}

_disposed = true;
ArrayPool<byte>.Shared.Return(_buffer);
return ValueTask.CompletedTask;
await base.DisposeAsync();
}

public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
Expand Down
Loading