Microservices Foundations: Architect with gRPC & Message Brokers in ASP.NET Core
Master microservices architecture with gRPC and message brokers in ASP.NET Core. Complete guide from basics to advanced patterns with real-world examples.
ASPNETCore,Microservices,gRPC,MessageBrokers,RabbitMQ,ServiceCommunication,DistributedSystems,CloudNative,Docker,Kubernetes,Architecture
Table of Contents
1. Microservices Architecture Fundamentals
The Monolith to Microservices Journey
Real-World Scenario: Imagine an e-commerce platform that started as a monolith but now faces scaling challenges:
// Traditional Monolithic Architecture Problems public class ECommerceMonolith { // Tightly coupled components public async Task<OrderResult> ProcessOrderAsync(OrderRequest request) { // Inventory management var inventoryResult = await _inventoryService.CheckStockAsync(request.Items); if (!inventoryResult.Success) return OrderResult.Failed("Insufficient stock"); // Payment processing var paymentResult = await _paymentService.ProcessPaymentAsync(request.Payment); if (!paymentResult.Success) return OrderResult.Failed("Payment failed"); // Order creation var order = await _orderService.CreateOrderAsync(request); // Shipping calculation var shipping = await _shippingService.CalculateShippingAsync(order); // Notification await _notificationService.SendOrderConfirmationAsync(order); // Analytics await _analyticsService.TrackOrderAsync(order); return OrderResult.Success(order); } }
Microservices Core Principles
// Microservices Definition public class MicroservicesPrinciples { public List<string> KeyPrinciples = new() { "Single Responsibility: Each service focuses on one business capability", "Independent Deployment: Services can be deployed separately", "Decentralized Data Management: Each service owns its data", "Infrastructure Automation: CI/CD and containerization", "Design for Failure: Resilience and fault tolerance", "Evolutionary Design: Services can evolve independently" }; }
Benefits and Trade-offs
Benefits:
Independent scaling of services
Technology diversity across services
Faster development and deployment cycles
Improved fault isolation
Better team autonomy
Challenges:
Distributed system complexity
Data consistency challenges
Network latency and reliability
Operational overhead
Testing complexity
2. gRPC Deep Dive
What is gRPC?
gRPC is a modern, high-performance RPC (Remote Procedure Call) framework that can run in any environment.
// Protobuf definition for Product Service
syntax = "proto3";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
option csharp_namespace = "ProductService.Grpc";
service ProductService {
  rpc GetProduct (GetProductRequest) returns (ProductResponse);
  rpc CreateProduct (CreateProductRequest) returns (ProductResponse);
  rpc UpdateProduct (UpdateProductRequest) returns (ProductResponse);
  rpc DeleteProduct (DeleteProductRequest) returns (google.protobuf.Empty);
  rpc ListProducts (ListProductsRequest) returns (stream ProductResponse);
  rpc BulkCreateProducts (stream CreateProductRequest) returns (BulkCreateResponse);
}
message GetProductRequest {
  int32 product_id = 1;
}
message CreateProductRequest {
  string name = 1;
  string description = 2;
  double price = 3;
  int32 stock_quantity = 4;
  string category_id = 5;
}
message UpdateProductRequest {
  int32 product_id = 1;
  string name = 2;
  string description = 3;
  double price = 4;
  int32 stock_quantity = 5;
}
message DeleteProductRequest {
  int32 product_id = 1;
}
message ListProductsRequest {
  int32 page_size = 1;
  string page_token = 2;
  string filter = 3;
}
message ProductResponse {
  int32 product_id = 1;
  string name = 2;
  string description = 3;
  double price = 4;
  int32 stock_quantity = 5;
  string category_id = 6;
  google.protobuf.Timestamp created_at = 7;
  google.protobuf.Timestamp updated_at = 8;
  ProductStatus status = 9;
}
message BulkCreateResponse {
  int32 created_count = 1;
  repeated string errors = 2;
}
enum ProductStatus {
  ACTIVE = 0;
  INACTIVE = 1;
  OUT_OF_STOCK = 2;
  DISCONTINUED = 3;
}gRPC vs REST Comparison
// Communication Pattern Comparison public class CommunicationPatterns { public void CompareProtocols() { var restCharacteristics = new { Protocol = "HTTP/1.1", PayloadFormat = "JSON/XML", Communication = "Request-Response", Streaming = "Limited (SSE, WebSockets)", Performance = "Good for human-readable APIs", UseCases = "Public APIs, Browsers, Mobile Apps" }; var grpcCharacteristics = new { Protocol = "HTTP/2", PayloadFormat = "Protocol Buffers (Binary)", Communication = "Multiple patterns", Streaming = "Full support (Client, Server, Bidirectional)", Performance = "High for service-to-service", UseCases = "Microservices, Mobile, Real-time" }; } }
HTTP/2 Benefits
// HTTP/2 Features that power gRPC public class Http2Features { public List<string> KeyFeatures = new() { "Binary Framing: More efficient than text-based HTTP/1.1", "Multiplexing: Multiple requests over single connection", "Header Compression: HPACK reduces overhead", "Server Push: Server can push resources to client", "Stream Prioritization: Important requests get priority", "Flow Control: Prevents resource exhaustion" }; }
3. Building gRPC Services
Complete Product Service Implementation
Project Structure:
Microservices/ ├── src/ │ ├── ProductService/ │ │ ├── Protos/ │ │ ├── Services/ │ │ ├── Models/ │ │ └── Data/ │ ├── OrderService/ │ ├── InventoryService/ │ └── ApiGateway/ ├── contracts/ └── docker-compose.yml
Service Setup and Configuration
// ProductService/Program.cs using ProductService.Data; using ProductService.Services; using Microsoft.EntityFrameworkCore; var builder = WebApplication.CreateBuilder(args); // Add services to container builder.Services.AddGrpc(options => { options.EnableDetailedErrors = builder.Environment.IsDevelopment(); options.Interceptors.Add<ExceptionInterceptor>(); options.Interceptors.Add<LoggingInterceptor>(); }); builder.Services.AddDbContext<ProductContext>(options => { options.UseSqlServer(builder.Configuration.GetConnectionString("ProductDatabase")); }); builder.Services.AddScoped<IProductRepository, ProductRepository>(); builder.Services.AddSingleton<IServiceDiscovery, ConsulServiceDiscovery>(); // Health checks builder.Services.AddHealthChecks() .AddDbContextCheck<ProductContext>() .AddUrlGroup(new Uri("http://localhost:5000/health"), "self"); // Distributed caching builder.Services.AddStackExchangeRedisCache(options => { options.Configuration = builder.Configuration.GetConnectionString("Redis"); }); var app = builder.Build(); // Configure the HTTP request pipeline app.MapGrpcService<ProductGrpcService>(); app.MapHealthChecks("/health"); // gRPC-Web for browser compatibility app.MapGrpcService<ProductGrpcService>().EnableGrpcWeb(); app.MapGet("/", () => "Product gRPC Service is running!"); app.Run();
gRPC Service Implementation
// ProductService/Services/ProductGrpcService.cs using Grpc.Core; using ProductService.Data; using ProductService.Models; using Google.Protobuf.WellKnownTypes; namespace ProductService.Services; public class ProductGrpcService : ProductService.Grpc.ProductService.ProductServiceBase { private readonly IProductRepository _productRepository; private readonly ILogger<ProductGrpcService> _logger; private readonly ICacheService _cacheService; public ProductGrpcService( IProductRepository productRepository, ILogger<ProductGrpcService> logger, ICacheService cacheService) { _productRepository = productRepository; _logger = logger; _cacheService = cacheService; } public override async Task<ProductResponse> GetProduct( GetProductRequest request, ServerCallContext context) { _logger.LogInformation("Getting product with ID: {ProductId}", request.ProductId); // Check cache first var cacheKey = $"product_{request.ProductId}"; var cachedProduct = await _cacheService.GetAsync<ProductResponse>(cacheKey); if (cachedProduct != null) { _logger.LogDebug("Cache hit for product {ProductId}", request.ProductId); return cachedProduct; } var product = await _productRepository.GetByIdAsync(request.ProductId); if (product == null) { _logger.LogWarning("Product with ID {ProductId} not found", request.ProductId); throw new RpcException(new Status(StatusCode.NotFound, $"Product with ID {request.ProductId} not found")); } var response = MapToProductResponse(product); // Cache the response await _cacheService.SetAsync(cacheKey, response, TimeSpan.FromMinutes(30)); return response; } public override async Task<ProductResponse> CreateProduct( CreateProductRequest request, ServerCallContext context) { _logger.LogInformation("Creating new product: {ProductName}", request.Name); // Validate request if (string.IsNullOrWhiteSpace(request.Name)) { throw new RpcException(new Status(StatusCode.InvalidArgument, "Product name is required")); } if (request.Price <= 0) { throw new RpcException(new Status(StatusCode.InvalidArgument, "Product price must be positive")); } var product = new Product { Name = request.Name, Description = request.Description, Price = (decimal)request.Price, StockQuantity = request.StockQuantity, CategoryId = request.CategoryId, Status = ProductStatus.Active, CreatedAt = DateTime.UtcNow, UpdatedAt = DateTime.UtcNow }; await _productRepository.AddAsync(product); await _productRepository.SaveChangesAsync(); _logger.LogInformation("Product created with ID: {ProductId}", product.Id); // Invalidate relevant caches await _cacheService.RemoveAsync("products_list_*"); return MapToProductResponse(product); } public override async Task ListProducts( ListProductsRequest request, IServerStreamWriter<ProductResponse> responseStream, ServerCallContext context) { _logger.LogInformation("Streaming products with page size: {PageSize}", request.PageSize); var pageSize = request.PageSize > 0 ? request.PageSize : 50; var products = _productRepository.GetAllActiveProducts(); if (!string.IsNullOrEmpty(request.Filter)) { products = products.Where(p => p.Name.Contains(request.Filter) || p.Description.Contains(request.Filter)); } var count = 0; foreach (var product in products) { if (context.CancellationToken.IsCancellationRequested) break; await responseStream.WriteAsync(MapToProductResponse(product)); count++; // Simulate some delay for demonstration if (count % 10 == 0) await Task.Delay(100, context.CancellationToken); } _logger.LogInformation("Streamed {Count} products", count); } public override async Task<BulkCreateResponse> BulkCreateProducts( IAsyncStreamReader<CreateProductRequest> requestStream, ServerCallContext context) { _logger.LogInformation("Starting bulk product creation"); var response = new BulkCreateResponse(); var errors = new List<string>(); await foreach (var request in requestStream.ReadAllAsync(context.CancellationToken)) { try { var product = new Product { Name = request.Name, Description = request.Description, Price = (decimal)request.Price, StockQuantity = request.StockQuantity, CategoryId = request.CategoryId, Status = ProductStatus.Active, CreatedAt = DateTime.UtcNow, UpdatedAt = DateTime.UtcNow }; await _productRepository.AddAsync(product); response.CreatedCount++; } catch (Exception ex) { _logger.LogError(ex, "Failed to create product: {ProductName}", request.Name); errors.Add($"Failed to create {request.Name}: {ex.Message}"); } } if (response.CreatedCount > 0) { await _productRepository.SaveChangesAsync(); await _cacheService.RemoveAsync("products_list_*"); } response.Errors.AddRange(errors); _logger.LogInformation("Bulk creation completed: {CreatedCount} created, {ErrorCount} errors", response.CreatedCount, errors.Count); return response; } private ProductResponse MapToProductResponse(Product product) { return new ProductResponse { ProductId = product.Id, Name = product.Name, Description = product.Description ?? string.Empty, Price = (double)product.Price, StockQuantity = product.StockQuantity, CategoryId = product.CategoryId ?? string.Empty, CreatedAt = Timestamp.FromDateTime(product.CreatedAt), UpdatedAt = product.UpdatedAt.HasValue ? Timestamp.FromDateTime(product.UpdatedAt.Value) : null, Status = MapProductStatus(product.Status) }; } private ProductService.Grpc.ProductStatus MapProductStatus(ProductStatus status) { return status switch { ProductStatus.Active => ProductService.Grpc.ProductStatus.Active, ProductStatus.Inactive => ProductService.Grpc.ProductStatus.Inactive, ProductStatus.OutOfStock => ProductService.Grpc.ProductStatus.OutOfStock, ProductStatus.Discontinued => ProductService.Grpc.ProductStatus.Discontinued, _ => ProductService.Grpc.ProductStatus.Inactive }; } }
gRPC Interceptors
// ProductService/Interceptors/ExceptionInterceptor.cs public class ExceptionInterceptor : Interceptor { private readonly ILogger<ExceptionInterceptor> _logger; public ExceptionInterceptor(ILogger<ExceptionInterceptor> logger) { _logger = logger; } public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>( TRequest request, ServerCallContext context, UnaryServerMethod<TRequest, TResponse> continuation) { try { return await continuation(request, context); } catch (RpcException) { throw; // Already handled } catch (Exception ex) { _logger.LogError(ex, "Error processing gRPC call {Method}", context.Method); // Map to appropriate gRPC status var status = ex switch { ArgumentException => new Status(StatusCode.InvalidArgument, ex.Message), KeyNotFoundException => new Status(StatusCode.NotFound, ex.Message), UnauthorizedAccessException => new Status(StatusCode.PermissionDenied, ex.Message), _ => new Status(StatusCode.Internal, "An internal error occurred") }; throw new RpcException(status); } } } // ProductService/Interceptors/LoggingInterceptor.cs public class LoggingInterceptor : Interceptor { private readonly ILogger<LoggingInterceptor> _logger; public LoggingInterceptor(ILogger<LoggingInterceptor> logger) { _logger = logger; } public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>( TRequest request, ServerCallContext context, UnaryServerMethod<TRequest, TResponse> continuation) { var stopwatch = Stopwatch.StartNew(); _logger.LogInformation("Starting gRPC call {Method} from {Peer}", context.Method, context.Peer); try { var response = await continuation(request, context); stopwatch.Stop(); _logger.LogInformation("Completed gRPC call {Method} in {ElapsedMs}ms", context.Method, stopwatch.ElapsedMilliseconds); return response; } catch (Exception ex) { stopwatch.Stop(); _logger.LogError(ex, "gRPC call {Method} failed after {ElapsedMs}ms", context.Method, stopwatch.ElapsedMilliseconds); throw; } } }
4. Advanced gRPC Patterns
Client-Side Implementation
// ProductService client implementation public class ProductServiceClient : IProductServiceClient, IDisposable { private readonly ProductService.Grpc.ProductService.ProductServiceClient _client; private readonly ChannelBase _channel; private readonly ILogger<ProductServiceClient> _logger; public ProductServiceClient( string serviceUrl, ILogger<ProductServiceClient> logger) { _logger = logger; // Create channel with configuration _channel = GrpcChannel.ForAddress(serviceUrl, new GrpcChannelOptions { HttpHandler = new SocketsHttpHandler { PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan, KeepAlivePingDelay = TimeSpan.FromSeconds(60), KeepAlivePingTimeout = TimeSpan.FromSeconds(30), EnableMultipleHttp2Connections = true }, ServiceConfig = new ServiceConfig { LoadBalancingConfigs = { new RoundRobinConfig() }, MethodConfigs = { new MethodConfig { Names = { MethodName.Default }, RetryPolicy = new RetryPolicy { MaxAttempts = 3, InitialBackoff = TimeSpan.FromSeconds(1), MaxBackoff = TimeSpan.FromSeconds(5), BackoffMultiplier = 1.5, RetryableStatusCodes = { StatusCode.Unavailable } } } } } }); _client = new ProductService.Grpc.ProductService.ProductServiceClient(_channel); } public async Task<ProductResponse> GetProductAsync(int productId, CancellationToken cancellationToken = default) { try { var request = new GetProductRequest { ProductId = productId }; return await _client.GetProductAsync(request, cancellationToken: cancellationToken); } catch (RpcException ex) when (ex.StatusCode == StatusCode.NotFound) { _logger.LogWarning("Product {ProductId} not found", productId); throw new ProductNotFoundException(productId, ex); } catch (RpcException ex) { _logger.LogError(ex, "gRPC error getting product {ProductId}", productId); throw new ProductServiceException("Failed to get product", ex); } } public async IAsyncEnumerable<ProductResponse> StreamProductsAsync( int pageSize = 50, string filter = null) { var request = new ListProductsRequest { PageSize = pageSize, Filter = filter ?? string.Empty }; using var call = _client.ListProducts(request); await foreach (var product in call.ResponseStream.ReadAllAsync()) { yield return product; } } public async Task<BulkCreateResponse> BulkCreateProductsAsync( IAsyncEnumerable<CreateProductRequest> products, CancellationToken cancellationToken = default) { using var call = _client.BulkCreateProducts(cancellationToken: cancellationToken); await foreach (var product in products) { await call.RequestStream.WriteAsync(product); } await call.RequestStream.CompleteAsync(); return await call.ResponseAsync; } public void Dispose() { _channel?.Dispose(); } }
Bidirectional Streaming Example
// Real-time inventory update service public class InventoryGrpcService : InventoryService.Grpc.InventoryService.InventoryServiceBase { private readonly IInventoryRepository _inventoryRepository; private readonly ILogger<InventoryGrpcService> _logger; public InventoryGrpcService( IInventoryRepository inventoryRepository, ILogger<InventoryGrpcService> logger) { _inventoryRepository = inventoryRepository; _logger = logger; } public override async Task StreamInventoryUpdates( IAsyncStreamReader<InventoryUpdateRequest> requestStream, IServerStreamWriter<InventoryUpdateResponse> responseStream, ServerCallContext context) { _logger.LogInformation("Starting bidirectional inventory stream"); // Read requests and send responses concurrently var readTask = ReadRequestsAsync(requestStream, context.CancellationToken); var writeTask = WriteResponsesAsync(responseStream, context.CancellationToken); await Task.WhenAll(readTask, writeTask); } private async Task ReadRequestsAsync( IAsyncStreamReader<InventoryUpdateRequest> requestStream, CancellationToken cancellationToken) { await foreach (var request in requestStream.ReadAllAsync(cancellationToken)) { try { _logger.LogDebug("Processing inventory update for product {ProductId}", request.ProductId); // Process inventory update await _inventoryRepository.UpdateStockAsync( request.ProductId, request.QuantityChange, request.Reason); // You could also broadcast this update to other connected clients await BroadcastInventoryUpdate(request.ProductId, request.QuantityChange); } catch (Exception ex) { _logger.LogError(ex, "Failed to process inventory update for product {ProductId}", request.ProductId); } } } private async Task WriteResponsesAsync( IServerStreamWriter<InventoryUpdateResponse> responseStream, CancellationToken cancellationToken) { // Simulate sending periodic inventory summaries var timer = new PeriodicTimer(TimeSpan.FromSeconds(30)); while (await timer.WaitForNextTickAsync(cancellationToken)) { try { var lowStockItems = await _inventoryRepository.GetLowStockItemsAsync(threshold: 10); var response = new InventoryUpdateResponse { Summary = new InventorySummary { Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), LowStockItemCount = lowStockItems.Count, TotalProducts = await _inventoryRepository.GetTotalProductCountAsync() } }; response.LowStockItems.AddRange(lowStockItems.Select(item => new LowStockItem { ProductId = item.ProductId, ProductName = item.ProductName, CurrentStock = item.CurrentStock, MinimumRequired = item.MinimumRequired })); await responseStream.WriteAsync(response); } catch (Exception ex) { _logger.LogError(ex, "Failed to send inventory summary"); } } } private async Task BroadcastInventoryUpdate(int productId, int quantityChange) { // Implementation for broadcasting to other connected clients // This could use Redis pub/sub or other mechanisms } }
gRPC Health Checks
// Custom health check implementation public class ProductServiceHealthCheck : HealthCheck.HealthBase { private readonly ProductContext _dbContext; private readonly ILogger<ProductServiceHealthCheck> _logger; public ProductServiceHealthCheck( ProductContext dbContext, ILogger<ProductServiceHealthCheck> logger) { _dbContext = dbContext; _logger = logger; } public override async Task<HealthCheckResponse> Check( HealthCheckRequest request, ServerCallContext context) { var status = HealthCheckResponse.Types.ServingStatus.Serving; var checks = new Dictionary<string, string>(); // Check database connectivity try { await _dbContext.Database.CanConnectAsync(context.CancellationToken); checks["database"] = "healthy"; } catch (Exception ex) { status = HealthCheckResponse.Types.ServingStatus.NotServing; checks["database"] = $"unhealthy: {ex.Message}"; _logger.LogError(ex, "Database health check failed"); } // Check memory usage var memoryUsage = GC.GetTotalMemory(forceFullCollection: false) / 1024 / 1024; checks["memory_mb"] = memoryUsage.ToString(); if (memoryUsage > 500) // 500MB threshold { status = HealthCheckResponse.Types.ServingStatus.NotServing; checks["memory"] = "high_usage"; } return new HealthCheckResponse { Status = status }; } }
5. Message Brokers with RabbitMQ
RabbitMQ Fundamentals
// RabbitMQ configuration and setup public static class RabbitMQExtensions { public static IServiceCollection AddRabbitMQ(this IServiceCollection services, IConfiguration configuration) { var rabbitMQConfig = configuration.GetSection("RabbitMQ").Get<RabbitMQConfig>(); services.AddSingleton(rabbitMQConfig); // Connection factory services.AddSingleton<IConnectionFactory>(sp => { return new ConnectionFactory { HostName = rabbitMQConfig.HostName, Port = rabbitMQConfig.Port, UserName = rabbitMQConfig.UserName, Password = rabbitMQConfig.Password, VirtualHost = rabbitMQConfig.VirtualHost, DispatchConsumersAsync = true, AutomaticRecoveryEnabled = true, NetworkRecoveryInterval = TimeSpan.FromSeconds(10) }; }); // Connection services.AddSingleton<IConnection>(sp => { var factory = sp.GetRequiredService<IConnectionFactory>(); return factory.CreateConnection(); }); // Channel services.AddSingleton<IModel>(sp => { var connection = sp.GetRequiredService<IConnection>(); var channel = connection.CreateModel(); // Configure exchanges and queues ConfigureInfrastructure(channel, rabbitMQConfig); return channel; }); services.AddScoped<IMessagePublisher, RabbitMQMessagePublisher>(); services.AddHostedService<OrderCreatedEventConsumer>(); return services; } private static void ConfigureInfrastructure(IModel channel, RabbitMQConfig config) { // Exchanges channel.ExchangeDeclare("order-events", ExchangeType.Topic, durable: true, autoDelete: false); channel.ExchangeDeclare("inventory-events", ExchangeType.Topic, durable: true, autoDelete: false); channel.ExchangeDeclare("dead-letter", ExchangeType.Fanout, durable: true, autoDelete: false); // Queues channel.QueueDeclare("order.created", durable: true, exclusive: false, autoDelete: false); channel.QueueDeclare("inventory.update", durable: true, exclusive: false, autoDelete: false); channel.QueueDeclare("dead-letter-queue", durable: true, exclusive: false, autoDelete: false); // Dead letter queue var deadLetterArgs = new Dictionary<string, object> { { "x-dead-letter-exchange", "dead-letter" }, { "x-message-ttl", 30000 } // 30 seconds }; channel.QueueDeclare("order.created.retry", durable: true, exclusive: false, autoDelete: false, arguments: deadLetterArgs); // Bindings channel.QueueBind("order.created", "order-events", "order.created"); channel.QueueBind("inventory.update", "inventory-events", "inventory.updated"); channel.QueueBind("dead-letter-queue", "dead-letter", ""); channel.QueueBind("order.created.retry", "order-events", "order.created"); } } public class RabbitMQConfig { public string HostName { get; set; } = "localhost"; public int Port { get; set; } = 5672; public string UserName { get; set; } = "guest"; public string Password { get; set; } = "guest"; public string VirtualHost { get; set; } = "/"; }
Message Publisher Implementation
// Message publisher service public class RabbitMQMessagePublisher : IMessagePublisher { private readonly IModel _channel; private readonly ILogger<RabbitMQMessagePublisher> _logger; private readonly ISerializer _serializer; public RabbitMQMessagePublisher( IModel channel, ILogger<RabbitMQMessagePublisher> logger, ISerializer serializer) { _channel = channel; _logger = logger; _serializer = serializer; } public async Task PublishAsync<T>(T message, string exchange, string routingKey) where T : class { try { var body = _serializer.SerializeToBytes(message); var properties = _channel.CreateBasicProperties(); properties.Persistent = true; properties.ContentType = "application/json"; properties.MessageId = Guid.NewGuid().ToString(); properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()); _channel.BasicPublish( exchange: exchange, routingKey: routingKey, mandatory: true, basicProperties: properties, body: body); _logger.LogDebug("Published message to {Exchange} with routing key {RoutingKey}", exchange, routingKey); } catch (Exception ex) { _logger.LogError(ex, "Failed to publish message to {Exchange} with routing key {RoutingKey}", exchange, routingKey); throw; } } public async Task PublishWithRetryAsync<T>(T message, string exchange, string routingKey, int maxRetries = 3) where T : class { var retryCount = 0; while (retryCount < maxRetries) { try { await PublishAsync(message, exchange, routingKey); return; } catch (Exception ex) when (retryCount < maxRetries - 1) { retryCount++; _logger.LogWarning(ex, "Publish failed, retry {RetryCount}/{MaxRetries} after delay", retryCount, maxRetries); await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, retryCount))); } } throw new MessagePublishException($"Failed to publish message after {maxRetries} retries"); } }
Event Consumer Implementation
// Order created event consumer public class OrderCreatedEventConsumer : BackgroundService { private readonly IModel _channel; private readonly ILogger<OrderCreatedEventConsumer> _logger; private readonly IServiceProvider _serviceProvider; private readonly ISerializer _serializer; public OrderCreatedEventConsumer( IModel channel, ILogger<OrderCreatedEventConsumer> logger, IServiceProvider serviceProvider, ISerializer serializer) { _channel = channel; _logger = logger; _serviceProvider = serviceProvider; _serializer = serializer; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var consumer = new AsyncEventingBasicConsumer(_channel); consumer.Received += async (model, ea) => { try { await ProcessMessageAsync(ea); _channel.BasicAck(ea.DeliveryTag, multiple: false); } catch (Exception ex) { _logger.LogError(ex, "Error processing message with delivery tag {DeliveryTag}", ea.DeliveryTag); // Negative acknowledgement with requeue _channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false); } }; _channel.BasicConsume( queue: "order.created", autoAck: false, consumer: consumer); _logger.LogInformation("Started OrderCreatedEventConsumer"); await Task.Delay(Timeout.Infinite, stoppingToken); } private async Task ProcessMessageAsync(BasicDeliverEventArgs ea) { using var scope = _serviceProvider.CreateScope(); var message = _serializer.Deserialize<OrderCreatedEvent>(ea.Body.ToArray()); var inventoryService = scope.ServiceProvider.GetRequiredService<IInventoryService>(); var notificationService = scope.ServiceProvider.GetRequiredService<INotificationService>(); _logger.LogInformation("Processing order created event for order {OrderId}", message.OrderId); // Reserve inventory await inventoryService.ReserveInventoryAsync(message.OrderId, message.Items); // Send notification await notificationService.SendOrderConfirmationAsync(message.OrderId, message.CustomerEmail); _logger.LogInformation("Successfully processed order created event for order {OrderId}", message.OrderId); } } // Order created event model public class OrderCreatedEvent { public string EventId { get; set; } = Guid.NewGuid().ToString(); public DateTime OccurredOn { get; set; } = DateTime.UtcNow; public int OrderId { get; set; } public int CustomerId { get; set; } public string CustomerEmail { get; set; } public List<OrderItem> Items { get; set; } = new(); public decimal TotalAmount { get; set; } } public class OrderItem { public int ProductId { get; set; } public string ProductName { get; set; } public int Quantity { get; set; } public decimal UnitPrice { get; set; } }
6. Event-Driven Architecture
Event Sourcing with Microservices
// Event-sourced order aggregate public class OrderAggregate { private readonly List<object> _changes = new(); public string OrderId { get; private set; } public OrderStatus Status { get; private set; } public decimal TotalAmount { get; private set; } public string CustomerId { get; private set; } public DateTime CreatedAt { get; private set; } public DateTime? UpdatedAt { get; private set; } // Rehydrate from events public OrderAggregate(string orderId, IEnumerable<object> events) { OrderId = orderId; foreach (var @event in events) { Apply(@event, false); } } // Create new order public OrderAggregate(string orderId, string customerId, List<OrderItem> items) { if (string.IsNullOrWhiteSpace(orderId)) throw new ArgumentException("Order ID is required", nameof(orderId)); if (string.IsNullOrWhiteSpace(customerId)) throw new ArgumentException("Customer ID is required", nameof(customerId)); if (items == null || !items.Any()) throw new ArgumentException("Order must have items", nameof(items)); var orderCreated = new OrderCreatedEvent { OrderId = orderId, CustomerId = customerId, Items = items, TotalAmount = items.Sum(i => i.UnitPrice * i.Quantity), CreatedAt = DateTime.UtcNow }; Apply(orderCreated); } public void UpdateShippingAddress(ShippingAddress address) { if (Status != OrderStatus.Created && Status != OrderStatus.Confirmed) throw new InvalidOperationException("Cannot update shipping address in current state"); var @event = new OrderShippingAddressUpdatedEvent { OrderId = OrderId, ShippingAddress = address, UpdatedAt = DateTime.UtcNow }; Apply(@event); } public void ConfirmOrder() { if (Status != OrderStatus.Created) throw new InvalidOperationException("Order can only be confirmed from created state"); var @event = new OrderConfirmedEvent { OrderId = OrderId, ConfirmedAt = DateTime.UtcNow }; Apply(@event); } private void Apply(object @event, bool isNew = true) { When(@event); if (isNew) { _changes.Add(@event); } } private void When(object @event) { switch (@event) { case OrderCreatedEvent e: OrderId = e.OrderId; CustomerId = e.CustomerId; TotalAmount = e.TotalAmount; Status = OrderStatus.Created; CreatedAt = e.CreatedAt; break; case OrderConfirmedEvent e: Status = OrderStatus.Confirmed; UpdatedAt = e.ConfirmedAt; break; case OrderShippingAddressUpdatedEvent e: UpdatedAt = e.UpdatedAt; break; } } public IReadOnlyCollection<object> GetUncommittedChanges() => _changes.AsReadOnly(); public void MarkChangesAsCommitted() => _changes.Clear(); } // Event store implementation public class EventStore : IEventStore { private readonly IEventStoreRepository _repository; private readonly IEventPublisher _eventPublisher; private readonly ILogger<EventStore> _logger; public EventStore( IEventStoreRepository repository, IEventPublisher eventPublisher, ILogger<EventStore> logger) { _repository = repository; _eventPublisher = eventPublisher; _logger = logger; } public async Task SaveAsync(string aggregateId, IEnumerable<object> events, int expectedVersion) { var eventData = events.Select(@event => new EventData { Id = Guid.NewGuid().ToString(), AggregateId = aggregateId, EventType = @event.GetType().Name, EventData = JsonSerializer.Serialize(@event, @event.GetType()), Version = ++expectedVersion, Timestamp = DateTime.UtcNow }).ToList(); await _repository.AppendEventsAsync(aggregateId, eventData, expectedVersion - events.Count()); // Publish events for other microservices foreach (var @event in events) { await _eventPublisher.PublishAsync(@event); } _logger.LogInformation("Saved {EventCount} events for aggregate {AggregateId}", eventData.Count, aggregateId); } public async Task<List<object>> GetEventsAsync(string aggregateId) { var eventData = await _repository.GetEventsAsync(aggregateId); return eventData.Select(DeserializeEvent).ToList(); } private object DeserializeEvent(EventData eventData) { var eventType = Type.GetType($"ECommerce.Domain.Events.{eventData.EventType}, ECommerce.Domain"); if (eventType == null) throw new InvalidOperationException($"Unknown event type: {eventData.EventType}"); return JsonSerializer.Deserialize(eventData.EventData, eventType); } }
Saga Pattern Implementation
// Order processing saga public class OrderProcessingSaga : IEventHandler<OrderCreatedEvent>, IEventHandler<PaymentProcessedEvent>, IEventHandler<InventoryReservedEvent>, IEventHandler<ShippingCreatedEvent> { private readonly ISagaRepository _sagaRepository; private readonly IEventPublisher _eventPublisher; private readonly ILogger<OrderProcessingSaga> _logger; public OrderProcessingSaga( ISagaRepository sagaRepository, IEventPublisher eventPublisher, ILogger<OrderProcessingSaga> logger) { _sagaRepository = sagaRepository; _eventPublisher = eventPublisher; _logger = logger; } public async Task Handle(OrderCreatedEvent @event) { _logger.LogInformation("Starting order processing saga for order {OrderId}", @event.OrderId); var saga = new OrderSaga(@event.OrderId, @event.CustomerId, @event.TotalAmount); await _sagaRepository.SaveAsync(saga); // Start payment processing await _eventPublisher.PublishAsync(new ProcessPaymentCommand { OrderId = @event.OrderId, Amount = @event.TotalAmount, PaymentMethod = "CreditCard" // Would come from order }); } public async Task Handle(PaymentProcessedEvent @event) { var saga = await _sagaRepository.GetByIdAsync(@event.OrderId); if (@event.Success) { saga.MarkPaymentProcessed(); await _sagaRepository.SaveAsync(saga); // Proceed to inventory reservation await _eventPublisher.PublishAsync(new ReserveInventoryCommand { OrderId = @event.OrderId, Items = @event.Items }); } else { saga.MarkPaymentFailed(@event.ErrorMessage); await _sagaRepository.SaveAsync(saga); // Compensating action await _eventPublisher.PublishAsync(new CancelOrderCommand { OrderId = @event.OrderId, Reason = "Payment failed: " + @event.ErrorMessage }); } } public async Task Handle(InventoryReservedEvent @event) { var saga = await _sagaRepository.GetByIdAsync(@event.OrderId); saga.MarkInventoryReserved(); await _sagaRepository.SaveAsync(saga); // Proceed to shipping await _eventPublisher.PublishAsync(new CreateShippingCommand { OrderId = @event.OrderId, ShippingAddress = @event.ShippingAddress, Items = @event.Items }); } public async Task Handle(ShippingCreatedEvent @event) { var saga = await _sagaRepository.GetByIdAsync(@event.OrderId); saga.MarkShippingCreated(); await _sagaRepository.SaveAsync(saga); // Order processing completed await _eventPublisher.PublishAsync(new CompleteOrderCommand { OrderId = @event.OrderId }); _logger.LogInformation("Order processing saga completed for order {OrderId}", @event.OrderId); } }
7. Service Discovery & API Gateway
Consul Service Discovery
// Consul service registration public class ConsulServiceDiscovery : IServiceDiscovery, IHostedService { private readonly IConsulClient _consulClient; private readonly ServiceConfig _serviceConfig; private readonly ILogger<ConsulServiceDiscovery> _logger; private string _serviceId; public ConsulServiceDiscovery( IConsulClient consulClient, IConfiguration configuration, ILogger<ConsulServiceDiscovery> logger) { _consulClient = consulClient; _logger = logger; _serviceConfig = configuration.GetSection("ServiceDiscovery").Get<ServiceConfig>(); } public async Task StartAsync(CancellationToken cancellationToken) { _serviceId = $"{_serviceConfig.ServiceName}-{Guid.NewGuid()}"; var registration = new AgentServiceRegistration { ID = _serviceId, Name = _serviceConfig.ServiceName, Address = _serviceConfig.ServiceAddress, Port = _serviceConfig.ServicePort, Tags = _serviceConfig.Tags, Check = new AgentServiceCheck { HTTP = $"http://{_serviceConfig.ServiceAddress}:{_serviceConfig.ServicePort}/health", Interval = TimeSpan.FromSeconds(30), Timeout = TimeSpan.FromSeconds(10), DeregisterCriticalServiceAfter = TimeSpan.FromMinutes(1) } }; await _consulClient.Agent.ServiceRegister(registration, cancellationToken); _logger.LogInformation("Service {ServiceName} registered with Consul", _serviceConfig.ServiceName); } public async Task StopAsync(CancellationToken cancellationToken) { await _consulClient.Agent.ServiceDeregister(_serviceId, cancellationToken); _logger.LogInformation("Service {ServiceName} deregistered from Consul", _serviceConfig.ServiceName); } public async Task<ServiceEndpoint> GetServiceAsync(string serviceName) { var services = await _consulClient.Health.Service(serviceName, string.Empty, true); if (services.Response == null || !services.Response.Any()) { throw new ServiceNotFoundException($"Service {serviceName} not found in Consul"); } // Simple round-robin load balancing var service = services.Response[Random.Shared.Next(services.Response.Length)]; return new ServiceEndpoint { Address = service.Service.Address, Port = service.Service.Port }; } public async Task<List<ServiceEndpoint>> GetAllServicesAsync(string serviceName) { var services = await _consulClient.Health.Service(serviceName, string.Empty, true); return services.Response?.Select(s => new ServiceEndpoint { Address = s.Service.Address, Port = s.Service.Port }).ToList() ?? new List<ServiceEndpoint>(); } }
API Gateway with Ocelot
// Ocelot configuration public static class OcelotConfiguration { public static WebApplicationBuilder AddOcelotGateway(this WebApplicationBuilder builder) { builder.Services.AddOcelot() .AddConsul() .AddConfigStoredInConsul(); builder.Services.AddAuthentication("Bearer") .AddJwtBearer("Bearer", options => { options.Authority = builder.Configuration["Identity:Authority"]; options.RequireHttpsMetadata = false; options.TokenValidationParameters = new TokenValidationParameters { ValidateAudience = false }; }); return builder; } } // ocelot.json configuration { "Routes": [ { "DownstreamPathTemplate": "/api/products/{everything}", "DownstreamScheme": "https", "DownstreamHostAndPorts": [ { "Host": "product-service", "Port": 5001 } ], "UpstreamPathTemplate": "/products/{everything}", "UpstreamHttpMethod": [ "GET", "POST", "PUT", "DELETE" ], "AuthenticationOptions": { "AuthenticationProviderKey": "Bearer", "AllowedScopes": [] }, "RateLimitOptions": { "ClientWhitelist": [], "EnableRateLimiting": true, "Period": "1s", "PeriodTimespan": 1, "Limit": 100 } }, { "DownstreamPathTemplate": "/api/orders/{everything}", "DownstreamScheme": "https", "DownstreamHostAndPorts": [ { "Host": "order-service", "Port": 5002 } ], "UpstreamPathTemplate": "/orders/{everything}", "UpstreamHttpMethod": [ "GET", "POST", "PUT", "DELETE" ], "AuthenticationOptions": { "AuthenticationProviderKey": "Bearer", "AllowedScopes": [ "orders" ] } }, { "DownstreamPathTemplate": "/api/inventory/{everything}", "DownstreamScheme": "https", "DownstreamHostAndPorts": [ { "Host": "inventory-service", "Port": 5003 } ], "UpstreamPathTemplate": "/inventory/{everything}", "UpstreamHttpMethod": [ "GET", "POST", "PUT" ], "AuthenticationOptions": { "AuthenticationProviderKey": "Bearer", "AllowedScopes": [ "inventory" ] } } ], "GlobalConfiguration": { "BaseUrl": "https://localhost:7000", "ServiceDiscoveryProvider": { "Host": "consul", "Port": 8500, "Type": "Consul" } } }
8. Containerization & Orchestration
Docker Configuration
# Product Service Dockerfile FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base WORKDIR /app EXPOSE 80 EXPOSE 443 # Install curl for health checks RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/* FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build WORKDIR /src COPY ["src/ProductService/ProductService.csproj", "src/ProductService/"] COPY ["contracts/ProductService.Grpc/ProductService.Grpc.csproj", "contracts/ProductService.Grpc/"] RUN dotnet restore "src/ProductService/ProductService.csproj" COPY . . WORKDIR "/src/src/ProductService" RUN dotnet build "ProductService.csproj" -c Release -o /app/build FROM build AS publish RUN dotnet publish "ProductService.csproj" -c Release -o /app/publish FROM base AS final WORKDIR /app COPY --from=publish /app/publish . # Create non-root user RUN groupadd -r appuser && useradd -r -g appuser appuser RUN chown -R appuser:appuser /app USER appuser ENTRYPOINT ["dotnet", "ProductService.dll"]
Kubernetes Deployment
# product-service-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: product-service namespace: ecommerce labels: app: product-service version: v1.0.0 spec: replicas: 3 selector: matchLabels: app: product-service template: metadata: labels: app: product-service version: v1.0.0 annotations: consul.hashicorp.com/connect-inject: "true" consul.hashicorp.com/service-tags: "version=v1.0.0" spec: containers: - name: product-service image: ecommerce/product-service:latest imagePullPolicy: IfNotPresent ports: - containerPort: 80 name: http - containerPort: 443 name: https env: - name: ASPNETCORE_ENVIRONMENT value: "Production" - name: ConnectionStrings__ProductDatabase valueFrom: secretKeyRef: name: database-secrets key: product-connection-string - name: RabbitMQ__HostName value: "rabbitmq" - name: Consul__Host value: "consul-server" resources: requests: memory: "256Mi" cpu: "250m" limits: memory: "512Mi" cpu: "500m" livenessProbe: httpGet: path: /health port: 80 initialDelaySeconds: 30 periodSeconds: 10 timeoutSeconds: 5 readinessProbe: httpGet: path: /health/ready port: 80 initialDelaySeconds: 5 periodSeconds: 5 startupProbe: httpGet: path: /health/startup port: 80 initialDelaySeconds: 10 periodSeconds: 10 failureThreshold: 3 securityContext: allowPrivilegeEscalation: false runAsNonRoot: true runAsUser: 1000 capabilities: drop: - ALL restartPolicy: Always terminationGracePeriodSeconds: 60 --- apiVersion: v1 kind: Service metadata: name: product-service namespace: ecommerce labels: app: product-service annotations: consul.hashicorp.com/service-tags: "version=v1.0.0" spec: selector: app: product-service ports: - name: http port: 80 targetPort: 80 protocol: TCP - name: grpc port: 5001 targetPort: 5001 protocol: TCP type: ClusterIP
Docker Compose for Development
# docker-compose.yml version: '3.8' services: product-service: build: context: . dockerfile: src/ProductService/Dockerfile environment: - ASPNETCORE_ENVIRONMENT=Development - ConnectionStrings__ProductDatabase=Server=sql-server;Database=ProductDB;User Id=sa;Password=YourPassword123!;TrustServerCertificate=true; - RabbitMQ__HostName=rabbitmq - Consul__Host=consul ports: - "5001:80" depends_on: - sql-server - rabbitmq - consul networks: - ecommerce-network order-service: build: context: . dockerfile: src/OrderService/Dockerfile environment: - ASPNETCORE_ENVIRONMENT=Development - ConnectionStrings__OrderDatabase=Server=sql-server;Database=OrderDB;User Id=sa;Password=YourPassword123!;TrustServerCertificate=true; - RabbitMQ__HostName=rabbitmq ports: - "5002:80" depends_on: - sql-server - rabbitmq networks: - ecommerce-network api-gateway: build: context: . dockerfile: src/ApiGateway/Dockerfile environment: - ASPNETCORE_ENVIRONMENT=Development ports: - "7000:80" depends_on: - product-service - order-service networks: - ecommerce-network sql-server: image: mcr.microsoft.com/mssql/server:2022-latest environment: SA_PASSWORD: "YourPassword123!" ACCEPT_EULA: "Y" MSSQL_PID: "Express" ports: - "1433:1433" volumes: - sql-data:/var/opt/mssql networks: - ecommerce-network rabbitmq: image: rabbitmq:3-management ports: - "5672:5672" - "15672:15672" volumes: - rabbitmq-data:/var/lib/rabbitmq networks: - ecommerce-network consul: image: consul:1.15 ports: - "8500:8500" command: "agent -dev -client=0.0.0.0" networks: - ecommerce-network seq: image: datalust/seq:latest environment: - ACCEPT_EULA=Y ports: - "5341:5341" - "8081:80" volumes: - seq-data:/data networks: - ecommerce-network volumes: sql-data: rabbitmq-data: seq-data: networks: ecommerce-network: driver: bridge
9. Distributed Data Management
Database per Service Pattern
// Product service database context public class ProductContext : DbContext { public ProductContext(DbContextOptions<ProductContext> options) : base(options) { } public DbSet<Product> Products => Set<Product>(); public DbSet<Category> Categories => Set<Category>(); public DbSet<ProductReview> ProductReviews => Set<ProductReview>(); protected override void OnModelCreating(ModelBuilder modelBuilder) { modelBuilder.Entity<Product>(entity => { entity.HasKey(e => e.Id); entity.Property(e => e.Name).IsRequired().HasMaxLength(100); entity.Property(e => e.Description).HasMaxLength(500); entity.Property(e => e.Price).HasColumnType("decimal(18,2)"); entity.HasIndex(e => e.Name); entity.HasIndex(e => e.CategoryId); entity.HasQueryFilter(e => e.Status == ProductStatus.Active); }); modelBuilder.Entity<Category>(entity => { entity.HasKey(e => e.Id); entity.Property(e => e.Name).IsRequired().HasMaxLength(50); entity.HasIndex(e => e.Name).IsUnique(); }); base.OnModelCreating(modelBuilder); } } // Order service database context public class OrderContext : DbContext { public OrderContext(DbContextOptions<OrderContext> options) : base(options) { } public DbSet<Order> Orders => Set<Order>(); public DbSet<OrderItem> OrderItems => Set<OrderItem>(); protected override void OnModelCreating(ModelBuilder modelBuilder) { modelBuilder.Entity<Order>(entity => { entity.HasKey(e => e.Id); entity.Property(e => e.TotalAmount).HasColumnType("decimal(18,2)"); entity.Property(e => e.Status).HasConversion<string>(); entity.HasIndex(e => e.CustomerId); entity.HasIndex(e => e.Status); entity.HasIndex(e => e.CreatedAt); }); modelBuilder.Entity<OrderItem>(entity => { entity.HasKey(e => e.Id); entity.Property(e => e.UnitPrice).HasColumnType("decimal(18,2)"); entity.HasOne(e => e.Order) .WithMany(o => o.Items) .HasForeignKey(e => e.OrderId); }); base.OnModelCreating(modelBuilder); } }
Saga Data Management
// Saga data model public class OrderSaga { public string OrderId { get; set; } public string CustomerId { get; set; } public decimal TotalAmount { get; set; } public OrderSagaState State { get; set; } public Dictionary<string, object> Context { get; set; } = new(); public List<SagaStep> Steps { get; set; } = new(); public DateTime CreatedAt { get; set; } public DateTime? UpdatedAt { get; set; } public DateTime? CompletedAt { get; set; } public void MarkPaymentProcessed() { State = OrderSagaState.PaymentProcessed; Steps.Add(new SagaStep { Name = "PaymentProcessed", CompletedAt = DateTime.UtcNow }); UpdatedAt = DateTime.UtcNow; } public void MarkInventoryReserved() { State = OrderSagaState.InventoryReserved; Steps.Add(new SagaStep { Name = "InventoryReserved", CompletedAt = DateTime.UtcNow }); UpdatedAt = DateTime.UtcNow; } public void MarkShippingCreated() { State = OrderSagaState.ShippingCreated; Steps.Add(new SagaStep { Name = "ShippingCreated", CompletedAt = DateTime.UtcNow }); UpdatedAt = DateTime.UtcNow; } public void MarkCompleted() { State = OrderSagaState.Completed; CompletedAt = DateTime.UtcNow; UpdatedAt = DateTime.UtcNow; } public void MarkPaymentFailed(string error) { State = OrderSagaState.PaymentFailed; Context["PaymentError"] = error; UpdatedAt = DateTime.UtcNow; } } public enum OrderSagaState { Started, PaymentProcessed, InventoryReserved, ShippingCreated, Completed, PaymentFailed, InventoryReservationFailed, ShippingCreationFailed } public class SagaStep { public string Name { get; set; } public DateTime? CompletedAt { get; set; } public string Error { get; set; } }
10. Observability & Monitoring
Distributed Tracing
// OpenTelemetry configuration public static class OpenTelemetryExtensions { public static IServiceCollection AddOpenTelemetry(this IServiceCollection services, IConfiguration configuration) { services.AddOpenTelemetry() .WithTracing(tracing => { tracing.AddAspNetCoreInstrumentation(options => { options.EnrichWithHttpRequest = (activity, httpRequest) => { activity.SetTag("request.protocol", httpRequest.Protocol); }; options.EnrichWithHttpResponse = (activity, httpResponse) => { activity.SetTag("response.length", httpResponse.ContentLength); }; options.RecordException = true; }) .AddGrpcClientInstrumentation() .AddHttpClientInstrumentation() .AddEntityFrameworkCoreInstrumentation(options => { options.SetDbStatementForText = true; }) .AddRedisInstrumentation() .AddSource("ProductService") .AddSource("OrderService") .AddSource("InventoryService") .SetSampler(new AlwaysOnSampler()) .AddOtlpExporter(options => { options.Endpoint = new Uri(configuration["Otlp:Endpoint"]); }) .AddConsoleExporter(); }) .WithMetrics(metrics => { metrics.AddAspNetCoreInstrumentation() .AddHttpClientInstrumentation() .AddRuntimeInstrumentation() .AddProcessInstrumentation() .AddMeter("Microsoft.AspNetCore.Hosting") .AddMeter("Microsoft.AspNetCore.Server.Kestrel") .AddOtlpExporter(options => { options.Endpoint = new Uri(configuration["Otlp:Endpoint"]); }); }); return services; } }
Health Checks with Dependencies
// Comprehensive health checks public static class HealthCheckExtensions { public static IHealthChecksBuilder AddMicroserviceHealthChecks( this IServiceCollection services, IConfiguration configuration) { return services.AddHealthChecks() // Database health checks .AddSqlServer( connectionString: configuration.GetConnectionString("ProductDatabase"), name: "product-database", tags: new[] { "ready", "live" }) .AddRedis( redisConnectionString: configuration.GetConnectionString("Redis"), name: "redis", tags: new[] { "ready", "live" }) // External services .AddUrlGroup( new Uri("https://api.paymentgateway.com/health"), name: "payment-gateway", tags: new[] { "ready" }) // Message broker .AddRabbitMQ( rabbitConnectionString: configuration.GetConnectionString("RabbitMQ"), name: "rabbitmq", tags: new[] { "ready", "live" }) // Disk storage .AddDiskStorageHealthCheck(s => s.AddDrive("C:\\", 1024), name: "storage", tags: new[] { "live" }) // Memory check .AddProcessAllocatedMemoryHealthCheck(512, "process-memory") // Custom checks .AddCheck<GrpcHealthCheck>("grpc-endpoints", tags: new[] { "ready" }) .AddCheck<DatabaseMigrationHealthCheck>("database-migrations", tags: new[] { "ready" }); } } // Custom gRPC health check public class GrpcHealthCheck : IHealthCheck { private readonly IProductServiceClient _productServiceClient; private readonly ILogger<GrpcHealthCheck> _logger; public GrpcHealthCheck( IProductServiceClient productServiceClient, ILogger<GrpcHealthCheck> logger) { _productServiceClient = productServiceClient; _logger = logger; } public async Task<HealthCheckResult> CheckHealthAsync( HealthCheckContext context, CancellationToken cancellationToken = default) { try { // Test gRPC connection by making a simple call await _productServiceClient.GetProductAsync(1, cancellationToken); return HealthCheckResult.Healthy("gRPC endpoints are responsive"); } catch (Exception ex) { _logger.LogWarning(ex, "gRPC health check failed"); return HealthCheckResult.Unhealthy("gRPC endpoints are not responsive"); } } }
11. Security in Microservices
Secure Service Communication
// gRPC with authentication and authorization public static class GrpcSecurityExtensions { public static IServiceCollection AddSecureGrpc(this IServiceCollection services) { services.AddGrpc(options => { options.Interceptors.Add<AuthInterceptor>(); options.Interceptors.Add<LoggingInterceptor>(); }); services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme) .AddJwtBearer(options => { options.Authority = "https://localhost:5001"; options.TokenValidationParameters = new TokenValidationParameters { ValidateAudience = false, ValidTypes = new[] { "at+jwt" } }; }); services.AddAuthorization(options => { options.AddPolicy("ProductRead", policy => policy.RequireAuthenticatedUser().RequireClaim("scope", "product.read")); options.AddPolicy("ProductWrite", policy => policy.RequireAuthenticatedUser().RequireClaim("scope", "product.write")); }); return services; } } // Authentication interceptor public class AuthInterceptor : Interceptor { private readonly IHttpContextAccessor _httpContextAccessor; private readonly ILogger<AuthInterceptor> _logger; public AuthInterceptor( IHttpContextAccessor httpContextAccessor, ILogger<AuthInterceptor> logger) { _httpContextAccessor = httpContextAccessor; _logger = logger; } public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>( TRequest request, ServerCallContext context, UnaryServerMethod<TRequest, TResponse> continuation) { var httpContext = _httpContextAccessor.HttpContext; if (httpContext == null) { throw new RpcException(new Status(StatusCode.Unauthenticated, "Authentication required")); } // Extract token from gRPC metadata var token = context.RequestHeaders.FirstOrDefault(h => h.Key == "authorization")?.Value; if (string.IsNullOrEmpty(token)) { throw new RpcException(new Status(StatusCode.Unauthenticated, "Authorization token required")); } // Validate token and set user context try { var user = await ValidateTokenAsync(token); context.UserState["User"] = user; } catch (Exception ex) { _logger.LogWarning(ex, "Token validation failed"); throw new RpcException(new Status(StatusCode.Unauthenticated, "Invalid token")); } return await continuation(request, context); } private async Task<ClaimsPrincipal> ValidateTokenAsync(string token) { // Token validation logic // This would typically use JWT validation with your identity provider return await Task.FromResult(new ClaimsPrincipal()); } }
Secure Message Brokers
// Secure RabbitMQ configuration public class SecureRabbitMQConnectionFactory { public ConnectionFactory CreateSecureFactory(RabbitMQConfig config) { return new ConnectionFactory { HostName = config.HostName, Port = config.Port, UserName = config.UserName, Password = config.Password, VirtualHost = config.VirtualHost, // Security settings Ssl = new SslOption { Enabled = config.UseSsl, ServerName = config.HostName, CertificateValidationCallback = (sender, certificate, chain, sslPolicyErrors) => { // Custom certificate validation return sslPolicyErrors == SslPolicyErrors.None; } }, // Connection settings RequestedHeartbeat = TimeSpan.FromSeconds(60), NetworkRecoveryInterval = TimeSpan.FromSeconds(10), AutomaticRecoveryEnabled = true, TopologyRecoveryEnabled = true }; } }
12. Real-World E-Commerce Platform
Complete Microservices Architecture
// Order service with event sourcing public class OrderService : OrderService.Grpc.OrderService.OrderServiceBase { private readonly IEventStore _eventStore; private readonly IOrderRepository _orderRepository; private readonly IEventPublisher _eventPublisher; private readonly ILogger<OrderService> _logger; public OrderService( IEventStore eventStore, IOrderRepository orderRepository, IEventPublisher eventPublisher, ILogger<OrderService> logger) { _eventStore = eventStore; _orderRepository = orderRepository; _eventPublisher = eventPublisher; _logger = logger; } public override async Task<CreateOrderResponse> CreateOrder( CreateOrderRequest request, ServerCallContext context) { _logger.LogInformation("Creating order for customer {CustomerId}", request.CustomerId); try { // Validate products via gRPC call to ProductService var productService = context.GetHttpContext().RequestServices .GetRequiredService<IProductServiceClient>(); foreach (var item in request.Items) { var product = await productService.GetProductAsync(item.ProductId); if (product.StockQuantity < item.Quantity) { throw new RpcException(new Status(StatusCode.FailedPrecondition, $"Insufficient stock for product {item.ProductId}")); } } // Create order aggregate var orderId = Guid.NewGuid().ToString(); var orderItems = request.Items.Select(i => new OrderItem { ProductId = i.ProductId, ProductName = i.ProductName, Quantity = i.Quantity, UnitPrice = (decimal)i.UnitPrice }).ToList(); var order = new OrderAggregate(orderId, request.CustomerId, orderItems); // Save events var events = order.GetUncommittedChanges(); await _eventStore.SaveAsync(orderId, events, -1); order.MarkChangesAsCommitted(); // Publish integration event await _eventPublisher.PublishAsync(new OrderCreatedIntegrationEvent { OrderId = orderId, CustomerId = request.CustomerId, Items = request.Items.ToList(), TotalAmount = (decimal)request.TotalAmount }); _logger.LogInformation("Order {OrderId} created successfully", orderId); return new CreateOrderResponse { OrderId = orderId, Status = OrderStatus.Created }; } catch (RpcException) { throw; } catch (Exception ex) { _logger.LogError(ex, "Failed to create order for customer {CustomerId}", request.CustomerId); throw new RpcException(new Status(StatusCode.Internal, "Order creation failed")); } } }
Integration Testing
// Microservices integration tests public class MicroservicesIntegrationTests : IClassFixture<MicroservicesTestFixture> { private readonly MicroservicesTestFixture _fixture; private readonly HttpClient _apiGatewayClient; public MicroservicesIntegrationTests(MicroservicesTestFixture fixture) { _fixture = fixture; _apiGatewayClient = fixture.CreateClient(); } [Fact] public async Task CreateOrder_ValidRequest_ShouldProcessSuccessfully() { // Arrange var createOrderRequest = new { CustomerId = 123, Items = new[] { new { ProductId = 1, Quantity = 2, UnitPrice = 25.99m }, new { ProductId = 2, Quantity = 1, UnitPrice = 15.50m } }, ShippingAddress = new { Street = "123 Test St", City = "Test City", Country = "Test Country", ZipCode = "12345" } }; // Act var response = await _apiGatewayClient.PostAsJsonAsync("/orders", createOrderRequest); // Assert response.EnsureSuccessStatusCode(); var orderResponse = await response.Content.ReadFromJsonAsync<CreateOrderResponse>(); orderResponse.Should().NotBeNull(); orderResponse.OrderId.Should().NotBeNullOrEmpty(); orderResponse.Status.Should().Be(OrderStatus.Created); // Verify events were published await _fixture.WaitForEvent<OrderCreatedIntegrationEvent>( e => e.OrderId == orderResponse.OrderId, TimeSpan.FromSeconds(10)); // Verify inventory was updated var inventoryResponse = await _apiGatewayClient.GetAsync($"/inventory/products/1"); inventoryResponse.EnsureSuccessStatusCode(); } } public class MicroservicesTestFixture : WebApplicationFactory<Program> { private readonly TestContainersFixture _containers; public MicroservicesTestFixture() { _containers = new TestContainersFixture(); } protected override void ConfigureWebHost(IWebHostBuilder builder) { builder.ConfigureServices(services => { // Replace real services with test doubles services.AddSingleton<IMessagePublisher, TestMessagePublisher>(); services.AddSingleton<IProductServiceClient, TestProductServiceClient>(); }); builder.UseEnvironment("Testing"); } protected override IHost CreateHost(IHostBuilder builder) { // Start test containers before host _containers.StartAsync().GetAwaiter().GetResult(); return base.CreateHost(builder); } public async Task WaitForEvent<TEvent>(Func<TEvent, bool> predicate, TimeSpan timeout) { var testPublisher = Services.GetRequiredService<TestMessagePublisher>(); var startTime = DateTime.UtcNow; while (DateTime.UtcNow - startTime < timeout) { if (testPublisher.PublishedEvents.OfType<TEvent>().Any(predicate)) return; await Task.Delay(100); } throw new TimeoutException($"Event of type {typeof(TEvent).Name} not published within timeout"); } }
This comprehensive microservices foundation guide provides everything needed to architect, build, and deploy scalable microservices using gRPC and message brokers in ASP.NET Core. The patterns and practices demonstrated here form the foundation for building robust, maintainable distributed systems.
Powered By: FreeLearning365.com
.png)
0 Comments
thanks for your comments!