CQRS and MediatR Masterclass: Solve Business Complexity in ASP.NET Core (Part-36 of 40)

 

CQRS and MediatR Masterclass: Solve Business Complexity in ASP.NET Core (Part-36 of 40)



CQRS and MediatR Masterclass: Solve Business Complexity in ASP.NET Core




Master CQRS and MediatR patterns in ASP.NET Core. Complete implementation guide for scalable, maintainable applications with real-world examples.

  ASPNETCore,CQRS,MediatR,CleanArchitecture,Patterns,Scalability,Microservices,Command,Query,Repository,DomainDrivenDesign,DDD


 Table of Contents

  1. Understanding the Problem

  2. CQRS Fundamentals

  3. MediatR Deep Dive

  4. Basic Implementation

  5. Advanced CQRS Patterns

  6. Validation & Behavior

  7. Performance Optimization

  8. Event Sourcing Integration

  9. Testing Strategies

  10. Real-World E-Commerce Case Study

  11. Microservices Integration

  12. Production Best Practices


1. Understanding the Problem

The Traditional CRUD Challenges

Real-World Scenario: Imagine building an e-commerce platform where a single product page needs to:

  • Display product information

  • Show inventory status

  • Calculate discounts

  • Display customer reviews

  • Show related products

  • Track view count for analytics

csharp
// Traditional monolithic service approach
public class ProductService : IProductService
{
    private readonly IProductRepository _productRepository;
    private readonly IInventoryRepository _inventoryRepository;
    private readonly IDiscountRepository _discountRepository;
    private readonly IReviewRepository _reviewRepository;
    private readonly IAnalyticsRepository _analyticsRepository;

    public async Task<ProductDTO> GetProductAsync(int productId)
    {
        // Multiple database calls
        var product = await _productRepository.GetByIdAsync(productId);
        var inventory = await _inventoryRepository.GetByProductIdAsync(productId);
        var discounts = await _discountRepository.GetActiveDiscountsAsync(productId);
        var reviews = await _reviewRepository.GetByProductIdAsync(productId);
        
        // Business logic scattered
        var discountedPrice = CalculateDiscountedPrice(product.Price, discounts);
        var stockStatus = CalculateStockStatus(inventory.Quantity);
        var averageRating = CalculateAverageRating(reviews);
        
        // Analytics tracking
        await _analyticsRepository.TrackProductViewAsync(productId);
        
        // Complex mapping
        return new ProductDTO
        {
            Id = product.Id,
            Name = product.Name,
            Price = product.Price,
            DiscountedPrice = discountedPrice,
            StockStatus = stockStatus,
            AverageRating = averageRating,
            Reviews = reviews,
            // ... more properties
        };
    }
    
    // Similar complexity for write operations
    public async Task UpdateProductAsync(ProductUpdateDTO update)
    {
        // Validation
        if (update.Price < 0)
            throw new ArgumentException("Price cannot be negative");
            
        // Business rules
        if (update.Price > 1000 && !await IsPremiumUserAsync(update.UpdatedBy))
            throw new UnauthorizedAccessException("Only premium users can set high prices");
            
        // Multiple updates
        await _productRepository.UpdateAsync(update);
        await _priceHistoryRepository.AddAsync(update);
        await _auditRepository.LogUpdateAsync(update);
        
        // Notifications
        await _notificationService.NotifyPriceChangeAsync(update);
    }
}

Problems with Traditional Approach

  1. Single Responsibility Violation: Services handle too many concerns

  2. Performance Issues: Multiple database calls in single operation

  3. Complex Testing: Difficult to unit test individual behaviors

  4. Tight Coupling: Changes affect multiple parts of the system

  5. Scalability Challenges: Read and write operations compete for resources


2. CQRS Fundamentals

What is CQRS?

Command Query Responsibility Segregation is a pattern that separates read and write operations into different models.

csharp
// CQRS Core Concepts
public interface ICommand
{
    // Marker interface for commands
}

public interface IQuery<TResponse>
{
    // Marker interface for queries
}

public interface ICommandHandler<TCommand> 
    where TCommand : ICommand
{
    Task Handle(TCommand command, CancellationToken cancellationToken);
}

public interface IQueryHandler<TQuery, TResponse> 
    where TQuery : IQuery<TResponse>
{
    Task<TResponse> Handle(TQuery query, CancellationToken cancellationToken);
}

CQRS Architecture Patterns

csharp
// Level 1: Simple Separation
public class SimpleCQRS
{
    // Commands - Write operations
    public class CreateProductCommand : ICommand
    {
        public string Name { get; set; }
        public decimal Price { get; set; }
        public string Description { get; set; }
    }

    // Queries - Read operations  
    public class GetProductQuery : IQuery<ProductDTO>
    {
        public int ProductId { get; set; }
    }

    // Separate handlers
    public class CreateProductCommandHandler : ICommandHandler<CreateProductCommand>
    {
        public async Task Handle(CreateProductCommand command, CancellationToken cancellationToken)
        {
            // Write logic only
            var product = new Product(command.Name, command.Price, command.Description);
            await _productRepository.AddAsync(product);
        }
    }

    public class GetProductQueryHandler : IQueryHandler<GetProductQuery, ProductDTO>
    {
        public async Task<ProductDTO> Handle(GetProductQuery query, CancellationToken cancellationToken)
        {
            // Read logic only - optimized for reading
            return await _productReadRepository.GetByIdAsync(query.ProductId);
        }
    }
}

When to Use CQRS

Perfect Scenarios:

  • High-traffic applications with different read/write patterns

  • Complex business logic requiring separation of concerns

  • Systems needing different data models for reads vs writes

  • Applications requiring event sourcing

  • Microservices architectures

Not Recommended For:

  • Simple CRUD applications

  • Low-traffic systems

  • When over-engineering would cause more harm than good


3. MediatR Deep Dive

MediatR Architecture

MediatR is a mediator pattern implementation for .NET that helps reduce coupling between components.

csharp
// MediatR Core Interfaces
public interface IMediator
{
    Task<TResponse> Send<TResponse>(IRequest<TResponse> request, CancellationToken cancellationToken = default);
    Task<object?> Send(object request, CancellationToken cancellationToken = default);
    Task Publish(object notification, CancellationToken cancellationToken = default);
}

public interface IRequest<TResponse> { }

public interface IRequestHandler<TRequest, TResponse> 
    where TRequest : IRequest<TResponse>
{
    Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken);
}

public interface INotification { }

public interface INotificationHandler<TNotification> 
    where TNotification : INotification
{
    Task Handle(TNotification notification, CancellationToken cancellationToken);
}

Setting Up MediatR

csharp
// Program.cs - Configuration
using MediatR;

var builder = WebApplication.CreateBuilder(args);

// Add MediatR
builder.Services.AddMediatR(cfg => 
{
    cfg.RegisterServicesFromAssemblyContaining<Program>();
    // Add behaviors
    cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>));
    cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(ValidationBehavior<,>));
});

// Add supporting services
builder.Services.AddTransient(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>));
builder.Services.AddTransient(typeof(IPipelineBehavior<,>), typeof(ValidationBehavior<,>));

var app = builder.Build();

MediatR Pipeline Behaviors

csharp
// Logging Behavior
public class LoggingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
    where TRequest : IRequest<TResponse>
{
    private readonly ILogger<LoggingBehavior<TRequest, TResponse>> _logger;

    public LoggingBehavior(ILogger<LoggingBehavior<TRequest, TResponse>> logger)
    {
        _logger = logger;
    }

    public async Task<TResponse> Handle(
        TRequest request, 
        RequestHandlerDelegate<TResponse> next, 
        CancellationToken cancellationToken)
    {
        var requestName = typeof(TRequest).Name;
        
        _logger.LogInformation("Handling {RequestName} with {@Request}", requestName, request);
        
        try
        {
            var response = await next();
            _logger.LogInformation("Handled {RequestName} successfully", requestName);
            return response;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error handling {RequestName}", requestName);
            throw;
        }
    }
}

// Validation Behavior
public class ValidationBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
    where TRequest : IRequest<TResponse>
{
    private readonly IEnumerable<IValidator<TRequest>> _validators;

    public ValidationBehavior(IEnumerable<IValidator<TRequest>> validators)
    {
        _validators = validators;
    }

    public async Task<TResponse> Handle(
        TRequest request, 
        RequestHandlerDelegate<TResponse> next, 
        CancellationToken cancellationToken)
    {
        if (_validators.Any())
        {
            var context = new ValidationContext<TRequest>(request);
            
            var validationResults = await Task.WhenAll(
                _validators.Select(v => v.ValidateAsync(context, cancellationToken)));
                
            var failures = validationResults
                .SelectMany(r => r.Errors)
                .Where(f => f != null)
                .ToList();

            if (failures.Count != 0)
                throw new ValidationException(failures);
        }
        
        return await next();
    }
}

4. Basic Implementation

Complete E-Commerce Example

Let's implement a complete e-commerce system using CQRS and MediatR.

Project Structure:

text
ECommerce.CQRS/
├── src/
│   ├── ECommerce.Application/
│   │   ├── Commands/
│   │   ├── Queries/
│   │   ├── Behaviors/
│   │   └── Models/
│   ├── ECommerce.Domain/
│   ├── ECommerce.Infrastructure/
│   └── ECommerce.API/
├── tests/
└── docker-compose.yml

Domain Models

csharp
// ECommerce.Domain/Entities/Product.cs
public class Product : Entity
{
    public string Name { get; private set; }
    public string Description { get; private set; }
    public decimal Price { get; private set; }
    public int StockQuantity { get; private set; }
    public bool IsActive { get; private set; }
    public DateTime CreatedAt { get; private set; }
    public DateTime? UpdatedAt { get; private set; }

    private Product() { } // For EF Core

    public Product(string name, string description, decimal price, int stockQuantity)
    {
        Name = name ?? throw new ArgumentNullException(nameof(name));
        Description = description ?? throw new ArgumentNullException(nameof(description));
        Price = price >= 0 ? price : throw new ArgumentException("Price cannot be negative");
        StockQuantity = stockQuantity >= 0 ? stockQuantity : 
            throw new ArgumentException("Stock quantity cannot be negative");
        IsActive = true;
        CreatedAt = DateTime.UtcNow;
    }

    public void UpdatePrice(decimal newPrice)
    {
        if (newPrice < 0)
            throw new ArgumentException("Price cannot be negative");
            
        Price = newPrice;
        UpdatedAt = DateTime.UtcNow;
    }

    public void UpdateStock(int quantity)
    {
        if (quantity < 0)
            throw new ArgumentException("Stock quantity cannot be negative");
            
        StockQuantity = quantity;
        UpdatedAt = DateTime.UtcNow;
    }

    public void Deactivate()
    {
        IsActive = false;
        UpdatedAt = DateTime.UtcNow;
    }
}

// ECommerce.Domain/Common/Entity.cs
public abstract class Entity
{
    public int Id { get; protected set; }
    
    private readonly List<IDomainEvent> _domainEvents = new();
    public IReadOnlyCollection<IDomainEvent> DomainEvents => _domainEvents.AsReadOnly();

    protected void AddDomainEvent(IDomainEvent eventItem)
    {
        _domainEvents.Add(eventItem);
    }

    public void ClearDomainEvents()
    {
        _domainEvents.Clear();
    }
}

// ECommerce.Domain/Common/IDomainEvent.cs
public interface IDomainEvent : INotification
{
    DateTime OccurredOn { get; }
}

Commands Implementation

csharp
// ECommerce.Application/Commands/CreateProductCommand.cs
public class CreateProductCommand : IRequest<CreateProductResponse>
{
    public string Name { get; set; }
    public string Description { get; set; }
    public decimal Price { get; set; }
    public int StockQuantity { get; set; }
    
    public CreateProductCommand(string name, string description, decimal price, int stockQuantity)
    {
        Name = name;
        Description = description;
        Price = price;
        StockQuantity = stockQuantity;
    }
}

public class CreateProductResponse
{
    public int ProductId { get; set; }
    public string Name { get; set; }
    public DateTime CreatedAt { get; set; }
    
    public CreateProductResponse(int productId, string name, DateTime createdAt)
    {
        ProductId = productId;
        Name = name;
        CreatedAt = createdAt;
    }
}

// ECommerce.Application/Commands/CreateProductCommandValidator.cs
public class CreateProductCommandValidator : AbstractValidator<CreateProductCommand>
{
    public CreateProductCommandValidator()
    {
        RuleFor(x => x.Name)
            .NotEmpty().WithMessage("Product name is required")
            .MaximumLength(100).WithMessage("Product name cannot exceed 100 characters");
            
        RuleFor(x => x.Description)
            .NotEmpty().WithMessage("Product description is required")
            .MaximumLength(500).WithMessage("Description cannot exceed 500 characters");
            
        RuleFor(x => x.Price)
            .GreaterThanOrEqualTo(0).WithMessage("Price cannot be negative")
            .LessThanOrEqualTo(1000000).WithMessage("Price seems too high");
            
        RuleFor(x => x.StockQuantity)
            .GreaterThanOrEqualTo(0).WithMessage("Stock quantity cannot be negative");
    }
}

// ECommerce.Application/Commands/CreateProductCommandHandler.cs
public class CreateProductCommandHandler : IRequestHandler<CreateProductCommand, CreateProductResponse>
{
    private readonly IProductRepository _productRepository;
    private readonly ILogger<CreateProductCommandHandler> _logger;

    public CreateProductCommandHandler(
        IProductRepository productRepository,
        ILogger<CreateProductCommandHandler> logger)
    {
        _productRepository = productRepository;
        _logger = logger;
    }

    public async Task<CreateProductResponse> Handle(
        CreateProductCommand request, 
        CancellationToken cancellationToken)
    {
        _logger.LogInformation("Creating product: {ProductName}", request.Name);
        
        // Business logic
        var product = new Product(
            request.Name, 
            request.Description, 
            request.Price, 
            request.StockQuantity);
            
        // Persist
        await _productRepository.AddAsync(product);
        await _productRepository.SaveChangesAsync(cancellationToken);
        
        _logger.LogInformation("Product created with ID: {ProductId}", product.Id);
        
        return new CreateProductResponse(product.Id, product.Name, product.CreatedAt);
    }
}

Queries Implementation

csharp
// ECommerce.Application/Queries/GetProductQuery.cs
public class GetProductQuery : IRequest<ProductDTO>
{
    public int ProductId { get; set; }
    
    public GetProductQuery(int productId)
    {
        ProductId = productId;
    }
}

public class ProductDTO
{
    public int Id { get; set; }
    public string Name { get; set; }
    public string Description { get; set; }
    public decimal Price { get; set; }
    public int StockQuantity { get; set; }
    public bool IsActive { get; set; }
    public DateTime CreatedAt { get; set; }
    public DateTime? UpdatedAt { get; set; }
    public string StockStatus { get; set; }
}

// ECommerce.Application/Queries/GetProductQueryHandler.cs
public class GetProductQueryHandler : IRequestHandler<GetProductQuery, ProductDTO>
{
    private readonly IProductReadRepository _productReadRepository;
    private readonly ILogger<GetProductQueryHandler> _logger;

    public GetProductQueryHandler(
        IProductReadRepository productReadRepository,
        ILogger<GetProductQueryHandler> logger)
    {
        _productReadRepository = productReadRepository;
        _logger = logger;
    }

    public async Task<ProductDTO> Handle(
        GetProductQuery request, 
        CancellationToken cancellationToken)
    {
        _logger.LogDebug("Retrieving product with ID: {ProductId}", request.ProductId);
        
        var product = await _productReadRepository.GetByIdAsync(request.ProductId);
        
        if (product == null)
        {
            _logger.LogWarning("Product with ID {ProductId} not found", request.ProductId);
            throw new ProductNotFoundException(request.ProductId);
        }
        
        // Map to DTO and calculate derived properties
        return new ProductDTO
        {
            Id = product.Id,
            Name = product.Name,
            Description = product.Description,
            Price = product.Price,
            StockQuantity = product.StockQuantity,
            IsActive = product.IsActive,
            CreatedAt = product.CreatedAt,
            UpdatedAt = product.UpdatedAt,
            StockStatus = CalculateStockStatus(product.StockQuantity)
        };
    }
    
    private string CalculateStockStatus(int stockQuantity)
    {
        return stockQuantity switch
        {
            0 => "Out of Stock",
            < 10 => "Low Stock",
            _ => "In Stock"
        };
    }
}

// Custom exception
public class ProductNotFoundException : Exception
{
    public ProductNotFoundException(int productId) 
        : base($"Product with ID {productId} was not found.")
    {
    }
    
    public ProductNotFoundException(string message) : base(message)
    {
    }
}

API Controllers

csharp
// ECommerce.API/Controllers/ProductsController.cs
[ApiController]
[Route("api/[controller]")]
public class ProductsController : ControllerBase
{
    private readonly IMediator _mediator;
    private readonly ILogger<ProductsController> _logger;

    public ProductsController(IMediator mediator, ILogger<ProductsController> logger)
    {
        _mediator = mediator;
        _logger = logger;
    }

    [HttpPost]
    [ProducesResponseType(typeof(CreateProductResponse), StatusCodes.Status201Created)]
    [ProducesResponseType(StatusCodes.Status400BadRequest)]
    public async Task<ActionResult<CreateProductResponse>> CreateProduct(
        [FromBody] CreateProductCommand command)
    {
        _logger.LogInformation("Creating new product: {ProductName}", command.Name);
        
        var result = await _mediator.Send(command);
        
        return CreatedAtAction(
            nameof(GetProduct), 
            new { id = result.ProductId }, 
            result);
    }

    [HttpGet("{id}")]
    [ProducesResponseType(typeof(ProductDTO), StatusCodes.Status200OK)]
    [ProducesResponseType(StatusCodes.Status404NotFound)]
    public async Task<ActionResult<ProductDTO>> GetProduct(int id)
    {
        var query = new GetProductQuery(id);
        var product = await _mediator.Send(query);
        
        return Ok(product);
    }

    [HttpGet]
    [ProducesResponseType(typeof(PaginatedList<ProductSummaryDTO>), StatusCodes.Status200OK)]
    public async Task<ActionResult<PaginatedList<ProductSummaryDTO>>> GetProducts(
        [FromQuery] GetProductsQuery query)
    {
        var result = await _mediator.Send(query);
        return Ok(result);
    }
}

5. Advanced CQRS Patterns

Domain Events with MediatR

csharp
// ECommerce.Domain/Events/ProductCreatedEvent.cs
public class ProductCreatedEvent : IDomainEvent
{
    public int ProductId { get; }
    public string ProductName { get; }
    public decimal Price { get; }
    public DateTime OccurredOn { get; }

    public ProductCreatedEvent(int productId, string productName, decimal price)
    {
        ProductId = productId;
        ProductName = productName;
        Price = price;
        OccurredOn = DateTime.UtcNow;
    }
}

// ECommerce.Application/DomainEventHandlers/ProductCreatedEventHandler.cs
public class ProductCreatedEventHandler : INotificationHandler<ProductCreatedEvent>
{
    private readonly ILogger<ProductCreatedEventHandler> _logger;
    private readonly IEmailService _emailService;
    private readonly IAnalyticsRepository _analyticsRepository;

    public ProductCreatedEventHandler(
        ILogger<ProductCreatedEventHandler> logger,
        IEmailService emailService,
        IAnalyticsRepository analyticsRepository)
    {
        _logger = logger;
        _emailService = emailService;
        _analyticsRepository = analyticsRepository;
    }

    public async Task Handle(ProductCreatedEvent notification, CancellationToken cancellationToken)
    {
        _logger.LogInformation(
            "Handling product created event for product: {ProductName}", 
            notification.ProductName);

        // Multiple handlers can process the same event
        await Task.WhenAll(
            SendAdminNotification(notification, cancellationToken),
            TrackAnalytics(notification, cancellationToken),
            UpdateSearchIndex(notification, cancellationToken)
        );
    }

    private async Task SendAdminNotification(ProductCreatedEvent notification, CancellationToken cancellationToken)
    {
        try
        {
            await _emailService.SendProductCreatedNotificationAsync(
                notification.ProductId, 
                notification.ProductName);
                
            _logger.LogInformation("Admin notification sent for product {ProductId}", notification.ProductId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to send admin notification for product {ProductId}", notification.ProductId);
            // Don't throw - other handlers should still execute
        }
    }

    private async Task TrackAnalytics(ProductCreatedEvent notification, CancellationToken cancellationToken)
    {
        await _analyticsRepository.TrackProductCreationAsync(
            notification.ProductId, 
            notification.Price);
    }

    private async Task UpdateSearchIndex(ProductCreatedEvent notification, CancellationToken cancellationToken)
    {
        // Update search index asynchronously
        await Task.Delay(100, cancellationToken); // Simulate work
        _logger.LogInformation("Search index updated for product {ProductId}", notification.ProductId);
    }
}

// Updated command handler to publish domain events
public class CreateProductCommandHandler : IRequestHandler<CreateProductCommand, CreateProductResponse>
{
    private readonly IProductRepository _productRepository;
    private readonly ILogger<CreateProductCommandHandler> _logger;
    private readonly IPublisher _publisher;

    public CreateProductCommandHandler(
        IProductRepository productRepository,
        ILogger<CreateProductCommandHandler> logger,
        IPublisher publisher)
    {
        _productRepository = productRepository;
        _logger = logger;
        _publisher = publisher;
    }

    public async Task<CreateProductResponse> Handle(
        CreateProductCommand request, 
        CancellationToken cancellationToken)
    {
        var product = new Product(
            request.Name, 
            request.Description, 
            request.Price, 
            request.StockQuantity);
            
        await _productRepository.AddAsync(product);
        await _productRepository.SaveChangesAsync(cancellationToken);
        
        // Publish domain events
        foreach (var domainEvent in product.DomainEvents)
        {
            await _publisher.Publish(domainEvent, cancellationToken);
        }
        
        product.ClearDomainEvents();
        
        return new CreateProductResponse(product.Id, product.Name, product.CreatedAt);
    }
}

Separate Read/Write Models

csharp
// Write Model (Domain)
public class Product : Entity
{
    // Rich behavior and business rules
    public void ApplyDiscount(decimal percentage)
    {
        if (percentage < 0 || percentage > 100)
            throw new ArgumentException("Discount percentage must be between 0 and 100");
            
        Price = Price * (100 - percentage) / 100;
        AddDomainEvent(new ProductPriceChangedEvent(Id, Price));
    }
}

// Read Model (DTOs optimized for reading)
public class ProductDetailView
{
    public int Id { get; set; }
    public string Name { get; set; }
    public string Description { get; set; }
    public decimal Price { get; set; }
    public decimal DiscountedPrice { get; set; }
    public string CategoryName { get; set; }
    public double AverageRating { get; set; }
    public int ReviewCount { get; set; }
    public string[] ImageUrls { get; set; }
    public Dictionary<string, string> Specifications { get; set; }
    public ProductVariantDTO[] Variants { get; set; }
}

// Separate read repository
public interface IProductReadRepository
{
    Task<ProductDetailView> GetProductDetailAsync(int productId);
    Task<PaginatedList<ProductSummaryView>> GetProductsAsync(ProductQuery query);
    Task<List<ProductSearchResult>> SearchProductsAsync(string searchTerm);
}

// Implementation with Dapper for performance
public class ProductReadRepository : IProductReadRepository
{
    private readonly IDbConnection _connection;

    public ProductReadRepository(IDbConnection connection)
    {
        _connection = connection;
    }

    public async Task<ProductDetailView> GetProductDetailAsync(int productId)
    {
        const string sql = @"
            SELECT 
                p.Id, p.Name, p.Description, p.Price,
                c.Name as CategoryName,
                COALESCE(AVG(r.Rating), 0) as AverageRating,
                COUNT(r.Id) as ReviewCount,
                -- Complex joins for optimized reading
            FROM Products p
            LEFT JOIN Categories c ON p.CategoryId = c.Id
            LEFT JOIN Reviews r ON p.Id = r.ProductId
            LEFT JOIN ProductImages pi ON p.Id = pi.ProductId
            WHERE p.Id = @ProductId AND p.IsActive = 1
            GROUP BY p.Id, p.Name, p.Description, p.Price, c.Name";

        return await _connection.QuerySingleOrDefaultAsync<ProductDetailView>(sql, new { ProductId = productId });
    }
}

6. Validation & Behavior

Advanced Validation Pipeline

csharp
// FluentValidation validators
public class UpdateProductCommandValidator : AbstractValidator<UpdateProductCommand>
{
    public UpdateProductCommandValidator()
    {
        RuleFor(x => x.ProductId)
            .GreaterThan(0).WithMessage("Product ID must be positive");
            
        RuleFor(x => x.Name)
            .NotEmpty().When(x => x.Name != null)
            .MaximumLength(100).WithMessage("Name cannot exceed 100 characters");
            
        RuleFor(x => x.Price)
            .GreaterThanOrEqualTo(0).When(x => x.Price.HasValue)
            .WithMessage("Price cannot be negative");
            
        RuleFor(x => x.StockQuantity)
            .GreaterThanOrEqualTo(0).When(x => x.StockQuantity.HasValue)
            .WithMessage("Stock quantity cannot be negative");
            
        // Cross-property validation
        RuleFor(x => x)
            .Must(x => !(x.Price.HasValue && x.Price.Value == 0 && x.StockQuantity > 0))
            .WithMessage("Products with stock cannot have zero price")
            .OverridePropertyName("Price");
    }
}

// Custom validation behavior
public class CustomValidationBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
    where TRequest : IRequest<TResponse>
{
    private readonly IEnumerable<IValidator<TRequest>> _validators;
    private readonly ILogger<CustomValidationBehavior<TRequest, TResponse>> _logger;

    public CustomValidationBehavior(
        IEnumerable<IValidator<TRequest>> validators,
        ILogger<CustomValidationBehavior<TRequest, TResponse>> logger)
    {
        _validators = validators;
        _logger = logger;
    }

    public async Task<TResponse> Handle(
        TRequest request, 
        RequestHandlerDelegate<TResponse> next, 
        CancellationToken cancellationToken)
    {
        var requestName = typeof(TRequest).Name;
        
        if (_validators.Any())
        {
            _logger.LogDebug("Validating request {RequestName}", requestName);
            
            var context = new ValidationContext<TRequest>(request);
            var validationResults = await Task.WhenAll(
                _validators.Select(v => v.ValidateAsync(context, cancellationToken)));
                
            var failures = validationResults
                .SelectMany(r => r.Errors)
                .Where(f => f != null)
                .ToList();

            if (failures.Count != 0)
            {
                _logger.LogWarning(
                    "Validation failed for {RequestName} with {ErrorCount} errors", 
                    requestName, failures.Count);
                    
                throw new CustomValidationException(failures);
            }
        }
        
        return await next();
    }
}

// Custom validation exception
public class CustomValidationException : Exception
{
    public IDictionary<string, string[]> Errors { get; }

    public CustomValidationException(IEnumerable<ValidationFailure> failures)
        : base("Validation failed")
    {
        Errors = failures
            .GroupBy(e => e.PropertyName, e => e.ErrorMessage)
            .ToDictionary(failureGroup => failureGroup.Key, failureGroup => failureGroup.ToArray());
    }
}

Authorization Behavior

csharp
// Authorization behavior
public class AuthorizationBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
    where TRequest : IRequest<TResponse>
{
    private readonly ICurrentUserService _currentUserService;
    private readonly IIdentityService _identityService;
    private readonly ILogger<AuthorizationBehavior<TRequest, TResponse>> _logger;

    public AuthorizationBehavior(
        ICurrentUserService currentUserService,
        IIdentityService identityService,
        ILogger<AuthorizationBehavior<TRequest, TResponse>> logger)
    {
        _currentUserService = currentUserService;
        _identityService = identityService;
        _logger = logger;
    }

    public async Task<TResponse> Handle(
        TRequest request, 
        RequestHandlerDelegate<TResponse> next, 
        CancellationToken cancellationToken)
    {
        // Check for authorization attributes
        var authorizeAttributes = request.GetType()
            .GetCustomAttributes<AuthorizeAttribute>()
            .ToList();

        if (authorizeAttributes.Any())
        {
            // Ensure user is authenticated
            if (_currentUserService.UserId == null)
            {
                throw new UnauthorizedAccessException("User is not authenticated");
            }

            // Check roles
            var authorizeAttributesWithRoles = authorizeAttributes
                .Where(a => !string.IsNullOrWhiteSpace(a.Roles))
                .ToList();

            if (authorizeAttributesWithRoles.Any())
            {
                var authorized = false;
                foreach (var roles in authorizeAttributesWithRoles.Select(a => a.Roles.Split(',')))
                {
                    foreach (var role in roles)
                    {
                        var isInRole = await _identityService.IsInRoleAsync(
                            _currentUserService.UserId, role.Trim());
                            
                        if (isInRole)
                        {
                            authorized = true;
                            break;
                        }
                    }
                }

                if (!authorized)
                {
                    _logger.LogWarning(
                        "User {UserId} is not authorized for request {RequestName}",
                        _currentUserService.UserId, typeof(TRequest).Name);
                        
                    throw new UnauthorizedAccessException("User is not authorized");
                }
            }

            // Check policy
            var authorizeAttributesWithPolicies = authorizeAttributes
                .Where(a => !string.IsNullOrWhiteSpace(a.Policy))
                .ToList();
                
            if (authorizeAttributesWithPolicies.Any())
            {
                foreach (var policy in authorizeAttributesWithPolicies.Select(a => a.Policy))
                {
                    var authorized = await _identityService.AuthorizeAsync(
                        _currentUserService.UserId, policy);
                        
                    if (!authorized)
                    {
                        _logger.LogWarning(
                            "User {UserId} failed policy {Policy} for request {RequestName}",
                            _currentUserService.UserId, policy, typeof(TRequest).Name);
                            
                        throw new UnauthorizedAccessException("Policy requirement failed");
                    }
                }
            }
        }

        return await next();
    }
}

// Usage with authorization
[Authorize(Roles = "Admin,ProductManager")]
public class CreateProductCommand : IRequest<CreateProductResponse>
{
    // Command properties
}

[Authorize(Policy = "CanUpdateProductPrice")]
public class UpdateProductPriceCommand : IRequest<Unit>
{
    public int ProductId { get; set; }
    public decimal NewPrice { get; set; }
}

7. Performance Optimization

Caching Strategies

csharp
// Cache behavior
public class CachingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
    where TRequest : ICacheableRequest
{
    private readonly IDistributedCache _cache;
    private readonly ILogger<CachingBehavior<TRequest, TResponse>> _logger;

    public CachingBehavior(
        IDistributedCache cache,
        ILogger<CachingBehavior<TRequest, TResponse>> logger)
    {
        _cache = cache;
        _logger = logger;
    }

    public async Task<TResponse> Handle(
        TRequest request, 
        RequestHandlerDelegate<TResponse> next, 
        CancellationToken cancellationToken)
    {
        var cacheKey = request.GetCacheKey();
        
        try
        {
            // Try to get from cache
            var cachedResponse = await _cache.GetStringAsync(cacheKey, cancellationToken);
            if (cachedResponse != null)
            {
                _logger.LogDebug("Cache hit for {CacheKey}", cacheKey);
                return JsonSerializer.Deserialize<TResponse>(cachedResponse);
            }

            _logger.LogDebug("Cache miss for {CacheKey}", cacheKey);
            
            // Execute request
            var response = await next();
            
            // Cache the response
            var cacheOptions = new DistributedCacheEntryOptions
            {
                AbsoluteExpirationRelativeToNow = request.GetCacheDuration()
            };
            
            await _cache.SetStringAsync(
                cacheKey, 
                JsonSerializer.Serialize(response), 
                cacheOptions, 
                cancellationToken);
                
            return response;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error in caching behavior for {CacheKey}", cacheKey);
            // If caching fails, still proceed with the request
            return await next();
        }
    }
}

// Cacheable request interface
public interface ICacheableRequest
{
    string GetCacheKey();
    TimeSpan? GetCacheDuration();
}

// Cacheable query example
public class GetProductQuery : IRequest<ProductDTO>, ICacheableRequest
{
    public int ProductId { get; set; }
    
    public string GetCacheKey() => $"product_{ProductId}";
    
    public TimeSpan? GetCacheDuration() => TimeSpan.FromMinutes(30);
}

Query Optimization

csharp
// Optimized query with projections
public class GetProductsQuery : IRequest<PaginatedList<ProductSummaryDTO>>, ICacheableRequest
{
    public int PageNumber { get; set; } = 1;
    public int PageSize { get; set; } = 20;
    public string Category { get; set; }
    public decimal? MinPrice { get; set; }
    public decimal? MaxPrice { get; set; }
    public string SortBy { get; set; } = "name";
    public bool SortDescending { get; set; }
    
    public string GetCacheKey()
    {
        var key = $"products_page_{PageNumber}_size_{PageSize}";
        if (!string.IsNullOrEmpty(Category)) key += $"_cat_{Category}";
        if (MinPrice.HasValue) key += $"_min_{MinPrice}";
        if (MaxPrice.HasValue) key += $"_max_{MaxPrice}";
        key += $"_sort_{SortBy}_{(SortDescending ? "desc" : "asc")}";
        return key;
    }
    
    public TimeSpan? GetCacheDuration() => TimeSpan.FromMinutes(5);
}

// Optimized query handler
public class GetProductsQueryHandler : IRequestHandler<GetProductsQuery, PaginatedList<ProductSummaryDTO>>
{
    private readonly IProductReadRepository _readRepository;
    private readonly ILogger<GetProductsQueryHandler> _logger;

    public GetProductsQueryHandler(
        IProductReadRepository readRepository,
        ILogger<GetProductsQueryHandler> logger)
    {
        _readRepository = readRepository;
        _logger = logger;
    }

    public async Task<PaginatedList<ProductSummaryDTO>> Handle(
        GetProductsQuery request, 
        CancellationToken cancellationToken)
    {
        var (products, totalCount) = await _readRepository.GetProductsAsync(
            new ProductQuery
            {
                PageNumber = request.PageNumber,
                PageSize = request.PageSize,
                Category = request.Category,
                MinPrice = request.MinPrice,
                MaxPrice = request.MaxPrice,
                SortBy = request.SortBy,
                SortDescending = request.SortDescending
            });
            
        return new PaginatedList<ProductSummaryDTO>(
            products, 
            totalCount, 
            request.PageNumber, 
            request.PageSize);
    }
}

// Paginated list
public class PaginatedList<T>
{
    public List<T> Items { get; }
    public int PageNumber { get; }
    public int TotalPages { get; }
    public int TotalCount { get; }
    public bool HasPreviousPage => PageNumber > 1;
    public bool HasNextPage => PageNumber < TotalPages;

    public PaginatedList(List<T> items, int count, int pageNumber, int pageSize)
    {
        PageNumber = pageNumber;
        TotalPages = (int)Math.Ceiling(count / (double)pageSize);
        TotalCount = count;
        Items = items;
    }
}

8. Event Sourcing Integration

Event Sourced Aggregates

csharp
// Event-sourced product aggregate
public class EventSourcedProduct : AggregateRoot
{
    public string Name { get; private set; }
    public string Description { get; private set; }
    public decimal Price { get; private set; }
    public int StockQuantity { get; private set; }
    public bool IsActive { get; private set; }

    // Constructor for creating new aggregate
    public EventSourcedProduct(string name, string description, decimal price, int stockQuantity)
    {
        if (string.IsNullOrWhiteSpace(name))
            throw new ArgumentException("Product name cannot be empty", nameof(name));
            
        Apply(new ProductCreatedEvent(Guid.NewGuid(), name, description, price, stockQuantity));
    }

    // Constructor for rebuilding from events
    private EventSourcedProduct() { }

    public void UpdatePrice(decimal newPrice)
    {
        if (newPrice < 0)
            throw new ArgumentException("Price cannot be negative");
            
        if (newPrice != Price)
        {
            Apply(new ProductPriceUpdatedEvent(Id, newPrice, Price));
        }
    }

    public void UpdateStock(int quantity)
    {
        if (quantity < 0)
            throw new ArgumentException("Stock quantity cannot be negative");
            
        if (quantity != StockQuantity)
        {
            Apply(new ProductStockUpdatedEvent(Id, quantity, StockQuantity));
        }
    }

    // Event application methods
    protected override void When(object @event)
    {
        switch (@event)
        {
            case ProductCreatedEvent e:
                Id = e.ProductId;
                Name = e.Name;
                Description = e.Description;
                Price = e.Price;
                StockQuantity = e.StockQuantity;
                IsActive = true;
                break;
                
            case ProductPriceUpdatedEvent e:
                Price = e.NewPrice;
                break;
                
            case ProductStockUpdatedEvent e:
                StockQuantity = e.NewQuantity;
                break;
                
            case ProductDeactivatedEvent e:
                IsActive = false;
                break;
        }
    }

    // Ensure business rules
    protected override void EnsureValidState()
    {
        if (string.IsNullOrWhiteSpace(Name))
            throw new InvalidOperationException("Product must have a name");
            
        if (Price < 0)
            throw new InvalidOperationException("Price cannot be negative");
            
        if (StockQuantity < 0)
            throw new InvalidOperationException("Stock quantity cannot be negative");
    }
}

// Base aggregate root
public abstract class AggregateRoot
{
    private readonly List<object> _changes = new();

    public Guid Id { get; protected set; }
    public int Version { get; private set; } = -1;

    public IReadOnlyCollection<object> GetUncommittedChanges() => _changes.AsReadOnly();

    public void MarkChangesAsCommitted() => _changes.Clear();

    protected void Apply(object @event)
    {
        When(@event);
        EnsureValidState();
        _changes.Add(@event);
    }

    public void LoadFromHistory(IEnumerable<object> history)
    {
        foreach (var @event in history)
        {
            When(@event);
            Version++;
        }
    }

    protected abstract void When(object @event);
    protected abstract void EnsureValidState();
}

Event Store Implementation

csharp
// Event store interface
public interface IEventStore
{
    Task SaveAsync(Guid aggregateId, IEnumerable<object> events, int expectedVersion);
    Task<List<object>> GetEventsAsync(Guid aggregateId);
    Task<bool> AggregateExistsAsync(Guid aggregateId);
}

// Event store implementation
public class EventStore : IEventStore
{
    private readonly IEventStoreRepository _repository;
    private readonly ILogger<EventStore> _logger;

    public EventStore(IEventStoreRepository repository, ILogger<EventStore> logger)
    {
        _repository = repository;
        _logger = logger;
    }

    public async Task SaveAsync(Guid aggregateId, IEnumerable<object> events, int expectedVersion)
    {
        var eventData = events.Select(@event => new EventData
        {
            Id = Guid.NewGuid(),
            AggregateId = aggregateId,
            EventType = @event.GetType().Name,
            EventData = JsonSerializer.Serialize(@event, @event.GetType()),
            Timestamp = DateTime.UtcNow,
            Version = ++expectedVersion
        }).ToList();

        await _repository.AppendEventsAsync(eventData);
        
        _logger.LogInformation(
            "Saved {EventCount} events for aggregate {AggregateId}", 
            eventData.Count, aggregateId);
    }

    public async Task<List<object>> GetEventsAsync(Guid aggregateId)
    {
        var eventData = await _repository.GetEventsAsync(aggregateId);
        
        return eventData.Select(DeserializeEvent).ToList();
    }

    private object DeserializeEvent(EventData eventData)
    {
        var eventType = Type.GetType($"ECommerce.Domain.Events.{eventData.EventType}, ECommerce.Domain");
        if (eventType == null)
            throw new InvalidOperationException($"Unknown event type: {eventData.EventType}");
            
        return JsonSerializer.Deserialize(eventData.EventData, eventType);
    }
}

9. Testing Strategies

Unit Testing Commands and Queries

csharp
// Unit tests for command handler
public class CreateProductCommandHandlerTests
{
    private readonly Mock<IProductRepository> _productRepositoryMock;
    private readonly Mock<ILogger<CreateProductCommandHandler>> _loggerMock;
    private readonly Mock<IPublisher> _publisherMock;
    private readonly CreateProductCommandHandler _handler;

    public CreateProductCommandHandlerTests()
    {
        _productRepositoryMock = new Mock<IProductRepository>();
        _loggerMock = new Mock<ILogger<CreateProductCommandHandler>>();
        _publisherMock = new Mock<IPublisher>();
        
        _handler = new CreateProductCommandHandler(
            _productRepositoryMock.Object,
            _loggerMock.Object,
            _publisherMock.Object);
    }

    [Fact]
    public async Task Handle_ValidCommand_ShouldCreateProduct()
    {
        // Arrange
        var command = new CreateProductCommand(
            "Test Product", 
            "Test Description", 
            99.99m, 
            10);
            
        _productRepositoryMock
            .Setup(r => r.AddAsync(It.IsAny<Product>()))
            .Returns(Task.CompletedTask);
            
        _productRepositoryMock
            .Setup(r => r.SaveChangesAsync(It.IsAny<CancellationToken>()))
            .Returns(Task.CompletedTask);

        // Act
        var result = await _handler.Handle(command, CancellationToken.None);

        // Assert
        result.Should().NotBeNull();
        result.ProductId.Should().BeGreaterThan(0);
        result.Name.Should().Be(command.Name);
        
        _productRepositoryMock.Verify(
            r => r.AddAsync(It.Is<Product>(p => 
                p.Name == command.Name && 
                p.Price == command.Price)),
            Times.Once);
            
        _productRepositoryMock.Verify(
            r => r.SaveChangesAsync(It.IsAny<CancellationToken>()),
            Times.Once);
    }

    [Fact]
    public async Task Handle_InvalidPrice_ShouldThrowException()
    {
        // Arrange
        var command = new CreateProductCommand(
            "Test Product", 
            "Test Description", 
            -10m,  // Invalid price
            10);

        // Act & Assert
        await Assert.ThrowsAsync<ArgumentException>(() => 
            _handler.Handle(command, CancellationToken.None));
    }
}

// Integration tests
public class ProductCommandsIntegrationTests : IClassFixture<WebApplicationFactory<Program>>
{
    private readonly WebApplicationFactory<Program> _factory;
    private readonly HttpClient _client;

    public ProductCommandsIntegrationTests(WebApplicationFactory<Program> factory)
    {
        _factory = factory;
        _client = factory.CreateClient();
    }

    [Fact]
    public async Task CreateProduct_ValidRequest_ShouldReturnCreated()
    {
        // Arrange
        var command = new CreateProductCommand(
            "Integration Test Product",
            "Integration Test Description",
            49.99m,
            25);

        // Act
        var response = await _client.PostAsJsonAsync("/api/products", command);

        // Assert
        response.StatusCode.Should().Be(HttpStatusCode.Created);
        
        var content = await response.Content.ReadAsStringAsync();
        var result = JsonSerializer.Deserialize<CreateProductResponse>(content, 
            new JsonSerializerOptions { PropertyNameCaseInsensitive = true });
            
        result.Should().NotBeNull();
        result.ProductId.Should().BeGreaterThan(0);
    }
}

Behavior Testing

csharp
// Testing pipeline behaviors
public class ValidationBehaviorTests
{
    [Fact]
    public async Task Handle_ValidRequest_ShouldPassThrough()
    {
        // Arrange
        var validators = new List<IValidator<TestCommand>>
        {
            new TestCommandValidator()
        };
        
        var behavior = new ValidationBehavior<TestCommand, TestResponse>(validators);
        var command = new TestCommand { Name = "Valid Name", Value = 10 };
        
        // Act
        var result = await behavior.Handle(
            command, 
            () => Task.FromResult(new TestResponse()), 
            CancellationToken.None);

        // Assert
        result.Should().NotBeNull();
    }

    [Fact]
    public async Task Handle_InvalidRequest_ShouldThrowValidationException()
    {
        // Arrange
        var validators = new List<IValidator<TestCommand>>
        {
            new TestCommandValidator()
        };
        
        var behavior = new ValidationBehavior<TestCommand, TestResponse>(validators);
        var command = new TestCommand { Name = "", Value = -1 }; // Invalid

        // Act & Assert
        await Assert.ThrowsAsync<ValidationException>(() => 
            behavior.Handle(
                command, 
                () => Task.FromResult(new TestResponse()), 
                CancellationToken.None));
    }
}

// Test command and validator
public class TestCommand : IRequest<TestResponse>
{
    public string Name { get; set; }
    public int Value { get; set; }
}

public class TestResponse
{
    public string Result { get; set; } = "Success";
}

public class TestCommandValidator : AbstractValidator<TestCommand>
{
    public TestCommandValidator()
    {
        RuleFor(x => x.Name).NotEmpty();
        RuleFor(x => x.Value).GreaterThan(0);
    }
}

10. Real-World E-Commerce Case Study

Complete Order Processing System

csharp
// Order processing commands
public class CreateOrderCommand : IRequest<CreateOrderResponse>
{
    public int CustomerId { get; set; }
    public List<OrderItemDTO> Items { get; set; }
    public ShippingAddressDTO ShippingAddress { get; set; }
    public PaymentMethodDTO PaymentMethod { get; set; }
}

public class CreateOrderCommandHandler : IRequestHandler<CreateOrderCommand, CreateOrderResponse>
{
    private readonly IOrderRepository _orderRepository;
    private readonly IProductRepository _productRepository;
    private readonly IPublisher _publisher;
    private readonly ILogger<CreateOrderCommandHandler> _logger;

    public CreateOrderCommandHandler(
        IOrderRepository orderRepository,
        IProductRepository productRepository,
        IPublisher publisher,
        ILogger<CreateOrderCommandHandler> logger)
    {
        _orderRepository = orderRepository;
        _productRepository = productRepository;
        _publisher = publisher;
        _logger = logger;
    }

    public async Task<CreateOrderResponse> Handle(
        CreateOrderCommand request, 
        CancellationToken cancellationToken)
    {
        _logger.LogInformation("Creating order for customer {CustomerId}", request.CustomerId);

        // Validate products and inventory
        var productIds = request.Items.Select(i => i.ProductId).ToList();
        var products = await _productRepository.GetByIdsAsync(productIds);
        
        var orderItems = new List<OrderItem>();
        foreach (var item in request.Items)
        {
            var product = products.FirstOrDefault(p => p.Id == item.ProductId);
            if (product == null)
                throw new ProductNotFoundException(item.ProductId);
                
            if (product.StockQuantity < item.Quantity)
                throw new InsufficientStockException(product.Id, product.StockQuantity, item.Quantity);
                
            orderItems.Add(new OrderItem(product.Id, product.Name, product.Price, item.Quantity));
        }

        // Create order
        var order = new Order(
            request.CustomerId,
            orderItems,
            MapToShippingAddress(request.ShippingAddress),
            MapToPaymentMethod(request.PaymentMethod));

        // Reserve inventory
        foreach (var item in orderItems)
        {
            var product = products.First(p => p.Id == item.ProductId);
            product.UpdateStock(product.StockQuantity - item.Quantity);
        }

        // Save changes
        await _orderRepository.AddAsync(order);
        await _productRepository.SaveChangesAsync(cancellationToken);
        await _orderRepository.SaveChangesAsync(cancellationToken);

        // Publish events
        foreach (var domainEvent in order.DomainEvents)
        {
            await _publisher.Publish(domainEvent, cancellationToken);
        }

        _logger.LogInformation("Order {OrderId} created successfully", order.Id);

        return new CreateOrderResponse(order.Id, order.TotalAmount, order.Status);
    }
}

// Order created event and handlers
public class OrderCreatedEvent : IDomainEvent
{
    public int OrderId { get; }
    public int CustomerId { get; }
    public decimal TotalAmount { get; }
    public DateTime OccurredOn { get; }

    public OrderCreatedEvent(int orderId, int customerId, decimal totalAmount)
    {
        OrderId = orderId;
        CustomerId = customerId;
        TotalAmount = totalAmount;
        OccurredOn = DateTime.UtcNow;
    }
}

// Multiple event handlers for order created
public class OrderCreatedEventHandler : INotificationHandler<OrderCreatedEvent>
{
    private readonly IEmailService _emailService;
    private readonly ILogger<OrderCreatedEventHandler> _logger;

    public OrderCreatedEventHandler(
        IEmailService emailService,
        ILogger<OrderCreatedEventHandler> logger)
    {
        _emailService = emailService;
        _logger = logger;
    }

    public async Task Handle(OrderCreatedEvent notification, CancellationToken cancellationToken)
    {
        try
        {
            await _emailService.SendOrderConfirmationAsync(
                notification.OrderId, 
                notification.CustomerId);
                
            _logger.LogInformation("Order confirmation sent for order {OrderId}", notification.OrderId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to send order confirmation for order {OrderId}", notification.OrderId);
        }
    }
}

public class UpdateInventoryEventHandler : INotificationHandler<OrderCreatedEvent>
{
    private readonly IInventoryService _inventoryService;
    private readonly ILogger<UpdateInventoryEventHandler> _logger;

    public UpdateInventoryEventHandler(
        IInventoryService inventoryService,
        ILogger<UpdateInventoryEventHandler> logger)
    {
        _inventoryService = inventoryService;
        _logger = logger;
    }

    public async Task Handle(OrderCreatedEvent notification, CancellationToken cancellationToken)
    {
        await _inventoryService.ReserveInventoryForOrderAsync(notification.OrderId);
        _logger.LogInformation("Inventory reserved for order {OrderId}", notification.OrderId);
    }
}

public class ProcessPaymentEventHandler : INotificationHandler<OrderCreatedEvent>
{
    private readonly IPaymentService _paymentService;
    private readonly ILogger<ProcessPaymentEventHandler> _logger;

    public ProcessPaymentEventHandler(
        IPaymentService paymentService,
        ILogger<ProcessPaymentEventHandler> logger)
    {
        _paymentService = paymentService;
        _logger = logger;
    }

    public async Task Handle(OrderCreatedEvent notification, CancellationToken cancellationToken)
    {
        var paymentResult = await _paymentService.ProcessPaymentAsync(notification.OrderId);
        
        if (paymentResult.Success)
        {
            _logger.LogInformation("Payment processed successfully for order {OrderId}", notification.OrderId);
            
            // Publish payment succeeded event
            await _paymentService.PublishPaymentSucceededEvent(notification.OrderId);
        }
        else
        {
            _logger.LogWarning("Payment failed for order {OrderId}: {Error}", 
                notification.OrderId, paymentResult.ErrorMessage);
                
            // Publish payment failed event
            await _paymentService.PublishPaymentFailedEvent(notification.OrderId, paymentResult.ErrorMessage);
        }
    }
}

11. Microservices Integration

Cross-Service Communication

csharp
// Integration events
public interface IIntegrationEvent : INotification
{
    Guid Id { get; }
    DateTime OccurredOn { get; }
}

public class OrderCreatedIntegrationEvent : IIntegrationEvent
{
    public Guid Id { get; }
    public int OrderId { get; }
    public int CustomerId { get; }
    public decimal TotalAmount { get; }
    public DateTime OccurredOn { get; }
    public List<OrderItemDTO> Items { get; }

    public OrderCreatedIntegrationEvent(int orderId, int customerId, decimal totalAmount, List<OrderItemDTO> items)
    {
        Id = Guid.NewGuid();
        OrderId = orderId;
        CustomerId = customerId;
        TotalAmount = totalAmount;
        OccurredOn = DateTime.UtcNow;
        Items = items;
    }
}

// Integration event handler in shipping service
public class OrderCreatedIntegrationEventHandler : INotificationHandler<OrderCreatedIntegrationEvent>
{
    private readonly IShippingService _shippingService;
    private readonly ILogger<OrderCreatedIntegrationEventHandler> _logger;

    public OrderCreatedIntegrationEventHandler(
        IShippingService shippingService,
        ILogger<OrderCreatedIntegrationEventHandler> logger)
    {
        _shippingService = shippingService;
        _logger = logger;
    }

    public async Task Handle(OrderCreatedIntegrationEvent notification, CancellationToken cancellationToken)
    {
        _logger.LogInformation(
            "Processing shipping for order {OrderId}", 
            notification.OrderId);

        try
        {
            var shippingRequest = new CreateShippingRequest
            {
                OrderId = notification.OrderId,
                CustomerId = notification.CustomerId,
                TotalWeight = CalculateTotalWeight(notification.Items),
                ShippingAddress = notification.ShippingAddress
            };

            await _shippingService.CreateShippingAsync(shippingRequest);
            
            _logger.LogInformation(
                "Shipping created for order {OrderId}", 
                notification.OrderId);
        }
        catch (Exception ex)
        {
            _logger.LogError(
                ex, 
                "Failed to create shipping for order {OrderId}", 
                notification.OrderId);
            throw;
        }
    }

    private decimal CalculateTotalWeight(List<OrderItemDTO> items)
    {
        // Calculate total weight based on items
        return items.Sum(i => i.Quantity * i.UnitWeight);
    }
}

Saga Pattern Implementation

csharp
// Order processing saga
public class OrderProcessingSaga : 
    IEventHandler<OrderCreatedEvent>,
    IEventHandler<PaymentProcessedEvent>,
    IEventHandler<InventoryReservedEvent>,
    IEventHandler<ShippingCreatedEvent>
{
    private readonly ISagaRepository _sagaRepository;
    private readonly IMediator _mediator;
    private readonly ILogger<OrderProcessingSaga> _logger;

    public OrderProcessingSaga(
        ISagaRepository sagaRepository,
        IMediator mediator,
        ILogger<OrderProcessingSaga> logger)
    {
        _sagaRepository = sagaRepository;
        _mediator = mediator;
        _logger = logger;
    }

    public async Task Handle(OrderCreatedEvent @event, CancellationToken cancellationToken)
    {
        _logger.LogInformation("Starting order processing saga for order {OrderId}", @event.OrderId);

        var saga = new OrderSaga(@event.OrderId, @event.CustomerId, @event.TotalAmount);
        await _sagaRepository.SaveAsync(saga);

        // Start payment processing
        await _mediator.Publish(new ProcessPaymentCommand(@event.OrderId, @event.TotalAmount));
    }

    public async Task Handle(PaymentProcessedEvent @event, CancellationToken cancellationToken)
    {
        var saga = await _sagaRepository.GetByOrderIdAsync(@event.OrderId);
        
        if (@event.Success)
        {
            saga.MarkPaymentProcessed();
            await _sagaRepository.SaveAsync(saga);

            // Proceed to inventory reservation
            await _mediator.Publish(new ReserveInventoryCommand(@event.OrderId));
        }
        else
        {
            saga.MarkPaymentFailed(@event.ErrorMessage);
            await _sagaRepository.SaveAsync(saga);

            // Compensating action
            await _mediator.Publish(new CancelOrderCommand(@event.OrderId, "Payment failed"));
        }
    }

    public async Task Handle(InventoryReservedEvent @event, CancellationToken cancellationToken)
    {
        var saga = await _sagaRepository.GetByOrderIdAsync(@event.OrderId);
        saga.MarkInventoryReserved();
        await _sagaRepository.SaveAsync(saga);

        // Proceed to shipping
        await _mediator.Publish(new CreateShippingCommand(@event.OrderId));
    }

    public async Task Handle(ShippingCreatedEvent @event, CancellationToken cancellationToken)
    {
        var saga = await _sagaRepository.GetByOrderIdAsync(@event.OrderId);
        saga.MarkShippingCreated();
        await _sagaRepository.SaveAsync(saga);

        // Order processing completed
        await _mediator.Publish(new CompleteOrderCommand(@event.OrderId));
        
        _logger.LogInformation("Order processing saga completed for order {OrderId}", @event.OrderId);
    }
}

12. Production Best Practices

Configuration and Dependency Injection

csharp
// Dependency injection setup
public static class DependencyInjection
{
    public static IServiceCollection AddApplication(this IServiceCollection services)
    {
        // MediatR
        services.AddMediatR(cfg => 
        {
            cfg.RegisterServicesFromAssembly(Assembly.GetExecutingAssembly());
            cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>));
            cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(ValidationBehavior<,>));
            cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(AuthorizationBehavior<,>));
            cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(CachingBehavior<,>));
        });

        // FluentValidation
        services.AddValidatorsFromAssembly(Assembly.GetExecutingAssembly());
        
        // Behaviors
        services.AddTransient(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>));
        services.AddTransient(typeof(IPipelineBehavior<,>), typeof(ValidationBehavior<,>));
        services.AddTransient(typeof(IPipelineBehavior<,>), typeof(AuthorizationBehavior<,>));
        services.AddTransient(typeof(IPipelineBehavior<,>), typeof(CachingBehavior<,>));

        // Services
        services.AddScoped<ICurrentUserService, CurrentUserService>();
        services.AddScoped<IIdentityService, IdentityService>();
        
        // Caching
        services.AddDistributedMemoryCache(); // Or Redis in production
        
        return services;
    }
}

// Program.cs configuration
var builder = WebApplication.CreateBuilder(args);

// Add services
builder.Services.AddApplication();
builder.Services.AddInfrastructure(builder.Configuration);
builder.Services.AddWebServices();

// Configure logging
builder.Logging.AddConsole();
builder.Logging.AddDebug();
builder.Logging.AddApplicationInsights();

var app = builder.Build();

// Configure pipeline
if (app.Environment.IsDevelopment())
{
    app.UseDeveloperExceptionPage();
}
else
{
    app.UseExceptionHandler("/error");
    app.UseHsts();
}

app.UseHttpsRedirection();
app.UseRouting();
app.UseAuthentication();
app.UseAuthorization();
app.MapControllers();

app.Run();

Monitoring and Diagnostics

csharp
// Enhanced logging behavior
public class DiagnosticBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
{
    private readonly ILogger<DiagnosticBehavior<TRequest, TResponse>> _logger;
    private readonly ICurrentUserService _currentUserService;

    public DiagnosticBehavior(
        ILogger<DiagnosticBehavior<TRequest, TResponse>> logger,
        ICurrentUserService currentUserService)
    {
        _logger = logger;
        _currentUserService = currentUserService;
    }

    public async Task<TResponse> Handle(
        TRequest request, 
        RequestHandlerDelegate<TResponse> next, 
        CancellationToken cancellationToken)
    {
        var requestName = typeof(TRequest).Name;
        var requestId = Guid.NewGuid();
        var userId = _currentUserService.UserId ?? "Anonymous";
        
        using var activity = DiagnosticsConfig.ActivitySource.StartActivity(requestName);
        activity?.SetTag("request.id", requestId);
        activity?.SetTag("user.id", userId);
        
        using (_logger.BeginScope(new Dictionary<string, object>
        {
            ["RequestId"] = requestId,
            ["RequestName"] = requestName,
            ["UserId"] = userId
        }))
        {
            var stopwatch = Stopwatch.StartNew();
            
            _logger.LogInformation(
                "Handling request {RequestName} for user {UserId}", 
                requestName, userId);
                
            try
            {
                var response = await next();
                stopwatch.Stop();
                
                _logger.LogInformation(
                    "Request {RequestName} completed in {ElapsedMilliseconds}ms", 
                    requestName, stopwatch.ElapsedMilliseconds);
                    
                activity?.SetTag("duration", stopwatch.ElapsedMilliseconds);
                activity?.SetStatus(ActivityStatusCode.Ok);
                
                return response;
            }
            catch (Exception ex)
            {
                stopwatch.Stop();
                
                _logger.LogError(
                    ex, 
                    "Request {RequestName} failed after {ElapsedMilliseconds}ms", 
                    requestName, stopwatch.ElapsedMilliseconds);
                    
                activity?.SetTag("duration", stopwatch.ElapsedMilliseconds);
                activity?.SetStatus(ActivityStatusCode.Error);
                activity?.RecordException(ex);
                
                throw;
            }
        }
    }
}

// Diagnostics configuration
public static class DiagnosticsConfig
{
    public static readonly string ServiceName = "ECommerce.API";
    public static readonly ActivitySource ActivitySource = new(ServiceName);
}

Performance Monitoring

csharp
// Performance monitoring behavior
public class PerformanceBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
{
    private readonly ILogger<PerformanceBehavior<TRequest, TResponse>> _logger;
    private readonly IPerformanceMetrics _performanceMetrics;
    private readonly Stopwatch _timer;

    public PerformanceBehavior(
        ILogger<PerformanceBehavior<TRequest, TResponse>> logger,
        IPerformanceMetrics performanceMetrics)
    {
        _logger = logger;
        _performanceMetrics = performanceMetrics;
        _timer = new Stopwatch();
    }

    public async Task<TResponse> Handle(
        TRequest request, 
        RequestHandlerDelegate<TResponse> next, 
        CancellationToken cancellationToken)
    {
        _timer.Start();
        
        var response = await next();
        
        _timer.Stop();
        
        var elapsedMilliseconds = _timer.ElapsedMilliseconds;
        var requestName = typeof(TRequest).Name;
        
        // Log slow requests
        if (elapsedMilliseconds > 500) // Threshold: 500ms
        {
            _logger.LogWarning(
                "Slow request detected: {RequestName} took {ElapsedMilliseconds}ms", 
                requestName, elapsedMilliseconds);
        }
        
        // Track metrics
        _performanceMetrics.RecordRequestDuration(requestName, elapsedMilliseconds);
        
        return response;
    }
}

This comprehensive CQRS and MediatR masterclass provides a complete implementation guide for building scalable, maintainable ASP.NET Core applications. The patterns and practices demonstrated here will help you tackle complex business domains while maintaining clean, testable code architecture.


Powered By: FreeLearning365.com

Post a Comment

0 Comments