Class: LLM::Stream::Queue
- Inherits:
-
Object
- Object
- LLM::Stream::Queue
- Defined in:
- lib/llm/stream/queue.rb
Overview
A small queue for collecting streamed tool work. Values can be immediate Function::Return objects or concurrent handles returned by Function#spawn. Calling #wait(strategy) resolves queued work and returns an array of Function::Return values.
Instance Method Summary collapse
-
#<<(item) ⇒ LLM::Stream::Queue
Enqueue a function return or spawned task.
-
#empty? ⇒ Boolean
Returns true when the queue is empty.
- #initialize ⇒ LLM::Stream::Queue constructor
-
#wait(strategy) ⇒ Array<LLM::Function::Return>
(also: #value)
Waits for queued work to finish and returns function results.
Constructor Details
#initialize ⇒ LLM::Stream::Queue
12 13 14 |
# File 'lib/llm/stream/queue.rb', line 12 def initialize @items = [] end |
Instance Method Details
#<<(item) ⇒ LLM::Stream::Queue
Enqueue a function return or spawned task.
20 21 22 23 |
# File 'lib/llm/stream/queue.rb', line 20 def <<(item) @items << item self end |
#empty? ⇒ Boolean
Returns true when the queue is empty.
28 29 30 |
# File 'lib/llm/stream/queue.rb', line 28 def empty? @items.empty? end |
#wait(strategy) ⇒ Array<LLM::Function::Return> Also known as: value
Waits for queued work to finish and returns function results.
40 41 42 43 44 45 46 47 48 |
# File 'lib/llm/stream/queue.rb', line 40 def wait(strategy) returns, tasks = @items.shift(@items.length).partition { LLM::Function::Return === _1 } returns.concat case strategy when :thread then LLM::Function::ThreadGroup.new(tasks).wait when :task then LLM::Function::TaskGroup.new(tasks).wait when :fiber then LLM::Function::FiberGroup.new(tasks).wait else raise ArgumentError, "Unknown strategy: #{strategy.inspect}. Expected :thread, :task, or :fiber" end end |