RepoAmazon (Nova)Amazon (Nova)published Apr 1, 2025seen 5d

amazon-science/concurry

Python

Open original ↗

Captured source

source ↗
published Apr 1, 2025seen 5dcaptured 8hhttp 200method plain

amazon-science/concurry

Description: Scaling made stupid: Accelerate your AI research and production workloads

Language: Python

License: Apache-2.0

Stars: 18

Forks: 1

Open issues: 1

Created: 2025-04-01T05:31:25Z

Pushed: 2026-06-10T10:09:34Z

Default branch: mainline

Fork: no

Archived: no

README:

Concurry

Parallelism made simple, both for humans and AI agents.

Concurry is a unified, delightful concurrency library for Python. It replaces the fragmented landscape of threading, multiprocessing, asyncio, and Ray with a single, elegant API. Write your code once, and run it on a single thread, multiple cores, or a distributed cluster—without changing a line of business logic.

---

🚀 Quickstart: 50x Speedup in 3 Lines of Code

Calling LLMs sequentially is painfully slow. With Concurry, you can parallelize your existing code instantly.

Prerequisites: pip install concurry litellm

from pydantic import BaseModel
import litellm
# Line 1. Import concurry
from concurry import worker, gather

# Line 2. Add the @worker decorator to an existing class
@worker(mode="thread", max_workers=50)
class LLM(BaseModel):
model: str

def call(self, prompt: str) -> str:
# This runs in a separate thread!
return litellm.completion(
model=self.model,
messages=[{"role": "user", "content": prompt}]
).choices[0].message.content

# Initialize your worker (looks just like a normal class)
llm = LLM(model="gpt-3.5-turbo")

prompts = [f"What is {i} + {i}?" for i in range(100)]
results = [llm.call(prompt) for prompt in prompts] # Returns futures instantly, runs in parallel
# Line 3. gather futures
results = gather(results, progress=True) # Waits for all results

print(f"Processed {len(results)} prompts!")
llm.stop()

The Result:

  • Sequential: ~780 seconds
  • Concurry: ~16 seconds (50x faster)

No refactoring. No concurrent.futures. No async def virus. No ray.remote. Just your code, parallelized. We think that's delicious 🤤

---

📦 Installation

pip install concurry

For distributed computing support:

pip install "concurry[ray]"

For all features:

pip install "concurry[all]"

---

💡 Why Concurry?

The Problem: Fragmentation

Python's concurrency tools are scattered.

  • Threading: Good for I/O, bad API (concurrent.futures).
  • Multiprocessing: Good for CPU, hard to debug, pickling errors.
  • Asyncio: High throughput, but requires rewriting everything (async/await).
  • Ray: Powerful for clusters, but heavyweight for scripts.

The Solution: Unified API

Concurry abstracts all of these into a single interface.

Without Concurry (The Old Way)

You have to learn 4 different APIs to do the same thing.

# ❌ Threading API
with ThreadPoolExecutor() as executor:
future = executor.submit(task, arg)

# ❌ Multiprocessing API (Different behavior!)
with ProcessPoolExecutor() as executor:
future = executor.submit(task, arg)

# ❌ Asyncio API (Rewrite everything!)
async def main():
await asyncio.create_task(async_task(arg))

# ❌ Ray API (Another new API!)
ray.get(ray_task.remote(arg))

With Concurry (The Delightful Way)

One API, any backend.

from concurry import worker, gather

@worker
class MyWorker:
def do_work(self, x: int) -> int:
return x * 2

# Run on threads?
w = MyWorker.options(mode="thread", max_workers=10).init()

# Run on processes? Uncomment below.
# w = MyWorker.options(mode="process", max_workers=10).init()

# Run on a ray cluster? Uncomment below.
# w = MyWorker.options(mode="ray", max_workers=10).init()

# Run on asyncio? Uncomment below.
# w = MyWorker.options(mode="asyncio").init()

# The submission code NEVER changes:
futures = [w.do_work(i) for i in range(1000)]
# The collection code NEVER changes:
results = gather(futures, progress=True)
w.stop()

---

✨ Key Features

🎭 Actor-Based Workers

Stateful workers that persist across calls. Perfect for database connections, model weights, or session management.

from concurry import worker

@worker(mode="thread")
class Counter:
def __init__(self):
self.count = 0

def increment(self) -> int:
self.count += 1
return self.count

# State is preserved!
counter = Counter()
print(counter.increment().result()) # 1
print(counter.increment().result()) # 2
counter.stop()

🚦 Rate Limiting

Built-in rate limiting for APIs. Token buckets, sliding windows, and more, enforced globally across all workers.

from concurry import worker, gather, CallLimit

@worker(
mode="thread",
max_workers=20,
# Limit to 100 calls per minute across ALL 20 threads
limits=[CallLimit(window=60, capacity=100)]
)
class APIWorker:
def fetch(self, url: str):
# Rate limit is automatically checked here
return f"Fetched {url}"

pool = APIWorker()
futures = [pool.fetch(f"url_{i}") for i in range(200)]
results = gather(futures, progress=True) # Smoothly throttled!
pool.stop()

🔁 Intelligent Retries

Don't let flaky networks break your batch jobs. Configure retries declaratively.

from concurry import worker, RetryConfig

@worker(
mode="thread",
retry_config=RetryConfig(
max_retries=5,
retry_on=(ConnectionError, TimeoutError),
backoff_factor=2.0 # Exponential backoff: 1s, 2s, 4s, ...
)
)
class FlakyWorker:
def fetch(self):
# Automatically retried on failure!
pass

✅ Pydantic Integration

Full support for Pydantic models. Arguments are validated and coerced before they even reach the worker.

from concurry import worker
from pydantic import BaseModel, Field

@worker(mode="process")
class DataWorker(BaseModel):
db_url: str = Field(..., pattern=r"^postgres://")

def process(self, data: dict):
return data

# Validated at initialization!
try:
w = DataWorker(db_url="invalid-url")
except Exception as e:
print(f"Validation failed!: {e}") # Caught before worker starts

🎬 The @task Decorator

Just want to run a function in parallel? You don't need a class.

from concurry import task, gather
import time

@task(mode="process", max_workers=4)
def heavy_computation(x: int) -> int:
time.sleep(1) ## Example heavy computation
return x

# Run 100 heavy computations in parallel
futures = [heavy_computation(i) for i in range(100)]
results = gather(futures, progress=True)
heavy_computation.stop()

---

📚 Documentation

  • [User Guide](https://amazon-science.github.io/concurry/user-guide/getting-started/): Tutorials and best practices.
  • **[API…

Excerpt shown — open the source for the full document.

Notability

notability 3.0/10

Low-stars Amazon research repo

Amazon (Nova) has a repo signal matching infrastructure, product and customer.