Class: SessionChannel
- Inherits:
-
ApplicationCable::Channel
- Object
- ActionCable::Channel::Base
- ApplicationCable::Channel
- SessionChannel
- Defined in:
- app/channels/session_channel.rb
Overview
Streams events for a specific session to connected clients. Part of the Brain/TUI separation: the Brain broadcasts events through this channel, and any number of clients (TUI, web, API) can subscribe.
On subscription, sends the session’s chat history so the client can render previous messages without a separate API call.
Constant Summary collapse
- DEFAULT_LIST_LIMIT =
10- MAX_LIST_LIMIT =
50
Instance Method Summary collapse
-
#change_view_mode(data) ⇒ Object
Changes the session’s view mode and re-broadcasts the viewport.
-
#create_session(_data) ⇒ Object
Creates a new session and switches the channel stream to it.
-
#interrupt_execution(_data) ⇒ Object
Requests interruption of the current tool execution.
-
#list_sessions(data) ⇒ Object
Returns recent root sessions with nested child metadata for session picker UI.
-
#recall_pending(data) ⇒ Object
Recalls the most recent pending message for editing.
-
#receive(data) ⇒ Object
Receives messages from clients and broadcasts them to all session subscribers.
-
#save_token(data) ⇒ Object
Validates and saves an Anthropic subscription token to encrypted credentials.
-
#speak(data) ⇒ Object
Processes user input.
-
#subscribed ⇒ Object
Subscribes the client to the session-specific stream.
-
#switch_session(data) ⇒ Object
Switches the channel stream to an existing session.
Instance Method Details
#change_view_mode(data) ⇒ Object
Changes the session’s view mode and re-broadcasts the viewport. All clients on the session receive the mode change and fresh history.
168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'app/channels/session_channel.rb', line 168 def change_view_mode(data) mode = data["view_mode"].to_s return transmit_error("Invalid view mode") unless Session::VIEW_MODES.include?(mode) session = Session.find(@current_session_id) session.update!(view_mode: mode) ActionCable.server.broadcast(stream_name, {"action" => "view_mode_changed", "view_mode" => mode}) (session) rescue ActiveRecord::RecordNotFound transmit_error("Session not found") end |
#create_session(_data) ⇒ Object
Creates a new session and switches the channel stream to it. The client receives a session_changed signal followed by (empty) history.
121 122 123 124 |
# File 'app/channels/session_channel.rb', line 121 def create_session(_data) session = Session.create! switch_to_session(session.id) end |
#interrupt_execution(_data) ⇒ Object
Requests interruption of the current tool execution. Sets a flag on the session that the LLM client checks between tool calls. Remaining tools receive synthetic “Stopped by user” results to satisfy the API’s tool_use/tool_result pairing requirement.
Atomic: a single UPDATE with WHERE avoids the read-then-write race where the session could finish processing between the SELECT and UPDATE. No-op if the session isn’t currently processing.
99 100 101 102 |
# File 'app/channels/session_channel.rb', line 99 def interrupt_execution(_data) Session.where(id: @current_session_id, processing: true) .update_all(interrupt_requested: true) end |
#list_sessions(data) ⇒ Object
Returns recent root sessions with nested child metadata for session picker UI. Filters to root sessions only (no parent_session_id). Child sessions are nested under their parent with name and status information.
109 110 111 112 113 114 115 116 117 |
# File 'app/channels/session_channel.rb', line 109 def list_sessions(data) limit = (data["limit"] || DEFAULT_LIST_LIMIT).to_i.clamp(1, MAX_LIST_LIMIT) sessions = Session.root_sessions.recent(limit).includes(:child_sessions) all_ids = sessions.flat_map { |session| [session.id] + session.child_sessions.map(&:id) } counts = Event.where(session_id: all_ids)..group(:session_id).count result = sessions.map { |session| serialize_session_with_children(session, counts) } transmit({"action" => "sessions_list", "sessions" => result}) end |
#recall_pending(data) ⇒ Object
Recalls the most recent pending message for editing. Deletes the pending event and broadcasts the recall so all clients remove it.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'app/channels/session_channel.rb', line 73 def recall_pending(data) event_id = data["event_id"].to_i return if event_id <= 0 event = Event.find_by( id: event_id, session_id: @current_session_id, event_type: "user_message", status: Event::PENDING_STATUS ) return unless event event.destroy! ActionCable.server.broadcast(stream_name, {"action" => "user_message_recalled", "event_id" => event_id}) end |
#receive(data) ⇒ Object
Receives messages from clients and broadcasts them to all session subscribers.
41 42 43 |
# File 'app/channels/session_channel.rb', line 41 def receive(data) ActionCable.server.broadcast(stream_name, data) end |
#save_token(data) ⇒ Object
Validates and saves an Anthropic subscription token to encrypted credentials. Format-validated and API-validated before storage. The token never enters the LLM context window — it flows directly from WebSocket to encrypted credentials.
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'app/channels/session_channel.rb', line 144 def save_token(data) token = data["token"].to_s.strip Providers::Anthropic.validate_token_format!(token) warning = begin Providers::Anthropic.validate_token_api!(token) nil rescue Providers::Anthropic::TransientError => transient # Token format is valid but API is temporarily unavailable (500, timeout, etc.). # Save the token to break the prompt loop — it will work once the API recovers. "Token saved but could not be verified — #{transient.}" end write_anthropic_token(token) transmit({"action" => "token_saved", "warning" => warning}.compact) rescue Providers::Anthropic::TokenFormatError, Providers::Anthropic::AuthenticationError => error transmit({"action" => "token_error", "message" => error.}) end |
#speak(data) ⇒ Object
Processes user input. For idle sessions, persists the event immediately so the message appears in the TUI without waiting for the background job, then schedules AgentRequestJob for LLM delivery. If delivery fails, the job deletes the event and emits a Events::BounceBack.
For busy sessions, emits a pending Events::UserMessage that queues until the current agent loop completes.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'app/channels/session_channel.rb', line 54 def speak(data) content = data["content"].to_s.strip return if content.empty? session = Session.find_by(id: @current_session_id) return unless session if session.processing? Events::Bus.emit(Events::UserMessage.new(content: content, session_id: @current_session_id, status: Event::PENDING_STATUS)) else event = session.create_user_event(content) AgentRequestJob.perform_later(session.id, event_id: event.id) end end |
#subscribed ⇒ Object
Subscribes the client to the session-specific stream. When a valid session_id is provided, subscribes to that session. When omitted or zero, resolves to the most recent session (creating one if none exist) — this is the CQRS-compliant path where the server owns session resolution instead of a REST endpoint.
Always transmits a session_changed signal so the client learns the authoritative session ID, followed by view_mode and history.
26 27 28 29 30 31 32 33 34 35 36 |
# File 'app/channels/session_channel.rb', line 26 def subscribed @current_session_id = resolve_session_id stream_from stream_name session = Session.find_by(id: @current_session_id) return unless session transmit_session_changed(session) transmit_view_mode(session) transmit_history(session) end |
#switch_session(data) ⇒ Object
Switches the channel stream to an existing session. The client receives a session_changed signal followed by chat history.
130 131 132 133 134 135 136 137 |
# File 'app/channels/session_channel.rb', line 130 def switch_session(data) target_id = data["session_id"].to_i return transmit_error("Session not found") unless target_id > 0 switch_to_session(target_id) rescue ActiveRecord::RecordNotFound transmit_error("Session not found") end |