Class: TUI::MessageStore

Inherits:
Object
  • Object
show all
Defined in:
lib/tui/message_store.rb

Overview

Thread-safe in-memory store for chat entries displayed in the TUI. Replaces Events::Subscribers::MessageCollector in the WebSocket-based TUI, with no dependency on Rails or the Events module.

Accepts Action Cable message payloads and stores typed entries:

  • ‘:rendered, data:, message_type:, id:` for messages with structured decorator output

  • ‘:message, role:, content:, id:` for user/agent messages (fallback)

  • ‘:tool_counter, calls:, responses:` for tool activity

Structured data takes priority when available. Messages with nil rendered content fall back to existing behavior: tool messages aggregate into counters, conversation messages store role and content.

Entries with message IDs are maintained in ID order (ascending) regardless of arrival order, preventing misordering from race conditions between live broadcasts and viewport replays. Duplicate IDs are deduplicated by updating the existing entry.

Tool counters aggregate per agent turn: a new counter starts when a tool_call arrives after a conversation entry. Consecutive tool messages increment the same counter until the next conversation message breaks the chain.

When a message arrives with ‘“action” => “update”` and a known `“id”`, the existing entry is replaced in-place, preserving display order.

Constant Summary collapse

MESSAGE_TYPES =
%w[user_message agent_message].freeze
ROLE_MAP =
{
  "user_message" => "user",
  "agent_message" => "assistant"
}.freeze

Instance Method Summary collapse

Constructor Details

#initializeMessageStore

Returns a new instance of MessageStore.



38
39
40
41
42
43
44
45
46
# File 'lib/tui/message_store.rb', line 38

def initialize
  @entries = []
  @entries_by_id = {}
  @pending_entries = []
  @pending_by_id = {}
  @mutex = Mutex.new
  @version = 0
  @token_economy = default_token_economy
end

Instance Method Details

#add_pending(pending_message_id, content) ⇒ void

This method returns an undefined value.

Adds a pending message to the separate pending list. Pending messages always render after real messages.

Parameters:

  • pending_message_id (Integer)

    PendingMessage database ID

  • content (String)

    message text



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/tui/message_store.rb', line 149

def add_pending(pending_message_id, content)
  @mutex.synchronize do
    entry = {
      type: :rendered,
      data: {"role" => "user", "content" => content, "status" => "pending"},
      message_type: "user_message",
      pending_message_id: pending_message_id
    }
    old = @pending_by_id[pending_message_id]
    @pending_entries.delete(old) if old
    @pending_entries << entry
    @pending_by_id[pending_message_id] = entry
    @version += 1
  end
end

#clearvoid

This method returns an undefined value.

Removes all entries. Called on view mode change and session switch to prepare for re-decorated viewport messages from the server. Resets token economy totals since we’re starting fresh.



132
133
134
135
136
137
138
139
140
141
# File 'lib/tui/message_store.rb', line 132

def clear
  @mutex.synchronize do
    @entries = []
    @entries_by_id = {}
    @pending_entries = []
    @pending_by_id = {}
    @token_economy = default_token_economy
    @version += 1
  end
end

#last_pending_user_messageHash?

Returns the last pending user message for recall editing.

Returns:

  • (Hash, nil)

    ‘Integer, content: String` or nil



183
184
185
186
187
188
189
190
# File 'lib/tui/message_store.rb', line 183

def last_pending_user_message
  @mutex.synchronize do
    entry = @pending_entries.last
    return nil unless entry

    {pending_message_id: entry[:pending_message_id], content: entry.dig(:data, "content")}
  end
end

#messagesArray<Hash>

Returns thread-safe copy of stored entries (pending messages at the end).

Returns:

  • (Array<Hash>)

    thread-safe copy of stored entries (pending messages at the end)



57
58
59
# File 'lib/tui/message_store.rb', line 57

def messages
  @mutex.synchronize { @entries.dup + @pending_entries.dup }
end

#process_event(event_data) ⇒ Boolean

Processes a raw event payload from the WebSocket channel. Uses structured decorator data when available; falls back to role/content extraction for messages and tool counter aggregation.

Events with ‘“action” => “update”` and a matching `“id”` replace the existing entry’s data in-place rather than appending.

Extracts api_metrics when present and accumulates token economy data.

Parameters:

  • event_data (Hash)

    Action Cable event payload with “type”, “content”, and optionally “rendered” (hash of mode => lines), “id”, “action”, “api_metrics”

Returns:

  • (Boolean)

    true if the event type was recognized and handled



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/tui/message_store.rb', line 102

def process_event(event_data)
  message_id = event_data["id"]

  # Track API metrics for token economy HUD (only on create, not update)
  if event_data["action"] != "update"
    accumulate_api_metrics(event_data["api_metrics"])
  end

  if event_data["action"] == "update" && message_id
    return update_existing(event_data, message_id)
  end

  rendered = extract_rendered(event_data)

  if rendered
    record_rendered(rendered, message_type: event_data["type"], id: message_id)
  else
    case event_data["type"]
    when "tool_call" then record_tool_call
    when "tool_response" then record_tool_response
    when *MESSAGE_TYPES then record_message(event_data)
    else false
    end
  end
end

#remove_by_id(message_id) ⇒ Boolean

Removes an entry by its message ID. Used when a pending message is recalled for editing or deleted by another client.

Parameters:

  • message_id (Integer)

    database ID of the message to remove

Returns:

  • (Boolean)

    true if the entry was found and removed



197
198
199
200
201
202
203
204
205
206
# File 'lib/tui/message_store.rb', line 197

def remove_by_id(message_id)
  @mutex.synchronize do
    entry = @entries_by_id.delete(message_id)
    return false unless entry

    @entries.delete(entry)
    @version += 1
    true
  end
end

#remove_by_ids(message_ids) ⇒ Integer

Removes entries by their message IDs. Used when the brain reports that messages have left the LLM’s viewport (context window eviction). Acquires the mutex once for the entire batch.

Parameters:

  • message_ids (Array<Integer>)

    database IDs of messages to remove

Returns:

  • (Integer)

    count of entries actually removed



214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/tui/message_store.rb', line 214

def remove_by_ids(message_ids)
  @mutex.synchronize do
    removed = 0
    message_ids.each do |message_id|
      entry = @entries_by_id.delete(message_id)
      next unless entry

      @entries.delete(entry)
      removed += 1
    end
    @version += 1 if removed > 0
    removed
  end
end

#remove_pending(pending_message_id) ⇒ Boolean

Removes a pending message by its PendingMessage ID.

Parameters:

  • pending_message_id (Integer)

    PendingMessage database ID

Returns:

  • (Boolean)

    true if found and removed



169
170
171
172
173
174
175
176
177
178
# File 'lib/tui/message_store.rb', line 169

def remove_pending(pending_message_id)
  @mutex.synchronize do
    entry = @pending_by_id.delete(pending_message_id)
    return false unless entry

    @pending_entries.delete(entry)
    @version += 1
    true
  end
end

#sizeInteger

Returns number of stored entries including pending (no array copy).

Returns:

  • (Integer)

    number of stored entries including pending (no array copy)



62
63
64
# File 'lib/tui/message_store.rb', line 62

def size
  @mutex.synchronize { @entries.size + @pending_entries.size }
end

#token_economyHash

Returns aggregated token economy data for HUD display. Includes running totals, cache hit rate, and latest rate limit snapshot.

Returns:

  • (Hash)

    token economy stats:

    • :input_tokens [Integer] total input tokens across all calls

    • :output_tokens [Integer] total output tokens

    • :cache_read_input_tokens [Integer] total cached token reads

    • :cache_creation_input_tokens [Integer] total cache writes

    • :call_count [Integer] number of API calls tracked

    • :cache_hit_rate [Float] percentage of input served from cache (0.0-1.0)

    • :rate_limits [Hash, nil] latest rate limit values from API



77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/tui/message_store.rb', line 77

def token_economy
  @mutex.synchronize do
    stats = @token_economy.dup
    total_input = stats[:input_tokens] + stats[:cache_read_input_tokens] + stats[:cache_creation_input_tokens]
    stats[:cache_hit_rate] = if total_input > 0
      stats[:cache_read_input_tokens].to_f / total_input
    else
      0.0
    end
    stats
  end
end

#versionInteger

Monotonically increasing counter that bumps on every mutation. Consumers compare this to a cached value to detect changes without copying the full entries array on every frame.

Returns:

  • (Integer)


52
53
54
# File 'lib/tui/message_store.rb', line 52

def version
  @mutex.synchronize { @version }
end