Class: SessionChannel
- Inherits:
-
ApplicationCable::Channel
- Object
- ActionCable::Channel::Base
- ApplicationCable::Channel
- SessionChannel
- Defined in:
- app/channels/session_channel.rb
Overview
Streams events for a specific session to connected clients. Part of the Brain/TUI separation: the Brain broadcasts events through this channel, and any number of clients (TUI, web, API) can subscribe.
On subscription, sends the session’s chat history so the client can render previous messages without a separate API call.
Constant Summary collapse
- DEFAULT_LIST_LIMIT =
10- MAX_LIST_LIMIT =
50
Instance Method Summary collapse
-
#change_view_mode(data) ⇒ Object
Changes the session’s view mode and re-broadcasts the viewport.
-
#create_session(_data) ⇒ Object
Creates a new session and switches the channel stream to it.
-
#list_sessions(data) ⇒ Object
Returns recent root sessions with nested child metadata for session picker UI.
-
#recall_pending(data) ⇒ Object
Recalls the most recent pending message for editing.
-
#receive(data) ⇒ Object
Receives messages from clients and broadcasts them to all session subscribers.
-
#save_token(data) ⇒ Object
Validates and saves an Anthropic subscription token to encrypted credentials.
-
#speak(data) ⇒ Object
Processes user input: persists the message and enqueues LLM processing.
-
#subscribed ⇒ Object
Subscribes the client to the session-specific stream.
-
#switch_session(data) ⇒ Object
Switches the channel stream to an existing session.
Instance Method Details
#change_view_mode(data) ⇒ Object
Changes the session’s view mode and re-broadcasts the viewport. All clients on the session receive the mode change and fresh history.
141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'app/channels/session_channel.rb', line 141 def change_view_mode(data) mode = data["view_mode"].to_s return transmit_error("Invalid view mode") unless Session::VIEW_MODES.include?(mode) session = Session.find(@current_session_id) session.update!(view_mode: mode) ActionCable.server.broadcast(stream_name, {"action" => "view_mode_changed", "view_mode" => mode}) (session) rescue ActiveRecord::RecordNotFound transmit_error("Session not found") end |
#create_session(_data) ⇒ Object
Creates a new session and switches the channel stream to it. The client receives a session_changed signal followed by (empty) history.
102 103 104 105 |
# File 'app/channels/session_channel.rb', line 102 def create_session(_data) session = Session.create! switch_to_session(session.id) end |
#list_sessions(data) ⇒ Object
Returns recent root sessions with nested child metadata for session picker UI. Filters to root sessions only (no parent_session_id). Child sessions are nested under their parent with name and status information.
90 91 92 93 94 95 96 97 98 |
# File 'app/channels/session_channel.rb', line 90 def list_sessions(data) limit = (data["limit"] || DEFAULT_LIST_LIMIT).to_i.clamp(1, MAX_LIST_LIMIT) sessions = Session.root_sessions.recent(limit).includes(:child_sessions) all_ids = sessions.flat_map { |session| [session.id] + session.child_sessions.map(&:id) } counts = Event.where(session_id: all_ids)..group(:session_id).count result = sessions.map { |session| serialize_session_with_children(session, counts) } transmit({"action" => "sessions_list", "sessions" => result}) end |
#recall_pending(data) ⇒ Object
Recalls the most recent pending message for editing. Deletes the pending event and broadcasts the recall so all clients remove it.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'app/channels/session_channel.rb', line 69 def recall_pending(data) event_id = data["event_id"].to_i return if event_id <= 0 event = Event.find_by( id: event_id, session_id: @current_session_id, event_type: "user_message", status: Event::PENDING_STATUS ) return unless event event.destroy! ActionCable.server.broadcast(stream_name, {"action" => "user_message_recalled", "event_id" => event_id}) end |
#receive(data) ⇒ Object
Receives messages from clients and broadcasts them to all session subscribers.
41 42 43 |
# File 'app/channels/session_channel.rb', line 41 def receive(data) ActionCable.server.broadcast(stream_name, data) end |
#save_token(data) ⇒ Object
Validates and saves an Anthropic subscription token to encrypted credentials. Format-validated and API-validated before storage. The token never enters the LLM context window — it flows directly from WebSocket to encrypted credentials.
125 126 127 128 129 130 131 132 133 134 135 |
# File 'app/channels/session_channel.rb', line 125 def save_token(data) token = data["token"].to_s.strip Providers::Anthropic.validate_token_format!(token) Providers::Anthropic.validate_token_api!(token) write_anthropic_token(token) transmit({"action" => "token_saved"}) rescue Providers::Anthropic::TokenFormatError, Providers::Anthropic::AuthenticationError => error transmit({"action" => "token_error", "message" => error.}) end |
#speak(data) ⇒ Object
Processes user input: persists the message and enqueues LLM processing. When the session is actively processing an agent request, the message is queued as “pending” and picked up after the current loop completes.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'app/channels/session_channel.rb', line 50 def speak(data) content = data["content"].to_s.strip return if content.empty? session = Session.find_by(id: @current_session_id) return unless session if session.processing? Events::Bus.emit(Events::UserMessage.new(content: content, session_id: @current_session_id, status: Event::PENDING_STATUS)) else Events::Bus.emit(Events::UserMessage.new(content: content, session_id: @current_session_id)) AgentRequestJob.perform_later(@current_session_id) end end |
#subscribed ⇒ Object
Subscribes the client to the session-specific stream. When a valid session_id is provided, subscribes to that session. When omitted or zero, resolves to the most recent session (creating one if none exist) — this is the CQRS-compliant path where the server owns session resolution instead of a REST endpoint.
Always transmits a session_changed signal so the client learns the authoritative session ID, followed by view_mode and history.
26 27 28 29 30 31 32 33 34 35 36 |
# File 'app/channels/session_channel.rb', line 26 def subscribed @current_session_id = resolve_session_id stream_from stream_name session = Session.find_by(id: @current_session_id) return unless session transmit_session_changed(session) transmit_view_mode(session) transmit_history(session) end |
#switch_session(data) ⇒ Object
Switches the channel stream to an existing session. The client receives a session_changed signal followed by chat history.
111 112 113 114 115 116 117 118 |
# File 'app/channels/session_channel.rb', line 111 def switch_session(data) target_id = data["session_id"].to_i return transmit_error("Session not found") unless target_id > 0 switch_to_session(target_id) rescue ActiveRecord::RecordNotFound transmit_error("Session not found") end |