iptv-stream-web/services/ChatServer.php
Vincent 41cd7a4fd8 Add comprehensive unit tests for Security, UserModel, and Validation utilities
- Implemented SecurityTest to validate token generation, CSRF protection, input sanitization, and rate limiting.
- Created UserModelTest to ensure correct database operations for user management, including creation, updating, banning, and fetching active users.
- Developed ValidationTest to verify input validation and sanitization for user IDs, nicknames, messages, and API requests.
- Introduced Security and Validation utility classes with methods for secure token generation, input sanitization, and comprehensive validation rules.
2025-09-30 21:22:28 -04:00

276 lines
8.9 KiB
PHP

<?php
/**
* Real-time Chat Service using Server-Sent Events (SSE)
* Provides efficient real-time communication without polling
*/
class ChatServer
{
private $userModel;
private $chatMessageModel;
private $activeViewerModel;
public function __construct()
{
$this->userModel = new UserModel();
$this->chatMessageModel = new ChatMessageModel();
$this->activeViewerModel = new ActiveViewerModel();
}
/**
* Handle SSE connection for real-time chat updates
*/
public function handleSSE($userId)
{
// Validate user ID
if (!$this->validateSSEConnection($userId)) {
http_response_code(403);
die("Invalid connection");
}
// Set headers for SSE
header('Content-Type: text/event-stream');
header('Cache-Control: no-cache');
header('Access-Control-Allow-Origin: *');
header('Access-Control-Allow-Methods: GET');
header('Connection: keep-alive');
// Disable output buffering
if (function_exists('apache_setenv')) {
apache_setenv('no-gzip', '1');
}
ini_set('zlib.output_compression', '0');
ini_set('output_buffering', '0');
ob_implicit_flush(1);
// Clear any existing output
while (ob_get_level()) {
ob_end_clean();
}
// Send initial connection event
$this->sendEvent('connection', [
'status' => 'connected',
'user_id' => $userId,
'timestamp' => time()
]);
// Get last message ID for incremental updates
$lastMessageId = $this->getLastMessageIdForUser($userId);
// Store connection info for heartbeat
$_SESSION['sse_connected'] = true;
$_SESSION['sse_user_id'] = $userId;
$_SESSION['sse_start_time'] = time();
Security::logSecurityEvent('sse_connection_opened', ['user_id' => $userId]);
try {
// Update user's last seen
if ($userId) {
$this->userModel->updateLastSeen($userId);
}
// Send heartbeat every 30 seconds to keep connection alive
$heartbeatInterval = 30;
$lastHeartbeat = time();
$lastCheck = time();
$checkInterval = 2; // Check for new messages every 2 seconds
$connected = true;
while ($connected && !connection_aborted()) {
$currentTime = time();
// Send heartbeat
if ($currentTime - $lastHeartbeat >= $heartbeatInterval) {
$this->sendEvent('heartbeat', [
'timestamp' => $currentTime,
'uptime' => $currentTime - $_SESSION['sse_start_time']
]);
$lastHeartbeat = $currentTime;
// Update user's active status during heartbeat
if ($userId) {
$this->userModel->updateLastSeen($userId);
}
}
// Check for new messages
if ($currentTime - $lastCheck >= $checkInterval) {
$newMessages = $this->chatMessageModel->getMessagesAfterId($lastMessageId);
if (!empty($newMessages)) {
// Send new messages
$this->sendEvent('new_messages', [
'messages' => $newMessages,
'count' => count($newMessages)
]);
// Update last message ID from the most recent message
$lastMessage = end($newMessages);
$lastMessageId = $lastMessage['id'];
}
// Send viewer count updates periodically
$viewerCount = $this->activeViewerModel->getActiveCount();
$this->sendEvent('viewer_count_update', [
'count' => $viewerCount
]);
$lastCheck = $currentTime;
}
// Clean up old viewers (run this every few heartbeat cycles)
static $cleanupCounter = 0;
if (++$cleanupCounter % 5 === 0) { // Every 5 heartbeats = every 150 seconds
$this->activeViewerModel->cleanupInactive();
}
// Check for PHP timeouts or memory issues
if ($this->shouldTerminateConnection()) {
$connected = false;
break;
}
// Sleep to avoid consuming too much CPU
usleep(500000); // 0.5 seconds
}
} catch (Exception $e) {
Security::logSecurityEvent('sse_connection_error', [
'user_id' => $userId,
'error' => $e->getMessage()
]);
$this->sendEvent('error', [
'message' => 'Connection error occurred',
'code' => 'CONNECTION_ERROR'
]);
}
// Send disconnect event
$this->sendEvent('disconnect', [
'reason' => 'connection_closed',
'timestamp' => time()
]);
// Clean up connection state
unset($_SESSION['sse_connected'], $_SESSION['sse_user_id'], $_SESSION['sse_start_time']);
Security::logSecurityEvent('sse_connection_closed', ['user_id' => $userId, 'duration' => $currentTime - $_SESSION['sse_start_time']]);
}
/**
* Send a server-sent event
*/
private function sendEvent($eventType, $data)
{
$eventData = json_encode([
'event' => $eventType,
'data' => $data,
'timestamp' => time()
]);
echo "event: {$eventType}\n";
echo "data: {$eventData}\n\n";
// Force output
if (ob_get_level()) {
ob_flush();
}
flush();
}
/**
* Validate SSE connection request
*/
private function validateSSEConnection($userId)
{
// Check if user ID is valid format
if (!Validation::validateUserId($userId)['valid']) {
return false;
}
// Verify CSRF token if provided in URL
$csrfToken = $_GET['csrf'] ?? '';
if (!empty($csrfToken) && !Security::validateCSRFToken($csrfToken)) {
return false;
}
// Check rate limiting for SSE connections
if (!Security::checkRateLimit(Security::getClientIP(), 'sse_connection', 5, 60)) {
return false;
}
return true;
}
/**
* Get the last message ID that a user has seen
*/
private function getLastMessageIdForUser($userId)
{
// Get from GET parameter, session, or database preference
$lastId = $_GET['last_id'] ?? $_SESSION['sse_last_message_id'] ?? null;
if (!$lastId) {
// If no last ID, start from last 50 messages
$recentMessages = $this->chatMessageModel->getRecent(1);
$lastId = !empty($recentMessages) ? $recentMessages[0]['id'] : 0;
}
$_SESSION['sse_last_message_id'] = $lastId;
return $lastId;
}
/**
* Check if connection should be terminated
*/
private function shouldTerminateConnection()
{
// Check memory usage (terminate if > 32MB)
$memoryUsage = memory_get_usage(true);
if ($memoryUsage > 32 * 1024 * 1024) {
return true;
}
// Check execution time (terminate after 10 minutes)
$executionTime = time() - $_SESSION['sse_start_time'];
if ($executionTime > 600) {
return true;
}
// Check if user is still authenticated (if admin)
if (Security::isAdminAuthenticated()) {
$timeout = Config::get('admin.session_timeout', 3600);
$loginTime = $_SESSION['admin_login_time'] ?? 0;
if (time() - $loginTime > $timeout) {
return true;
}
}
return false;
}
/**
* Handle broadcasting a new message to any registered listeners
* (This would be used in a more advanced implementation with process communication)
*/
public function broadcastMessage($messageData)
{
// Store message in database
$messageId = $this->chatMessageModel->create($messageData);
if ($messageId) {
// Log message creation
Security::logSecurityEvent('message_broadcast', [
'message_id' => $messageId,
'user_id' => $messageData['user_id']
]);
return $messageId;
}
return false;
}
}