Real-Time Magic: SignalR and gRPC - Bring Apps Alive with Interactive Wonders!
Master real-time communication in ASP.NET Core with SignalR for live chats and gRPC for high-performance microservices. Build interactive wonders!
ASPNETCore,SignalR,gRPC,RealTime,WebSockets,Microservices,Performance,WebAPI,ChatApps,LiveUpdates
Table of Contents
1. Introduction to Real-Time Communication <a name="introduction"></a>
Real-time communication has transformed how users interact with web applications. From live chat systems to collaborative editing tools and real-time dashboards, the demand for instant data updates has never been higher. ASP.NET Core provides two powerful technologies for building real-time applications: SignalR for WebSocket-based communication and gRPC for high-performance service-to-service communication.
The Evolution of Real-Time Web
Traditional web applications followed a request-response pattern where clients had to poll servers for updates. This approach was inefficient and created latency. Modern real-time technologies enable:
Instant Updates: Push data to clients immediately when changes occur
Bidirectional Communication: Both client and server can initiate communication
Reduced Latency: Eliminate polling overhead with persistent connections
Better User Experience: Create responsive, interactive applications
When to Use SignalR vs gRPC
public class CommunicationChoiceGuide { public CommunicationTechnology ChooseTechnology(ApplicationRequirements requirements) { if (requirements.NeedBrowserClientSupport && requirements.RealTimeUpdates) { return CommunicationTechnology.SignalR; } else if (requirements.HighPerformance && requirements.InternalServiceCommunication) { return CommunicationTechnology.gRPC; } else if (requirements.BothBrowserAndServiceCommunication) { return CommunicationTechnology.Hybrid; } return CommunicationTechnology.RestAPI; } } public enum CommunicationTechnology { SignalR, // Web browsers, mobile apps, real-time features gRPC, // Microservices, internal APIs, high-performance Hybrid, // Combined approach RestAPI // Traditional HTTP APIs }
Real-World Applications
Live Chat Systems: Customer support, team collaboration
Real-Time Dashboards: Stock trading, IoT monitoring, analytics
Collaborative Tools: Google Docs-like editing, design tools
Gaming: Multiplayer games, live scores
Notifications: Live updates, alerts, announcements
Financial Systems: Live trading, price updates
2. Understanding SignalR Fundamentals <a name="signalr-fundamentals"></a>
SignalR Architecture Overview
SignalR provides an abstraction over real-time transports, automatically choosing the best available transport method:
WebSockets (Preferred): Full-duplex communication over a single TCP connection
Server-Sent Events: One-way communication from server to client
Long Polling: Fallback for environments that don't support newer transports
Basic SignalR Setup
// Program.cs configuration using Microsoft.AspNetCore.SignalR; var builder = WebApplication.CreateBuilder(args); // Add SignalR services builder.Services.AddSignalR(options => { options.EnableDetailedErrors = true; options.KeepAliveInterval = TimeSpan.FromSeconds(15); options.ClientTimeoutInterval = TimeSpan.FromSeconds(30); }); // Add CORS for cross-origin requests builder.Services.AddCors(options => { options.AddPolicy("SignalRPolicy", policy => { policy.WithOrigins("https://localhost:3000", "https://myapp.com") .AllowAnyHeader() .AllowAnyMethod() .AllowCredentials(); }); }); var app = builder.Build(); // Configure the HTTP request pipeline app.UseCors("SignalRPolicy"); app.UseRouting(); app.MapHub<ChatHub>("/chathub"); app.MapHub<NotificationHub>("/notifications"); app.MapHub<DashboardHub>("/dashboard"); app.Run();
Creating Your First SignalR Hub
using Microsoft.AspNetCore.SignalR; using System.Threading.Tasks; public class ChatHub : Hub { private static readonly Dictionary<string, string> _userConnections = new(); // Client calls this method to join the chat public async Task JoinChat(string userName) { _userConnections[Context.ConnectionId] = userName; // Notify all clients that a user joined await Clients.All.SendAsync("UserJoined", userName, DateTime.Now.ToString()); // Send current user list to the new user await Clients.Caller.SendAsync("CurrentUsers", _userConnections.Values.ToArray()); await Clients.Caller.SendAsync("ReceiveMessage", "System", $"Welcome to the chat, {userName}!"); } // Client calls this method to send a message public async Task SendMessage(string user, string message) { if (string.IsNullOrWhiteSpace(message)) return; // Broadcast message to all connected clients await Clients.All.SendAsync("ReceiveMessage", user, message, DateTime.Now.ToString()); } // Client calls this method to send a private message public async Task SendPrivateMessage(string targetUser, string message) { var sender = _userConnections[Context.ConnectionId]; var targetConnection = _userConnections .FirstOrDefault(x => x.Value == targetUser).Key; if (targetConnection != null) { await Clients.Client(targetConnection).SendAsync("ReceivePrivateMessage", sender, message, DateTime.Now.ToString()); await Clients.Caller.SendAsync("ReceivePrivateMessage", $"You to {targetUser}", message, DateTime.Now.ToString()); } } // Handle client connection public override async Task OnConnectedAsync() { await Clients.Caller.SendAsync("ReceiveMessage", "System", "Connected to chat server!"); await base.OnConnectedAsync(); } // Handle client disconnection public override async Task OnDisconnectedAsync(Exception? exception) { if (_userConnections.TryGetValue(Context.ConnectionId, out var userName)) { _userConnections.Remove(Context.ConnectionId); await Clients.All.SendAsync("UserLeft", userName, DateTime.Now.ToString()); } await base.OnDisconnectedAsync(exception); } }
JavaScript Client Implementation
<!DOCTYPE html> <html> <head> <title>Real-Time Chat</title> <style> .chat-container { max-width: 800px; margin: 0 auto; padding: 20px; } .messages { height: 400px; border: 1px solid #ccc; overflow-y: scroll; padding: 10px; } .message { margin-bottom: 10px; padding: 8px; border-radius: 5px; } .system { background-color: #f0f0f0; color: #666; } .user { background-color: #e3f2fd; } .private { background-color: #fff3e0; border-left: 3px solid #ff9800; } .input-area { margin-top: 20px; display: flex; gap: 10px; } input, button { padding: 10px; font-size: 16px; } </style> </head> <body> <div class="chat-container"> <h1>Real-Time Chat</h1> <div class="connection-status"> <span id="status">Disconnected</span> </div> <div class="user-setup" id="userSetup"> <input type="text" id="userNameInput" placeholder="Enter your name" /> <button onclick="joinChat()">Join Chat</button> </div> <div class="chat-area" id="chatArea" style="display: none;"> <div class="users-online"> <h3>Online Users: <span id="userCount">0</span></h3> <div id="userList"></div> </div> <div id="messages" class="messages"></div> <div class="input-area"> <input type="text" id="messageInput" placeholder="Type your message..." /> <button onclick="sendMessage()">Send</button> <select id="userSelect"> <option value="">Everyone</option> </select> <button onclick="sendPrivateMessage()">Send Private</button> </div> </div> </div> <script src="https://cdnjs.cloudflare.com/ajax/libs/microsoft-signalr/6.0.8/signalr.min.js"></script> <script> const connection = new signalR.HubConnectionBuilder() .withUrl("/chathub") .withAutomaticReconnect([0, 2000, 5000, 10000, 30000]) // Reconnect intervals .configureLogging(signalR.LogLevel.Information) .build(); let currentUser = ''; // Connection event handlers connection.onreconnecting(error => { console.log('Connection lost. Reconnecting...', error); updateStatus('Reconnecting...', 'orange'); }); connection.onreconnected(connectionId => { console.log('Connection reestablished.'); updateStatus('Connected', 'green'); if (currentUser) { connection.invoke("JoinChat", currentUser); } }); connection.onclose(error => { console.log('Connection closed.', error); updateStatus('Disconnected', 'red'); }); // Message handlers connection.on("ReceiveMessage", (user, message, timestamp) => { addMessage(user, message, timestamp, 'user'); }); connection.on("ReceivePrivateMessage", (user, message, timestamp) => { addMessage(user, message, timestamp, 'private'); }); connection.on("UserJoined", (userName, timestamp) => { addMessage('System', `${userName} joined the chat`, timestamp, 'system'); }); connection.on("UserLeft", (userName, timestamp) => { addMessage('System', `${userName} left the chat`, timestamp, 'system'); }); connection.on("CurrentUsers", (users) => { updateUserList(users); }); // UI functions function updateStatus(message, color) { const statusElement = document.getElementById('status'); statusElement.textContent = message; statusElement.style.color = color; } function addMessage(user, message, timestamp, type) { const messagesDiv = document.getElementById('messages'); const messageDiv = document.createElement('div'); messageDiv.className = `message ${type}`; const time = new Date(timestamp).toLocaleTimeString(); messageDiv.innerHTML = ` <strong>${user}</strong> <small>(${time})</small>: ${message} `; messagesDiv.appendChild(messageDiv); messagesDiv.scrollTop = messagesDiv.scrollHeight; } function updateUserList(users) { const userList = document.getElementById('userList'); const userSelect = document.getElementById('userSelect'); userList.innerHTML = ''; userSelect.innerHTML = '<option value="">Everyone</option>'; users.forEach(user => { if (user !== currentUser) { // Add to online users list const userElement = document.createElement('div'); userElement.textContent = user; userList.appendChild(userElement); // Add to private message dropdown const option = document.createElement('option'); option.value = user; option.textContent = user; userSelect.appendChild(option); } }); document.getElementById('userCount').textContent = users.length; } // Chat functions async function joinChat() { const userName = document.getElementById('userNameInput').value.trim(); if (!userName) return; try { await connection.start(); await connection.invoke("JoinChat", userName); currentUser = userName; document.getElementById('userSetup').style.display = 'none'; document.getElementById('chatArea').style.display = 'block'; updateStatus('Connected', 'green'); } catch (err) { console.error('Failed to join chat:', err); alert('Failed to join chat. Please try again.'); } } async function sendMessage() { const messageInput = document.getElementById('messageInput'); const message = messageInput.value.trim(); if (!message) return; try { await connection.invoke("SendMessage", currentUser, message); messageInput.value = ''; } catch (err) { console.error('Failed to send message:', err); } } async function sendPrivateMessage() { const messageInput = document.getElementById('messageInput'); const userSelect = document.getElementById('userSelect'); const targetUser = userSelect.value; const message = messageInput.value.trim(); if (!targetUser || !message) return; try { await connection.invoke("SendPrivateMessage", targetUser, message); messageInput.value = ''; } catch (err) { console.error('Failed to send private message:', err); } } // Handle Enter key document.getElementById('messageInput').addEventListener('keypress', function(e) { if (e.key === 'Enter') { const userSelect = document.getElementById('userSelect'); if (userSelect.value === '') { sendMessage(); } else { sendPrivateMessage(); } } }); // Initialize connection async function startConnection() { try { await connection.start(); updateStatus('Connected', 'green'); } catch (err) { console.error('SignalR Connection Error: ', err); updateStatus('Connection Failed', 'red'); setTimeout(startConnection, 5000); } } startConnection(); </script> </body> </html>
Advanced SignalR Hub with Strong Typing
// Strongly typed hub interface for better IntelliSense public interface IChatClient { Task ReceiveMessage(string user, string message, string timestamp); Task ReceivePrivateMessage(string fromUser, string message, string timestamp); Task UserJoined(string userName, string timestamp); Task UserLeft(string userName, string timestamp); Task CurrentUsers(string[] users); Task MessageDelivered(string messageId, string timestamp); Task UserTyping(string userName, bool isTyping); } // Strongly typed hub public class AdvancedChatHub : Hub<IChatClient> { private readonly IChatRepository _chatRepository; private readonly ILogger<AdvancedChatHub> _logger; public AdvancedChatHub(IChatRepository chatRepository, ILogger<AdvancedChatHub> logger) { _chatRepository = chatRepository; _logger = logger; } public async Task JoinChat(string userName, string room = "general") { var user = new ChatUser { ConnectionId = Context.ConnectionId, UserName = userName, Room = room, JoinedAt = DateTime.UtcNow }; await _chatRepository.AddUserAsync(user); await Groups.AddToGroupAsync(Context.ConnectionId, room); // Notify group about new user await Clients.Group(room).UserJoined(userName, DateTime.UtcNow.ToString()); // Send room history to the new user var roomHistory = await _chatRepository.GetRoomHistoryAsync(room, 50); foreach (var message in roomHistory) { await Clients.Caller.ReceiveMessage(message.UserName, message.Content, message.SentAt.ToString()); } // Send current room users var roomUsers = await _chatRepository.GetRoomUsersAsync(room); await Clients.Caller.CurrentUsers(roomUsers.Select(u => u.UserName).ToArray()); _logger.LogInformation("User {UserName} joined room {Room}", userName, room); } public async Task SendMessageToRoom(string room, string message) { if (string.IsNullOrWhiteSpace(message)) return; var user = await _chatRepository.GetUserAsync(Context.ConnectionId); if (user == null) return; var chatMessage = new ChatMessage { Id = Guid.NewGuid().ToString(), UserName = user.UserName, Room = room, Content = message, SentAt = DateTime.UtcNow }; await _chatRepository.AddMessageAsync(chatMessage); // Broadcast to room await Clients.Group(room).ReceiveMessage(user.UserName, message, DateTime.UtcNow.ToString()); _logger.LogDebug("Message sent to room {Room} by {User}", room, user.UserName); } public async Task StartTyping(string room) { var user = await _chatRepository.GetUserAsync(Context.ConnectionId); if (user != null) { await Clients.OthersInGroup(room).UserTyping(user.UserName, true); } } public async Task StopTyping(string room) { var user = await _chatRepository.GetUserAsync(Context.ConnectionId); if (user != null) { await Clients.OthersInGroup(room).UserTyping(user.UserName, false); } } public override async Task OnDisconnectedAsync(Exception? exception) { var user = await _chatRepository.GetUserAsync(Context.ConnectionId); if (user != null) { await _chatRepository.RemoveUserAsync(Context.ConnectionId); await Groups.RemoveFromGroupAsync(Context.ConnectionId, user.Room); await Clients.Group(user.Room).UserLeft(user.UserName, DateTime.UtcNow.ToString()); _logger.LogInformation("User {UserName} left room {Room}", user.UserName, user.Room); } await base.OnDisconnectedAsync(exception); } } // Supporting classes public class ChatUser { public string ConnectionId { get; set; } = string.Empty; public string UserName { get; set; } = string.Empty; public string Room { get; set; } = "general"; public DateTime JoinedAt { get; set; } } public class ChatMessage { public string Id { get; set; } = string.Empty; public string UserName { get; set; } = string.Empty; public string Room { get; set; } = "general"; public string Content { get; set; } = string.Empty; public DateTime SentAt { get; set; } } public interface IChatRepository { Task AddUserAsync(ChatUser user); Task<ChatUser?> GetUserAsync(string connectionId); Task RemoveUserAsync(string connectionId); Task AddMessageAsync(ChatMessage message); Task<List<ChatMessage>> GetRoomHistoryAsync(string room, int maxMessages); Task<List<ChatUser>> GetRoomUsersAsync(string room); } public class InMemoryChatRepository : IChatRepository { private readonly ConcurrentDictionary<string, ChatUser> _users = new(); private readonly ConcurrentBag<ChatMessage> _messages = new(); public Task AddUserAsync(ChatUser user) { _users[user.ConnectionId] = user; return Task.CompletedTask; } public Task<ChatUser?> GetUserAsync(string connectionId) { _users.TryGetValue(connectionId, out var user); return Task.FromResult(user); } public Task RemoveUserAsync(string connectionId) { _users.TryRemove(connectionId, out _); return Task.CompletedTask; } public Task AddMessageAsync(ChatMessage message) { _messages.Add(message); return Task.CompletedTask; } public Task<List<ChatMessage>> GetRoomHistoryAsync(string room, int maxMessages) { var history = _messages .Where(m => m.Room == room) .OrderByDescending(m => m.SentAt) .Take(maxMessages) .OrderBy(m => m.SentAt) .ToList(); return Task.FromResult(history); } public Task<List<ChatUser>> GetRoomUsersAsync(string room) { var users = _users.Values .Where(u => u.Room == room) .ToList(); return Task.FromResult(users); } }
3. Building Real-Time Chat Applications <a name="chat-applications"></a>
Complete Chat Application Architecture
// Message types for different chat features public abstract class ChatMessageBase { public string Id { get; set; } = Guid.NewGuid().ToString(); public string Type { get; set; } = string.Empty; public string RoomId { get; set; } = string.Empty; public string Sender { get; set; } = string.Empty; public DateTime Timestamp { get; set; } = DateTime.UtcNow; } public class TextMessage : ChatMessageBase { public string Content { get; set; } = string.Empty; public string? ReplyTo { get; set; } } public class ImageMessage : ChatMessageBase { public string ImageUrl { get; set; } = string.Empty; public string? Caption { get; set; } public long FileSize { get; set; } } public class SystemMessage : ChatMessageBase { public string Action { get; set; } = string.Empty; // "user_joined", "user_left", "room_created" public string Data { get; set; } = string.Empty; } public class TypingNotification : ChatMessageBase { public bool IsTyping { get; set; } } // Enhanced chat hub with multiple rooms and rich messages public class EnterpriseChatHub : Hub<IChatClient> { private readonly IChatService _chatService; private readonly ILogger<EnterpriseChatHub> _logger; public EnterpriseChatHub(IChatService chatService, ILogger<EnterpriseChatHub> logger) { _chatService = chatService; _logger = logger; } public async Task<JoinRoomResult> JoinRoom(string roomId, string userName) { try { var user = new ChatUser { ConnectionId = Context.ConnectionId, UserName = userName, RoomId = roomId, JoinedAt = DateTime.UtcNow }; await _chatService.AddUserToRoomAsync(user); await Groups.AddToGroupAsync(Context.ConnectionId, roomId); // Notify room await Clients.Group(roomId).UserJoined(userName, DateTime.UtcNow.ToString()); // Get room state var roomState = await _chatService.GetRoomStateAsync(roomId); _logger.LogInformation("User {UserName} joined room {RoomId}", userName, roomId); return new JoinRoomResult { Success = true, RoomState = roomState, Message = $"Joined room {roomId} successfully" }; } catch (Exception ex) { _logger.LogError(ex, "Error joining room {RoomId} for user {UserName}", roomId, userName); return new JoinRoomResult { Success = false, Message = "Failed to join room" }; } } public async Task SendTextMessage(string roomId, string content, string? replyTo = null) { var user = await _chatService.GetUserAsync(Context.ConnectionId); if (user == null || user.RoomId != roomId) return; var message = new TextMessage { RoomId = roomId, Sender = user.UserName, Content = content, ReplyTo = replyTo }; await _chatService.SaveMessageAsync(message); await Clients.Group(roomId).ReceiveMessage(user.UserName, content, DateTime.UtcNow.ToString()); _logger.LogDebug("Text message sent to room {RoomId} by {User}", roomId, user.UserName); } public async Task SendImageMessage(string roomId, string imageUrl, string caption, long fileSize) { var user = await _chatService.GetUserAsync(Context.ConnectionId); if (user == null || user.RoomId != roomId) return; var message = new ImageMessage { RoomId = roomId, Sender = user.UserName, ImageUrl = imageUrl, Caption = caption, FileSize = fileSize }; await _chatService.SaveMessageAsync(message); // For image messages, we might want a different client method await Clients.Group(roomId).ReceiveImageMessage( user.UserName, imageUrl, caption, fileSize, DateTime.UtcNow.ToString()); _logger.LogDebug("Image message sent to room {RoomId} by {User}", roomId, user.UserName); } public async Task NotifyTyping(string roomId, bool isTyping) { var user = await _chatService.GetUserAsync(Context.ConnectionId); if (user != null && user.RoomId == roomId) { await Clients.OthersInGroup(roomId).UserTyping(user.UserName, isTyping); } } public async Task<RoomHistory> GetMessageHistory(string roomId, int skip = 0, int take = 50) { var user = await _chatService.GetUserAsync(Context.ConnectionId); if (user == null || user.RoomId != roomId) throw new HubException("User not in room"); return await _chatService.GetMessageHistoryAsync(roomId, skip, take); } public override async Task OnDisconnectedAsync(Exception? exception) { var user = await _chatService.GetUserAsync(Context.ConnectionId); if (user != null) { await _chatService.RemoveUserFromRoomAsync(Context.ConnectionId); await Groups.RemoveFromGroupAsync(Context.ConnectionId, user.RoomId); await Clients.Group(user.RoomId).UserLeft(user.UserName, DateTime.UtcNow.ToString()); _logger.LogInformation("User {UserName} left room {RoomId}", user.UserName, user.RoomId); } await base.OnDisconnectedAsync(exception); } } public class JoinRoomResult { public bool Success { get; set; } public RoomState? RoomState { get; set; } public string Message { get; set; } = string.Empty; } public class RoomState { public string RoomId { get; set; } = string.Empty; public string RoomName { get; set; } = string.Empty; public List<string> OnlineUsers { get; set; } = new(); public List<ChatMessageBase> RecentMessages { get; set; } = new(); public int TotalMessages { get; set; } } public class RoomHistory { public List<ChatMessageBase> Messages { get; set; } = new(); public bool HasMore { get; set; } public int TotalCount { get; set; } }
Chat Service Implementation
public interface IChatService { Task AddUserToRoomAsync(ChatUser user); Task<ChatUser?> GetUserAsync(string connectionId); Task RemoveUserFromRoomAsync(string connectionId); Task SaveMessageAsync(ChatMessageBase message); Task<RoomState> GetRoomStateAsync(string roomId); Task<RoomHistory> GetMessageHistoryAsync(string roomId, int skip, int take); Task<List<ChatRoom>> GetAvailableRoomsAsync(); Task<ChatRoom> CreateRoomAsync(string roomName, string createdBy); } public class ChatService : IChatService { private readonly IChatRepository _repository; private readonly ILogger<ChatService> _logger; public ChatService(IChatRepository repository, ILogger<ChatService> logger) { _repository = repository; _logger = logger; } public async Task AddUserToRoomAsync(ChatUser user) { await _repository.AddUserAsync(user); _logger.LogDebug("User {UserName} added to room {RoomId}", user.UserName, user.RoomId); } public async Task<ChatUser?> GetUserAsync(string connectionId) { return await _repository.GetUserAsync(connectionId); } public async Task RemoveUserFromRoomAsync(string connectionId) { await _repository.RemoveUserAsync(connectionId); } public async Task SaveMessageAsync(ChatMessageBase message) { await _repository.AddMessageAsync(message); // You could add additional logic here: // - Check for profanity // - Update room last activity // - Trigger notifications // - Save to persistent storage } public async Task<RoomState> GetRoomStateAsync(string roomId) { var room = await _repository.GetRoomAsync(roomId); var users = await _repository.GetRoomUsersAsync(roomId); var recentMessages = await _repository.GetRecentMessagesAsync(roomId, 50); var totalMessages = await _repository.GetRoomMessageCountAsync(roomId); return new RoomState { RoomId = roomId, RoomName = room?.Name ?? roomId, OnlineUsers = users.Select(u => u.UserName).ToList(), RecentMessages = recentMessages, TotalMessages = totalMessages }; } public async Task<RoomHistory> GetMessageHistoryAsync(string roomId, int skip, int take) { var messages = await _repository.GetMessagesAsync(roomId, skip, take + 1); var hasMore = messages.Count > take; if (hasMore) { messages = messages.Take(take).ToList(); } var totalCount = await _repository.GetRoomMessageCountAsync(roomId); return new RoomHistory { Messages = messages, HasMore = hasMore, TotalCount = totalCount }; } public async Task<List<ChatRoom>> GetAvailableRoomsAsync() { return await _repository.GetRoomsAsync(); } public async Task<ChatRoom> CreateRoomAsync(string roomName, string createdBy) { var room = new ChatRoom { Id = Guid.NewGuid().ToString(), Name = roomName, CreatedBy = createdBy, CreatedAt = DateTime.UtcNow, IsActive = true }; await _repository.AddRoomAsync(room); _logger.LogInformation("Room {RoomName} created by {User}", roomName, createdBy); return room; } }
4. SignalR Hubs and Groups <a name="hubs-groups"></a>
Advanced Group Management
public class GroupManagementHub : Hub { private readonly IGroupService _groupService; private readonly ILogger<GroupManagementHub> _logger; public GroupManagementHub(IGroupService groupService, ILogger<GroupManagementHub> logger) { _groupService = groupService; _logger = logger; } // Join multiple groups public async Task JoinGroups(string[] groupNames) { foreach (var groupName in groupNames) { await Groups.AddToGroupAsync(Context.ConnectionId, groupName); _logger.LogDebug("Connection {ConnectionId} joined group {GroupName}", Context.ConnectionId, groupName); } await Clients.Caller.SendAsync("GroupsJoined", groupNames); } // Leave specific groups public async Task LeaveGroups(string[] groupNames) { foreach (var groupName in groupNames) { await Groups.RemoveFromGroupAsync(Context.ConnectionId, groupName); _logger.LogDebug("Connection {ConnectionId} left group {GroupName}", Context.ConnectionId, groupName); } await Clients.Caller.SendAsync("GroupsLeft", groupNames); } // Get groups for current connection public async Task<string[]> GetMyGroups() { // Note: SignalR doesn't provide a built-in way to get groups for a connection // You need to track this yourself return await _groupService.GetConnectionGroupsAsync(Context.ConnectionId); } // Send message to specific groups public async Task SendToGroups(string[] groupNames, string message) { if (groupNames == null || !groupNames.Any()) return; // Send to multiple groups foreach (var groupName in groupNames) { await Clients.Group(groupName).SendAsync("ReceiveGroupMessage", Context.ConnectionId, message, DateTime.UtcNow); } _logger.LogDebug("Message sent to groups: {Groups}", string.Join(", ", groupNames)); } // Broadcast to all except specified groups public async Task BroadcastExceptGroups(string[] excludedGroups, string message) { // This requires custom implementation since SignalR doesn't have built-in exclusion var allConnections = await _groupService.GetAllConnectionsAsync(); var excludedConnections = new HashSet<string>(); foreach (var group in excludedGroups) { var groupConnections = await _groupService.GetGroupConnectionsAsync(group); foreach (var connection in groupConnections) { excludedConnections.Add(connection); } } var targetConnections = allConnections.Except(excludedConnections).ToList(); foreach (var connectionId in targetConnections) { await Clients.Client(connectionId).SendAsync("ReceiveMessage", "System", message, DateTime.UtcNow); } } } // Custom group tracking service public interface IGroupService { Task AddConnectionToGroupAsync(string connectionId, string groupName); Task RemoveConnectionFromGroupAsync(string connectionId, string groupName); Task<List<string>> GetConnectionGroupsAsync(string connectionId); Task<List<string>> GetGroupConnectionsAsync(string groupName); Task<List<string>> GetAllConnectionsAsync(); } public class InMemoryGroupService : IGroupService { private readonly ConcurrentDictionary<string, HashSet<string>> _connectionGroups = new(); private readonly ConcurrentDictionary<string, HashSet<string>> _groupConnections = new(); public Task AddConnectionToGroupAsync(string connectionId, string groupName) { // Track connection -> groups _connectionGroups.AddOrUpdate(connectionId, new HashSet<string> { groupName }, (key, existing) => { existing.Add(groupName); return existing; }); // Track group -> connections _groupConnections.AddOrUpdate(groupName, new HashSet<string> { connectionId }, (key, existing) => { existing.Add(connectionId); return existing; }); return Task.CompletedTask; } public Task RemoveConnectionFromGroupAsync(string connectionId, string groupName) { // Remove from connection -> groups if (_connectionGroups.TryGetValue(connectionId, out var groups)) { groups.Remove(groupName); if (!groups.Any()) { _connectionGroups.TryRemove(connectionId, out _); } } // Remove from group -> connections if (_groupConnections.TryGetValue(groupName, out var connections)) { connections.Remove(connectionId); if (!connections.Any()) { _groupConnections.TryRemove(groupName, out _); } } return Task.CompletedTask; } public Task<List<string>> GetConnectionGroupsAsync(string connectionId) { if (_connectionGroups.TryGetValue(connectionId, out var groups)) { return Task.FromResult(groups.ToList()); } return Task.FromResult(new List<string>()); } public Task<List<string>> GetGroupConnectionsAsync(string groupName) { if (_groupConnections.TryGetValue(groupName, out var connections)) { return Task.FromResult(connections.ToList()); } return Task.FromResult(new List<string>()); } public Task<List<string>> GetAllConnectionsAsync() { return Task.FromResult(_connectionGroups.Keys.ToList()); } }
Real-Time Notification System
public interface INotificationClient { Task ReceiveNotification(Notification notification); Task NotificationRead(string notificationId); Task NotificationCountUpdated(int unreadCount); } public class NotificationHub : Hub<INotificationClient> { private readonly INotificationService _notificationService; private readonly ILogger<NotificationHub> _logger; public NotificationHub( INotificationService notificationService, ILogger<NotificationHub> logger) { _notificationService = notificationService; _logger = logger; } public async Task MarkAsRead(string notificationId) { var userId = GetUserIdFromContext(); if (string.IsNullOrEmpty(userId)) return; await _notificationService.MarkAsReadAsync(notificationId, userId); await Clients.Caller.NotificationRead(notificationId); // Update unread count var unreadCount = await _notificationService.GetUnreadCountAsync(userId); await Clients.Caller.NotificationCountUpdated(unreadCount); } public async Task MarkAllAsRead() { var userId = GetUserIdFromContext(); if (string.IsNullOrEmpty(userId)) return; await _notificationService.MarkAllAsReadAsync(userId); await Clients.Caller.NotificationRead("all"); await Clients.Caller.NotificationCountUpdated(0); } public async Task<List<Notification>> GetNotifications(int skip = 0, int take = 20) { var userId = GetUserIdFromContext(); if (string.IsNullOrEmpty(userId)) return new List<Notification>(); return await _notificationService.GetUserNotificationsAsync(userId, skip, take); } public override async Task OnConnectedAsync() { var userId = GetUserIdFromContext(); if (!string.IsNullOrEmpty(userId)) { await Groups.AddToGroupAsync(Context.ConnectionId, $"user_{userId}"); // Send initial unread count var unreadCount = await _notificationService.GetUnreadCountAsync(userId); await Clients.Caller.NotificationCountUpdated(unreadCount); _logger.LogDebug("User {UserId} connected to notifications", userId); } await base.OnConnectedAsync(); } public override async Task OnDisconnectedAsync(Exception? exception) { var userId = GetUserIdFromContext(); if (!string.IsNullOrEmpty(userId)) { await Groups.RemoveFromGroupAsync(Context.ConnectionId, $"user_{userId}"); } await base.OnDisconnectedAsync(exception); } private string? GetUserIdFromContext() { // In a real application, you'd get this from the claims return Context.User?.FindFirst("sub")?.Value; } } // Notification service for sending notifications public class NotificationService : INotificationService { private readonly IHubContext<NotificationHub, INotificationClient> _hubContext; private readonly INotificationRepository _repository; private readonly ILogger<NotificationService> _logger; public NotificationService( IHubContext<NotificationHub, INotificationClient> hubContext, INotificationRepository repository, ILogger<NotificationService> logger) { _hubContext = hubContext; _repository = repository; _logger = logger; } public async Task SendNotificationAsync(Notification notification) { await _repository.AddNotificationAsync(notification); // Send to specific user await _hubContext.Clients.Group($"user_{notification.UserId}") .ReceiveNotification(notification); _logger.LogInformation("Notification sent to user {UserId}", notification.UserId); } public async Task SendNotificationToUsersAsync(Notification notification, IEnumerable<string> userIds) { foreach (var userId in userIds) { var userNotification = notification with { UserId = userId, Id = Guid.NewGuid().ToString() }; await _repository.AddNotificationAsync(userNotification); await _hubContext.Clients.Group($"user_{userId}") .ReceiveNotification(userNotification); } _logger.LogInformation("Notification sent to {Count} users", userIds.Count()); } public async Task BroadcastNotificationAsync(Notification notification) { // This would send to all connected users // Be careful with this in production! await _hubContext.Clients.All.ReceiveNotification(notification); _logger.LogInformation("Broadcast notification sent to all users"); } public async Task MarkAsReadAsync(string notificationId, string userId) { await _repository.MarkAsReadAsync(notificationId, userId); } public async Task MarkAllAsReadAsync(string userId) { await _repository.MarkAllAsReadAsync(userId); } public async Task<int> GetUnreadCountAsync(string userId) { return await _repository.GetUnreadCountAsync(userId); } public async Task<List<Notification>> GetUserNotificationsAsync(string userId, int skip, int take) { return await _repository.GetUserNotificationsAsync(userId, skip, take); } } public record Notification( string Id, string UserId, string Title, string Message, NotificationType Type, DateTime CreatedAt, bool IsRead = false, string? ActionUrl = null, Dictionary<string, object>? Data = null); public enum NotificationType { Info, Success, Warning, Error, System }
5. Scaling SignalR Applications <a name="scaling-signalr"></a>
Redis Backplane for Scale-Out
// Program.cs configuration for Redis backplane var builder = WebApplication.CreateBuilder(args); builder.Services.AddSignalR() .AddStackExchangeRedis(redisConnectionString, options => { options.Configuration.ChannelPrefix = "MyApp"; }); var app = builder.Build(); app.MapHub<ChatHub>("/chathub"); app.MapHub<NotificationHub>("/notifications"); app.Run();
Azure SignalR Service Integration
// Azure SignalR Service configuration var builder = WebApplication.CreateBuilder(args); builder.Services.AddSignalR() .AddAzureSignalR(options => { options.ServerStickyMode = ServerStickyMode.Required; options.RetryInterval = TimeSpan.FromSeconds(1); options.MaxRetries = 3; }); // Or using connection string from configuration builder.Services.AddSignalR() .AddAzureSignalR(builder.Configuration["Azure:SignalR:ConnectionString"]); var app = builder.Build(); app.MapHub<ChatHub>("/chathub"); app.MapHub<NotificationHub>("/notifications"); app.UseRouting(); app.UseEndpoints(endpoints => { endpoints.MapHub<ChatHub>("/chathub"); endpoints.MapHub<NotificationHub>("/notifications"); }); app.Run();
Custom Scale-Out Implementation
public interface IMessageBus { Task PublishAsync<T>(string channel, T message); Task SubscribeAsync<T>(string channel, Func<T, Task> handler); Task UnsubscribeAsync(string channel); } public class RedisMessageBus : IMessageBus { private readonly IConnectionMultiplexer _redis; private readonly ILogger<RedisMessageBus> _logger; private readonly Dictionary<string, List<Func<object, Task>>> _handlers = new(); public RedisMessageBus(IConnectionMultiplexer redis, ILogger<RedisMessageBus> logger) { _redis = redis; _logger = logger; } public async Task PublishAsync<T>(string channel, T message) { var subscriber = _redis.GetSubscriber(); var serialized = JsonSerializer.Serialize(message); await subscriber.PublishAsync(channel, serialized); _logger.LogDebug("Message published to channel {Channel}", channel); } public async Task SubscribeAsync<T>(string channel, Func<T, Task> handler) { var subscriber = _redis.GetSubscriber(); await subscriber.SubscribeAsync(channel, async (redisChannel, value) => { try { var message = JsonSerializer.Deserialize<T>(value!); if (message != null) { await handler(message); } } catch (Exception ex) { _logger.LogError(ex, "Error handling message from channel {Channel}", channel); } }); _logger.LogInformation("Subscribed to channel {Channel}", channel); } public async Task UnsubscribeAsync(string channel) { var subscriber = _redis.GetSubscriber(); await subscriber.UnsubscribeAsync(channel); _logger.LogInformation("Unsubscribed from channel {Channel}", channel); } } // Scale-out aware hub public class ScalableChatHub : Hub { private readonly IMessageBus _messageBus; private readonly ILogger<ScalableChatHub> _logger; public ScalableChatHub(IMessageBus messageBus, ILogger<ScalableChatHub> logger) { _messageBus = messageBus; _logger = logger; } public async Task SendMessageToRoom(string roomId, string user, string message) { // Store message in database // ... // Broadcast to local clients await Clients.Group(roomId).ReceiveMessage(user, message, DateTime.UtcNow.ToString()); // Publish to message bus for other servers var broadcastMessage = new BroadcastMessage { ServerId = GetServerId(), RoomId = roomId, User = user, Message = message, Timestamp = DateTime.UtcNow }; await _messageBus.PublishAsync($"chat_room_{roomId}", broadcastMessage); } public override async Task OnConnectedAsync() { // Subscribe to message bus for this server var roomId = GetRoomIdFromContext(); if (!string.IsNullOrEmpty(roomId)) { await _messageBus.SubscribeAsync<BroadcastMessage>($"chat_room_{roomId}", async message => { // Ignore messages from this server if (message.ServerId != GetServerId()) { await Clients.Group(roomId).ReceiveMessage( message.User, message.Message, message.Timestamp.ToString()); } }); } await base.OnConnectedAsync(); } private string GetServerId() { // This could be machine name, container ID, or a configured value return Environment.MachineName; } private string? GetRoomIdFromContext() { // Extract room ID from query string or headers var httpContext = Context.GetHttpContext(); return httpContext?.Request.Query["roomId"].ToString(); } } public class BroadcastMessage { public string ServerId { get; set; } = string.Empty; public string RoomId { get; set; } = string.Empty; public string User { get; set; } = string.Empty; public string Message { get; set; } = string.Empty; public DateTime Timestamp { get; set; } }
6. gRPC Core Concepts <a name="grpc-concepts"></a>
Introduction to gRPC
gRPC is a modern, high-performance RPC (Remote Procedure Call) framework that can run in any environment. It uses HTTP/2 for transport, Protocol Buffers as the interface description language, and provides features such as:
Bidirectional streaming
Flow control
Authentication
Cancellation and timeouts
Protocol Buffers Definition
syntax = "proto3";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
option csharp_namespace = "ECommerce.Grpc";
// Product service definition
service ProductService {
rpc GetProduct (GetProductRequest) returns (ProductResponse);
rpc GetProducts (GetProductsRequest) returns (stream ProductResponse);
rpc CreateProduct (CreateProductRequest) returns (ProductResponse);
rpc UpdateProduct (UpdateProductRequest) returns (ProductResponse);
rpc DeleteProduct (DeleteProductRequest) returns (google.protobuf.Empty);
rpc StreamProductUpdates (StreamProductUpdatesRequest) returns (stream ProductUpdate);
}
// Order service definition
service OrderService {
rpc CreateOrder (CreateOrderRequest) returns (OrderResponse);
rpc GetOrder (GetOrderRequest) returns (OrderResponse);
rpc StreamOrders (StreamOrdersRequest) returns (stream OrderResponse);
rpc ProcessOrderStream (stream OrderProcessRequest) returns (stream OrderProcessResponse);
}
// Message definitions
message GetProductRequest {
int32 product_id = 1;
}
message GetProductsRequest {
repeated int32 product_ids = 1;
string category = 2;
int32 page_size = 3;
string page_token = 4;
}
message CreateProductRequest {
string name = 1;
string description = 2;
double price = 3;
int32 stock_quantity = 4;
string category_id = 5;
}
message UpdateProductRequest {
int32 product_id = 1;
string name = 2;
string description = 3;
double price = 4;
int32 stock_quantity = 5;
}
message DeleteProductRequest {
int32 product_id = 1;
}
message ProductResponse {
int32 product_id = 1;
string name = 2;
string description = 3;
double price = 4;
int32 stock_quantity = 5;
string category_id = 6;
google.protobuf.Timestamp created_at = 7;
google.protobuf.Timestamp updated_at = 8;
}
message ProductUpdate {
int32 product_id = 1;
ProductUpdateType update_type = 2;
ProductResponse product = 3;
google.protobuf.Timestamp timestamp = 4;
}
enum ProductUpdateType {
CREATED = 0;
UPDATED = 1;
DELETED = 2;
PRICE_CHANGED = 3;
STOCK_UPDATED = 4;
}
message StreamProductUpdatesRequest {
repeated int32 product_ids = 1;
repeated string categories = 2;
}
// Order messages
message CreateOrderRequest {
int32 user_id = 1;
repeated OrderItem items = 2;
string shipping_address = 3;
}
message OrderItem {
int32 product_id = 1;
int32 quantity = 2;
double unit_price = 3;
}
message GetOrderRequest {
int32 order_id = 1;
}
message OrderResponse {
int32 order_id = 1;
int32 user_id = 2;
repeated OrderItem items = 3;
double total_amount = 4;
OrderStatus status = 5;
string shipping_address = 6;
google.protobuf.Timestamp created_at = 7;
google.protobuf.Timestamp updated_at = 8;
}
message StreamOrdersRequest {
int32 user_id = 1;
OrderStatus status = 2;
google.protobuf.Timestamp from_date = 3;
}
message OrderProcessRequest {
int32 order_id = 1;
string action = 2; // "process", "cancel", "update"
map<string, string> metadata = 3;
}
message OrderProcessResponse {
int32 order_id = 1;
bool success = 2;
string message = 3;
OrderStatus new_status = 4;
}
enum OrderStatus {
PENDING = 0;
CONFIRMED = 1;
PROCESSING = 2;
SHIPPED = 3;
DELIVERED = 4;
CANCELLED = 5;
}gRPC Server Configuration
// Program.cs for gRPC server using ECommerce.Grpc; var builder = WebApplication.CreateBuilder(args); // Add gRPC services builder.Services.AddGrpc(options => { options.EnableDetailedErrors = true; options.Interceptors.Add<ExceptionInterceptor>(); options.Interceptors.Add<LoggingInterceptor>(); options.MaxReceiveMessageSize = 5 * 1024 * 1024; // 5MB options.MaxSendMessageSize = 5 * 1024 * 1024; // 5MB }); // Add gRPC reflection for testing builder.Services.AddGrpcReflection(); // Register application services builder.Services.AddScoped<IProductRepository, ProductRepository>(); builder.Services.AddScoped<IOrderRepository, OrderRepository>(); builder.Services.AddScoped<IProductService, ProductService>(); var app = builder.Build(); // Configure the HTTP request pipeline app.MapGrpcService<ProductGrpcService>(); app.MapGrpcService<OrderGrpcService>(); // Enable gRPC reflection in development if (app.Environment.IsDevelopment()) { app.MapGrpcReflectionService(); } // Health checks for gRPC services app.MapGet("/", () => "gRPC server is running. Use a gRPC client to communicate."); app.Run(); // gRPC interceptors for cross-cutting concerns public class ExceptionInterceptor : Interceptor { private readonly ILogger<ExceptionInterceptor> _logger; public ExceptionInterceptor(ILogger<ExceptionInterceptor> logger) { _logger = logger; } public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>( TRequest request, ServerCallContext context, UnaryServerMethod<TRequest, TResponse> continuation) { try { return await continuation(request, context); } catch (Exception ex) { _logger.LogError(ex, "Error processing gRPC call {Method}", context.Method); throw new RpcException(new Status(StatusCode.Internal, "An error occurred")); } } } public class LoggingInterceptor : Interceptor { private readonly ILogger<LoggingInterceptor> _logger; public LoggingInterceptor(ILogger<LoggingInterceptor> logger) { _logger = logger; } public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>( TRequest request, ServerCallContext context, UnaryServerMethod<TRequest, TResponse> continuation) { var stopwatch = Stopwatch.StartNew(); _logger.LogInformation("Starting gRPC call {Method}", context.Method); try { var response = await continuation(request, context); stopwatch.Stop(); _logger.LogInformation( "Completed gRPC call {Method} in {ElapsedMs}ms", context.Method, stopwatch.ElapsedMilliseconds); return response; } catch (Exception ex) { stopwatch.Stop(); _logger.LogError( ex, "gRPC call {Method} failed after {ElapsedMs}ms", context.Method, stopwatch.ElapsedMilliseconds); throw; } } }
7. Building High-Performance gRPC Services <a name="grpc-services"></a>
Product gRPC Service Implementation
using Grpc.Core; using Google.Protobuf.WellKnownTypes; using ECommerce.Grpc; namespace ECommerce.Server.Services; public class ProductGrpcService : ProductService.ProductServiceBase { private readonly IProductService _productService; private readonly ILogger<ProductGrpcService> _logger; public ProductGrpcService( IProductService productService, ILogger<ProductGrpcService> logger) { _productService = productService; _logger = logger; } public override async Task<ProductResponse> GetProduct( GetProductRequest request, ServerCallContext context) { _logger.LogInformation("Getting product {ProductId}", request.ProductId); var product = await _productService.GetProductAsync(request.ProductId); if (product == null) { throw new RpcException( new Status(StatusCode.NotFound, $"Product {request.ProductId} not found")); } return MapToProductResponse(product); } public override async Task GetProducts( GetProductsRequest request, IServerStreamWriter<ProductResponse> responseStream, ServerCallContext context) { _logger.LogInformation("Streaming products for request"); var products = await _productService.GetProductsAsync( request.ProductIds.ToList(), request.Category, request.PageSize, request.PageToken); foreach (var product in products) { // Check if the client has cancelled the request if (context.CancellationToken.IsCancellationRequested) { _logger.LogInformation("Client cancelled product stream"); break; } await responseStream.WriteAsync(MapToProductResponse(product)); // Small delay to demonstrate streaming await Task.Delay(100, context.CancellationToken); } _logger.LogInformation("Completed streaming products"); } public override async Task<ProductResponse> CreateProduct( CreateProductRequest request, ServerCallContext context) { _logger.LogInformation("Creating new product: {ProductName}", request.Name); var product = new Product { Name = request.Name, Description = request.Description, Price = (decimal)request.Price, StockQuantity = request.StockQuantity, CategoryId = request.CategoryId }; var createdProduct = await _productService.CreateProductAsync(product); return MapToProductResponse(createdProduct); } public override async Task<ProductResponse> UpdateProduct( UpdateProductRequest request, ServerCallContext context) { _logger.LogInformation("Updating product {ProductId}", request.ProductId); var product = await _productService.GetProductAsync(request.ProductId); if (product == null) { throw new RpcException( new Status(StatusCode.NotFound, $"Product {request.ProductId} not found")); } product.Name = request.Name; product.Description = request.Description; product.Price = (decimal)request.Price; product.StockQuantity = request.StockQuantity; var updatedProduct = await _productService.UpdateProductAsync(product); return MapToProductResponse(updatedProduct); } public override async Task<Empty> DeleteProduct( DeleteProductRequest request, ServerCallContext context) { _logger.LogInformation("Deleting product {ProductId}", request.ProductId); var success = await _productService.DeleteProductAsync(request.ProductId); if (!success) { throw new RpcException( new Status(StatusCode.NotFound, $"Product {request.ProductId} not found")); } return new Empty(); } public override async Task StreamProductUpdates( StreamProductUpdatesRequest request, IServerStreamWriter<ProductUpdate> responseStream, ServerCallContext context) { _logger.LogInformation("Starting product updates stream"); // Subscribe to product updates var cancellationToken = context.CancellationToken; var productIds = request.ProductIds.ToList(); var categories = request.Categories.ToList(); // This would typically connect to a message bus or database change feed var productUpdateChannel = _productService.GetProductUpdateChannel( productIds, categories, cancellationToken); try { await foreach (var update in productUpdateChannel.ReadAllAsync(cancellationToken)) { if (cancellationToken.IsCancellationRequested) break; var productUpdate = new ProductUpdate { ProductId = update.ProductId, UpdateType = MapToUpdateType(update.UpdateType), Product = MapToProductResponse(update.Product), Timestamp = Timestamp.FromDateTime(update.Timestamp) }; await responseStream.WriteAsync(productUpdate); } } catch (OperationCanceledException) { _logger.LogInformation("Product updates stream cancelled by client"); } _logger.LogInformation("Product updates stream completed"); } private static ProductResponse MapToProductResponse(Product product) { return new ProductResponse { ProductId = product.Id, Name = product.Name, Description = product.Description, Price = (double)product.Price, StockQuantity = product.StockQuantity, CategoryId = product.CategoryId, CreatedAt = Timestamp.FromDateTime(product.CreatedAt), UpdatedAt = Timestamp.FromDateTime(product.UpdatedAt) }; } private static ProductUpdateType MapToUpdateType(ProductUpdateTypeEnum updateType) { return updateType switch { ProductUpdateTypeEnum.Created => ProductUpdateType.Created, ProductUpdateTypeEnum.Updated => ProductUpdateType.Updated, ProductUpdateTypeEnum.Deleted => ProductUpdateType.Deleted, ProductUpdateTypeEnum.PriceChanged => ProductUpdateType.PriceChanged, ProductUpdateTypeEnum.StockUpdated => ProductUpdateType.StockUpdated, _ => ProductUpdateType.Updated }; } } // Supporting classes public class Product { public int Id { get; set; } public string Name { get; set; } = string.Empty; public string Description { get; set; } = string.Empty; public decimal Price { get; set; } public int StockQuantity { get; set; } public string CategoryId { get; set; } = string.Empty; public DateTime CreatedAt { get; set; } public DateTime UpdatedAt { get; set; } } public class ProductUpdateEvent { public int ProductId { get; set; } public ProductUpdateTypeEnum UpdateType { get; set; } public Product Product { get; set; } = null!; public DateTime Timestamp { get; set; } } public enum ProductUpdateTypeEnum { Created, Updated, Deleted, PriceChanged, StockUpdated }
Order gRPC Service with Bidirectional Streaming
public class OrderGrpcService : OrderService.OrderServiceBase { private readonly IOrderService _orderService; private readonly ILogger<OrderGrpcService> _logger; public OrderGrpcService( IOrderService orderService, ILogger<OrderGrpcService> logger) { _orderService = orderService; _logger = logger; } public override async Task<OrderResponse> CreateOrder( CreateOrderRequest request, ServerCallContext context) { _logger.LogInformation("Creating order for user {UserId}", request.UserId); var order = new Order { UserId = request.UserId, ShippingAddress = request.ShippingAddress, Items = request.Items.Select(MapToOrderItem).ToList(), Status = OrderStatus.Pending, CreatedAt = DateTime.UtcNow }; var createdOrder = await _orderService.CreateOrderAsync(order); return MapToOrderResponse(createdOrder); } public override async Task<OrderResponse> GetOrder( GetOrderRequest request, ServerCallContext context) { _logger.LogInformation("Getting order {OrderId}", request.OrderId); var order = await _orderService.GetOrderAsync(request.OrderId); if (order == null) { throw new RpcException( new Status(StatusCode.NotFound, $"Order {request.OrderId} not found")); } return MapToOrderResponse(order); } public override async Task StreamOrders( StreamOrdersRequest request, IServerStreamWriter<OrderResponse> responseStream, ServerCallContext context) { _logger.LogInformation("Streaming orders for user {UserId}", request.UserId); var orders = await _orderService.GetOrdersAsync( request.UserId, MapToOrderStatus(request.Status), request.FromDate?.ToDateTime()); foreach (var order in orders) { if (context.CancellationToken.IsCancellationRequested) break; await responseStream.WriteAsync(MapToOrderResponse(order)); await Task.Delay(50, context.CancellationToken); // Simulate processing } _logger.LogInformation("Completed streaming orders"); } public override async Task ProcessOrderStream( IAsyncStreamReader<OrderProcessRequest> requestStream, IServerStreamWriter<OrderProcessResponse> responseStream, ServerCallContext context) { _logger.LogInformation("Starting bidirectional order processing stream"); await foreach (var request in requestStream.ReadAllAsync(context.CancellationToken)) { try { _logger.LogDebug("Processing order {OrderId} with action {Action}", request.OrderId, request.Action); OrderProcessResponse response; switch (request.Action.ToLower()) { case "process": response = await ProcessOrderAsync(request.OrderId, request.Metadata); break; case "cancel": response = await CancelOrderAsync(request.OrderId, request.Metadata); break; case "update": response = await UpdateOrderAsync(request.OrderId, request.Metadata); break; default: response = new OrderProcessResponse { OrderId = request.OrderId, Success = false, Message = $"Unknown action: {request.Action}", NewStatus = OrderStatus.Pending }; break; } await responseStream.WriteAsync(response); } catch (Exception ex) { _logger.LogError(ex, "Error processing order {OrderId}", request.OrderId); await responseStream.WriteAsync(new OrderProcessResponse { OrderId = request.OrderId, Success = false, Message = $"Error: {ex.Message}", NewStatus = OrderStatus.Pending }); } } _logger.LogInformation("Bidirectional order processing stream completed"); } private async Task<OrderProcessResponse> ProcessOrderAsync(int orderId, IDictionary<string, string> metadata) { var order = await _orderService.GetOrderAsync(orderId); if (order == null) { return new OrderProcessResponse { OrderId = orderId, Success = false, Message = "Order not found", NewStatus = OrderStatus.Pending }; } // Process the order (in a real app, this would involve payment, inventory, etc.) order.Status = OrderStatus.Confirmed; order.UpdatedAt = DateTime.UtcNow; await _orderService.UpdateOrderAsync(order); return new OrderProcessResponse { OrderId = orderId, Success = true, Message = "Order processed successfully", NewStatus = OrderStatus.Confirmed }; } private async Task<OrderProcessResponse> CancelOrderAsync(int orderId, IDictionary<string, string> metadata) { var order = await _orderService.GetOrderAsync(orderId); if (order == null) { return new OrderProcessResponse { OrderId = orderId, Success = false, Message = "Order not found", NewStatus = OrderStatus.Pending }; } order.Status = OrderStatus.Cancelled; order.UpdatedAt = DateTime.UtcNow; await _orderService.UpdateOrderAsync(order); return new OrderProcessResponse { OrderId = orderId, Success = true, Message = "Order cancelled successfully", NewStatus = OrderStatus.Cancelled }; } private async Task<OrderProcessResponse> UpdateOrderAsync(int orderId, IDictionary<string, string> metadata) { // Implementation for updating order details return new OrderProcessResponse { OrderId = orderId, Success = true, Message = "Order updated successfully", NewStatus = OrderStatus.Processing }; } private static OrderItemModel MapToOrderItem(global::ECommerce.Grpc.OrderItem grpcItem) { return new OrderItemModel { ProductId = grpcItem.ProductId, Quantity = grpcItem.Quantity, UnitPrice = (decimal)grpcItem.UnitPrice }; } private static OrderResponse MapToOrderResponse(Order order) { var response = new OrderResponse { OrderId = order.Id, UserId = order.UserId, TotalAmount = (double)order.TotalAmount, ShippingAddress = order.ShippingAddress, Status = MapToOrderStatus(order.Status), CreatedAt = Timestamp.FromDateTime(order.CreatedAt), UpdatedAt = Timestamp.FromDateTime(order.UpdatedAt) }; response.Items.AddRange(order.Items.Select(item => new global::ECommerce.Grpc.OrderItem { ProductId = item.ProductId, Quantity = item.Quantity, UnitPrice = (double)item.UnitPrice })); return response; } private static OrderStatus MapToOrderStatus(OrderStatusEnum status) { return status switch { OrderStatusEnum.Pending => OrderStatus.Pending, OrderStatusEnum.Confirmed => OrderStatus.Confirmed, OrderStatusEnum.Processing => OrderStatus.Processing, OrderStatusEnum.Shipped => OrderStatus.Shipped, OrderStatusEnum.Delivered => OrderStatus.Delivered, OrderStatusEnum.Cancelled => OrderStatus.Cancelled, _ => OrderStatus.Pending }; } private static OrderStatusEnum MapToOrderStatus(OrderStatus status) { return status switch { OrderStatus.Pending => OrderStatusEnum.Pending, OrderStatus.Confirmed => OrderStatusEnum.Confirmed, OrderStatus.Processing => OrderStatusEnum.Processing, OrderStatus.Shipped => OrderStatusEnum.Shipped, OrderStatus.Delivered => OrderStatusEnum.Delivered, OrderStatus.Cancelled => OrderStatusEnum.Cancelled, _ => OrderStatusEnum.Pending }; } } // Supporting classes public class Order { public int Id { get; set; } public int UserId { get; set; } public List<OrderItemModel> Items { get; set; } = new(); public decimal TotalAmount => Items.Sum(item => item.UnitPrice * item.Quantity); public string ShippingAddress { get; set; } = string.Empty; public OrderStatusEnum Status { get; set; } public DateTime CreatedAt { get; set; } public DateTime UpdatedAt { get; set; } } public class OrderItemModel { public int ProductId { get; set; } public int Quantity { get; set; } public decimal UnitPrice { get; set; } } public enum OrderStatusEnum { Pending, Confirmed, Processing, Shipped, Delivered, Cancelled }
8. gRPC Streaming Patterns <a name="grpc-streaming"></a>
Client-Side Streaming Implementation
// Client-side streaming example for bulk operations public class BulkUploadGrpcService : BulkUploadService.BulkUploadServiceBase { private readonly IProductService _productService; private readonly ILogger<BulkUploadGrpcService> _logger; public BulkUploadGrpcService( IProductService productService, ILogger<BulkUploadGrpcService> logger) { _productService = productService; _logger = logger; } public override async Task<BulkUploadResponse> UploadProducts( IAsyncStreamReader<ProductUploadRequest> requestStream, ServerCallContext context) { _logger.LogInformation("Starting bulk product upload"); var results = new List<ProductUploadResult>(); var successCount = 0; var errorCount = 0; await foreach (var request in requestStream.ReadAllAsync(context.CancellationToken)) { try { _logger.LogDebug("Processing product: {ProductName}", request.Name); var product = new Product { Name = request.Name, Description = request.Description, Price = (decimal)request.Price, StockQuantity = request.StockQuantity, CategoryId = request.CategoryId }; var createdProduct = await _productService.CreateProductAsync(product); successCount++; results.Add(new ProductUploadResult { ProductId = createdProduct.Id, Name = createdProduct.Name, Success = true, Message = "Product created successfully" }); _logger.LogDebug("Successfully created product: {ProductName}", request.Name); } catch (Exception ex) { errorCount++; results.Add(new ProductUploadResult { ProductId = 0, Name = request.Name, Success = false, Message = $"Error: {ex.Message}" }); _logger.LogError(ex, "Failed to create product: {ProductName}", request.Name); } } _logger.LogInformation( "Bulk upload completed: {SuccessCount} successful, {ErrorCount} errors", successCount, errorCount); var response = new BulkUploadResponse { TotalProcessed = successCount + errorCount, Successful = successCount, Failed = errorCount }; response.Results.AddRange(results); return response; } } // Protocol Buffers for bulk upload service BulkUploadService { rpc UploadProducts (stream ProductUploadRequest) returns (BulkUploadResponse); } message ProductUploadRequest { string name = 1; string description = 2; double price = 3; int32 stock_quantity = 4; string category_id = 5; } message BulkUploadResponse { int32 total_processed = 1; int32 successful = 2; int32 failed = 3; repeated ProductUploadResult results = 4; } message ProductUploadResult { int32 product_id = 1; string name = 2; bool success = 3; string message = 4; }
Bidirectional Streaming for Real-Time Collaboration
public class CollaborationGrpcService : CollaborationService.CollaborationServiceBase { private readonly ICollaborationSessionManager _sessionManager; private readonly ILogger<CollaborationGrpcService> _logger; public CollaborationGrpcService( ICollaborationSessionManager sessionManager, ILogger<CollaborationGrpcService> logger) { _sessionManager = sessionManager; _logger = logger; } public override async Task Collaborate( IAsyncStreamReader<CollaborationMessage> requestStream, IServerStreamWriter<CollaborationMessage> responseStream, ServerCallContext context) { var sessionId = GetSessionIdFromContext(context); var userId = GetUserIdFromContext(context); _logger.LogInformation( "Starting collaboration session {SessionId} for user {UserId}", sessionId, userId); // Join the collaboration session await _sessionManager.JoinSessionAsync(sessionId, userId, responseStream); try { // Process incoming messages from client await foreach (var message in requestStream.ReadAllAsync(context.CancellationToken)) { _logger.LogDebug( "Received collaboration message from user {UserId} in session {SessionId}", userId, sessionId); // Broadcast to other participants in the session await _sessionManager.BroadcastToSessionAsync( sessionId, userId, message, context.CancellationToken); } } catch (OperationCanceledException) { _logger.LogInformation( "Collaboration session {SessionId} cancelled for user {UserId}", sessionId, userId); } finally { // Leave the session await _sessionManager.LeaveSessionAsync(sessionId, userId); _logger.LogInformation( "User {UserId} left collaboration session {SessionId}", userId, sessionId); } } private string GetSessionIdFromContext(ServerCallContext context) { return context.RequestHeaders.FirstOrDefault(h => h.Key == "session-id")?.Value ?? throw new RpcException(new Status(StatusCode.InvalidArgument, "Session ID required")); } private string GetUserIdFromContext(ServerCallContext context) { return context.RequestHeaders.FirstOrDefault(h => h.Key == "user-id")?.Value ?? throw new RpcException(new Status(StatusCode.InvalidArgument, "User ID required")); } } // Collaboration session manager public interface ICollaborationSessionManager { Task JoinSessionAsync(string sessionId, string userId, IServerStreamWriter<CollaborationMessage> stream); Task LeaveSessionAsync(string sessionId, string userId); Task BroadcastToSessionAsync(string sessionId, string fromUserId, CollaborationMessage message, CancellationToken cancellationToken); } public class CollaborationSessionManager : ICollaborationSessionManager { private readonly ConcurrentDictionary<string, CollaborationSession> _sessions = new(); private readonly ILogger<CollaborationSessionManager> _logger; public CollaborationSessionManager(ILogger<CollaborationSessionManager> logger) { _logger = logger; } public async Task JoinSessionAsync(string sessionId, string userId, IServerStreamWriter<CollaborationMessage> stream) { var session = _sessions.GetOrAdd(sessionId, id => new CollaborationSession(id)); await session.AddParticipantAsync(userId, stream); // Notify other participants var joinMessage = new CollaborationMessage { Type = MessageType.UserJoined, SenderId = "system", SessionId = sessionId, Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), Content = $"{userId} joined the session", Data = { { "userId", userId } } }; await session.BroadcastAsync(joinMessage, userId); _logger.LogInformation("User {UserId} joined session {SessionId}", userId, sessionId); } public Task LeaveSessionAsync(string sessionId, string userId) { if (_sessions.TryGetValue(sessionId, out var session)) { session.RemoveParticipant(userId); // Notify other participants var leaveMessage = new CollaborationMessage { Type = MessageType.UserLeft, SenderId = "system", SessionId = sessionId, Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), Content = $"{userId} left the session", Data = { { "userId", userId } } }; _ = session.BroadcastAsync(leaveMessage, userId); // Fire and forget // Remove empty sessions if (session.ParticipantCount == 0) { _sessions.TryRemove(sessionId, out _); } _logger.LogInformation("User {UserId} left session {SessionId}", userId, sessionId); } return Task.CompletedTask; } public async Task BroadcastToSessionAsync(string sessionId, string fromUserId, CollaborationMessage message, CancellationToken cancellationToken) { if (_sessions.TryGetValue(sessionId, out var session)) { await session.BroadcastAsync(message, fromUserId, cancellationToken); } } } public class CollaborationSession { private readonly ConcurrentDictionary<string, IServerStreamWriter<CollaborationMessage>> _participants = new(); public string SessionId { get; } public int ParticipantCount => _participants.Count; public CollaborationSession(string sessionId) { SessionId = sessionId; } public Task AddParticipantAsync(string userId, IServerStreamWriter<CollaborationMessage> stream) { _participants[userId] = stream; return Task.CompletedTask; } public void RemoveParticipant(string userId) { _participants.TryRemove(userId, out _); } public async Task BroadcastAsync(CollaborationMessage message, string excludeUserId = "", CancellationToken cancellationToken = default) { var tasks = _participants .Where(p => p.Key != excludeUserId) .Select(async participant => { try { await participant.Value.WriteAsync(message); } catch (Exception ex) { // Handle disconnected clients Console.WriteLine($"Error sending to {participant.Key}: {ex.Message}"); } }); await Task.WhenAll(tasks); } }
9. Real-World Case Study: Live Dashboard <a name="case-study"></a>
Real-Time Analytics Dashboard
// Dashboard hub for real-time analytics public interface IDashboardClient { Task ReceiveMetrics(DashboardMetrics metrics); Task ReceiveAlert(Alert alert); Task SystemStatusChanged(SystemStatus status); Task UserActivity(UserActivityEvent activity); } public class DashboardHub : Hub<IDashboardClient> { private readonly IDashboardService _dashboardService; private readonly ILogger<DashboardHub> _logger; public DashboardHub( IDashboardService dashboardService, ILogger<DashboardHub> logger) { _dashboardService = dashboardService; _logger = logger; } public async Task SubscribeToMetrics(string dashboardId, string[] metricTypes) { await Groups.AddToGroupAsync(Context.ConnectionId, $"dashboard_{dashboardId}"); // Send current metrics var metrics = await _dashboardService.GetCurrentMetricsAsync(dashboardId, metricTypes); await Clients.Caller.ReceiveMetrics(metrics); _logger.LogInformation( "User subscribed to dashboard {DashboardId} with metrics: {MetricTypes}", dashboardId, string.Join(", ", metricTypes)); } public async Task UnsubscribeFromMetrics(string dashboardId) { await Groups.RemoveFromGroupAsync(Context.ConnectionId, $"dashboard_{dashboardId}"); _logger.LogInformation("User unsubscribed from dashboard {DashboardId}", dashboardId); } public async Task<UserActivityEvent> TrackUserActivity(string dashboardId, string activityType, string details) { var activity = new UserActivityEvent { DashboardId = dashboardId, UserId = GetUserId(), ActivityType = activityType, Details = details, Timestamp = DateTime.UtcNow }; await _dashboardService.RecordUserActivityAsync(activity); // Broadcast to other dashboard viewers (excluding the sender) await Clients.OthersInGroup($"dashboard_{dashboardId}").UserActivity(activity); return activity; } public override async Task OnConnectedAsync() { var userId = GetUserId(); var dashboardId = GetDashboardIdFromQuery(); if (!string.IsNullOrEmpty(dashboardId)) { await Groups.AddToGroupAsync(Context.ConnectionId, $"dashboard_{dashboardId}"); _logger.LogInformation( "User {UserId} connected to dashboard {DashboardId}", userId, dashboardId); } await base.OnConnectedAsync(); } public override async Task OnDisconnectedAsync(Exception? exception) { var userId = GetUserId(); var dashboardId = GetDashboardIdFromQuery(); if (!string.IsNullOrEmpty(dashboardId)) { await Groups.RemoveFromGroupAsync(Context.ConnectionId, $"dashboard_{dashboardId}"); _logger.LogInformation( "User {UserId} disconnected from dashboard {DashboardId}", userId, dashboardId); } await base.OnDisconnectedAsync(exception); } private string GetUserId() { return Context.User?.Identity?.Name ?? "anonymous"; } private string GetDashboardIdFromQuery() { var httpContext = Context.GetHttpContext(); return httpContext?.Request.Query["dashboardId"].ToString() ?? string.Empty; } } // Dashboard service for metrics collection public class DashboardService : IDashboardService { private readonly IHubContext<DashboardHub, IDashboardClient> _hubContext; private readonly IMetricsCollector _metricsCollector; private readonly IAlertService _alertService; private readonly ILogger<DashboardService> _logger; public DashboardService( IHubContext<DashboardHub, IDashboardClient> hubContext, IMetricsCollector metricsCollector, IAlertService alertService, ILogger<DashboardService> logger) { _hubContext = hubContext; _metricsCollector = metricsCollector; _alertService = alertService; _logger = logger; } public async Task<DashboardMetrics> GetCurrentMetricsAsync(string dashboardId, string[] metricTypes) { var metrics = new DashboardMetrics { DashboardId = dashboardId, Timestamp = DateTime.UtcNow }; foreach (var metricType in metricTypes) { var metric = await _metricsCollector.GetMetricAsync(metricType, dashboardId); if (metric != null) { metrics.Metrics.Add(metric); } } return metrics; } public async Task StartMetricsStreaming(string dashboardId, string[] metricTypes, TimeSpan interval) { _logger.LogInformation("Starting metrics streaming for dashboard {DashboardId}", dashboardId); var timer = new PeriodicTimer(interval); while (await timer.WaitForNextTickAsync()) { try { var metrics = await GetCurrentMetricsAsync(dashboardId, metricTypes); await _hubContext.Clients.Group($"dashboard_{dashboardId}").ReceiveMetrics(metrics); // Check for alerts await CheckAndSendAlertsAsync(dashboardId, metrics); } catch (Exception ex) { _logger.LogError(ex, "Error streaming metrics for dashboard {DashboardId}", dashboardId); } } } public async Task RecordUserActivityAsync(UserActivityEvent activity) { // Store activity in database // ... _logger.LogDebug( "Recorded user activity: {ActivityType} for dashboard {DashboardId}", activity.ActivityType, activity.DashboardId); } public async Task SendAlertAsync(string dashboardId, Alert alert) { await _hubContext.Clients.Group($"dashboard_{dashboardId}").ReceiveAlert(alert); _logger.LogWarning( "Alert sent to dashboard {DashboardId}: {AlertMessage}", dashboardId, alert.Message); } private async Task CheckAndSendAlertsAsync(string dashboardId, DashboardMetrics metrics) { foreach (var metric in metrics.Metrics) { var alert = await _alertService.CheckMetricForAlertAsync(metric, dashboardId); if (alert != null) { await SendAlertAsync(dashboardId, alert); } } } } // Metrics models public class DashboardMetrics { public string DashboardId { get; set; } = string.Empty; public DateTime Timestamp { get; set; } public List<Metric> Metrics { get; set; } = new(); } public class Metric { public string Type { get; set; } = string.Empty; public string Name { get; set; } = string.Empty; public double Value { get; set; } public string Unit { get; set; } = string.Empty; public Dictionary<string, string> Tags { get; set; } = new(); } public class Alert { public string Id { get; set; } = Guid.NewGuid().ToString(); public string DashboardId { get; set; } = string.Empty; public string Type { get; set; } = string.Empty; // "warning", "error", "info" public string Message { get; set; } = string.Empty; public DateTime Timestamp { get; set; } public Dictionary<string, object> Data { get; set; } = new(); } public class UserActivityEvent { public string DashboardId { get; set; } = string.Empty; public string UserId { get; set; } = string.Empty; public string ActivityType { get; set; } = string.Empty; public string Details { get; set; } = string.Empty; public DateTime Timestamp { get; set; } } public class SystemStatus { public string Component { get; set; } = string.Empty; public string Status { get; set; } = string.Empty; // "healthy", "degraded", "down" public string Message { get; set; } = string.Empty; public DateTime LastChecked { get; set; } }
gRPC Dashboard Service
// gRPC service for dashboard data public class DashboardGrpcService : DashboardService.DashboardServiceBase { private readonly IDashboardDataService _dataService; private readonly ILogger<DashboardGrpcService> _logger; public DashboardGrpcService( IDashboardDataService dataService, ILogger<DashboardGrpcService> logger) { _dataService = dataService; _logger = logger; } public override async Task GetRealTimeMetrics( DashboardMetricsRequest request, IServerStreamWriter<DashboardMetricsResponse> responseStream, ServerCallContext context) { _logger.LogInformation( "Starting real-time metrics stream for dashboard {DashboardId}", request.DashboardId); var cancellationToken = context.CancellationToken; var metricsChannel = _dataService.GetRealTimeMetricsStream( request.DashboardId, request.MetricTypes.ToArray(), cancellationToken); try { await foreach (var metrics in metricsChannel.ReadAllAsync(cancellationToken)) { if (cancellationToken.IsCancellationRequested) break; var response = new DashboardMetricsResponse { DashboardId = metrics.DashboardId, Timestamp = Timestamp.FromDateTime(metrics.Timestamp) }; response.Metrics.AddRange(metrics.Metrics.Select(m => new MetricData { Type = m.Type, Name = m.Name, Value = m.Value, Unit = m.Unit })); await responseStream.WriteAsync(response); } } catch (OperationCanceledException) { _logger.LogInformation("Metrics stream cancelled for dashboard {DashboardId}", request.DashboardId); } _logger.LogInformation("Metrics stream completed for dashboard {DashboardId}", request.DashboardId); } public override async Task StreamAlerts( AlertStreamRequest request, IServerStreamWriter<AlertResponse> responseStream, ServerCallContext context) { _logger.LogInformation("Starting alert stream for dashboard {DashboardId}", request.DashboardId); var cancellationToken = context.CancellationToken; var alertChannel = _dataService.GetAlertStream(request.DashboardId, cancellationToken); try { await foreach (var alert in alertChannel.ReadAllAsync(cancellationToken)) { if (cancellationToken.IsCancellationRequested) break; var response = new AlertResponse { AlertId = alert.Id, Type = alert.Type, Message = alert.Message, Timestamp = Timestamp.FromDateTime(alert.Timestamp) }; // Add alert data foreach (var (key, value) in alert.Data) { response.Data.Add(key, value.ToString() ?? string.Empty); } await responseStream.WriteAsync(response); } } catch (OperationCanceledException) { _logger.LogInformation("Alert stream cancelled for dashboard {DashboardId}", request.DashboardId); } _logger.LogInformation("Alert stream completed for dashboard {DashboardId}", request.DashboardId); } public override async Task<DashboardSummary> GetDashboardSummary( DashboardSummaryRequest request, ServerCallContext context) { _logger.LogInformation("Getting summary for dashboard {DashboardId}", request.DashboardId); var summary = await _dataService.GetDashboardSummaryAsync( request.DashboardId, request.FromDate?.ToDateTime(), request.ToDate?.ToDateTime()); return new DashboardSummary { DashboardId = summary.DashboardId, TotalUsers = summary.TotalUsers, ActiveUsers = summary.ActiveUsers, TotalAlerts = summary.TotalAlerts, SystemHealth = summary.SystemHealth, LastUpdated = Timestamp.FromDateTime(summary.LastUpdated) }; } }
10. Security and Authentication <a name="security"></a>
SignalR Authentication and Authorization
// Custom authentication for SignalR public class CustomUserIdProvider : IUserIdProvider { public string GetUserId(HubConnectionContext connection) { // Get user ID from claims return connection.User?.FindFirst(ClaimTypes.NameIdentifier)?.Value ?? connection.ConnectionId; } } // Register in Program.cs builder.Services.AddSingleton<IUserIdProvider, CustomUserIdProvider>(); // Authorization in hubs [Authorize] public class SecureChatHub : Hub { private readonly ILogger<SecureChatHub> _logger; public SecureChatHub(ILogger<SecureChatHub> logger) { _logger = logger; } [Authorize(Roles = "Admin,Moderator")] public async Task DeleteMessage(string messageId) { var user = Context.User; var userName = user?.Identity?.Name ?? "unknown"; _logger.LogInformation("User {UserName} deleted message {MessageId}", userName, messageId); await Clients.All.SendAsync("MessageDeleted", messageId, userName); } [Authorize(Policy = "ChatAccess")] public async Task SendMessage(string message) { var user = Context.User; var userName = user?.Identity?.Name ?? "unknown"; await Clients.All.SendAsync("ReceiveMessage", userName, message, DateTime.UtcNow); } public override async Task OnConnectedAsync() { var user = Context.User; var userName = user?.Identity?.Name ?? "unknown"; _logger.LogInformation("User {UserName} connected to secure chat", userName); await base.OnConnectedAsync(); } } // JWT authentication for SignalR public class JwtTokenQueryStringAuthHandler : AuthenticationHandler<JwtBearerOptions> { public JwtTokenQueryStringAuthHandler( IOptionsMonitor<JwtBearerOptions> options, ILoggerFactory logger, UrlEncoder encoder, ISystemClock clock) : base(options, logger, encoder, clock) { } protected override async Task<AuthenticateResult> HandleAuthenticateAsync() { if (!Context.WebSockets.IsWebSocketRequest && !Request.Query.ContainsKey("access_token")) { return AuthenticateResult.NoResult(); } var token = Request.Query["access_token"]; if (string.IsNullOrEmpty(token)) { return AuthenticateResult.Fail("Token is required"); } // Set the token in the Authorization header so JwtBearerHandler can process it Request.Headers["Authorization"] = $"Bearer {token}"; return await Context.AuthenticateAsync(Scheme.Name); } }
gRPC Authentication and Interceptors
// gRPC authentication interceptor public class AuthInterceptor : Interceptor { private readonly ILogger<AuthInterceptor> _logger; public AuthInterceptor(ILogger<AuthInterceptor> logger) { _logger = logger; } public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>( TRequest request, ServerCallContext context, UnaryServerMethod<TRequest, TResponse> continuation) { // Extract and validate JWT token var token = GetTokenFromContext(context); if (string.IsNullOrEmpty(token)) { throw new RpcException(new Status(StatusCode.Unauthenticated, "Token required")); } var principal = await ValidateTokenAsync(token); if (principal == null) { throw new RpcException(new Status(StatusCode.Unauthenticated, "Invalid token")); } // Set user in context for use in service methods context.UserState["user"] = principal; return await continuation(request, context); } private string? GetTokenFromContext(ServerCallContext context) { var headers = context.RequestHeaders; var authHeader = headers.FirstOrDefault(h => h.Key == "authorization"); if (authHeader != null) { return authHeader.Value?.StartsWith("Bearer ") == true ? authHeader.Value.Substring(7) : null; } return null; } private async Task<ClaimsPrincipal?> ValidateTokenAsync(string token) { try { var handler = new JwtSecurityTokenHandler(); var jwtToken = handler.ReadJwtToken(token); // Validate token (in real app, use proper validation) var claims = jwtToken.Claims; var identity = new ClaimsIdentity(claims, "JWT"); return new ClaimsPrincipal(identity); } catch (Exception ex) { _logger.LogError(ex, "Token validation failed"); return null; } } } // Role-based authorization for gRPC [AttributeUsage(AttributeTargets.Method | AttributeTargets.Class, Inherited = true, AllowMultiple = true)] public class AuthorizeGrpcAttribute : Attribute { public string[] Roles { get; set; } = Array.Empty<string>(); public string Policy { get; set; } = string.Empty; } public class AuthorizationInterceptor : Interceptor { private readonly ILogger<AuthorizationInterceptor> _logger; public AuthorizationInterceptor(ILogger<AuthorizationInterceptor> logger) { _logger = logger; } public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>( TRequest request, ServerCallContext context, UnaryServerMethod<TRequest, TResponse> continuation) { // Check for AuthorizeGrpc attribute var method = context.GetMethodDescriptor(); var attribute = GetAuthorizeAttribute(method); if (attribute != null) { var user = context.GetHttpContext().User; if (!user.Identity?.IsAuthenticated == true) { throw new RpcException(new Status(StatusCode.Unauthenticated, "Authentication required")); } // Check roles if (attribute.Roles.Any() && !attribute.Roles.Any(user.IsInRole)) { throw new RpcException(new Status(StatusCode.PermissionDenied, "Insufficient permissions")); } // Check policy (you would implement policy checking here) if (!string.IsNullOrEmpty(attribute.Policy)) { // Implement policy checking } } return await continuation(request, context); } private AuthorizeGrpcAttribute? GetAuthorizeAttribute(IMethod method) { // This is simplified - in real implementation, you'd use reflection // to get attributes from the service method return null; } } // Secure gRPC service with authentication public class SecureProductGrpcService : ProductService.ProductServiceBase { private readonly IProductService _productService; private readonly ILogger<SecureProductGrpcService> _logger; public SecureProductGrpcService( IProductService productService, ILogger<SecureProductGrpcService> logger) { _productService = productService; _logger = logger; } public override async Task<ProductResponse> CreateProduct( CreateProductRequest request, ServerCallContext context) { var user = context.GetHttpContext().User; var userName = user.Identity?.Name ?? "unknown"; _logger.LogInformation("User {UserName} creating product: {ProductName}", userName, request.Name); // Check if user has permission to create products if (!user.IsInRole("Admin") && !user.IsInRole("ProductManager")) { throw new RpcException( new Status(StatusCode.PermissionDenied, "Insufficient permissions to create products")); } var product = new Product { Name = request.Name, Description = request.Description, Price = (decimal)request.Price, StockQuantity = request.StockQuantity, CategoryId = request.CategoryId, CreatedBy = userName }; var createdProduct = await _productService.CreateProductAsync(product); return MapToProductResponse(createdProduct); } [AuthorizeGrpc(Roles = "Admin")] public override async Task<Empty> DeleteProduct( DeleteProductRequest request, ServerCallContext context) { _logger.LogInformation("Admin user deleting product {ProductId}", request.ProductId); var success = await _productService.DeleteProductAsync(request.ProductId); if (!success) { throw new RpcException( new Status(StatusCode.NotFound, $"Product {request.ProductId} not found")); } return new Empty(); } private static ProductResponse MapToProductResponse(Product product) { return new ProductResponse { ProductId = product.Id, Name = product.Name, Description = product.Description, Price = (double)product.Price, StockQuantity = product.StockQuantity, CategoryId = product.CategoryId, CreatedAt = Timestamp.FromDateTime(product.CreatedAt), UpdatedAt = Timestamp.FromDateTime(product.UpdatedAt) }; } }
11. Performance Optimization <a name="performance"></a>
SignalR Performance Tuning
// Optimized SignalR configuration builder.Services.AddSignalR(options => { options.EnableDetailedErrors = false; // Disable in production options.KeepAliveInterval = TimeSpan.FromSeconds(15); options.ClientTimeoutInterval = TimeSpan.FromSeconds(30); options.MaximumReceiveMessageSize = 64 * 1024; // 64KB options.StreamBufferCapacity = 10; // Use MessagePack for better performance options.AddMessagePackProtocol(); }) .AddMessagePackProtocol(options => { options.SerializerOptions = MessagePackSerializerOptions.Standard .WithResolver(StandardResolverAllowPrivate.Instance) .WithCompression(MessagePackCompression.Lz4BlockArray); }); // Connection management for performance public class ConnectionManager { private readonly ConcurrentDictionary<string, ConnectionInfo> _connections = new(); private readonly ILogger<ConnectionManager> _logger; public ConnectionManager(ILogger<ConnectionManager> logger) { _logger = logger; } public void AddConnection(string connectionId, string userId) { _connections[connectionId] = new ConnectionInfo { ConnectionId = connectionId, UserId = userId, ConnectedAt = DateTime.UtcNow, LastActivity = DateTime.UtcNow }; _logger.LogDebug("Connection added: {ConnectionId} for user {UserId}", connectionId, userId); } public void UpdateActivity(string connectionId) { if (_connections.TryGetValue(connectionId, out var info)) { info.LastActivity = DateTime.UtcNow; } } public void RemoveConnection(string connectionId) { _connections.TryRemove(connectionId, out _); _logger.LogDebug("Connection removed: {ConnectionId}", connectionId); } public void CleanupInactiveConnections(TimeSpan maxInactivity) { var cutoff = DateTime.UtcNow - maxInactivity; var inactiveConnections = _connections.Values .Where(c => c.LastActivity < cutoff) .ToList(); foreach (var connection in inactiveConnections) { _connections.TryRemove(connection.ConnectionId, out _); _logger.LogInformation("Removed inactive connection: {ConnectionId}", connection.ConnectionId); } } public int GetActiveConnectionCount() { return _connections.Count; } public IEnumerable<string> GetUserConnectionIds(string userId) { return _connections.Values .Where(c => c.UserId == userId) .Select(c => c.ConnectionId); } } public class ConnectionInfo { public string ConnectionId { get; set; } = string.Empty; public string UserId { get; set; } = string.Empty; public DateTime ConnectedAt { get; set; } public DateTime LastActivity { get; set; } }
gRPC Performance Optimization
// Optimized gRPC configuration builder.Services.AddGrpc(options => { options.EnableDetailedErrors = false; // Disable in production options.MaxReceiveMessageSize = 16 * 1024 * 1024; // 16MB options.MaxSendMessageSize = 16 * 1024 * 1024; // 16MB options.CompressionProviders = new List<ICompressionProvider> { new GzipCompressionProvider(CompressionLevel.Fastest) }; options.ResponseCompressionAlgorithm = "gzip"; options.ResponseCompressionLevel = CompressionLevel.Fastest; // Add performance interceptors options.Interceptors.Add<MetricsInterceptor>(); options.Interceptors.Add<CachingInterceptor>(); }); // Connection pooling for gRPC clients public class GrpcConnectionPool { private readonly ConcurrentDictionary<string, ChannelBase> _channels = new(); private readonly ILogger<GrpcConnectionPool> _logger; public GrpcConnectionPool(ILogger<GrpcConnectionPool> logger) { _logger = logger; } public ChannelBase GetChannel(string address) { return _channels.GetOrAdd(address, addr => { _logger.LogInformation("Creating gRPC channel to {Address}", addr); return new Channel(addr, ChannelCredentials.Insecure, new ChannelOptions { MaxSendMessageSize = 16 * 1024 * 1024, MaxReceiveMessageSize = 16 * 1024 * 1024, MaxRetryAttempts = 3, MaxRetryBufferSize = 16 * 1024 * 1024, MaxRetryBufferPerCallSize = 1 * 1024 * 1024 }); }); } public async Task ShutdownAsync() { foreach (var channel in _channels.Values) { await channel.ShutdownAsync(); } _channels.Clear(); _logger.LogInformation("All gRPC channels shutdown"); } } // Performance monitoring interceptor public class MetricsInterceptor : Interceptor { private readonly IMetricsCollector _metrics; private readonly ILogger<MetricsInterceptor> _logger; public MetricsInterceptor(IMetricsCollector metrics, ILogger<MetricsInterceptor> logger) { _metrics = metrics; _logger = logger; } public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>( TRequest request, ServerCallContext context, UnaryServerMethod<TRequest, TResponse> continuation) { var stopwatch = Stopwatch.StartNew(); var methodName = context.Method; try { var response = await continuation(request, context); stopwatch.Stop(); // Record success metrics await _metrics.RecordGrpcCallAsync(methodName, stopwatch.ElapsedMilliseconds, true); return response; } catch (Exception ex) { stopwatch.Stop(); // Record error metrics await _metrics.RecordGrpcCallAsync(methodName, stopwatch.ElapsedMilliseconds, false); _logger.LogError(ex, "gRPC call {Method} failed", methodName); throw; } } public override async Task ServerStreamingServerHandler<TRequest, TResponse>( TRequest request, IServerStreamWriter<TResponse> responseStream, ServerCallContext context, ServerStreamingServerMethod<TRequest, TResponse> continuation) { var stopwatch = Stopwatch.StartNew(); var methodName = context.Method; try { await continuation(request, responseStream, context); stopwatch.Stop(); await _metrics.RecordGrpcCallAsync(methodName, stopwatch.ElapsedMilliseconds, true); } catch (Exception ex) { stopwatch.Stop(); await _metrics.RecordGrpcCallAsync(methodName, stopwatch.ElapsedMilliseconds, false); _logger.LogError(ex, "gRPC streaming call {Method} failed", methodName); throw; } } } // Response caching interceptor public class CachingInterceptor : Interceptor { private readonly IMemoryCache _cache; private readonly ILogger<CachingInterceptor> _logger; public CachingInterceptor(IMemoryCache cache, ILogger<CachingInterceptor> logger) { _cache = cache; _logger = logger; } public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>( TRequest request, ServerCallContext context, UnaryServerMethod<TRequest, TResponse> continuation) { // Only cache GET-equivalent methods if (!IsCacheableMethod(context.Method)) { return await continuation(request, context); } var cacheKey = GenerateCacheKey(context.Method, request); if (_cache.TryGetValue<TResponse>(cacheKey, out var cachedResponse)) { _logger.LogDebug("Cache hit for {Method}", context.Method); return cachedResponse!; } var response = await continuation(request, context); // Cache the response var cacheOptions = new MemoryCacheEntryOptions { AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(5), Size = 1 // Relative size for cache management }; _cache.Set(cacheKey, response, cacheOptions); _logger.LogDebug("Cached response for {Method}", context.Method); return response; } private bool IsCacheableMethod(string methodName) { var cacheableMethods = new[] { "GetProduct", "GetProducts", "GetOrder" }; return cacheableMethods.Any(m => methodName.Contains(m)); } private string GenerateCacheKey(string methodName, object request) { var serialized = JsonSerializer.Serialize(request); var hash = Convert.ToBase64String(SHA256.HashData(Encoding.UTF8.GetBytes(serialized))); return $"{methodName}:{hash}"; } }
12. Testing and Debugging <a name="testing"></a>
SignalR Testing Utilities
// Test hub context for unit testing public class TestHubContext<THub, T> : IHubContext<THub, T> where THub : Hub<T> where T : class { public TestHubContext() { Clients = new TestHubClients<T>(); Groups = new TestGroupManager(); } public IHubClients<T> Clients { get; } public IGroupManager Groups { get; } // Additional properties for testing public List<ClientMessage> SentMessages => ((TestHubClients<T>)Clients).SentMessages; } public class TestHubClients<T> : IHubClients<T> { public List<ClientMessage> SentMessages { get; } = new(); public T All => CreateProxy("All"); public T Caller => CreateProxy("Caller"); public T Others => CreateProxy("Others"); public T Client(string connectionId) => CreateProxy($"Client:{connectionId}"); public T Clients(IReadOnlyList<string> connectionIds) => CreateProxy($"Clients:{string.Join(",", connectionIds)}"); public T Group(string groupName) => CreateProxy($"Group:{groupName}"); public T Groups(IReadOnlyList<string> groupNames) => CreateProxy($"Groups:{string.Join(",", groupNames)}"); public T User(string userId) => CreateProxy($"User:{userId}"); public T Users(IReadOnlyList<string> userIds) => CreateProxy($"Users:{string.Join(",", userIds)}"); private T CreateProxy(string target) { var proxy = new Mock<T>(); // Record all method calls var methods = typeof(T).GetMethods(); foreach (var method in methods) { var parameters = method.GetParameters(); var returnType = method.ReturnType; proxy.Setup(m => m.GetType().GetMethod(method.Name, parameters)) .Returns(() => { return method.Invoke(proxy.Object, parameters.Select(p => It.IsAny<object>()).ToArray()); }); // For async methods if (returnType == typeof(Task)) { proxy.Setup(GetMethodExpression(method, parameters)) .Returns(() => Task.CompletedTask) .Callback<object[]>(args => { SentMessages.Add(new ClientMessage { Target = target, Method = method.Name, Arguments = args }); }); } else if (returnType.IsGenericType && returnType.GetGenericTypeDefinition() == typeof(Task<>)) { var resultType = returnType.GetGenericArguments()[0]; var taskType = typeof(Task<>).MakeGenericType(resultType); var completedTask = taskType.GetMethod("FromResult")?.Invoke(null, new[] { CreateDefault(resultType) }); proxy.Setup(GetMethodExpression(method, parameters)) .Returns(() => (Task)completedTask!) .Callback<object[]>(args => { SentMessages.Add(new ClientMessage { Target = target, Method = method.Name, Arguments = args }); }); } } return proxy.Object; } private static object CreateDefault(Type type) { return type.IsValueType ? Activator.CreateInstance(type)! : null!; } private static Expression<Func<T, object>> GetMethodExpression(System.Reflection.MethodInfo method, ParameterInfo[] parameters) { // This is simplified - you'd need to build proper expression trees throw new NotImplementedException(); } } public class ClientMessage { public string Target { get; set; } = string.Empty; public string Method { get; set; } = string.Empty; public object[] Arguments { get; set; } = Array.Empty<object>(); } // Unit tests for SignalR hubs public class ChatHubTests { private readonly TestHubContext<ChatHub, IChatClient> _hubContext; private readonly ChatHub _hub; private readonly Mock<IChatService> _chatServiceMock; public ChatHubTests() { _hubContext = new TestHubContext<ChatHub, IChatClient>(); _chatServiceMock = new Mock<IChatService>(); _hub = new ChatHub(_chatServiceMock.Object) { Context = new TestHubCallerContext(), Clients = _hubContext.Clients, Groups = _hubContext.Groups }; } [Fact] public async Task SendMessage_ShouldBroadcastToAllClients() { // Arrange var user = "testuser"; var message = "Hello, world!"; // Act await _hub.SendMessage(user, message); // Assert var sentMessage = _hubContext.SentMessages.FirstOrDefault(m => m.Method == "ReceiveMessage"); Assert.NotNull(sentMessage); Assert.Equal("All", sentMessage.Target); Assert.Equal(user, sentMessage.Arguments[0]); Assert.Equal(message, sentMessage.Arguments[1]); } [Fact] public async Task JoinChat_ShouldAddUserToGroup() { // Arrange var userName = "testuser"; // Act await _hub.JoinChat(userName); // Assert var groupMessage = _hubContext.SentMessages.FirstOrDefault(m => m.Method == "UserJoined"); Assert.NotNull(groupMessage); Assert.Equal("All", groupMessage.Target); Assert.Equal(userName, groupMessage.Arguments[0]); } } // Integration tests for SignalR public class SignalRIntegrationTests : IClassFixture<WebApplicationFactory<Program>> { private readonly WebApplicationFactory<Program> _factory; public SignalRIntegrationTests(WebApplicationFactory<Program> factory) { _factory = factory; } [Fact] public async Task CanConnectToChatHub() { // Arrange var client = _factory.CreateClient(); var connection = new HubConnectionBuilder() .WithUrl($"{client.BaseAddress}chathub", options => { options.HttpMessageHandlerFactory = _ => _factory.Server.CreateHandler(); }) .Build(); var connected = false; connection.Closed += ex => Task.CompletedTask; // Act & Assert try { await connection.StartAsync(); connected = true; } finally { if (connected) { await connection.StopAsync(); } } Assert.True(connected); } }
gRPC Testing
// gRPC service unit tests public class ProductGrpcServiceTests { private readonly Mock<IProductService> _productServiceMock; private readonly Mock<ILogger<ProductGrpcService>> _loggerMock; private readonly ProductGrpcService _service; private readonly TestServerCallContext _context; public ProductGrpcServiceTests() { _productServiceMock = new Mock<IProductService>(); _loggerMock = new Mock<ILogger<ProductGrpcService>>(); _service = new ProductGrpcService(_productServiceMock.Object, _loggerMock.Object); _context = new TestServerCallContext(); } [Fact] public async Task GetProduct_WithValidId_ReturnsProduct() { // Arrange var productId = 1; var product = new Product { Id = productId, Name = "Test Product", Price = 9.99m }; _productServiceMock.Setup(x => x.GetProductAsync(productId)) .ReturnsAsync(product); var request = new GetProductRequest { ProductId = productId }; // Act var response = await _service.GetProduct(request, _context); // Assert Assert.NotNull(response); Assert.Equal(productId, response.ProductId); Assert.Equal("Test Product", response.Name); Assert.Equal(9.99, response.Price); } [Fact] public async Task GetProduct_WithInvalidId_ThrowsNotFound() { // Arrange var productId = 999; _productServiceMock.Setup(x => x.GetProductAsync(productId)) .ReturnsAsync((Product)null!); var request = new GetProductRequest { ProductId = productId }; // Act & Assert var exception = await Assert.ThrowsAsync<RpcException>(() => _service.GetProduct(request, _context)); Assert.Equal(StatusCode.NotFound, exception.StatusCode); } } // Test server call context public class TestServerCallContext : ServerCallContext { private readonly Metadata _requestHeaders; private readonly CancellationToken _cancellationToken; private readonly Metadata _responseTrailers; private readonly AuthContext _authContext; private readonly Dictionary<object, object> _userState; private WriteOptions? _writeOptions; public TestServerCallContext() { _requestHeaders = new Metadata(); _cancellationToken = new CancellationToken(); _responseTrailers = new Metadata(); _authContext = new AuthContext(string.Empty, new Dictionary<string, List<AuthProperty>>()); _userState = new Dictionary<object, object>(); } protected override string MethodCore => "TestMethod"; protected override string HostCore => "localhost"; protected override string PeerCore => "127.0.0.1:50051"; protected override DateTime DeadlineCore => DateTime.MaxValue; protected override Metadata RequestHeadersCore => _requestHeaders; protected override CancellationToken CancellationTokenCore => _cancellationToken; protected override Metadata ResponseTrailersCore => _responseTrailers; protected override Status StatusCore { get; set; } protected override WriteOptions WriteOptionsCore { get => _writeOptions!; set => _writeOptions = value; } protected override AuthContext AuthContextCore => _authContext; protected override ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions? options) { throw new NotImplementedException(); } protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders) { throw new NotImplementedException(); } protected override IDictionary<object, object> UserStateCore => _userState; } // Integration tests for gRPC public class GrpcIntegrationTests : IClassFixture<GrpcTestFixture<Program>> { private readonly GrpcTestFixture<Program> _fixture; public GrpcIntegrationTests(GrpcTestFixture<Program> fixture) { _fixture = fixture; } [Fact] public async Task GetProduct_ReturnsProduct() { // Arrange var client = new ProductService.ProductServiceClient(_fixture.Channel); // Act var response = await client.GetProductAsync(new GetProductRequest { ProductId = 1 }); // Assert Assert.NotNull(response); Assert.Equal(1, response.ProductId); Assert.NotEmpty(response.Name); } } public class GrpcTestFixture<TStartup> : IDisposable where TStartup : class { private readonly WebApplicationFactory<TStartup> _factory; private readonly IHost _host; public GrpcChannel Channel { get; } public GrpcTestFixture() { _factory = new WebApplicationFactory<TStartup>(); var client = _factory.CreateDefaultClient(new ResponseVersionHandler()); Channel = GrpcChannel.ForAddress(client.BaseAddress!, new GrpcChannelOptions { HttpClient = client }); } public void Dispose() { Channel?.Dispose(); _factory?.Dispose(); } private class ResponseVersionHandler : DelegatingHandler { protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) { var response = await base.SendAsync(request, cancellationToken); response.Version = request.Version; return response; } } }
13. Production Deployment <a name="deployment"></a>
Docker Configuration
# Dockerfile for SignalR + gRPC application FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base WORKDIR /app EXPOSE 80 EXPOSE 443 EXPOSE 5000 EXPOSE 5001 FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build WORKDIR /src COPY ["ECommerce.Server/ECommerce.Server.csproj", "ECommerce.Server/"] RUN dotnet restore "ECommerce.Server/ECommerce.Server.csproj" COPY . . WORKDIR "/src/ECommerce.Server" RUN dotnet build "ECommerce.Server.csproj" -c Release -o /app/build FROM build AS publish RUN dotnet publish "ECommerce.Server.csproj" -c Release -o /app/publish FROM base AS final WORKDIR /app COPY --from=publish /app/publish . ENTRYPOINT ["dotnet", "ECommerce.Server.dll"]
Kubernetes Configuration
# kubernetes/deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: ecommerce-server labels: app: ecommerce-server spec: replicas: 3 selector: matchLabels: app: ecommerce-server template: metadata: labels: app: ecommerce-server spec: containers: - name: ecommerce-server image: ecommerce/server:latest ports: - containerPort: 80 - containerPort: 443 - containerPort: 5000 # gRPC - containerPort: 5001 # gRPC with TLS env: - name: ASPNETCORE_ENVIRONMENT value: "Production" - name: ConnectionStrings__DefaultConnection valueFrom: secretKeyRef: name: db-secret key: connection-string - name: Azure__SignalR__ConnectionString valueFrom: secretKeyRef: name: azure-secret key: signalr-connection-string resources: requests: memory: "256Mi" cpu: "250m" limits: memory: "512Mi" cpu: "500m" livenessProbe: httpGet: path: /health port: 80 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /health/ready port: 80 initialDelaySeconds: 5 periodSeconds: 5 --- apiVersion: v1 kind: Service metadata: name: ecommerce-service spec: selector: app: ecommerce-server ports: - name: http port: 80 targetPort: 80 - name: https port: 443 targetPort: 443 - name: grpc port: 5000 targetPort: 5000 - name: grpc-tls port: 5001 targetPort: 5001 type: LoadBalancer
Health Checks and Monitoring
// Comprehensive health checks builder.Services.AddHealthChecks() .AddCheck<SignalRHealthCheck>("signalr", tags: new[] { "live", "ready" }) .AddCheck<GrpcHealthCheck>("grpc", tags: new[] { "live", "ready" }) .AddSqlServer( builder.Configuration.GetConnectionString("DefaultConnection"), name: "database", tags: new[] { "ready" }) .AddRedis( builder.Configuration.GetConnectionString("Redis"), name: "redis", tags: new[] { "ready" }) .AddAzureSignalR( builder.Configuration.GetConnectionString("AzureSignalR"), name: "azure-signalr", tags: new[] { "ready" }) .AddApplicationInsightsPublisher(); // Custom health checks public class SignalRHealthCheck : IHealthCheck { private readonly IHubContext<ChatHub> _hubContext; private readonly ILogger<SignalRHealthCheck> _logger; public SignalRHealthCheck(IHubContext<ChatHub> hubContext, ILogger<SignalRHealthCheck> logger) { _hubContext = hubContext; _logger = logger; } public async Task<HealthCheckResult> CheckHealthAsync( HealthCheckContext context, CancellationToken cancellationToken = default) { try { // Test SignalR connectivity by sending a test message await _hubContext.Clients.All.SendAsync("HealthCheck", DateTime.UtcNow); return HealthCheckResult.Healthy("SignalR is responding"); } catch (Exception ex) { _logger.LogError(ex, "SignalR health check failed"); return HealthCheckResult.Unhealthy("SignalR is not responding", ex); } } } public class GrpcHealthCheck : IHealthCheck { private readonly ProductService.ProductServiceClient _client; private readonly ILogger<GrpcHealthCheck> _logger; public GrpcHealthCheck(ProductService.ProductServiceClient client, ILogger<GrpcHealthCheck> logger) { _client = client; _logger = logger; } public async Task<HealthCheckResult> CheckHealthAsync( HealthCheckContext context, CancellationToken cancellationToken = default) { try { // Test gRPC service with a simple call var response = await _client.GetProductAsync( new GetProductRequest { ProductId = 1 }, cancellationToken: cancellationToken); return HealthCheckResult.Healthy("gRPC services are responding"); } catch (Exception ex) { _logger.LogError(ex, "gRPC health check failed"); return HealthCheckResult.Unhealthy("gRPC services are not responding", ex); } } } // Health check endpoints app.MapHealthChecks("/health", new HealthCheckOptions { Predicate = check => check.Tags.Contains("live"), ResponseWriter = WriteHealthCheckResponse }); app.MapHealthChecks("/health/ready", new HealthCheckOptions { Predicate = check => check.Tags.Contains("ready"), ResponseWriter = WriteHealthCheckResponse }); private static Task WriteHealthCheckResponse(HttpContext context, HealthReport result) { context.Response.ContentType = "application/json; charset=utf-8"; var options = new JsonWriterOptions { Indented = true }; using var stream = new MemoryStream(); using (var writer = new Utf8JsonWriter(stream, options)) { writer.WriteStartObject(); writer.WriteString("status", result.Status.ToString()); writer.WriteString("totalDuration", result.TotalDuration.ToString()); writer.WriteStartObject("entries"); foreach (var entry in result.Entries) { writer.WriteStartObject(entry.Key); writer.WriteString("status", entry.Value.Status.ToString()); writer.WriteString("description", entry.Value.Description); writer.WriteString("duration", entry.Value.Duration.ToString()); if (entry.Value.Exception != null) { writer.WriteString("exception", entry.Value.Exception.Message); } writer.WriteStartObject("data"); foreach (var item in entry.Value.Data) { writer.WriteString(item.Key, item.Value?.ToString()); } writer.WriteEndObject(); writer.WriteEndObject(); } writer.WriteEndObject(); writer.WriteEndObject(); } var json = Encoding.UTF8.GetString(stream.ToArray()); return context.Response.WriteAsync(json); }
14. Conclusion <a name="conclusion"></a>
In this comprehensive guide, we've explored the powerful real-time communication capabilities of ASP.NET Core through SignalR and gRPC. These technologies enable you to build highly interactive, responsive, and performant applications that meet modern user expectations.
Key Takeaways
SignalR Excellence: Perfect for browser-based real-time features like chat, notifications, and live updates
gRPC Performance: Ideal for high-performance microservices communication with built-in streaming
Hybrid Approach: Combine both technologies for comprehensive real-time capabilities
Production Ready: Implement proper security, scaling, and monitoring for enterprise applications
When to Choose Each Technology
Choose SignalR for:
Web browser clients
Real-time user interfaces
Chat applications
Live notifications
Collaborative features
Choose gRPC for:
Microservices communication
High-performance APIs
Mobile applications
Internal service-to-service calls
Streaming large datasets
Best Practices Summary
Use strongly-typed hubs for better IntelliSense and type safety
Implement proper error handling and reconnection logic
Use message packing for better performance in SignalR
Implement proper authentication and authorization for both technologies
Use interceptors for cross-cutting concerns in gRPC
Monitor performance and set up proper health checks
Plan for scaling with backplanes and connection management
Continuing Your Journey
Real-time communication is a vast field with many advanced patterns and techniques. Continue exploring:
Advanced SignalR patterns like filters and custom protocols
gRPC advanced features like deadlines, cancellation, and complex streaming
Integration with message brokers like RabbitMQ or Azure Service Bus
Real-time data synchronization patterns
Advanced security scenarios like mutual TLS and certificate authentication
By mastering SignalR and gRPC, you'll be well-equipped to build the next generation of real-time, interactive applications that delight users and meet modern business requirements.
Powered By: FreeLearning365.com
.png)
0 Comments
thanks for your comments!