Class: TUI::MessageStore

Inherits:
Object
  • Object
show all
Defined in:
lib/tui/message_store.rb

Overview

Thread-safe in-memory store for chat entries displayed in the TUI. Replaces Events::Subscribers::MessageCollector in the WebSocket-based TUI, with no dependency on Rails or the Events module.

Accepts Action Cable message payloads and stores typed entries:

  • ‘:rendered, data:, message_type:, id:` for messages with structured decorator output

  • ‘:message, role:, content:, id:` for user/agent messages (fallback)

  • ‘:tool_counter, calls:, responses:` for tool activity

Structured data takes priority when available. Messages with nil rendered content fall back to existing behavior: tool messages aggregate into counters, conversation messages store role and content.

Entries with message IDs are maintained in ID order (ascending) regardless of arrival order, preventing misordering from race conditions between live broadcasts and viewport replays. Duplicate IDs are deduplicated by updating the existing entry.

Tool counters aggregate per agent turn: a new counter starts when a tool_call arrives after a conversation entry. Consecutive tool messages increment the same counter until the next conversation message breaks the chain.

When a message arrives with ‘“action” => “update”` and a known `“id”`, the existing entry is replaced in-place, preserving display order.

Constant Summary collapse

MESSAGE_TYPES =
%w[user_message agent_message].freeze
ROLE_MAP =
{
  "user_message" => "user",
  "agent_message" => "assistant"
}.freeze

Instance Method Summary collapse

Constructor Details

#initializeMessageStore

Returns a new instance of MessageStore.



36
37
38
39
40
41
# File 'lib/tui/message_store.rb', line 36

def initialize
  @entries = []
  @entries_by_id = {}
  @mutex = Mutex.new
  @version = 0
end

Instance Method Details

#clearvoid

This method returns an undefined value.

Removes all entries. Called on view mode change and session switch to prepare for re-decorated viewport messages from the server.



95
96
97
98
99
100
101
# File 'lib/tui/message_store.rb', line 95

def clear
  @mutex.synchronize do
    @entries = []
    @entries_by_id = {}
    @version += 1
  end
end

#last_pending_user_messageHash?

Returns the last pending user message for recall editing. Walks entries backwards and returns the first pending user_message found.

Returns:

  • (Hash, nil)

    ‘Integer, content: String` or nil if none pending



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/tui/message_store.rb', line 107

def last_pending_user_message
  @mutex.synchronize do
    @entries.reverse_each do |entry|
      next unless entry[:message_type] == "user_message"

      if entry[:type] == :rendered && entry.dig(:data, "status") == "pending"
        return {id: entry[:id], content: entry.dig(:data, "content")}
      end

      # Only check the most recent user message
      break
    end
    nil
  end
end

#messagesArray<Hash>

Returns thread-safe copy of stored entries.

Returns:

  • (Array<Hash>)

    thread-safe copy of stored entries



52
53
54
# File 'lib/tui/message_store.rb', line 52

def messages
  @mutex.synchronize { @entries.dup }
end

#process_event(event_data) ⇒ Boolean

Processes a raw event payload from the WebSocket channel. Uses structured decorator data when available; falls back to role/content extraction for messages and tool counter aggregation.

Events with ‘“action” => “update”` and a matching `“id”` replace the existing entry’s data in-place rather than appending.

Parameters:

  • event_data (Hash)

    Action Cable event payload with “type”, “content”, and optionally “rendered” (hash of mode => lines), “id”, “action”

Returns:

  • (Boolean)

    true if the event type was recognized and handled



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/tui/message_store.rb', line 71

def process_event(event_data)
  message_id = event_data["id"]

  if event_data["action"] == "update" && message_id
    return update_existing(event_data, message_id)
  end

  rendered = extract_rendered(event_data)

  if rendered
    record_rendered(rendered, message_type: event_data["type"], id: message_id)
  else
    case event_data["type"]
    when "tool_call" then record_tool_call
    when "tool_response" then record_tool_response
    when *MESSAGE_TYPES then record_message(event_data)
    else false
    end
  end
end

#remove_by_id(message_id) ⇒ Boolean

Removes an entry by its message ID. Used when a pending message is recalled for editing or deleted by another client.

Parameters:

  • message_id (Integer)

    database ID of the message to remove

Returns:

  • (Boolean)

    true if the entry was found and removed



128
129
130
131
132
133
134
135
136
137
# File 'lib/tui/message_store.rb', line 128

def remove_by_id(message_id)
  @mutex.synchronize do
    entry = @entries_by_id.delete(message_id)
    return false unless entry

    @entries.delete(entry)
    @version += 1
    true
  end
end

#remove_by_ids(message_ids) ⇒ Integer

Removes entries by their message IDs. Used when the brain reports that messages have left the LLM’s viewport (context window eviction). Acquires the mutex once for the entire batch.

Parameters:

  • message_ids (Array<Integer>)

    database IDs of messages to remove

Returns:

  • (Integer)

    count of entries actually removed



145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/tui/message_store.rb', line 145

def remove_by_ids(message_ids)
  @mutex.synchronize do
    removed = 0
    message_ids.each do |message_id|
      entry = @entries_by_id.delete(message_id)
      next unless entry

      @entries.delete(entry)
      removed += 1
    end
    @version += 1 if removed > 0
    removed
  end
end

#sizeInteger

Returns number of stored entries (no array copy).

Returns:

  • (Integer)

    number of stored entries (no array copy)



57
58
59
# File 'lib/tui/message_store.rb', line 57

def size
  @mutex.synchronize { @entries.size }
end

#versionInteger

Monotonically increasing counter that bumps on every mutation. Consumers compare this to a cached value to detect changes without copying the full entries array on every frame.

Returns:

  • (Integer)


47
48
49
# File 'lib/tui/message_store.rb', line 47

def version
  @mutex.synchronize { @version }
end