Class: TUI::MessageStore

Inherits:
Object
  • Object
show all
Defined in:
lib/tui/message_store.rb

Overview

Thread-safe in-memory store for chat entries displayed in the TUI. Replaces Events::Subscribers::MessageCollector in the WebSocket-based TUI, with no dependency on Rails or the Events module.

Accepts Action Cable message payloads and stores typed entries:

  • ‘:rendered, data:, message_type:, id:` for messages with structured decorator output

  • ‘:message, role:, content:, id:` for user/agent messages (fallback)

  • ‘:tool_counter, calls:, responses:` for tool activity

Structured data takes priority when available. Messages with nil rendered content fall back to existing behavior: tool messages aggregate into counters, conversation messages store role and content.

Entries with message IDs are maintained in ID order (ascending) regardless of arrival order, preventing misordering from race conditions between live broadcasts and viewport replays. Duplicate IDs are deduplicated by updating the existing entry.

Tool counters aggregate per agent turn: a new counter starts when a tool_call arrives after a conversation entry. Consecutive tool messages increment the same counter until the next conversation message breaks the chain.

When a message arrives with ‘“action” => “update”` and a known `“id”`, the existing entry is replaced in-place, preserving display order.

Constant Summary collapse

MESSAGE_TYPES =
%w[user_message agent_message].freeze
ROLE_MAP =
{
  "user_message" => "user",
  "agent_message" => "assistant"
}.freeze

Instance Method Summary collapse

Constructor Details

#initializeMessageStore

Returns a new instance of MessageStore.



36
37
38
39
40
41
42
43
# File 'lib/tui/message_store.rb', line 36

def initialize
  @entries = []
  @entries_by_id = {}
  @pending_entries = []
  @pending_by_id = {}
  @mutex = Mutex.new
  @version = 0
end

Instance Method Details

#add_pending(pending_message_id, content) ⇒ void

This method returns an undefined value.

Adds a pending message to the separate pending list. Pending messages always render after real messages.

Parameters:

  • pending_message_id (Integer)

    PendingMessage database ID

  • content (String)

    message text



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/tui/message_store.rb', line 113

def add_pending(pending_message_id, content)
  @mutex.synchronize do
    entry = {
      type: :rendered,
      data: {"role" => "user", "content" => content, "status" => "pending"},
      message_type: "user_message",
      pending_message_id: pending_message_id
    }
    old = @pending_by_id[pending_message_id]
    @pending_entries.delete(old) if old
    @pending_entries << entry
    @pending_by_id[pending_message_id] = entry
    @version += 1
  end
end

#clearvoid

This method returns an undefined value.

Removes all entries. Called on view mode change and session switch to prepare for re-decorated viewport messages from the server.



97
98
99
100
101
102
103
104
105
# File 'lib/tui/message_store.rb', line 97

def clear
  @mutex.synchronize do
    @entries = []
    @entries_by_id = {}
    @pending_entries = []
    @pending_by_id = {}
    @version += 1
  end
end

#last_pending_user_messageHash?

Returns the last pending user message for recall editing.

Returns:

  • (Hash, nil)

    ‘Integer, content: String` or nil



147
148
149
150
151
152
153
154
# File 'lib/tui/message_store.rb', line 147

def last_pending_user_message
  @mutex.synchronize do
    entry = @pending_entries.last
    return nil unless entry

    {pending_message_id: entry[:pending_message_id], content: entry.dig(:data, "content")}
  end
end

#messagesArray<Hash>

Returns thread-safe copy of stored entries (pending messages at the end).

Returns:

  • (Array<Hash>)

    thread-safe copy of stored entries (pending messages at the end)



54
55
56
# File 'lib/tui/message_store.rb', line 54

def messages
  @mutex.synchronize { @entries.dup + @pending_entries.dup }
end

#process_event(event_data) ⇒ Boolean

Processes a raw event payload from the WebSocket channel. Uses structured decorator data when available; falls back to role/content extraction for messages and tool counter aggregation.

Events with ‘“action” => “update”` and a matching `“id”` replace the existing entry’s data in-place rather than appending.

Parameters:

  • event_data (Hash)

    Action Cable event payload with “type”, “content”, and optionally “rendered” (hash of mode => lines), “id”, “action”

Returns:

  • (Boolean)

    true if the event type was recognized and handled



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/tui/message_store.rb', line 73

def process_event(event_data)
  message_id = event_data["id"]

  if event_data["action"] == "update" && message_id
    return update_existing(event_data, message_id)
  end

  rendered = extract_rendered(event_data)

  if rendered
    record_rendered(rendered, message_type: event_data["type"], id: message_id)
  else
    case event_data["type"]
    when "tool_call" then record_tool_call
    when "tool_response" then record_tool_response
    when *MESSAGE_TYPES then record_message(event_data)
    else false
    end
  end
end

#remove_by_id(message_id) ⇒ Boolean

Removes an entry by its message ID. Used when a pending message is recalled for editing or deleted by another client.

Parameters:

  • message_id (Integer)

    database ID of the message to remove

Returns:

  • (Boolean)

    true if the entry was found and removed



161
162
163
164
165
166
167
168
169
170
# File 'lib/tui/message_store.rb', line 161

def remove_by_id(message_id)
  @mutex.synchronize do
    entry = @entries_by_id.delete(message_id)
    return false unless entry

    @entries.delete(entry)
    @version += 1
    true
  end
end

#remove_by_ids(message_ids) ⇒ Integer

Removes entries by their message IDs. Used when the brain reports that messages have left the LLM’s viewport (context window eviction). Acquires the mutex once for the entire batch.

Parameters:

  • message_ids (Array<Integer>)

    database IDs of messages to remove

Returns:

  • (Integer)

    count of entries actually removed



178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/tui/message_store.rb', line 178

def remove_by_ids(message_ids)
  @mutex.synchronize do
    removed = 0
    message_ids.each do |message_id|
      entry = @entries_by_id.delete(message_id)
      next unless entry

      @entries.delete(entry)
      removed += 1
    end
    @version += 1 if removed > 0
    removed
  end
end

#remove_pending(pending_message_id) ⇒ Boolean

Removes a pending message by its PendingMessage ID.

Parameters:

  • pending_message_id (Integer)

    PendingMessage database ID

Returns:

  • (Boolean)

    true if found and removed



133
134
135
136
137
138
139
140
141
142
# File 'lib/tui/message_store.rb', line 133

def remove_pending(pending_message_id)
  @mutex.synchronize do
    entry = @pending_by_id.delete(pending_message_id)
    return false unless entry

    @pending_entries.delete(entry)
    @version += 1
    true
  end
end

#sizeInteger

Returns number of stored entries including pending (no array copy).

Returns:

  • (Integer)

    number of stored entries including pending (no array copy)



59
60
61
# File 'lib/tui/message_store.rb', line 59

def size
  @mutex.synchronize { @entries.size + @pending_entries.size }
end

#versionInteger

Monotonically increasing counter that bumps on every mutation. Consumers compare this to a cached value to detect changes without copying the full entries array on every frame.

Returns:

  • (Integer)


49
50
51
# File 'lib/tui/message_store.rb', line 49

def version
  @mutex.synchronize { @version }
end