Audio call-centre analysis POC: ingest recordings from S3, store audio as Lance blob v2, transcribe with SenseVoice, analyse with DeepSeek, append acoustic embeddings, and query by scalar filter or nearest-neighbour.
The toolkit uses the workflow/ pipeline: analysis runs first, then audio blobs
and analysis metadata are co-ingested into the Lance asset table.
Manifest (parquet / jsonl / csv)
doc_id, s3_url
│
▼ Stage 1 — workflow/analyze.py
│ Daft: read manifest → download audio bytes → duration filter
│ → SenseVoice ASR (transcript + acoustic_emotion)
│ → PII redaction (ID card, phone numbers)
│ → DeepSeek LLM (downgrade_related, bad_tone, emotion_score …)
│ → [optional --embed] audio_embedding (128-dim)
│
├── (no --embed) → JSONL on S3
└── (--embed) → Lance staging table on S3
│
▼ Stage 2 — workflow/ingest.py
│ Daft: read JSONL or Lance staging table
│ → download audio blobs from s3_url
│ → stamp ingest_time
│ → write_lance (blob v2, append)
│ → validate blob v2
│
▼ Lance asset table (blob v2, local or S3)
│ columns: doc_id, s3_url, audio_blob,
│ transcript, acoustic_emotion,
│ downgrade_related, primary_reason,
│ secondary_reason, summary, confidence,
│ text_emotion, bad_tone, emotion_score,
│ [audio_embedding], ingest_time
│
▼ Stage 3 — workflow/index.py
│ lance_ray : IVF_PQ index on audio_embedding
│ pylance : ZONEMAP index on ingest_time
│
├──▶ Stage 4 — workflow/query.py
│ Daft SQL (daft.sql()) : --sql (scalar / aggregation)
│ Daft scanner pushdown : --where (scalar filter)
│ Daft scanner nearest : --vector-from (ANN, IVF index)
│
└──▶ Stage 5 — workflow/manage.py
pylance ds.delete() : --before / --after
lance_ray.compact_files : automatic after delete
| Engine | Used for | Reason |
|---|---|---|
| Daft | manifest read, S3 download, ASR/LLM pipeline, Lance write (Stage 1 & 2), scalar and ANN query | Primary engine; stable APIs |
| lance_ray | IVF_PQ vector index creation, compact_files |
Preferred for Lance table management; distributed Ray workers |
| pylance | ZONEMAP scalar index, row delete, cleanup_old_versions |
ZONEMAP: lance_ray requires unreleased code; delete: only API |
| daft_lance | fallback for compact_files if lance_ray unavailable |
Not used for index; Daft-first applies to data processing only |
uv sync --upgradeCreate a .env file (or export variables directly):
# S3 / MinIO
MINIO_ENDPOINT=http://127.0.0.1:9000
MINIO_ROOT_USER=minioadmin
MINIO_ROOT_PASSWORD=minioadmin
MINIO_REGION=us-east-1
# LLM — leave blank to skip DeepSeek analysis (fields will be null)
DEEPSEEK_API_KEY=sk-...
DEEPSEEK_BASE_URL=https://api.deepseek.com
DEEPSEEK_MODEL=deepseek-chat
# ASR device
ASR_DEVICE=cpu # or cuda
# Duration filter applied in Stage 1
MIN_DURATION_S=0
MAX_DURATION_S=1800
# Embedding backend used in Stage 1 --embed
EMBED_BACKEND=signal # signal (128-dim RMS+ZCR) or wav2vec2
# Daft runner
USE_RAY=0 # set to 1 to use Ray for Daft-backed steps
RAY_ADDRESS= # leave empty to start/join local RayThe manifest must be parquet, jsonl, or csv with at minimum doc_id and s3_url columns.
--lance-uri accepts both local paths and s3:// URIs.
Downloads audio from S3, runs ASR and LLM analysis, writes output to S3.
# Output: JSONL (no embeddings)
python -m multimodal_toolkit.workflow.analyze \
--manifest s3://bucket/audio/manifest.parquet \
--out s3://bucket/audio/analysis.jsonl
# Output: Lance staging table (includes audio_embedding; required for ANN search later)
python -m multimodal_toolkit.workflow.analyze \
--manifest s3://bucket/audio/manifest.parquet \
--out s3://bucket/audio/staging.lance \
--embedReads Stage 1 output, downloads audio blobs, and appends them together with the analysis metadata into the Lance asset table.
python -m multimodal_toolkit.workflow.ingest \
--analysis s3://bucket/audio/analysis.jsonl \
--lance-uri s3://bucket/audio/calls.lancePass a .lance URI to --analysis if Stage 1 was run with --embed.
Builds indexes for fast query. Run after the table has enough rows (IVF_PQ needs at least num_partitions × 256 rows; use --num-partitions 1 for small tables).
# Build both indexes (default)
python -m multimodal_toolkit.workflow.index \
--lance-uri s3://bucket/audio/calls.lance
# Vector index only
python -m multimodal_toolkit.workflow.index \
--lance-uri s3://bucket/audio/calls.lance \
--no-time
# Tune partitions for small tables
python -m multimodal_toolkit.workflow.index \
--lance-uri s3://bucket/audio/calls.lance \
--num-partitions 1 --num-sub-vectors 8# Scalar filter (Daft pushdown via read_lance default_scan_options)
python -m multimodal_toolkit.workflow.query \
--lance-uri s3://bucket/audio/calls.lance \
--where "bad_tone = true OR downgrade_related = true" \
--top-k 20
# Full Daft SQL SELECT (table name in scope: calls)
python -m multimodal_toolkit.workflow.query \
--lance-uri s3://bucket/audio/calls.lance \
--sql "SELECT primary_reason, COUNT(*) AS cnt FROM calls GROUP BY primary_reason ORDER BY cnt DESC"
# Scalar filter with projection via Daft SQL
python -m multimodal_toolkit.workflow.query \
--lance-uri s3://bucket/audio/calls.lance \
--sql "SELECT doc_id, emotion_score, primary_reason FROM calls WHERE bad_tone = true AND emotion_score > 0.5 ORDER BY emotion_score DESC" \
--top-k 20
# ANN vector search via Daft Lance scanner (uses IVF index)
python -m multimodal_toolkit.workflow.query \
--lance-uri s3://bucket/audio/calls.lance \
--vector-from call_001.mp3 \
--top-k 10
# Combined: ANN with scalar pre-filter
python -m multimodal_toolkit.workflow.query \
--lance-uri s3://bucket/audio/calls.lance \
--vector-from call_001.mp3 \
--where "downgrade_related = true" \
--distance-min 0.0 \
--distance-max 1.0 \
--top-k 10Delete rows by ingest date and compact the table:
# Delete rows ingested before a date
python -m multimodal_toolkit.workflow.manage \
--lance-uri s3://bucket/audio/calls.lance \
--before 2025-01-01
# Delete rows outside a date window
python -m multimodal_toolkit.workflow.manage \
--lance-uri s3://bucket/audio/calls.lance \
--after 2024-06-01 --before 2024-12-31Compaction and version cleanup run automatically after delete.
| Component | Version | Notes |
|---|---|---|
| Daft | 0.7.15 | Main execution engine |
| daft-lance | 0.4.0 | read_lance, write_lance, take_blobs, create_scalar_index, compact_files |
| pylance | 7.0.0 | Lance dataset, blob v2, ANN scanner, delete, cleanup |
| lance-ray | 0.4.2 | Vector index creation; write-back path deferred |
| Ray | 2.55.1 | Pulled in by lance-ray; Daft uses native runner unless USE_RAY=1 |
Default Daft runner: native (local multi-threaded). Set USE_RAY=1 to switch to Ray for Daft-backed steps. Stage 3 (lance_ray index) and Stage 4 ANN (pylance scanner) always run locally regardless of USE_RAY.
Audio downloaded twice in the workflow pipeline.
Stage 1 downloads audio bytes for ASR and embedding; Stage 2 downloads the same files again to store as Lance blobs. This is intentional — analysis output (JSONL) does not carry raw bytes across stages. Plan accordingly for bandwidth costs or cache files locally between stages.
--embed in Stage 1 is required for ANN search.
If Stage 1 was run without --embed, the Lance asset table has no audio_embedding column. Stage 3 will error and Stage 4 --vector-from will have nothing to search. Re-run Stage 1 with --embed and re-ingest to add embeddings.
IVF_PQ minimum row count.
The default --num-partitions 16 requires at least 4096 rows. For tables with fewer rows, pass --num-partitions 1 (or skip --embedding and rely on scalar queries only).
DeepSeek key absent → LLM columns are null.
If DEEPSEEK_API_KEY is not set, downgrade_related, bad_tone, primary_reason, summary, confidence, text_emotion, and emotion_score are all null. ASR and acoustic embeddings still run normally.
Blob v2 is validated after every ingest.
validate_blob_v2 raises immediately if Lance silently downgraded audio_blob to large_binary. Never skip this check when testing new library versions.
Local Lance URIs are verified end-to-end.
S3 Lance table write/read is exercised by the underlying libraries but should be treated as a separate validation item for this POC.