userModel = new UserModel(); $this->chatMessageModel = new ChatMessageModel(); $this->activeViewerModel = new ActiveViewerModel(); } /** * Handle SSE connection for real-time chat updates */ public function handleSSE($userId) { // Validate user ID if (!$this->validateSSEConnection($userId)) { http_response_code(403); die("Invalid connection"); } // Set headers for SSE header('Content-Type: text/event-stream'); header('Cache-Control: no-cache'); header('Access-Control-Allow-Origin: *'); header('Access-Control-Allow-Methods: GET'); header('Connection: keep-alive'); // Disable output buffering if (function_exists('apache_setenv')) { apache_setenv('no-gzip', '1'); } ini_set('zlib.output_compression', '0'); ini_set('output_buffering', '0'); ob_implicit_flush(1); // Clear any existing output while (ob_get_level()) { ob_end_clean(); } // Send initial connection event $this->sendEvent('connection', [ 'status' => 'connected', 'user_id' => $userId, 'timestamp' => time() ]); // Get last message ID for incremental updates $lastMessageId = $this->getLastMessageIdForUser($userId); // Store connection info for heartbeat $_SESSION['sse_connected'] = true; $_SESSION['sse_user_id'] = $userId; $_SESSION['sse_start_time'] = time(); Security::logSecurityEvent('sse_connection_opened', ['user_id' => $userId]); try { // Update user's last seen if ($userId) { $this->userModel->updateLastSeen($userId); } // Send heartbeat every 30 seconds to keep connection alive $heartbeatInterval = 30; $lastHeartbeat = time(); $lastCheck = time(); $checkInterval = 2; // Check for new messages every 2 seconds $connected = true; while ($connected && !connection_aborted()) { $currentTime = time(); // Send heartbeat if ($currentTime - $lastHeartbeat >= $heartbeatInterval) { $this->sendEvent('heartbeat', [ 'timestamp' => $currentTime, 'uptime' => $currentTime - $_SESSION['sse_start_time'] ]); $lastHeartbeat = $currentTime; // Update user's active status during heartbeat if ($userId) { $this->userModel->updateLastSeen($userId); } } // Check for new messages if ($currentTime - $lastCheck >= $checkInterval) { $newMessages = $this->chatMessageModel->getMessagesAfterId($lastMessageId); if (!empty($newMessages)) { // Send new messages $this->sendEvent('new_messages', [ 'messages' => $newMessages, 'count' => count($newMessages) ]); // Update last message ID from the most recent message $lastMessage = end($newMessages); $lastMessageId = $lastMessage['id']; } // Send viewer count updates periodically $viewerCount = $this->activeViewerModel->getActiveCount(); $this->sendEvent('viewer_count_update', [ 'count' => $viewerCount ]); $lastCheck = $currentTime; } // Clean up old viewers (run this every few heartbeat cycles) static $cleanupCounter = 0; if (++$cleanupCounter % 5 === 0) { // Every 5 heartbeats = every 150 seconds $this->activeViewerModel->cleanupInactive(); } // Check for PHP timeouts or memory issues if ($this->shouldTerminateConnection()) { $connected = false; break; } // Sleep to avoid consuming too much CPU usleep(500000); // 0.5 seconds } } catch (Exception $e) { Security::logSecurityEvent('sse_connection_error', [ 'user_id' => $userId, 'error' => $e->getMessage() ]); $this->sendEvent('error', [ 'message' => 'Connection error occurred', 'code' => 'CONNECTION_ERROR' ]); } // Send disconnect event $this->sendEvent('disconnect', [ 'reason' => 'connection_closed', 'timestamp' => time() ]); // Clean up connection state unset($_SESSION['sse_connected'], $_SESSION['sse_user_id'], $_SESSION['sse_start_time']); Security::logSecurityEvent('sse_connection_closed', ['user_id' => $userId, 'duration' => $currentTime - $_SESSION['sse_start_time']]); } /** * Send a server-sent event */ private function sendEvent($eventType, $data) { $eventData = json_encode([ 'event' => $eventType, 'data' => $data, 'timestamp' => time() ]); echo "event: {$eventType}\n"; echo "data: {$eventData}\n\n"; // Force output if (ob_get_level()) { ob_flush(); } flush(); } /** * Validate SSE connection request */ private function validateSSEConnection($userId) { // Check if user ID is valid format if (!Validation::validateUserId($userId)['valid']) { return false; } // Verify CSRF token if provided in URL $csrfToken = $_GET['csrf'] ?? ''; if (!empty($csrfToken) && !Security::validateCSRFToken($csrfToken)) { return false; } // Check rate limiting for SSE connections if (!Security::checkRateLimit(Security::getClientIP(), 'sse_connection', 5, 60)) { return false; } return true; } /** * Get the last message ID that a user has seen */ private function getLastMessageIdForUser($userId) { // Get from GET parameter, session, or database preference $lastId = $_GET['last_id'] ?? $_SESSION['sse_last_message_id'] ?? null; if (!$lastId) { // If no last ID, start from last 50 messages $recentMessages = $this->chatMessageModel->getRecent(1); $lastId = !empty($recentMessages) ? $recentMessages[0]['id'] : 0; } $_SESSION['sse_last_message_id'] = $lastId; return $lastId; } /** * Check if connection should be terminated */ private function shouldTerminateConnection() { // Check memory usage (terminate if > 32MB) $memoryUsage = memory_get_usage(true); if ($memoryUsage > 32 * 1024 * 1024) { return true; } // Check execution time (terminate after 10 minutes) $executionTime = time() - $_SESSION['sse_start_time']; if ($executionTime > 600) { return true; } // Check if user is still authenticated (if admin) if (Security::isAdminAuthenticated()) { $timeout = Config::get('admin.session_timeout', 3600); $loginTime = $_SESSION['admin_login_time'] ?? 0; if (time() - $loginTime > $timeout) { return true; } } return false; } /** * Handle broadcasting a new message to any registered listeners * (This would be used in a more advanced implementation with process communication) */ public function broadcastMessage($messageData) { // Store message in database $messageId = $this->chatMessageModel->create($messageData); if ($messageId) { // Log message creation Security::logSecurityEvent('message_broadcast', [ 'message_id' => $messageId, 'user_id' => $messageData['user_id'] ]); return $messageId; } return false; } }