Skip to content

grelinfo/grelmicro

Repository files navigation

grelmicro

Micro by design. Fast by default. Import only what you need.

A toolkit of Python primitives built for production needs, for services that coordinate work across processes, workers, or clusters.

PyPI - Version PyPI - Python Version License: MIT codecov uv Ruff ty

Project status: Active development. grelmicro is pre-1.0. The public API is not yet stable. Breaking changes are allowed on MINOR bumps (0.14.00.15.0) and never on PATCH. Pin the minor: grelmicro>=0.14.0,<0.15.0. After 1.0.0, standard semver applies. See the versioning policy.


Documentation: https://grelinfo.github.io/grelmicro/

Source Code: https://github.com/grelinfo/grelmicro


Why grelmicro

grelmicro gives you the building blocks every Python service needs: locks, rate limits, circuit breakers, cache, logging, health checks, and task scheduling. Each is a small, focused module with a pluggable backend.

It is built for any Python application, from a standalone script to full microservice patterns and self-contained systems, with a strong focus on solving distributed system problems. It fits naturally into cloud-native applications, containerized apps, and Kubernetes deployments.

  • Micro: one focused primitive per module. Import only what your code touches. Nothing else is loaded.
  • Fast: small footprint by design. We keep the layers thin so your code stays quick.
  • Async-first: every I/O call is async / await. Drops into FastAPI, FastStream, and any AnyIO-based stack.
  • Backend-agnostic: each primitive is a protocol. Swap Redis for PostgreSQL or SQLite without touching application code.
  • Production-ready: 100% test coverage and full type hints. Pre-1.0 API may shift on minor bumps. 1.x will commit to standard semver.

Modules

Module Summary
Cache @cached decorator with per-key stampede protection. In-memory TTLCache or RedisCacheBackend.
Synchronization Distributed Lock, TaskLock, LeaderElection. Redis, PostgreSQL, SQLite, Kubernetes, in-memory.
Task Scheduler Periodic task execution with optional distributed locking. Lightweight, not a Celery replacement.
Resilience Circuit Breaker and Rate Limiter with pluggable algorithms (TokenBucketConfig, GCRAConfig).
Logging 12-factor logging with JSON, LOGFMT, TEXT, or PRETTY output, structured error rendering, and OpenTelemetry trace context.
Tracing Unified instrumentation. @instrument creates OpenTelemetry spans and enriches log records with structured context.
Health Health check registry with concurrent runners and FastAPI liveness / readiness integration.
JSON Fast JSON via orjson when available, with automatic fallback to stdlib json.

Installation

pip install grelmicro

See the Installation guide for uv and poetry commands, plus optional extras for Redis, PostgreSQL, SQLite, Kubernetes, OpenTelemetry, and structlog.

Example

FastAPI integration

Create a file main.py with:

import logging
from contextlib import asynccontextmanager

from fastapi import FastAPI, HTTPException, Request

import grelmicro
from grelmicro import cache, resilience, sync
from grelmicro.cache import JsonSerializer, TTLCache, cached
from grelmicro.cache.redis import RedisCacheBackend
from grelmicro.logging import configure_logging
from grelmicro.resilience import (
    CircuitBreaker,
    RateLimitExceededError,
    RateLimiter,
)
from grelmicro.resilience.redis import RedisRateLimiterBackend
from grelmicro.sync import LeaderElection, Lock
from grelmicro.sync.redis import RedisSyncBackend
from grelmicro.task import TaskManager

logger = logging.getLogger(__name__)

# === grelmicro ===
task = TaskManager()
sync.register(RedisSyncBackend("redis://localhost:6379/0"))
cache.register(RedisCacheBackend("redis://localhost:6379/0", prefix="myapp:"))
resilience.register(RedisRateLimiterBackend("redis://localhost:6379/0"))

leader_election = LeaderElection("leader-election")
task.add_task(leader_election)

ttl_cache = TTLCache(ttl=300, serializer=JsonSerializer())


# === FastAPI ===
@asynccontextmanager
async def lifespan(app):
    configure_logging()
    async with grelmicro.lifespan(task):
        yield


app = FastAPI(lifespan=lifespan)


# --- Cache: avoid redundant database queries ---
@cached(ttl_cache)
async def get_user(user_id: int) -> dict:
    return {"id": user_id, "name": "Alice"}


@app.get("/users/{user_id}")
async def read_user(user_id: int):
    return await get_user(user_id)


# --- Circuit Breaker: protect calls to an unreliable service ---
cb = CircuitBreaker("my-service")


@app.get("/")
async def read_root():
    async with cb:
        return {"Hello": "World"}


# --- Rate Limiter: protect endpoints from overload ---
api_limiter = RateLimiter.gcra("api", limit=100, window=60)


@app.get("/api")
async def api_endpoint(request: Request):
    try:
        await api_limiter.acquire_or_raise(key=request.client.host)
    except RateLimitExceededError as exc:
        raise HTTPException(
            status_code=429,
            detail="Too many requests",
            headers={"Retry-After": str(int(exc.retry_after))},
        )
    return {"status": "ok"}


# --- Distributed Lock: synchronize access to a shared resource ---
lock = Lock("shared-resource")


@app.get("/protected")
async def protected():
    async with lock:
        return {"status": "ok"}


# --- Interval Task: run locally on every worker ---
@task.interval(seconds=5)
def heartbeat():
    logger.info("heartbeat")


# --- Distributed Task: run once per interval across all workers ---
@task.interval(seconds=60, max_lock_seconds=300)
def cleanup():
    logger.info("cleanup")


# --- Leader-gated Task: only the leader executes ---
@task.interval(seconds=10, leader=leader_election)
def leader_only_task():
    logger.info("leader task")

License

This project is licensed under the terms of the MIT license.

About

Grelmicro is a lightweight framework/toolkit which is ideal for building async microservices in Python.

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Contributors

Languages