Class: SessionChannel
- Inherits:
-
ApplicationCable::Channel
- Object
- ActionCable::Channel::Base
- ApplicationCable::Channel
- SessionChannel
- Defined in:
- app/channels/session_channel.rb
Overview
Streams events for a specific session to connected clients. Part of the Brain/TUI separation: the Brain broadcasts events through this channel, and any number of clients (TUI, web, API) can subscribe.
On subscription, sends the session’s chat history so the client can render previous messages without a separate API call.
Constant Summary collapse
- DEFAULT_LIST_LIMIT =
10- MAX_LIST_LIMIT =
50
Instance Method Summary collapse
-
#change_view_mode(data) ⇒ Object
Changes the session’s view mode and re-broadcasts the viewport.
-
#create_session(_data) ⇒ Object
Creates a new session and switches the channel stream to it.
-
#interrupt_execution(_data) ⇒ Object
Requests interruption of the current tool execution.
-
#list_sessions(data) ⇒ Object
Returns recent root sessions with nested child metadata for session picker UI.
-
#recall_pending(data) ⇒ Object
Recalls the most recent pending message for editing.
-
#receive(data) ⇒ Object
Receives messages from clients and broadcasts them to all session subscribers.
-
#save_token(data) ⇒ Object
Validates and saves an Anthropic subscription token to encrypted credentials.
-
#speak(data) ⇒ Object
Processes user input.
-
#subscribed ⇒ Object
Subscribes the client to the session-specific stream.
-
#switch_session(data) ⇒ Object
Switches the channel stream to an existing session.
Instance Method Details
#change_view_mode(data) ⇒ Object
Changes the session’s view mode and re-broadcasts the viewport. All clients on the session receive the mode change and fresh history.
164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'app/channels/session_channel.rb', line 164 def change_view_mode(data) mode = data["view_mode"].to_s return transmit_error("Invalid view mode") unless Session::VIEW_MODES.include?(mode) session = Session.find(@current_session_id) session.update!(view_mode: mode) ActionCable.server.broadcast(stream_name, {"action" => "view_mode_changed", "view_mode" => mode}) (session) rescue ActiveRecord::RecordNotFound transmit_error("Session not found") end |
#create_session(_data) ⇒ Object
Creates a new session and switches the channel stream to it. The client receives a session_changed signal followed by (empty) history.
117 118 119 120 |
# File 'app/channels/session_channel.rb', line 117 def create_session(_data) session = Session.create! switch_to_session(session.id) end |
#interrupt_execution(_data) ⇒ Object
Requests interruption of the current tool execution. Sets a flag on the session that the LLM client checks between tool calls. Remaining tools receive synthetic “Stopped by user” results to satisfy the API’s tool_use/tool_result pairing requirement.
Atomic: a single UPDATE with WHERE avoids the read-then-write race where the session could finish processing between the SELECT and UPDATE. No-op if the session isn’t currently processing.
95 96 97 98 |
# File 'app/channels/session_channel.rb', line 95 def interrupt_execution(_data) Session.where(id: @current_session_id, processing: true) .update_all(interrupt_requested: true) end |
#list_sessions(data) ⇒ Object
Returns recent root sessions with nested child metadata for session picker UI. Filters to root sessions only (no parent_session_id). Child sessions are nested under their parent with name and status information.
105 106 107 108 109 110 111 112 113 |
# File 'app/channels/session_channel.rb', line 105 def list_sessions(data) limit = (data["limit"] || DEFAULT_LIST_LIMIT).to_i.clamp(1, MAX_LIST_LIMIT) sessions = Session.root_sessions.recent(limit).includes(:child_sessions) all_ids = sessions.flat_map { |session| [session.id] + session.child_sessions.map(&:id) } counts = Event.where(session_id: all_ids)..group(:session_id).count result = sessions.map { |session| serialize_session_with_children(session, counts) } transmit({"action" => "sessions_list", "sessions" => result}) end |
#recall_pending(data) ⇒ Object
Recalls the most recent pending message for editing. Deletes the pending event and broadcasts the recall so all clients remove it.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'app/channels/session_channel.rb', line 69 def recall_pending(data) event_id = data["event_id"].to_i return if event_id <= 0 event = Event.find_by( id: event_id, session_id: @current_session_id, event_type: "user_message", status: Event::PENDING_STATUS ) return unless event event.destroy! ActionCable.server.broadcast(stream_name, {"action" => "user_message_recalled", "event_id" => event_id}) end |
#receive(data) ⇒ Object
Receives messages from clients and broadcasts them to all session subscribers.
41 42 43 |
# File 'app/channels/session_channel.rb', line 41 def receive(data) ActionCable.server.broadcast(stream_name, data) end |
#save_token(data) ⇒ Object
Validates and saves an Anthropic subscription token to encrypted credentials. Format-validated and API-validated before storage. The token never enters the LLM context window — it flows directly from WebSocket to encrypted credentials.
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'app/channels/session_channel.rb', line 140 def save_token(data) token = data["token"].to_s.strip Providers::Anthropic.validate_token_format!(token) warning = begin Providers::Anthropic.validate_token_api!(token) nil rescue Providers::Anthropic::TransientError => transient # Token format is valid but API is temporarily unavailable (500, timeout, etc.). # Save the token to break the prompt loop — it will work once the API recovers. "Token saved but could not be verified — #{transient.}" end write_anthropic_token(token) transmit({"action" => "token_saved", "warning" => warning}.compact) rescue Providers::Anthropic::TokenFormatError, Providers::Anthropic::AuthenticationError => error transmit({"action" => "token_error", "message" => error.}) end |
#speak(data) ⇒ Object
Processes user input. For idle sessions, persists the event immediately so the message appears in the TUI without waiting for the background job, then schedules AgentRequestJob for LLM delivery. If delivery fails, the job deletes the event and emits a Events::BounceBack.
For busy sessions, emits a pending Events::UserMessage that queues until the current agent loop completes.
55 56 57 58 59 60 61 62 63 |
# File 'app/channels/session_channel.rb', line 55 def speak(data) content = data["content"].to_s.strip return if content.empty? session = Session.find_by(id: @current_session_id) return unless session session.(content, bounce_back: true) end |
#subscribed ⇒ Object
Subscribes the client to the session-specific stream. When a valid session_id is provided, subscribes to that session. When omitted or zero, resolves to the most recent session (creating one if none exist) — this is the CQRS-compliant path where the server owns session resolution instead of a REST endpoint.
Always transmits a session_changed signal so the client learns the authoritative session ID, followed by view_mode and history.
26 27 28 29 30 31 32 33 34 35 36 |
# File 'app/channels/session_channel.rb', line 26 def subscribed @current_session_id = resolve_session_id stream_from stream_name session = Session.find_by(id: @current_session_id) return unless session transmit_session_changed(session) transmit_view_mode(session) transmit_history(session) end |
#switch_session(data) ⇒ Object
Switches the channel stream to an existing session. The client receives a session_changed signal followed by chat history.
126 127 128 129 130 131 132 133 |
# File 'app/channels/session_channel.rb', line 126 def switch_session(data) target_id = data["session_id"].to_i return transmit_error("Session not found") unless target_id > 0 switch_to_session(target_id) rescue ActiveRecord::RecordNotFound transmit_error("Session not found") end |