Class: LLM::Stream::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/llm/stream/queue.rb

Overview

A small queue for collecting streamed tool work. Values can be immediate Function::Return objects or concurrent handles returned by Function#spawn. Calling #wait(strategy) resolves queued work and returns an array of Function::Return values.

Instance Method Summary collapse

Constructor Details

#initializeLLM::Stream::Queue



12
13
14
# File 'lib/llm/stream/queue.rb', line 12

def initialize
  @items = []
end

Instance Method Details

#<<(item) ⇒ LLM::Stream::Queue

Enqueue a function return or spawned task.

Parameters:

Returns:



20
21
22
23
# File 'lib/llm/stream/queue.rb', line 20

def <<(item)
  @items << item
  self
end

#empty?Boolean

Returns true when the queue is empty.

Returns:

  • (Boolean)


28
29
30
# File 'lib/llm/stream/queue.rb', line 28

def empty?
  @items.empty?
end

#wait(strategy) ⇒ Array<LLM::Function::Return> Also known as: value

Waits for queued work to finish and returns function results.

Parameters:

  • strategy (Symbol)

    Controls concurrency strategy:

    • ‘:thread`: Use threads

    • ‘:task`: Use async tasks (requires async gem)

    • ‘:fiber`: Use raw fibers

Returns:



40
41
42
43
44
45
46
47
48
# File 'lib/llm/stream/queue.rb', line 40

def wait(strategy)
  returns, tasks = @items.shift(@items.length).partition { LLM::Function::Return === _1 }
  returns.concat case strategy
  when :thread then LLM::Function::ThreadGroup.new(tasks).wait
  when :task then LLM::Function::TaskGroup.new(tasks).wait
  when :fiber then LLM::Function::FiberGroup.new(tasks).wait
  else raise ArgumentError, "Unknown strategy: #{strategy.inspect}. Expected :thread, :task, or :fiber"
  end
end