Class: SessionChannel

Inherits:
ApplicationCable::Channel show all
Defined in:
app/channels/session_channel.rb

Overview

Streams events for a specific session to connected clients. Part of the Brain/TUI separation: the Brain broadcasts events through this channel, and any number of clients (TUI, web, API) can subscribe.

On subscription, sends the session’s chat history so the client can render previous messages without a separate API call.

Examples:

Client subscribes to a session

App.cable.subscriptions.create({ channel: "SessionChannel", session_id: 42 })

Constant Summary collapse

DEFAULT_LIST_LIMIT =
10
MAX_LIST_LIMIT =
50

Instance Method Summary collapse

Instance Method Details

#change_view_mode(data) ⇒ Object

Changes the session’s view mode and re-broadcasts the viewport. All clients on the session receive the mode change and fresh history.

Parameters:

  • data (Hash)

    must include “view_mode” (one of Session::VIEW_MODES)



145
146
147
148
149
150
151
152
153
154
155
156
# File 'app/channels/session_channel.rb', line 145

def change_view_mode(data)
  mode = data["view_mode"].to_s
  return transmit_error("Invalid view mode") unless Session::VIEW_MODES.include?(mode)

  session = Session.find(@current_session_id)
  session.update!(view_mode: mode)

  ActionCable.server.broadcast(stream_name, {"action" => "view_mode_changed", "view_mode" => mode})
  broadcast_viewport(session)
rescue ActiveRecord::RecordNotFound
  transmit_error("Session not found")
end

#create_session(_data) ⇒ Object

Creates a new session and switches the channel stream to it. The client receives a session_changed signal followed by (empty) history.



106
107
108
109
# File 'app/channels/session_channel.rb', line 106

def create_session(_data)
  session = Session.create!
  switch_to_session(session.id)
end

#list_sessions(data) ⇒ Object

Returns recent sessions with metadata for session picker UI.

Parameters:

  • data (Hash)

    optional “limit” (default 10, max 50)



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'app/channels/session_channel.rb', line 88

def list_sessions(data)
  limit = (data["limit"] || DEFAULT_LIST_LIMIT).to_i.clamp(1, MAX_LIST_LIMIT)
  sessions = Session.recent(limit)
  counts = Event.where(session_id: sessions.select(:id)).llm_messages.group(:session_id).count

  result = sessions.map do |session|
    {
      id: session.id,
      created_at: session.created_at.iso8601,
      updated_at: session.updated_at.iso8601,
      message_count: counts[session.id] || 0
    }
  end
  transmit({"action" => "sessions_list", "sessions" => result})
end

#recall_pending(data) ⇒ Object

Recalls the most recent pending message for editing. Deletes the pending event and broadcasts the recall so all clients remove it.

Parameters:

  • data (Hash)

    must include “event_id” (positive integer)



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'app/channels/session_channel.rb', line 69

def recall_pending(data)
  event_id = data["event_id"].to_i
  return if event_id <= 0

  event = Event.find_by(
    id: event_id,
    session_id: @current_session_id,
    event_type: "user_message",
    status: Event::PENDING_STATUS
  )
  return unless event

  event.destroy!
  ActionCable.server.broadcast(stream_name, {"action" => "user_message_recalled", "event_id" => event_id})
end

#receive(data) ⇒ Object

Receives messages from clients and broadcasts them to all session subscribers.

Parameters:

  • data (Hash)

    arbitrary message payload



41
42
43
# File 'app/channels/session_channel.rb', line 41

def receive(data)
  ActionCable.server.broadcast(stream_name, data)
end

#save_token(data) ⇒ Object

Validates and saves an Anthropic subscription token to encrypted credentials. Format-validated and API-validated before storage. The token never enters the LLM context window — it flows directly from WebSocket to encrypted credentials.

Parameters:

  • data (Hash)

    must include “token” (Anthropic subscription token string)



129
130
131
132
133
134
135
136
137
138
139
# File 'app/channels/session_channel.rb', line 129

def save_token(data)
  token = data["token"].to_s.strip

  Providers::Anthropic.validate_token_format!(token)
  Providers::Anthropic.validate_token_api!(token)
  write_anthropic_token(token)

  transmit({"action" => "token_saved"})
rescue Providers::Anthropic::TokenFormatError, Providers::Anthropic::AuthenticationError => error
  transmit({"action" => "token_error", "message" => error.message})
end

#speak(data) ⇒ Object

Processes user input: persists the message and enqueues LLM processing. When the session is actively processing an agent request, the message is queued as “pending” and picked up after the current loop completes.

Parameters:

  • data (Hash)

    must include “content” with the user’s message text



50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'app/channels/session_channel.rb', line 50

def speak(data)
  content = data["content"].to_s.strip
  return if content.empty?

  session = Session.find_by(id: @current_session_id)
  return unless session

  if session.processing?
    Events::Bus.emit(Events::UserMessage.new(content: content, session_id: @current_session_id, status: Event::PENDING_STATUS))
  else
    Events::Bus.emit(Events::UserMessage.new(content: content, session_id: @current_session_id))
    AgentRequestJob.perform_later(@current_session_id)
  end
end

#subscribedObject

Subscribes the client to the session-specific stream. When a valid session_id is provided, subscribes to that session. When omitted or zero, resolves to the most recent session (creating one if none exist) — this is the CQRS-compliant path where the server owns session resolution instead of a REST endpoint.

Always transmits a session_changed signal so the client learns the authoritative session ID, followed by view_mode and history.

Parameters:

  • params (Hash)

    optional :session_id (positive integer)



26
27
28
29
30
31
32
33
34
35
36
# File 'app/channels/session_channel.rb', line 26

def subscribed
  @current_session_id = resolve_session_id
  stream_from stream_name

  session = Session.find_by(id: @current_session_id)
  return unless session

  transmit_session_changed(session)
  transmit_view_mode(session)
  transmit_history(session)
end

#switch_session(data) ⇒ Object

Switches the channel stream to an existing session. The client receives a session_changed signal followed by chat history.

Parameters:

  • data (Hash)

    must include “session_id” (positive integer)



115
116
117
118
119
120
121
122
# File 'app/channels/session_channel.rb', line 115

def switch_session(data)
  target_id = data["session_id"].to_i
  return transmit_error("Session not found") unless target_id > 0

  switch_to_session(target_id)
rescue ActiveRecord::RecordNotFound
  transmit_error("Session not found")
end