This guide shows how to define a schema, run migrations, and perform CRUD with mongreldb_kit.
pip install mongreldb-kitimport os
import tempfile
from mongreldb_kit import (
Database,
DuplicateError,
ForeignKeyError,
RestrictError,
bool_,
fk,
int,
table,
text,
unique,
)
def schema():
return {
"tables": [
table(
name="users",
id=1,
columns=[
int("id", 1, primary_key=True),
text("email", 2),
text("name", 3, nullable=True),
],
primary_key="id",
unique_constraints=[unique("uq_user_email", "email")],
),
table(
name="posts",
id=2,
columns=[
int("id", 1, primary_key=True),
int("user_id", 2),
text("title", 3),
text("body", 4, nullable=True),
bool_("published", 5, default={"static": False}),
],
primary_key="id",
foreign_keys=[
fk(
"fk_posts_user",
"user_id",
references_table="users",
references_columns="id",
on_delete="cascade",
)
],
),
]
}
def tmp_db():
return os.path.join(tempfile.mkdtemp(), "app.kitdb")
def main():
path = tmp_db()
# Create or open the database.
db = Database.create(path, schema())
# Run migrations.
db.migrate(
[
{
"version": 1,
"name": "initial",
"ops": [
{"create_table": {"name": "users"}},
{"create_table": {"name": "posts"}},
],
}
]
)
# Insert users.
with db.begin() as txn:
alice = txn.insert("users", {"id": 1, "email": "alice@example.com", "name": "Alice"})
bob = txn.insert("users", {"id": 2, "email": "bob@example.com"})
txn.commit()
# Insert a post.
with db.begin() as txn:
post = txn.insert(
"posts",
{"id": 1, "user_id": alice["id"], "title": "Hello Kit", "body": "First post."},
)
txn.commit()
# Query posts by user, ordered by id descending.
with db.begin() as txn:
rows = txn.select(
"posts",
filter={"user_id": {"eq": 1}},
order="-id",
limit=10,
)
for row in rows:
print(row)
# Update the post.
with db.begin() as txn:
txn.update("posts", 1, {"published": True})
txn.commit()
# Deleting Alice cascades to her posts because of on_delete='cascade'.
with db.begin() as txn:
txn.delete("users", 1)
txn.commit()
if __name__ == "__main__":
main()| Function | Purpose |
|---|---|
table(...) |
Build a table dictionary |
int(name, id, **kwargs) / integer(...) |
64-bit integer column |
text(name, id, **kwargs) |
UTF-8 text column |
bool_(name, id, **kwargs) / boolean(...) |
Boolean column |
float_(name, id, **kwargs) / float64(...) |
64-bit float column |
json_col(name, id, **kwargs) |
JSON column |
bytes_col(name, id, **kwargs) |
Bytes column |
timestamp(name, id, **kwargs) |
Timestamp column |
date(name, id, **kwargs) |
Date column |
datetime(name, id, **kwargs) |
DateTime column |
index(name, columns, unique=False) |
Index definition |
unique(name, columns) |
Unique constraint |
fk(name, columns, references_table, references_columns, on_delete='restrict') |
Foreign key |
check(name, expr) |
Table check constraint |
| Kwarg | Effect |
|---|---|
nullable=True |
Allow None values |
primary_key=True |
Mark as part of the primary key |
default=... |
Default value (see shapes below) |
generated=True |
Auto-generate on insert/update |
enum_values=[...] |
Restrict string values |
min=..., max=... |
Numeric range |
min_length=..., max_length=... |
String/bytes length |
regex=... |
Pattern match |
check_expr="..." |
Column check as a serialized expression string (e.g. "price_cents >= 0") |
Default shapes mirror the cross-language DefaultKind JSON: {"static": <value>},
{"sequence": "<name>"}, {"custom_name": "<name>"}, and the bare strings "now" and "uuid".
A column whose default is {"sequence": ...} is auto-assigned a 1-based id when the inserted row
omits it (the first row is 1, never 0). check_expr and the table-level check(name, expr) use
the serialized string-expression grammar — the cross-language form, not a Python callable.
Use the context manager for automatic commit/rollback:
with db.begin() as txn:
txn.insert("users", {"id": 1, "email": "alice@example.com"})
# committed automatically on exitExplicit control is also available:
txn = db.begin()
try:
txn.insert("users", {...})
txn.commit()
except Exception:
txn.rollback()txn.insert_many(table, rows) stages an iterable of rows in the open transaction and returns the
stored rows as a list[dict] in order. It runs the same per-row defaults, validation, sequence
ids, and guards as insert, but stages the whole batch so one commit writes it — far faster than
a row-at-a-time loop, and all-or-nothing on failure. A single-column primary key preloads existing
keys once so the per-row duplicate check stays O(1).
with db.begin() as txn:
rows = txn.insert_many("products", [
{"sku": "A-1", "name": "Anvil"},
{"sku": "B-1", "name": "Bucket"},
])
# rows[0]["id"] == 1, rows[1]["id"] == 2 — sequence ids assigned in ordertxn.select accepts a friendly object filter and an order string:
rows = txn.select(
"posts",
filter={"published": {"eq": True}, "user_id": {"gt": 0}},
order="-placed_at",
limit=10,
offset=0,
columns=["id", "title"], # optional projection; omit for all columns
distinct=False,
)Per-column operators: eq, ne, gt, gte, lt, lte, like, contains, in, not_in,
is_null, is_not_null, in_subquery. A bare value ({"user_id": 1}) is shorthand for eq.
Top-level logical keys combine column predicates: and / or (a list of filters), not (a
filter), and exists / not_exists (a subselect). Multiple keys at one level are AND-ed.
Order syntax:
"+id"or"id"— ascending"-id"— descending"-placed_at,+id"— multiple columns
txn.aggregate runs group-by/having; build specs with the agg helper (count, sum, min,
max, avg):
from mongreldb_kit import agg
rows = txn.aggregate(
"orders",
aggregates=[agg("count", "n"), agg("sum", "spent", "amount")],
group_by=["customer_id"],
having={"n": {"gt": 1}},
)txn.join runs nested-loop joins; describe each join with kind (inner/left/cross) and an
on predicate built with on_eq:
from mongreldb_kit import on_eq
rows = txn.join(
"orders",
alias="o",
joins=[{"kind": "inner", "table": "customers", "alias": "c",
"on": on_eq("o.customer_id", "c.id")}],
)txn.select also takes ctes=[{"name", "table", ...}] to materialize common table expressions
before the body runs. Joins, aggregates, group/having, and CTEs are computed in memory.
db.migrate([
{"version": 1, "name": "init", "ops": [{"create_table": {"name": "users"}}]},
{"version": 2, "name": "add_posts", "ops": [{"create_table": {"name": "posts"}}]},
])Beyond create / open / begin / migrate, the Database handle exposes:
db.allocate_sequence("orders_id_seq") # next 1-based value (count=1 by default)
db.allocate_sequence("orders_id_seq", 10) # reserve 10, returns the first
db.table_names() # application tables (excludes __kit_* internals)
db.set_schema(schema()) # refresh the in-memory schema without migrating
db.transaction(lambda txn: txn.insert("users", {...})) # commit on success, retry on conflictdb.transaction(fn, max_retries=5) runs fn(txn), commits on success, rolls back on any error, and
retries the whole callback when a ConflictError (a retryable write-write conflict) is raised.
The byte-identical key encoders used internally are exposed for tooling and tests. Components are
typed values — {"int": n}, {"text": s}, or {"null": True} — so the integer 1 and the text
"1" never collide:
from mongreldb_kit import encode_pk, encode_unique_key, encode_row_guard_key
encode_pk([{"int": 1}]) # primary-key bytes
encode_unique_key(1, "uq_user_email", [{"text": "a@example.com"}])
encode_row_guard_key("users", [{"int": 1}])Exceptions carry a stable code attribute:
from mongreldb_kit import DuplicateError, ForeignKeyError, RestrictError
try:
txn.insert("users", {"id": 2, "email": "alice@example.com"})
except DuplicateError as exc:
print(exc.code) # DUPLICATEAvailable exceptions: ValidationError, DuplicateError, ForeignKeyError, RestrictError, MigrationError, ConflictError, StorageError, IntegrityError.
Save the file as kit_demo.py and run:
python kit_demo.py- Query builder — the full query model these helpers serialize.
- Constraints and Errors — the rules and the typed failures.
- Migrations — migration ops and the runner.
- TypeScript · Rust — the sibling language surfaces.