Micro by design. Fast by default. Import only what you need.
A toolkit of Python primitives built for production needs, for services that coordinate work across processes, workers, or clusters.
Project status: Active development. grelmicro is pre-1.0. The public API is not yet stable. Breaking changes are allowed on
MINORbumps (0.14.0→0.15.0) and never onPATCH. Pin the minor:grelmicro>=0.14.0,<0.15.0. After1.0.0, standard semver applies. See the versioning policy.
Documentation: https://grelinfo.github.io/grelmicro/
Source Code: https://github.com/grelinfo/grelmicro
grelmicro gives you the building blocks every Python service needs: locks, rate limits, circuit breakers, cache, logging, health checks, and task scheduling. Each is a small, focused module with a pluggable backend.
It is built for any Python application, from a standalone script to full microservice patterns and self-contained systems, with a strong focus on solving distributed system problems. It fits naturally into cloud-native applications, containerized apps, and Kubernetes deployments.
- Micro: one focused primitive per module. Import only what your code touches. Nothing else is loaded.
- Fast: small footprint by design. We keep the layers thin so your code stays quick.
- Async-first: every I/O call is
async/await. Drops into FastAPI, FastStream, and any AnyIO-based stack. - Backend-agnostic: each primitive is a protocol. Swap Redis for PostgreSQL or SQLite without touching application code.
- Production-ready: 100% test coverage and full type hints. Pre-1.0 API may shift on minor bumps.
1.xwill commit to standard semver.
| Module | Summary |
|---|---|
| Cache | @cached decorator with per-key stampede protection. In-memory TTLCache or RedisCacheBackend. |
| Synchronization | Distributed Lock, TaskLock, LeaderElection. Redis, PostgreSQL, SQLite, Kubernetes, in-memory. |
| Task Scheduler | Periodic task execution with optional distributed locking. Lightweight, not a Celery replacement. |
| Resilience | Circuit Breaker and Rate Limiter with pluggable algorithms (TokenBucketConfig, GCRAConfig). |
| Logging | 12-factor logging with JSON, LOGFMT, TEXT, or PRETTY output, structured error rendering, and OpenTelemetry trace context. |
| Tracing | Unified instrumentation. @instrument creates OpenTelemetry spans and enriches log records with structured context. |
| Health | Health check registry with concurrent runners and FastAPI liveness / readiness integration. |
| JSON | Fast JSON via orjson when available, with automatic fallback to stdlib json. |
pip install grelmicroSee the Installation guide for uv and poetry commands, plus optional extras for Redis, PostgreSQL, SQLite, Kubernetes, OpenTelemetry, and structlog.
Create a file main.py with:
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Request
import grelmicro
from grelmicro import cache, resilience, sync
from grelmicro.cache import JsonSerializer, TTLCache, cached
from grelmicro.cache.redis import RedisCacheBackend
from grelmicro.logging import configure_logging
from grelmicro.resilience import (
CircuitBreaker,
RateLimitExceededError,
RateLimiter,
)
from grelmicro.resilience.redis import RedisRateLimiterBackend
from grelmicro.sync import LeaderElection, Lock
from grelmicro.sync.redis import RedisSyncBackend
from grelmicro.task import TaskManager
logger = logging.getLogger(__name__)
# === grelmicro ===
task = TaskManager()
sync.register(RedisSyncBackend("redis://localhost:6379/0"))
cache.register(RedisCacheBackend("redis://localhost:6379/0", prefix="myapp:"))
resilience.register(RedisRateLimiterBackend("redis://localhost:6379/0"))
leader_election = LeaderElection("leader-election")
task.add_task(leader_election)
ttl_cache = TTLCache(ttl=300, serializer=JsonSerializer())
# === FastAPI ===
@asynccontextmanager
async def lifespan(app):
configure_logging()
async with grelmicro.lifespan(task):
yield
app = FastAPI(lifespan=lifespan)
# --- Cache: avoid redundant database queries ---
@cached(ttl_cache)
async def get_user(user_id: int) -> dict:
return {"id": user_id, "name": "Alice"}
@app.get("/users/{user_id}")
async def read_user(user_id: int):
return await get_user(user_id)
# --- Circuit Breaker: protect calls to an unreliable service ---
cb = CircuitBreaker("my-service")
@app.get("/")
async def read_root():
async with cb:
return {"Hello": "World"}
# --- Rate Limiter: protect endpoints from overload ---
api_limiter = RateLimiter.gcra("api", limit=100, window=60)
@app.get("/api")
async def api_endpoint(request: Request):
try:
await api_limiter.acquire_or_raise(key=request.client.host)
except RateLimitExceededError as exc:
raise HTTPException(
status_code=429,
detail="Too many requests",
headers={"Retry-After": str(int(exc.retry_after))},
)
return {"status": "ok"}
# --- Distributed Lock: synchronize access to a shared resource ---
lock = Lock("shared-resource")
@app.get("/protected")
async def protected():
async with lock:
return {"status": "ok"}
# --- Interval Task: run locally on every worker ---
@task.interval(seconds=5)
def heartbeat():
logger.info("heartbeat")
# --- Distributed Task: run once per interval across all workers ---
@task.interval(seconds=60, max_lock_seconds=300)
def cleanup():
logger.info("cleanup")
# --- Leader-gated Task: only the leader executes ---
@task.interval(seconds=10, leader=leader_election)
def leader_only_task():
logger.info("leader task")This project is licensed under the terms of the MIT license.