Class: SessionChannel
- Inherits:
-
ApplicationCable::Channel
- Object
- ActionCable::Channel::Base
- ApplicationCable::Channel
- SessionChannel
- Defined in:
- app/channels/session_channel.rb
Overview
Streams events for a specific session to connected clients. Part of the Brain/TUI separation: the Brain broadcasts events through this channel, and any number of clients (TUI, web, API) can subscribe.
On subscription, sends the session’s chat history so the client can render previous messages without a separate API call.
Constant Summary collapse
- DEFAULT_LIST_LIMIT =
10- MAX_LIST_LIMIT =
50
Instance Method Summary collapse
-
#create_session(_data) ⇒ Object
Creates a new session and switches the channel stream to it.
-
#list_sessions(data) ⇒ Object
Returns recent sessions with metadata for session picker UI.
-
#receive(data) ⇒ Object
Receives messages from clients and broadcasts them to all session subscribers.
-
#speak(data) ⇒ Object
Processes user input: persists the message and enqueues LLM processing.
-
#subscribed ⇒ Object
Subscribes the client to the session-specific stream.
-
#switch_session(data) ⇒ Object
Switches the channel stream to an existing session.
Instance Method Details
#create_session(_data) ⇒ Object
Creates a new session and switches the channel stream to it. The client receives a session_changed signal followed by (empty) history.
70 71 72 73 |
# File 'app/channels/session_channel.rb', line 70 def create_session(_data) session = Session.create! switch_to_session(session.id) end |
#list_sessions(data) ⇒ Object
Returns recent sessions with metadata for session picker UI.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'app/channels/session_channel.rb', line 52 def list_sessions(data) limit = (data["limit"] || DEFAULT_LIST_LIMIT).to_i.clamp(1, MAX_LIST_LIMIT) sessions = Session.recent(limit) counts = Event.where(session_id: sessions.select(:id))..group(:session_id).count result = sessions.map do |session| { id: session.id, created_at: session.created_at.iso8601, updated_at: session.updated_at.iso8601, message_count: counts[session.id] || 0 } end transmit({"action" => "sessions_list", "sessions" => result}) end |
#receive(data) ⇒ Object
Receives messages from clients and broadcasts them to all session subscribers.
34 35 36 |
# File 'app/channels/session_channel.rb', line 34 def receive(data) ActionCable.server.broadcast(stream_name, data) end |
#speak(data) ⇒ Object
Processes user input: persists the message and enqueues LLM processing.
41 42 43 44 45 46 47 |
# File 'app/channels/session_channel.rb', line 41 def speak(data) content = data["content"].to_s.strip return if content.empty? || !Session.exists?(@current_session_id) Events::Bus.emit(Events::UserMessage.new(content: content, session_id: @current_session_id)) AgentRequestJob.perform_later(@current_session_id) end |
#subscribed ⇒ Object
Subscribes the client to the session-specific stream. Rejects the subscription if no valid session_id is provided. Transmits chat history to the subscribing client after confirmation.
21 22 23 24 25 26 27 28 29 |
# File 'app/channels/session_channel.rb', line 21 def subscribed @current_session_id = params[:session_id].to_i if @current_session_id > 0 stream_from stream_name transmit_history else reject end end |
#switch_session(data) ⇒ Object
Switches the channel stream to an existing session. The client receives a session_changed signal followed by chat history.
79 80 81 82 83 84 85 86 |
# File 'app/channels/session_channel.rb', line 79 def switch_session(data) target_id = data["session_id"].to_i return transmit_error("Session not found") unless target_id > 0 switch_to_session(target_id) rescue ActiveRecord::RecordNotFound transmit_error("Session not found") end |