Class: Events::Subscribers::Persister
- Inherits:
-
Object
- Object
- Events::Subscribers::Persister
- Includes:
- Events::Subscriber
- Defined in:
- lib/events/subscribers/persister.rb
Overview
Persists all events to SQLite as they flow through the event bus. Each event is written as a Message record belonging to the active session.
When initialized with a specific session, all events are saved to that session. When initialized without one (global mode), the session is looked up from the event’s session_id payload field.
User messages are NOT persisted here — they are created directly by their callers (SessionChannel#speak, AgentLoop#run) so the message ID is available for bounce-back cleanup. Pending user messages live in the PendingMessage table, outside the event bus.
Instance Attribute Summary collapse
-
#session ⇒ Object
Returns the value of attribute session.
Instance Method Summary collapse
-
#emit(event) ⇒ Object
Receives a Rails.event notification hash and persists it.
-
#initialize(session = nil) ⇒ Persister
constructor
A new instance of Persister.
Constructor Details
#initialize(session = nil) ⇒ Persister
Returns a new instance of Persister.
29 30 31 32 |
# File 'lib/events/subscribers/persister.rb', line 29 def initialize(session = nil) @session = session @mutex = Mutex.new end |
Instance Attribute Details
#session ⇒ Object
Returns the value of attribute session.
27 28 29 |
# File 'lib/events/subscribers/persister.rb', line 27 def session @session end |
Instance Method Details
#emit(event) ⇒ Object
Receives a Rails.event notification hash and persists it.
Skips user messages — those are persisted by their callers (SessionChannel#speak, AgentLoop#run). Also skips event types not in Message::TYPES (transient events like BounceBack).
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/events/subscribers/persister.rb', line 42 def emit(event) payload = event[:payload] return unless payload.is_a?(Hash) event_type = payload[:type] return if event_type.nil? return unless Message::TYPES.include?(event_type) return if event_type == "user_message" target_session = @session || Session.find_by(id: payload[:session_id]) return unless target_session @mutex.synchronize do target_session..create!( message_type: event_type, payload: payload, tool_use_id: payload[:tool_use_id], timestamp: payload[:timestamp] || Time.current.to_ns ) end end |