Class: Events::Subscribers::Persister
- Inherits:
-
Object
- Object
- Events::Subscribers::Persister
- Includes:
- Events::Subscriber
- Defined in:
- lib/events/subscribers/persister.rb
Overview
Persists all events to SQLite as they flow through the event bus. Each event is written as an Event record belonging to the active session.
When initialized with a specific session, all events are saved to that session. When initialized without one (global mode), the session is looked up from the event’s session_id payload field.
Instance Attribute Summary collapse
-
#session ⇒ Object
Returns the value of attribute session.
Instance Method Summary collapse
-
#emit(event) ⇒ Object
Receives a Rails.event notification hash and persists it.
-
#initialize(session = nil) ⇒ Persister
constructor
A new instance of Persister.
Constructor Details
#initialize(session = nil) ⇒ Persister
Returns a new instance of Persister.
24 25 26 27 |
# File 'lib/events/subscribers/persister.rb', line 24 def initialize(session = nil) @session = session @mutex = Mutex.new end |
Instance Attribute Details
#session ⇒ Object
Returns the value of attribute session.
22 23 24 |
# File 'lib/events/subscribers/persister.rb', line 22 def session @session end |
Instance Method Details
#emit(event) ⇒ Object
Receives a Rails.event notification hash and persists it.
Skips non-pending user messages — those are persisted by AgentRequestJob inside a transaction with LLM delivery (Bounce Back, #236). Also skips event types not in Event::TYPES (transient events like BounceBack).
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/events/subscribers/persister.rb', line 37 def emit(event) payload = event[:payload] return unless payload.is_a?(Hash) event_type = payload[:type] return if event_type.nil? return unless Event::TYPES.include?(event_type) return if persisted_by_job?(event_type, payload) target_session = @session || Session.find_by(id: payload[:session_id]) return unless target_session @mutex.synchronize do target_session.events.create!( event_type: event_type, payload: payload, status: payload[:status], tool_use_id: payload[:tool_use_id], timestamp: payload[:timestamp] || Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond) ) end end |