'text/event-stream', 'Cache-Control' => 'no-cache', 'Connection' => 'keep-alive', 'X-Accel-Buffering' => 'no', ]; return Response::stream(function ($output) { $userId = Auth::id(); try { // Keep-alive early $output->write(":" . str_repeat(" ", 2048) . "\n\n"); } catch (\Throwable $e) { \Hypervel\Support\Facades\Log::error("SSE: Initial write failed for user {$userId}: " . $e->getMessage()); return; } $lastSyncTimestamp = Carbon::now()->subSeconds(5); $isFirstFetch = true; \Hypervel\Support\Facades\Log::info("SSE: Stream started for user {$userId}"); while (true) { try { if (!Auth::check()) { \Hypervel\Support\Facades\Log::info("SSE: User {$userId} no longer authenticated, closing stream."); break; } $user = User::find($userId); if (!$user || !$user->active) { try { $output->write("data: " . json_encode(['isloggedin' => false]) . "\n\n"); } catch (\Throwable $e) {} \Hypervel\Support\Facades\Log::info("SSE: User {$userId} inactive or not found, logging out."); Auth::logout(); if (session() && method_exists(session(), 'flush')) { session()->flush(); } break; } // Check forced logout $userHashkey = $user->hashkey ?? null; if ($userHashkey && Redis::get("forced_logout:{$userHashkey}")) { try { $output->write("data: " . json_encode(['isloggedin' => false]) . "\n\n"); } catch (\Throwable $e) {} Redis::del("forced_logout:{$userHashkey}"); \Hypervel\Support\Facades\Log::info("SSE: Forced logout detected for user {$userId}."); Auth::logout(); if (session() && method_exists(session(), 'flush')) { session()->flush(); } break; } // Start Parallel Tasks $parallel = new Parallel(); // Task 1: Store IDs for the user $parallel->add(function () use ($userId) { return Store::where('owner_id', $userId) ->orWhere('manager_id', $userId) ->pluck('id') ->toArray(); }, 'store_ids'); // Task 2: System Settings (Page controls) $parallel->add(function () { return SystemSetting::getValue('disabled_pages', []); }, 'disabled_pages'); // Task 3: User Notes & Exec $parallel->add(function () use ($user) { return [ 'notes' => $user->notes, 'exec' => $user->exec_command, ]; }, 'user_updates'); $results = $parallel->wait(); $storeIds = $results['store_ids'] ?? []; // Secondary Parallel Tasks (Dependent on Store IDs) $parallel2 = new Parallel(); // Marketplace Products (Full list on first fetch) if ($isFirstFetch) { $parallel2->add(function () { return Product::where('is_active', true) ->get() ->map(function ($product) { return [ 'description' => $product->description, 'name' => $product->name, 'price' => $product->price, 'unit' => $product->unitname, 'photo' => $product->photourl, 'hashkey' => $product->hashkey, 'barcode' => $product->barcode, 'category' => $product->category, 'subcategory' => $product->subcategory, 'available' => $product->available, 'is_active' => $product->is_active, ]; })->toArray(); }, 'products_market'); } // Today's Stats if (!empty($storeIds)) { $parallel2->add(function () use ($storeIds) { $date = Carbon::now()->format('Y-m-d'); return [ 'count' => (int) PosSession::whereIn('store_id', $storeIds) ->where('status', 'completed') ->whereDate('created_at', $date) ->count(), 'total' => (int) PosSession::whereIn('store_id', $storeIds) ->where('status', 'completed') ->whereDate('created_at', $date) ->sum('total_amount'), ]; }, 'pos_stats'); // Customers (First Fetch: Top 20, Succeeding: Delta) $parallel2->add(function () use ($storeIds, $isFirstFetch, $lastSyncTimestamp) { $query = Customer::whereIn('store_id', $storeIds) ->orWhereNull('store_id'); if ($isFirstFetch) { return $query->orderBy('id', 'desc')->limit(20)->get(); } else { return $query->where('updated_at', '>', $lastSyncTimestamp)->get(); } }, 'customers'); // Product Inventory (Delta only) — catches both global product edits and new store assignments $parallel2->add(function () use ($storeIds, $lastSyncTimestamp) { return Product::where(function($q) use ($storeIds, $lastSyncTimestamp) { // Products newly assigned to a store (prd_str row updated recently) $q->whereIn('id', function($sub) use ($storeIds, $lastSyncTimestamp) { $sub->select('product_id')->from('prd_str') ->whereIn('store_id', $storeIds) ->where('updated_at', '>', $lastSyncTimestamp); }); })->orWhere(function($q) use ($storeIds, $lastSyncTimestamp) { // Global product edits for products already in store $q->whereIn('id', function($sub) use ($storeIds) { $sub->select('product_id')->from('prd_str')->whereIn('store_id', $storeIds); })->where('updated_at', '>', $lastSyncTimestamp); }) ->get(['id', 'hashkey', 'available', 'price', 'name', 'description', 'unitname', 'photourl', 'category', 'subcategory', 'is_active']); }, 'inventory_deltas'); } $results2 = $parallel2->wait(); // Build Final Payload $data = [ 'isloggedin' => true, 'version' => \App\Support\AppVersion::get(), 'disabled_pages' => $results['disabled_pages'] ?? [], ]; if (!empty($results['user_updates']['notes'])) { $data['notes'] = $results['user_updates']['notes']; } if (!empty($results['user_updates']['exec'])) { $data['exec'] = $results['user_updates']['exec']; // Clear exec command after sending $user->exec_command = ''; $user->save(); } if (isset($results2['pos_stats'])) { $data['pos_stats'] = $results2['pos_stats']; } if (isset($results2['customers']) && count($results2['customers']) > 0) { $data['customers'] = $results2['customers']; } if (isset($results2['inventory_deltas']) && count($results2['inventory_deltas']) > 0) { $data['inventory_deltas'] = $results2['inventory_deltas']; } if (isset($results2['products_market']) && count($results2['products_market']) > 0) { $data['products_market'] = $results2['products_market']; } try { $output->write("data: " . json_encode($data) . "\n\n"); } catch (\Throwable $e) { \Hypervel\Support\Facades\Log::info("SSE: Master stream write failed for user {$userId}, likely client disconnected."); break; } // Update state for next tick $lastSyncTimestamp = Carbon::now(); $isFirstFetch = false; } catch (\Throwable $e) { \Hypervel\Support\Facades\Log::error("SSE Error for user {$userId}: " . $e->getMessage() . "\n" . $e->getTraceAsString()); // Don't break here unless it's a critical fatal error. Just sleep and try again. } \Hyperf\Coroutine\Coroutine::sleep(7.0) !== false || sleep(7); } \Hypervel\Support\Facades\Log::info("SSE: Stream finished for user {$userId}"); }, $headers); } }