Concurrency
Do supports concurrent execution through strands - lightweight asynchronous
tasks. See the strand module for full API
details.
Strand Concepts
Key features of strands:
- Lightweight: Strands are not full OS threads
- Asynchronous: Strands suspend when performing operations that would
otherwise block, such as file I/O, allowing other strands to run. This is
handled transparently, without explicit
asyncorawaitsyntax. Strands are not preemptible in general, so tight CPU-bound loops should be avoided. - Cancellable: You can terminate a strand early, in which case it will
raise a
CanceledErrorwhen it next attempts to suspend.
Scoped vs Background Strands
Do distinguishes between two kinds of strands:
Scoped strands (used by
strand.fork and
strand.pipeline)
are always joined before the function that creates them returns. You don't need
to manage them manually, and cancellation propagates automatically from parent
to child strands.
Background strands (created by
strand.spawn and
strand.stream) are not tied to a scope.
The strand runs independently and may outlive the spawning context. You must
manage it manually through the returned Strand
handle using join to wait for completion
and possibly cancel to terminate it.
Spawning Strands
spawn
The strand.spawn function creates a new
background strand:
import strand
let worker = strand.spawn do
echo "Running in background"
42
echo "Main thread continues"
let result = worker.join()
echo "Got result: $result"
The returned Strand handle allows you to:
Concurrent Execution
fork
The strand.fork function executes
multiple blocks concurrently and returns their results as an array:
import strand
let results = strand.fork
- do 42
- do "hello"
- do (1 + 2)
assert_eq $results [42, "hello", 3]
All blocks become runnable simultaneously and the function waits for all to complete. Results are returned in the same order as the input blocks.
Pipelines
pipeline
The
strand.pipeline
function connects multiple stages into a data processing pipeline:
import strand
let result = strand.pipeline
do strand.from [1, 2, 3, 4, 5]
do strand.where do |x| (x > 2)
do strand.each do |x| (x * 2)
do strand.collect()
assert_eq $result [6, 8, 10]
Each stage runs in its own strand, with implicit channels connecting output to input.
Channel Communication
channel
The strand.channel function creates a
sender/receiver pair for communicating between strands:
import strand
let send recv = strand.channel()
let worker = strand.spawn do
send.put 1
send.put 2
send.put 3
send.close()
for value = recv
echo $value
worker.join()
Channels have a fixed capacity (default 1).
Streams
stream
The strand.stream function creates a
background strand with channels pre-wired to its input and output, returning a
Stream handle. A Stream implements Iterable for
its output side and Sinkable for its input side, making it easy to bridge
background processing with the rest of your program without manually creating
and threading channels.
import strand
let s = strand.stream do strand.each do |x| (x * 2)
let input = s.sink()
let output = s.iter()
input.put 21
assert_eq (output.next()) 42
s.join()
Inside the callable, the strand reads from input with strand.next() and writes
to output with strand.put — the same as any pipeline stage. Pipeline stage
functions like each and where work unchanged.
Built-in Pipeline Stages
Several functions are designed to work as pipeline stages:
strand.from- emits values from an iterablestrand.where- filters values by a predicatestrand.each- transforms valuesstrand.collect- gathers values into a collection
Error Handling
When a strand exits with an error:
- If you call
join, the error is re-raised - In
forkandpipelinestrands, all sibiling strands are canceled. After all strands complete, an arbitrary error among all failed strands is re-raised. Errors that were not caused by sibling cancellation (e.g.CanceledError, orIterStopandSinkStoperrors in pipelines) are prioritized.
Cancellation
When a strand is cancelled (either by propagation from a parent strand or
explicitly), current and subsequent suspending operations fail with a
CanceledError. This effect is masked during finally blocks in
try/catch/finally statements to permit
possibly-suspending calls (e.g. to clean up temporary files).