Class: AgentRequestJob

Inherits:
ApplicationJob show all
Defined in:
app/jobs/agent_request_job.rb

Overview

Executes an LLM agent loop as a background job with retry logic for transient failures (network errors, rate limits, server errors).

Supports two modes:

**Immediate Persist (message_id provided):** The user message was already persisted and broadcast by the caller (e.g. SessionChannel#speak). The job verifies LLM delivery — if the first API call fails, the message is deleted and a Events::BounceBack is emitted so clients can restore the text to the input field.

**Standard (no message_id):** Processes already-persisted messages (e.g. after pending message promotion). Uses ActiveJob retry/discard for error handling.

Examples:

Immediate Persist — message already saved by SessionChannel

AgentRequestJob.perform_later(session.id, message_id: 42)

Standard — pending message processing

AgentRequestJob.perform_later(session.id)

Constant Summary collapse

AUTH_REQUIRED_ACTION =

ActionCable action signaling clients to prompt for an API token.

"authentication_required"

Instance Method Summary collapse

Instance Method Details

#perform(session_id, message_id: nil) ⇒ Object

Parameters:

  • session_id (Integer)

    ID of the session to process

  • message_id (Integer, nil) (defaults to: nil)

    ID of a pre-persisted user message (triggers delivery verification)



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'app/jobs/agent_request_job.rb', line 53

def perform(session_id, message_id: nil)
  session = Session.find(session_id)

  # Atomic: only one job processes a session at a time.
  return unless claim_processing(session_id)

  run_analytical_brain_blocking(session)

  agent_loop = AgentLoop.new(session: session)

  if message_id
    deliver_persisted_message(session, message_id, agent_loop)
  else
    agent_loop.run
  end

  # Process any pending messages queued while we were busy.
  loop do
    promoted = session.promote_pending_messages!
    break if promoted == 0
    agent_loop.run
  end

  session.schedule_analytical_brain!
ensure
  release_processing(session_id)
  clear_interrupt(session_id)
  agent_loop&.finalize
end