Class: SessionChannel
- Inherits:
-
ApplicationCable::Channel
- Object
- ActionCable::Channel::Base
- ApplicationCable::Channel
- SessionChannel
- Defined in:
- app/channels/session_channel.rb
Overview
Streams events for a specific session to connected clients. Part of the Brain/TUI separation: the Brain broadcasts events through this channel, and any number of clients (TUI, web, API) can subscribe.
On subscription, sends the session’s chat history so the client can render previous messages without a separate API call.
Constant Summary collapse
- DEFAULT_LIST_LIMIT =
10- MAX_LIST_LIMIT =
50
Instance Method Summary collapse
-
#change_view_mode(data) ⇒ Object
Changes the session’s view mode and re-broadcasts the viewport.
-
#create_session(_data) ⇒ Object
Creates a new session and switches the channel stream to it.
-
#list_sessions(data) ⇒ Object
Returns recent sessions with metadata for session picker UI.
-
#recall_pending(data) ⇒ Object
Recalls the most recent pending message for editing.
-
#receive(data) ⇒ Object
Receives messages from clients and broadcasts them to all session subscribers.
-
#save_token(data) ⇒ Object
Validates and saves an Anthropic subscription token to encrypted credentials.
-
#speak(data) ⇒ Object
Processes user input: persists the message and enqueues LLM processing.
-
#subscribed ⇒ Object
Subscribes the client to the session-specific stream.
-
#switch_session(data) ⇒ Object
Switches the channel stream to an existing session.
Instance Method Details
#change_view_mode(data) ⇒ Object
Changes the session’s view mode and re-broadcasts the viewport. All clients on the session receive the mode change and fresh history.
145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'app/channels/session_channel.rb', line 145 def change_view_mode(data) mode = data["view_mode"].to_s return transmit_error("Invalid view mode") unless Session::VIEW_MODES.include?(mode) session = Session.find(@current_session_id) session.update!(view_mode: mode) ActionCable.server.broadcast(stream_name, {"action" => "view_mode_changed", "view_mode" => mode}) (session) rescue ActiveRecord::RecordNotFound transmit_error("Session not found") end |
#create_session(_data) ⇒ Object
Creates a new session and switches the channel stream to it. The client receives a session_changed signal followed by (empty) history.
106 107 108 109 |
# File 'app/channels/session_channel.rb', line 106 def create_session(_data) session = Session.create! switch_to_session(session.id) end |
#list_sessions(data) ⇒ Object
Returns recent sessions with metadata for session picker UI.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'app/channels/session_channel.rb', line 88 def list_sessions(data) limit = (data["limit"] || DEFAULT_LIST_LIMIT).to_i.clamp(1, MAX_LIST_LIMIT) sessions = Session.recent(limit) counts = Event.where(session_id: sessions.select(:id))..group(:session_id).count result = sessions.map do |session| { id: session.id, created_at: session.created_at.iso8601, updated_at: session.updated_at.iso8601, message_count: counts[session.id] || 0 } end transmit({"action" => "sessions_list", "sessions" => result}) end |
#recall_pending(data) ⇒ Object
Recalls the most recent pending message for editing. Deletes the pending event and broadcasts the recall so all clients remove it.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'app/channels/session_channel.rb', line 69 def recall_pending(data) event_id = data["event_id"].to_i return if event_id <= 0 event = Event.find_by( id: event_id, session_id: @current_session_id, event_type: "user_message", status: Event::PENDING_STATUS ) return unless event event.destroy! ActionCable.server.broadcast(stream_name, {"action" => "user_message_recalled", "event_id" => event_id}) end |
#receive(data) ⇒ Object
Receives messages from clients and broadcasts them to all session subscribers.
41 42 43 |
# File 'app/channels/session_channel.rb', line 41 def receive(data) ActionCable.server.broadcast(stream_name, data) end |
#save_token(data) ⇒ Object
Validates and saves an Anthropic subscription token to encrypted credentials. Format-validated and API-validated before storage. The token never enters the LLM context window — it flows directly from WebSocket to encrypted credentials.
129 130 131 132 133 134 135 136 137 138 139 |
# File 'app/channels/session_channel.rb', line 129 def save_token(data) token = data["token"].to_s.strip Providers::Anthropic.validate_token_format!(token) Providers::Anthropic.validate_token_api!(token) write_anthropic_token(token) transmit({"action" => "token_saved"}) rescue Providers::Anthropic::TokenFormatError, Providers::Anthropic::AuthenticationError => error transmit({"action" => "token_error", "message" => error.}) end |
#speak(data) ⇒ Object
Processes user input: persists the message and enqueues LLM processing. When the session is actively processing an agent request, the message is queued as “pending” and picked up after the current loop completes.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'app/channels/session_channel.rb', line 50 def speak(data) content = data["content"].to_s.strip return if content.empty? session = Session.find_by(id: @current_session_id) return unless session if session.processing? Events::Bus.emit(Events::UserMessage.new(content: content, session_id: @current_session_id, status: Event::PENDING_STATUS)) else Events::Bus.emit(Events::UserMessage.new(content: content, session_id: @current_session_id)) AgentRequestJob.perform_later(@current_session_id) end end |
#subscribed ⇒ Object
Subscribes the client to the session-specific stream. When a valid session_id is provided, subscribes to that session. When omitted or zero, resolves to the most recent session (creating one if none exist) — this is the CQRS-compliant path where the server owns session resolution instead of a REST endpoint.
Always transmits a session_changed signal so the client learns the authoritative session ID, followed by view_mode and history.
26 27 28 29 30 31 32 33 34 35 36 |
# File 'app/channels/session_channel.rb', line 26 def subscribed @current_session_id = resolve_session_id stream_from stream_name session = Session.find_by(id: @current_session_id) return unless session transmit_session_changed(session) transmit_view_mode(session) transmit_history(session) end |
#switch_session(data) ⇒ Object
Switches the channel stream to an existing session. The client receives a session_changed signal followed by chat history.
115 116 117 118 119 120 121 122 |
# File 'app/channels/session_channel.rb', line 115 def switch_session(data) target_id = data["session_id"].to_i return transmit_error("Session not found") unless target_id > 0 switch_to_session(target_id) rescue ActiveRecord::RecordNotFound transmit_error("Session not found") end |