Class: TUI::CableClient

Inherits:
Object
  • Object
show all
Defined in:
lib/tui/cable_client.rb

Overview

Action Cable WebSocket client for connecting the TUI to the brain server. Runs the WebSocket connection in a background thread and exposes a thread-safe message queue for the TUI render loop to drain.

Implements the actioncable-v1-json protocol: subscribe to a SessionChannel, receive event broadcasts, and send user input via the speak action.

Automatically reconnects with exponential backoff when the connection drops unexpectedly. Detects stale connections via Action Cable ping heartbeat monitoring.

Examples:

client = TUI::CableClient.new(host: "localhost:42134")
client.connect
client.speak("Hello!")
messages = client.drain_messages
client.disconnect

Constant Summary collapse

DISCONNECT_TIMEOUT =

seconds to wait for WebSocket thread to finish

2
POLL_INTERVAL =

seconds between connection status checks

0.1
CONNECTION_TIMEOUT =

seconds to wait for the connecting state to advance

10
MAX_RECONNECT_ATTEMPTS =
10
BACKOFF_BASE =

initial backoff delay in seconds

1.0
BACKOFF_CAP =

maximum backoff delay

30.0
PING_STALE_THRESHOLD =

seconds without ping before connection is stale

6.0
MSG_TYPE_CONNECTION =

Message types queued for the TUI render loop via @message_queue

"connection"
STATUS_SUBSCRIBING =

Connection status values sent as MSG_TYPE_CONNECTION messages. These are message-level concepts for the TUI — distinct from the internal @status state machine (:disconnected, :connecting, etc.).

"subscribing"
STATUS_SUBSCRIBED =
"subscribed"
STATUS_REJECTED =
"rejected"
STATUS_DISCONNECTED =
"disconnected"
STATUS_RECONNECTING =
"reconnecting"
STATUS_FAILED =
"failed"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host:, session_id: nil) ⇒ CableClient

Returns a new instance of CableClient.

Parameters:

  • host (String)

    brain server address (e.g. “localhost:42134”)

  • session_id (Integer, nil) (defaults to: nil)

    session to subscribe to (nil for server-side resolution)



63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/tui/cable_client.rb', line 63

def initialize(host:, session_id: nil)
  @host = host
  @session_id = session_id
  @subscribed_session_id = session_id
  @status = :disconnected
  @message_queue = Thread::Queue.new
  @mutex = Mutex.new
  @ws = nil
  @ws_thread = nil
  @intentional_disconnect = false
  @reconnect_attempt = 0
  @last_ping_at = nil
  @connection_generation = 0
end

Instance Attribute Details

#hostString (readonly)

Returns brain server host:port.

Returns:

  • (String)

    brain server host:port



48
49
50
# File 'lib/tui/cable_client.rb', line 48

def host
  @host
end

#reconnect_attemptInteger (readonly)

Returns current reconnection attempt (0 when connected).

Returns:

  • (Integer)

    current reconnection attempt (0 when connected)



59
60
61
# File 'lib/tui/cable_client.rb', line 59

def reconnect_attempt
  @reconnect_attempt
end

#session_idInteger (readonly)

Returns current session ID.

Returns:

  • (Integer)

    current session ID



51
52
53
# File 'lib/tui/cable_client.rb', line 51

def session_id
  @session_id
end

#statusSymbol (readonly)

Returns connection status (:disconnected, :connecting, :connected, :subscribed, :reconnecting). Note: the “subscribing” concept exists only as a message-level status (see STATUS_SUBSCRIBING) queued for the TUI, not as an internal state.

Returns:

  • (Symbol)

    connection status (:disconnected, :connecting, :connected, :subscribed, :reconnecting). Note: the “subscribing” concept exists only as a message-level status (see STATUS_SUBSCRIBING) queued for the TUI, not as an internal state.



56
57
58
# File 'lib/tui/cable_client.rb', line 56

def status
  @status
end

Instance Method Details

#change_view_mode(mode) ⇒ void

This method returns an undefined value.

Requests the brain to change the session’s view mode. The server broadcasts view_mode_changed to all clients on the session, followed by the re-decorated viewport.

Parameters:

  • mode (String)

    one of “basic”, “verbose”, “debug”



125
126
127
# File 'lib/tui/cable_client.rb', line 125

def change_view_mode(mode)
  send_action("change_view_mode", {"view_mode" => mode})
end

#connectObject

Opens the WebSocket connection in a background thread. The connection subscribes to the session channel automatically after receiving the Action Cable welcome message. Reconnects automatically on unexpected disconnection.



82
83
84
85
86
87
88
# File 'lib/tui/cable_client.rb', line 82

def connect
  @mutex.synchronize do
    @intentional_disconnect = false
    @status = :connecting
  end
  @ws_thread = Thread.new { run_websocket_loop }
end

#create_sessionObject

Requests the brain to create a new session and switch to it. The server responds with a session_changed message followed by history.



99
100
101
# File 'lib/tui/cable_client.rb', line 99

def create_session
  send_action("create_session", {})
end

#disconnectObject

Closes the WebSocket connection and cleans up the background thread. Prevents automatic reconnection.



177
178
179
180
181
182
183
184
# File 'lib/tui/cable_client.rb', line 177

def disconnect
  @mutex.synchronize do
    @intentional_disconnect = true
    @status = :disconnected
  end
  @ws&.close
  @ws_thread&.join(DISCONNECT_TIMEOUT)
end

#drain_messagesArray<Hash>

Drains all pending messages from the queue (non-blocking). Call this from the TUI render loop to process incoming events.

Returns:

  • (Array<Hash>)

    messages received since last drain



165
166
167
168
169
170
171
172
173
# File 'lib/tui/cable_client.rb', line 165

def drain_messages
  messages = []
  loop do
    messages << @message_queue.pop(true)
  rescue ThreadError
    break
  end
  messages
end

#interruptvoid

This method returns an undefined value.

Requests interruption of the current tool execution. The server sets an interrupt flag that the LLM client checks between tool calls.



141
142
143
# File 'lib/tui/cable_client.rb', line 141

def interrupt
  send_action("interrupt_execution", {})
end

#list_sessions(limit: 10) ⇒ Object

Requests a list of recent sessions from the brain. The server responds with a sessions_list message.

Parameters:

  • limit (Integer) (defaults to: 10)

    max sessions to return (default 10)



115
116
117
# File 'lib/tui/cable_client.rb', line 115

def list_sessions(limit: 10)
  send_action("list_sessions", {"limit" => limit})
end

#recall_pending(event_id) ⇒ Object

Requests the brain to recall (delete) a pending message so the user can edit it before the LLM sees it.

Parameters:

  • event_id (Integer)

    database ID of the pending user_message event



133
134
135
# File 'lib/tui/cable_client.rb', line 133

def recall_pending(event_id)
  send_action("recall_pending", {"event_id" => event_id})
end

#save_token(token) ⇒ Object

Sends an Anthropic subscription token to the brain for validation and storage. The token flows directly from TUI input to encrypted credentials — never enters the LLM context window.

Parameters:

  • token (String)

    Anthropic subscription token (sk-ant-oat01-…)



150
151
152
# File 'lib/tui/cable_client.rb', line 150

def save_token(token)
  send_action("save_token", {"token" => token})
end

#speak(content) ⇒ Object

Sends user input to the brain for processing.

Parameters:

  • content (String)

    the user’s message text



93
94
95
# File 'lib/tui/cable_client.rb', line 93

def speak(content)
  send_action("speak", {"content" => content})
end

#switch_session(session_id) ⇒ Object

Requests the brain to switch to an existing session. The server responds with a session_changed message followed by history.

Parameters:

  • session_id (Integer)

    target session to resume



107
108
109
# File 'lib/tui/cable_client.rb', line 107

def switch_session(session_id)
  send_action("switch_session", {"session_id" => session_id})
end

#update_session_id(new_id) ⇒ Object

Updates the local session ID reference after a server-side session switch.

Parameters:

  • new_id (Integer)

    the new session ID



157
158
159
# File 'lib/tui/cable_client.rb', line 157

def update_session_id(new_id)
  @mutex.synchronize { @session_id = new_id }
end