diff --git a/.env.sample b/.env.sample index 1062fee..50b371c 100644 --- a/.env.sample +++ b/.env.sample @@ -11,10 +11,21 @@ AWS_SECRET=secret ELASTIC_PASSWORD=123 ES_PORT=9200 -ES_STACK_VERSION=8.9.0 +ES_STACK_VERSION=8.16.0 INDEXING_PASSWORD=index123 +# Ollama must be running on the host with mxbai-embed-large pulled. +# On Linux set OLLAMA_URL explicitly — see README. +# OLLAMA_URL=http://host.docker.internal:11434 +SEMANTIC_ENABLED=true +# Index-time embedding via your own HF Dedicated Inference Endpoint (TEI-backed). +# Query-time embedding always stays on local Ollama. Unset either var → ES +# embeds via Ollama at index time too (slower; fine for local dev). +HUGGING_FACE_KEY=key +# HF Inference Endpoint base URL (no trailing slash, no /v1/embeddings — we append). +HF_DEDICATED_URL=https://some-endpoint + # Grafana Cloud — logs (Loki) # Get from: grafana.com → your stack → Loki card → "Send Logs" # Token: My Account → Access Policies → token with logs:write scope diff --git a/.gitignore b/.gitignore index 883972b..f6f0aa4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ -db/01-hadithTable.sql +db/01-hadithdb.sql .env __pycache__ data/ diff --git a/README.md b/README.md index 4617579..667e757 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,229 @@ -# Search -To run: -docker-compose up --build +# sunnah.com Search API -Then visit: +Flask + Elasticsearch search service for sunnah.com. Supports lexical (BM25) and semantic search. + +--- + +## Architecture + +``` +Browser / PHP website + │ + ▼ + Flask API (this repo) ──► Elasticsearch + │ + ┌───────────┴───────────┐ + │ english-lexical │ BM25, no embeddings + │ english-mxbai │ mxbai-embed-large vectors + └───────────────────────┘ + + Ollama (host, port 11434) — embeds search queries + HF Dedicated Endpoint (optional) — embeds documents at index time +``` + +Each index name in ES is an **alias** (e.g. `english-mxbai`) pointing to a timestamped backing index. Reindexing builds a new backing index and atomically swaps the alias — the live index keeps serving traffic during the rebuild. + +--- + +## Local development setup + +### Prerequisites + +- Docker + Docker Compose +- [Ollama](https://ollama.com) installed and running on your machine + +### 1. Configure environment + +```bash +cp .env.sample .env +``` + +Semantic search is on by default (`SEMANTIC_ENABLED=true`). Set it to `false` if you want lexical-only and don't want to run Ollama. `OLLAMA_URL` defaults to `http://host.docker.internal:11434`, which works on Docker Desktop (Mac/Windows) — leave it unset locally. + +To offload index-time embedding to a HuggingFace Dedicated Inference Endpoint (recommended for prod — orders of magnitude faster on a small GPU than Ollama on a CPU instance), also set `HUGGING_FACE_KEY` and `HF_DEDICATED_URL` in `.env`. The endpoint must run [TEI](https://github.com/huggingface/text-embeddings-inference) with `mixedbread-ai/mxbai-embed-large-v1`. Leaving either var unset falls back to embedding via Ollama at index time too. + +### 2. Pull the model + +```bash +ollama pull mxbai-embed-large +``` + +### 3. Start the stack + +```bash +docker compose up --build +``` + +Flask is exposed on **port 5000**. + +### 4. Build the indexes + +``` http://localhost:5000/index?password=index123 +``` + +This reads all hadiths from MySQL and builds **both** the lexical and semantic indexes by default — that's almost always what you want. Embedding ~48k English hadiths takes ~9 min via the HF Dedicated Endpoint (or considerably longer through Ollama if no remote endpoint is configured). + +To build a subset, pass `targets=` (comma-separated): +``` +http://localhost:5000/index?password=index123&targets=lexical # lexical only +http://localhost:5000/index?password=index123&targets=mxbai # one semantic model +http://localhost:5000/index?password=index123&targets=lexical,mxbai # both (same as default) +``` + +To force a full rebuild instead of incremental: +``` +http://localhost:5000/index?password=index123&rebuild=true +``` + +Check index status (doc counts): +``` +http://localhost:5000/index/status +``` + +--- + +## Production deployment + +Production uses `docker-compose.prod.yml` directly. Key differences from local: +- **No MySQL service** — connects to the existing external DB via env vars +- **uwsgi** instead of Flask dev server, exposed on **port 7650** +- **Persistent ES data** in a named Docker volume (`es-data`) +- **Explicit ES JVM memory limits** (`-Xms600m -Xmx1g`) + +### 1. Configure environment + +```bash +cp .env.sample .env +``` + +Fill in production values — at minimum: + +```env +MYSQL_HOST= +MYSQL_USER= +MYSQL_PASSWORD= +MYSQL_DATABASE=hadithdb + +ELASTIC_PASSWORD= +INDEXING_PASSWORD= + +SEMANTIC_ENABLED=true +``` + +### 2. Ollama on Linux + +Install [Ollama](https://ollama.com) on the host and pull the model before starting the stack: + +```bash +ollama pull mxbai-embed-large +``` + +`host.docker.internal` only works on Docker Desktop (Mac/Windows), not on Linux. The prod compose file adds `host-gateway` so this hostname resolves correctly on Linux too — the default `OLLAMA_URL` works without any extra `.env` changes. + +### 3. Start the stack + +```bash +docker compose -f docker-compose.prod.yml up -d --build +``` + +### 4. Build the indexes + +The prod stack is exposed on **port 7650**. Builds both lexical and semantic by default: + +``` +http://:7650/index?password= +``` + +Add `&targets=lexical` or `&targets=mxbai` to build a subset. + +Check index status: +``` +http://:7650/index/status +``` + +--- + +## Embedding model + +| Key | Model | Query-time | Index-time | Dimensions | +|---|---|---|---|---| +| `mxbai` | mxbai-embed-large | Ollama (host) | HF Dedicated Endpoint (optional) → else Ollama | 1024 | + +Queries are always embedded via **Ollama on the host machine** (not inside Docker) — the container reaches it at `http://host.docker.internal:11434` via ES 8.16's OpenAI-compatible inference endpoint. Index-time embedding is offloaded to a remote TEI endpoint when `HUGGING_FACE_KEY` + `HF_DEDICATED_URL` are set: the indexer fetches vectors over HTTP and ships them inline with the bulk payload (ES's `semantic_text` accepts pre-populated chunks and skips its own inference call). Vectors from TEI and Ollama for the same model are bit-compatible (cosine ≈ 0.9999), so queries can match docs embedded by either side. + +Per-run tuning via env vars: `HF_DEDICATED_CONCURRENCY` (default 4), `HF_DEDICATED_BATCH_SIZE` (default 16, must keep `batch × max_input_length ≤ TEI's max_batch_tokens`), `HF_DEDICATED_RPM` (default -1, disabled). + +### Adding a model + +1. Add an entry to `EMBEDDING_MODELS` in `main.py` — copy the mxbai entry as a template (~10 lines). +2. Pull the model on the Ollama host: `ollama pull your-model-name`. +3. Hit `/index?password=...&targets=newkey` to build its index. (`/index` with no `targets=` will pick it up too, alongside lexical and the other semantic models.) +4. Add the alias name to `SEMANTIC_INDEXES` in `tests/batch_search.py`. +5. If it should be the default for `/search?mode=semantic` without a `&model=` param, point `DEFAULT_SEMANTIC_MODEL` at the new key. + +`SEMANTIC_ENABLED` is a single global toggle — you don't add a per-model env var. + +--- + +## Search modes + +| Mode | What it does | +|---|---| +| `lexical` | BM25 full-text search with collection boosts. Fast, exact keyword matching. Default. | +| `semantic` | Embedding similarity via HNSW approximate nearest-neighbor. Finds conceptually related hadiths even without keyword overlap. | + +Mode is passed as a query parameter: +``` +/english/search?q=prayer&mode=semantic +/english/search?q=prayer&mode=lexical +``` + +`mode=semantic` uses the model named in `DEFAULT_SEMANTIC_MODEL` (currently `mxbai`) when no `&model=` is supplied. Pass `&model=` to pick a different enabled model. + +--- + +## API endpoints + +| Endpoint | Description | +|---|---| +| `GET //search?q=...` | Main search endpoint (consumed by PHP website) | +| `GET /index?password=...` | Build/rebuild ES indexes from MySQL | +| `GET /index/status` | Doc counts for all indexes | + +--- + +## Docker Compose files + +| File | When to use | +|---|---| +| `docker-compose.yml` | Local development. `docker compose up --build`. | +| `docker-compose.prod.yml` | Production. Run with `-f docker-compose.prod.yml`. Uses uwsgi, persistent ES data volume, explicit JVM memory limits, no MySQL service. | + +**Why Elasticsearch has a fixed IP** (`172.31.250.10`): at high request rates, Docker's embedded DNS resolver becomes a bottleneck and throws `EAI_AGAIN` errors. Hardcoding the IP in `/etc/hosts` via `extra_hosts` makes every lookup instant. + +**Observability services** (`es-exporter`, `alloy`) ship ES metrics and logs to Grafana Cloud. They require Grafana Cloud credentials in `.env` — if you don't have them, these services will fail to connect but won't break the rest of the stack. + +--- + +## Batch evaluation + +`tests/batch_search.py` runs a fixed set of queries across lexical and semantic and produces a CSV and markdown report for side-by-side comparison. + +```bash +docker exec search-web-1 python3 /code/tests/batch_search.py +``` + +Outputs (`batch_results.csv`, `batch_report.md`) land in the repo root — the dev compose mounts `./:/code`, so files the script writes to `/code/` inside the container show up on the host immediately. No `docker cp` needed. + +The script runs inside the container because ES is not exposed to the host — it's only reachable at `http://elasticsearch:9200` from within the Docker network. + +Edit `QUERIES` in `tests/batch_search.py` to change which queries are tested. + +**Note:** always use commas between query strings in the list. Python silently concatenates adjacent string literals without a comma, producing wrong queries with no error. + +--- + +## Formatting -To run: -docker-compose up --build -docker-compose -f docker-compose.prod.yml -d up --build +Format Python code with `uv format` before committing. diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index c13501e..bf4225a 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -18,6 +18,11 @@ services: # consulted before DNS, so every lookup is instant. extra_hosts: - "elasticsearch:172.31.250.10" + # host.docker.internal resolves automatically on Docker Desktop (Mac/Windows) + # but not on Linux. host-gateway is Docker's built-in alias for the host's + # IP on the bridge network, making host.docker.internal work on Linux too. + # This is what lets the container reach Ollama running on the host. + - "host.docker.internal:host-gateway" elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:${ES_STACK_VERSION} container_name: elasticsearch diff --git a/docker-compose.yml b/docker-compose.yml index bb16de9..bf854e7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -72,7 +72,6 @@ services: - GC_PROM_USER=${GC_PROM_USER} - GC_PROM_PASSWORD=${GC_PROM_PASSWORD} - DEPLOY_ENV=${DEPLOY_ENV:-local} - networks: default: driver: bridge diff --git a/main.py b/main.py index d3c1823..878dba3 100644 --- a/main.py +++ b/main.py @@ -1,18 +1,24 @@ +import hashlib import logging +import socket import sys import time +import urllib.request +import urllib.error import uuid +from concurrent.futures import ThreadPoolExecutor, as_completed +from enum import Enum from flask import Flask, request, jsonify, g from werkzeug.exceptions import HTTPException import pymysql import os from dotenv import load_dotenv -import math import json from elasticsearch import Elasticsearch, helpers, BadRequestError, NotFoundError from pythonjsonlogger import jsonlogger +from utils.rate_limiter import RateLimiter from utils.shortcode_pattern import SHORTCODE_PATTERN load_dotenv(".env.local") @@ -55,6 +61,8 @@ def _emit_access_log(response): ) response.headers["X-Request-Id"] = g.request_id return response + + es_auth = ("elastic", os.environ.get("ELASTIC_PASSWORD")) es_base_url = f"http://elasticsearch:{os.environ.get('ES_PORT')}" es_client = Elasticsearch( @@ -65,13 +73,97 @@ def _emit_access_log(response): request_timeout=10, ) -INDEX_NAME = "english" -# Tiebreaker boosts added on top of the text-similarity score so canonical -# collections rise when relevance is otherwise comparable. Sized to swing -# rankings when BM25 scores are within a few points (e.g. the same hadith -# narrated across collections), but still let a clearly stronger text match -# from a less canonical collection win. +def _is_truthy(value): + return (value or "").lower() in ("1", "true", "yes") + + +# Pure lexical index — no embeddings, fast to rebuild. +LEXICAL_INDEX = "english-lexical" + +# Each model gets its own ES index so you can index and switch independently. +# The semantic field is always called "semantic_text" inside each model's index. +SEMANTIC_FIELD = "semantic_text" + +_OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://host.docker.internal:11434") +_HUGGING_FACE_KEY = os.environ.get("HUGGING_FACE_KEY") +_HF_DEDICATED_URL = os.environ.get( + "HF_DEDICATED_URL" +) # e.g. https://.endpoints.huggingface.cloud + +# Embedding vector dimensions for mxbai-embed-large(-v1). Used for inline chunks. +_MXBAI_DIMS = 1024 + + +def _build_remote_mxbai_inference(): + """Index-time embedding via a HuggingFace Inference Endpoint running TEI. + + The endpoint exposes an OpenAI-compatible /v1/embeddings route that returns + L2-normalized vectors directly. Returns None (→ fall back to ES inference + via Ollama at index time) when either env var is missing. + """ + if not (_HUGGING_FACE_KEY and _HF_DEDICATED_URL): + return None + return { + "url": f"{_HF_DEDICATED_URL.rstrip('/')}/v1/embeddings", + "api_key": _HUGGING_FACE_KEY, + "model_id": "mxbai", # TEI ignores model field, but OpenAI shape requires it + "dims": _MXBAI_DIMS, + } + + +SEMANTIC_ENABLED = _is_truthy(os.environ.get("SEMANTIC_ENABLED")) + +# Catalog of semantic models. Pure data — no env coupling. Add an entry here +# to register another model; the on/off switch lives on SEMANTIC_ENABLED above. +EMBEDDING_MODELS = { + "mxbai": { + "label": "mxbai-embed-large", + "index": "english-mxbai", + "inference_id": "mxbai-embed-large", + "multilingual": False, + # ES inference endpoint — always bound to local Ollama (query-time embedding). + # Ollama exposes an OpenAI-compatible API; ES 8.16 has no native ollama service. + "service": "openai", + "service_settings": { + "api_key": "ollama", # Ollama doesn't require auth; ES requires a non-empty value + "url": f"{_OLLAMA_URL}/v1/embeddings", + "model_id": "mxbai-embed-large", + "similarity": "cosine", + }, + # Optional remote inference for index time only. When set, the indexer + # pre-computes vectors via the HF Dedicated Endpoint and ships them + # inline in the bulk payload (semantic_text accepts pre-populated chunks + # and skips its own inference call). Query time always goes through the + # ES inference endpoint above (local Ollama). + "remote_inference": _build_remote_mxbai_inference(), + }, +} + +_ENABLED_MODELS = EMBEDDING_MODELS if SEMANTIC_ENABLED else {} + +# Which model `/search?mode=semantic` picks when no `model=` param is given. +# Set explicitly rather than reading the first dict key, so adding another +# semantic model doesn't accidentally change which one is the default. +DEFAULT_SEMANTIC_MODEL = "mxbai" + +# Bulk-indexing timeouts. Semantic bulk can be slow because ES embeds each +# doc against the inference endpoint (Ollama) unless we shipped inline chunks; +# lexical bulk is just text ingest and stays fast. +LEXICAL_BULK_TIMEOUT_S = 60 +SEMANTIC_BULK_TIMEOUT_S = 300 + + +class SearchMode(str, Enum): + """Search mode for /search?mode=…. str mixin so equality with raw query + strings and JSON serialization both produce the underlying value + ('lexical' / 'semantic') without extra plumbing. + """ + + LEXICAL = "lexical" + SEMANTIC = "semantic" + + COLLECTION_BOOSTS = [ ("bukhari", 5.0), ("muslim", 4.8), @@ -87,6 +179,7 @@ def _emit_access_log(response): ("riyadussalihin", 2.5), ] + @app.errorhandler(Exception) def _handle_unexpected(exc): if isinstance(exc, HTTPException): @@ -106,13 +199,254 @@ def home(): return "

Welcome to sunnah.com search api.

" -def create_and_update_index(index_name, documents, fields_to_not_index): - settings = { +# ── Index management ────────────────────────────────────────────────────────── + + +def _ensure_inference_endpoint(model): + try: + es_client.inference.get( + task_type="text_embedding", inference_id=model["inference_id"] + ) + return + except NotFoundError: + pass + es_client.options(request_timeout=60).inference.put( + task_type="text_embedding", + inference_id=model["inference_id"], + inference_config={ + "service": model["service"], + "service_settings": model["service_settings"], + }, + ) + + +def _content_hash(doc): + payload = { + k: v for k, v in doc.items() if k not in ("_id", "contentHash", SEMANTIC_FIELD) + } + encoded = json.dumps(payload, sort_keys=True, default=str, ensure_ascii=False) + return hashlib.sha256(encoded.encode("utf-8")).hexdigest() + + +def _prepare_documents(documents): + for doc in documents: + doc["_id"] = f"{doc['lang']}:{doc['urn']}" + doc["contentHash"] = _content_hash(doc) + + +# HF's per-endpoint pool 429s well below TEI's max_concurrent_requests=512. +# batch_size × max_input_length must stay under TEI's max_batch_tokens (16384). +_REMOTE_EMBED_CONCURRENCY = int(os.environ.get("HF_DEDICATED_CONCURRENCY", "4")) +_REMOTE_EMBED_BATCH_SIZE = int(os.environ.get("HF_DEDICATED_BATCH_SIZE", "16")) +# -1 disables throttling; HF Dedicated bills by compute-time, not RPM. +_REMOTE_EMBED_RPM = int(os.environ.get("HF_DEDICATED_RPM", "-1")) +_REMOTE_EMBED_MAX_RETRIES = 6 +_REMOTE_EMBED_BACKOFF_FLOOR_S = 5 +# Cap server-supplied Retry-After so a misbehaving 503 can't park a worker. +_REMOTE_EMBED_BACKOFF_CEILING_S = 60 + + +def _embed_via_remote(model, texts): + """Batch-embed `texts` via the configured HF Dedicated Endpoint. + + Returns a list of float vectors aligned with input order. Retries on 429 + and transient 5xx with exponential backoff (Retry-After respected when + ≥ floor). Captures the response body on non-retryable failures (e.g. 400 + "inputs cannot be empty") to make debugging easier. + """ + cfg = model["remote_inference"] + headers = { + "Authorization": f"Bearer {cfg['api_key']}", + "Content-Type": "application/json", + } + limiter = RateLimiter(_REMOTE_EMBED_RPM, log=access_log) + + def _embed_batch(batch_texts): + # OpenAI-shape body; TEI accepts the `truncate` field on /v1/embeddings + # to silently handle inputs over max_input_length. + payload = json.dumps( + {"model": cfg["model_id"], "input": batch_texts, "truncate": True} + ).encode("utf-8") + for attempt in range(_REMOTE_EMBED_MAX_RETRIES): + limiter.acquire() + req = urllib.request.Request( + cfg["url"], data=payload, headers=headers, method="POST" + ) + + status = None + retry_after = None + try: + with urllib.request.urlopen(req, timeout=120) as resp: + body = json.loads(resp.read()) + # TEI's /v1/embeddings returns OpenAI shape with L2-normalized vectors. + return [item["embedding"] for item in body["data"]] + except urllib.error.HTTPError as e: + status = e.code + retryable = e.code == 429 or 500 <= e.code < 600 + retry_after = e.headers.get("Retry-After") + if not retryable or attempt == _REMOTE_EMBED_MAX_RETRIES - 1: + # Capture body for non-retryable failures so we can debug + # 400-class errors (oversize inputs, bad model id, etc.). + body_snippet = e.read()[:400].decode("utf-8", errors="replace") + access_log.error( + "remote_embed_failed", + extra={ + "status": e.code, + "body": body_snippet, + "batch_size": len(batch_texts), + }, + ) + raise + except (urllib.error.URLError, socket.timeout, ConnectionError) as e: + # DNS failure, connect refused, read timeout, RST mid-stream — + # treat as transient and retry rather than killing the run. + status = "network_error" + if attempt == _REMOTE_EMBED_MAX_RETRIES - 1: + access_log.error( + "remote_embed_failed", + extra={ + "status": status, + "reason": str(e), + "batch_size": len(batch_texts), + }, + ) + raise + + # Shared backoff path for any retryable failure above. + parsed = ( + float(retry_after) + if retry_after and retry_after.replace(".", "", 1).isdigit() + else 0 + ) + # TEI sometimes returns Retry-After: 0 — enforce a floor so we don't + # immediately re-fire. Cap Retry-After so a single misbehaving 503 + # can't park a worker for many minutes. + wait = max( + min(parsed, _REMOTE_EMBED_BACKOFF_CEILING_S), + _REMOTE_EMBED_BACKOFF_FLOOR_S, + min(2**attempt, 30), + ) + access_log.warning( + "remote_embed_retry", + extra={"status": status, "attempt": attempt + 1, "wait_s": wait}, + ) + time.sleep(wait) + + batches = [ + texts[i : i + _REMOTE_EMBED_BATCH_SIZE] + for i in range(0, len(texts), _REMOTE_EMBED_BATCH_SIZE) + ] + out = [None] * len(batches) + with ThreadPoolExecutor(max_workers=_REMOTE_EMBED_CONCURRENCY) as ex: + future_to_idx = {ex.submit(_embed_batch, b): i for i, b in enumerate(batches)} + # as_completed yields futures in completion order, so a single slow batch + # doesn't idle workers that finished after it but were submitted earlier. + for f in as_completed(future_to_idx): + out[future_to_idx[f]] = f.result() + return [v for batch in out for v in batch] + + +def _attach_semantic_field(paired): + """Attach SEMANTIC_FIELD as plain text on each doc. + + ES then auto-embeds via the bound inference endpoint (Ollama) at bulk time, + unless _rewrite_inline_chunks is called first to pre-populate the field + with vectors from a remote provider. + + Empty/whitespace-only text is filtered at the SQL source, so by the time we + get here every paired text is a non-empty string. + """ + return [{**doc, SEMANTIC_FIELD: text} for doc, text in paired] + + +def _inline_chunk_doc(doc, text, vec, inference_id, model_settings): + """Build the doc shape ES's semantic_text accepts when bypassing inference.""" + return { + **doc, + SEMANTIC_FIELD: { + "text": text, + "inference": { + "inference_id": inference_id, + "model_settings": model_settings, + "chunks": [{"text": text, "embeddings": vec}], + }, + }, + } + + +def _rewrite_inline_chunks(docs, model): + """Replace each doc's plain-text SEMANTIC_FIELD with the full inline-chunks + structure, with vectors fetched from the model's remote inference API. + + Called only on docs about to be bulk-sent (after incremental diffing) so we + don't burn API quota embedding unchanged docs. + """ + remote = model["remote_inference"] + texts = [doc[SEMANTIC_FIELD] for doc in docs] + + access_log.info( + "remote_embed_start", + extra={ + "model": model["label"], + "doc_count": len(texts), + "batch_size": _REMOTE_EMBED_BATCH_SIZE, + "concurrency": _REMOTE_EMBED_CONCURRENCY, + "rpm": _REMOTE_EMBED_RPM, + }, + ) + t0 = time.time() + vectors = _embed_via_remote(model, texts) + access_log.info( + "remote_embed_done", + extra={ + "model": model["label"], + "doc_count": len(texts), + "duration_s": round(time.time() - t0, 1), + }, + ) + + model_settings = { + "task_type": "text_embedding", + "dimensions": remote["dims"], + "similarity": "cosine", + "element_type": "float", + } + return [ + _inline_chunk_doc(doc, text, vec, model["inference_id"], model_settings) + for doc, text, vec in zip(docs, texts, vectors) + ] + + +def _bulk_index(actions, index, timeout): + return helpers.bulk( + es_client, + actions, + index=index, + request_timeout=timeout, + raise_on_error=False, + raise_on_exception=False, + ) + + +def _index_is_incremental(index_name): + """True if the index has a contentHash field (built by this indexer).""" + try: + mapping = es_client.indices.get_mapping(index=index_name) + except NotFoundError: + return False + return all( + "contentHash" in idx.get("mappings", {}).get("properties", {}) + for idx in mapping.values() + ) + + +def _make_settings(): + return { "index": { "number_of_shards": 1, - "search.slowlog.threshold.query.warn": "1s", - "search.slowlog.threshold.query.info": "500ms", - "search.slowlog.threshold.fetch.warn": "500ms", + "search.slowlog.threshold.query.warn": "1s", + "search.slowlog.threshold.query.info": "500ms", + "search.slowlog.threshold.fetch.warn": "500ms", "analysis": { "analyzer": { "trigram": { @@ -125,24 +459,19 @@ def create_and_update_index(index_name, documents, fields_to_not_index): "type": "custom", "tokenizer": "standard", "char_filter": ["html_strip", "shortcode_strip"], - "filter": [ - "lowercase", - "stop", - "synonyms_filter", - "stemmer", - ], + "filter": ["lowercase", "stop", "synonyms_filter", "stemmer"], }, "custom_arabic": { - "tokenizer": "standard", + "tokenizer": "standard", "char_filter": ["html_strip", "shortcode_strip"], "filter": [ "lowercase", "decimal_digit", "arabic_normalization", "arabic_stemmer", - "shingle" - ] - } + "shingle", + ], + }, }, "char_filter": { "shortcode_strip": { @@ -152,79 +481,166 @@ def create_and_update_index(index_name, documents, fields_to_not_index): } }, "filter": { - # 2-3 word shingles for better suggestions "shingle": { "type": "shingle", "min_shingle_size": 2, "max_shingle_size": 3, - "output_unigrams": True + "output_unigrams": True, }, "synonyms_filter": { "type": "synonym", "lenient": True, "synonyms_path": "synonyms.txt", }, - "arabic_stemmer": { - "type": "stemmer", - "language": "arabic" - }, - "arabic_stop": { - "type": "stop", - "stopwords": "_arabic_" - }, + "arabic_stemmer": {"type": "stemmer", "language": "arabic"}, + "arabic_stop": {"type": "stop", "stopwords": "_arabic_"}, }, }, } } - mappings = { - "properties": { - field: {"type": "text", "index": False} for field in fields_to_not_index + + +def _make_mappings(non_indexed_fields, model=None): + props = {field: {"type": "text", "index": False} for field in non_indexed_fields} + props["hadithText"] = { + "type": "text", + "analyzer": "synonym", + "fields": {"trigram": {"type": "text", "analyzer": "trigram"}}, + } + props["arabicText"] = {"type": "text", "analyzer": "custom_arabic"} + props["contentHash"] = {"type": "keyword", "index": False} + if model: + props[SEMANTIC_FIELD] = { + "type": "semantic_text", + "inference_id": model["inference_id"], } - | - # Configurating field for suggestions - { - "hadithText": { - "type": "text", - "analyzer": "synonym", - "fields": { - "trigram": {"type": "text", "analyzer": "trigram"}, - }, - } + return {"properties": props} + + +def _rebuild_index(index_name, documents, non_indexed_fields, model=None): + # time_ns avoids collisions when two rebuilds land in the same second. + new_index = f"{index_name}-{time.time_ns()}" + timeout = SEMANTIC_BULK_TIMEOUT_S if model else LEXICAL_BULK_TIMEOUT_S + es_client.indices.create( + index=new_index, + mappings=_make_mappings(non_indexed_fields, model), + settings=_make_settings(), + ) + try: + if model and model.get("remote_inference"): + documents = _rewrite_inline_chunks(documents, model) + success, errors = _bulk_index(documents, new_index, timeout=timeout) + if success == 0: + es_client.indices.delete(index=new_index, ignore_unavailable=True) + return {"mode": "rebuild", "success_count": 0, "errors": errors} + + old_indices = [] + if es_client.indices.exists_alias(name=index_name): + old_indices = list(es_client.indices.get_alias(name=index_name).keys()) + elif es_client.indices.exists(index=index_name): + es_client.indices.delete(index=index_name) + + actions = [{"add": {"index": new_index, "alias": index_name}}] + for old in old_indices: + actions.append({"remove": {"index": old, "alias": index_name}}) + es_client.indices.update_aliases(actions=actions) + for old in old_indices: + es_client.indices.delete(index=old, ignore_unavailable=True) + except Exception: + es_client.indices.delete(index=new_index, ignore_unavailable=True) + raise + + return {"mode": "rebuild", "success_count": success, "errors": errors} + + +def _incremental_index(index_name, documents, model=None): + incoming = {doc["_id"]: doc for doc in documents} + if not incoming: + # Refuse to wipe the live index when the source returns nothing + # (transient DB failure, wrong DATABASE env, etc.). + return { + "mode": "incremental", + "indexed": 0, + "deleted": 0, + "unchanged": 0, + "success_count": 0, + "errors": ["source returned 0 documents — refusing to delete live index"], } - | {"arabicText": {"type": "text", "analyzer": "custom_arabic"}} - } - if es_client.indices.exists(index=index_name): - es_client.indices.delete(index=index_name) - es_client.indices.create(index=index_name, mappings=mappings, settings=settings) - successCount, errors = helpers.bulk(es_client, documents, index=index_name) - return successCount, errors + existing_hashes = {} + for hit in helpers.scan( + es_client, index=index_name, query={"_source": ["contentHash"]}, size=2000 + ): + existing_hashes[hit["_id"]] = hit["_source"].get("contentHash") + + to_index = [ + doc + for doc_id, doc in incoming.items() + if existing_hashes.get(doc_id) != doc["contentHash"] + ] + to_delete = [doc_id for doc_id in existing_hashes if doc_id not in incoming] + + if to_index and model and model.get("remote_inference"): + to_index = _rewrite_inline_chunks(to_index, model) + + actions = to_index + [{"_op_type": "delete", "_id": did} for did in to_delete] + + timeout = SEMANTIC_BULK_TIMEOUT_S if model else LEXICAL_BULK_TIMEOUT_S + success, errors = 0, [] + if actions: + success, errors = _bulk_index(actions, index_name, timeout=timeout) -def get_suggest_query(suggest_field): return { - "field": suggest_field, - "size": 3, - "gram_size": 3, - "direct_generator": [ - {"field": suggest_field, "suggest_mode": "missing"} - ], - "highlight": {"pre_tag": "", "post_tag": ""}, - "collate": { - "query": { - "source": { - "match": {suggest_field: "{{suggestion}}"} - } - }, - # Only return suggestions with a query match - "prune": False, - }, + "mode": "incremental", + "indexed": len(to_index), + "deleted": len(to_delete), + "unchanged": len(incoming) - len(to_index), + "success_count": success, + "errors": errors, } + +def _index_one( + index_name, documents, non_indexed_fields, model=None, force_rebuild=False +): + """Rebuild or incrementally update a single index.""" + if force_rebuild or not _index_is_incremental(index_name): + return _rebuild_index(index_name, documents, non_indexed_fields, model) + return _incremental_index(index_name, documents, model) + + +# ── Routes ──────────────────────────────────────────────────────────────────── + + @app.route("/index", methods=["GET"]) def index(): start = time.time() if request.args.get("password") != os.environ.get("INDEXING_PASSWORD"): return "Must provide valid password to index", 401 + force_rebuild = _is_truthy(request.args.get("rebuild")) + + # ?targets=… is a comma-separated subset of {lexical, }. + # Missing → build everything. Empty or unknown → 400 (don't silently + # misinterpret a typo as "do nothing" or "do everything"). + valid_targets = {"lexical", *_ENABLED_MODELS} + raw_targets = request.args.get("targets") + if raw_targets is None: + targets = valid_targets + else: + targets = {t.strip() for t in raw_targets.split(",") if t.strip()} + if not targets: + return jsonify( + {"error": "targets= must be a non-empty comma-separated list"} + ), 400 + unknown = targets - valid_targets + if unknown: + return jsonify( + { + "error": f"unknown targets {sorted(unknown)}; " + f"valid: {sorted(valid_targets)}" + } + ), 400 + connection = pymysql.connect( host=os.environ.get("MYSQL_HOST"), user=os.environ.get("MYSQL_USER"), @@ -232,152 +648,247 @@ def index(): database=os.environ.get("MYSQL_DATABASE"), ) cursor = connection.cursor(pymysql.cursors.DictCursor) - # Arabic Hadiths + # Filter out empty rows at the source so the rest of the pipeline doesn't + # have to handle them — TEI rejects empty inputs, ES wastes an _id storing + # them, and a hadith with no text can't match any query anyway. cursor.execute( - """SELECT arabicURN as urn, collection, hadithNumber, hadithText as arabicText, - matchingEnglishURN, "ar" as lang, grade1 as grade FROM ArabicHadithTable""" + """SELECT arabicURN as urn, collection, hadithNumber, hadithText as arabicText, + matchingEnglishURN, "ar" as lang, grade1 as grade FROM ArabicHadithTable + WHERE hadithText IS NOT NULL AND TRIM(hadithText) != ''""" ) arabicHadiths = cursor.fetchall() - arabicOnlyHadiths = [] - matchingArabicHadiths = {} - for arabicHadith in arabicHadiths: - if arabicHadith["matchingEnglishURN"] == 0: - arabicOnlyHadiths.append(arabicHadith) + arabicOnlyHadiths, matchingArabicHadiths = [], {} + for h in arabicHadiths: + if h["matchingEnglishURN"] == 0: + arabicOnlyHadiths.append(h) else: - matchingArabicHadiths[arabicHadith["matchingEnglishURN"]] = arabicHadith - + matchingArabicHadiths[h["matchingEnglishURN"]] = h - # English Hadiths cursor.execute( - """SELECT englishURN as urn, collection, hadithText, - matchingArabicURN, "en" as lang, grade1 as grade FROM EnglishHadithTable""" + """SELECT englishURN as urn, collection, hadithText, + matchingArabicURN, "en" as lang, grade1 as grade FROM EnglishHadithTable + WHERE hadithText IS NOT NULL AND TRIM(hadithText) != ''""" ) englishHadiths = cursor.fetchall() - - # Add arabic text and hadithNumber to english hadith - for englishHadith in englishHadiths: - if englishHadith["urn"] not in matchingArabicHadiths: - continue - matchingArabic = matchingArabicHadiths[englishHadith["urn"]] - englishHadith["arabicText"] = matchingArabic["arabicText"] - englishHadith["arabicGrade"] = matchingArabic["grade"] - englishHadith["hadithNumber"] = matchingArabic["hadithNumber"] - - indexingSuccessCount, indexingErrors = create_and_update_index( - INDEX_NAME, englishHadiths + arabicOnlyHadiths, ["urn", "matchingArabicURN", "lang"] - ) + for h in englishHadiths: + if h["urn"] in matchingArabicHadiths: + ar = matchingArabicHadiths[h["urn"]] + h["arabicText"] = ar["arabicText"] + h["arabicGrade"] = ar["grade"] + h["hadithNumber"] = ar["hadithNumber"] connection.close() + + non_indexed = ["urn", "matchingArabicURN", "lang"] + + # Prepare IDs and content hashes. arabicHadiths is a superset of arabicOnlyHadiths + # (same dict objects), so preparing arabicHadiths covers both. + _prepare_documents(arabicHadiths) + _prepare_documents(englishHadiths) + + # Lexical index: English + Arabic-only (avoids duplicate hits for paired hadiths). + lexical_docs = englishHadiths + arabicOnlyHadiths + + # Semantic index: full multilingual corpus — every Arabic doc gets its Arabic text + # embedded, every English doc gets its English text embedded. This lets a multilingual + # model like text-embedding-3-small retrieve across both languages from one index. + results = {} + if "lexical" in targets: + results["lexical"] = _index_one( + LEXICAL_INDEX, + lexical_docs, + non_indexed, + model=None, + force_rebuild=force_rebuild, + ) + models_to_index = {k: v for k, v in _ENABLED_MODELS.items() if k in targets} + for model_key, model in models_to_index.items(): + _ensure_inference_endpoint(model) + if model.get("multilingual"): + # Full corpus: every Arabic doc embeds Arabic text, every English doc embeds English. + en_docs = [(doc, doc["hadithText"]) for doc in englishHadiths] + ar_docs = [(doc, doc["arabicText"]) for doc in arabicHadiths] + paired = en_docs + ar_docs + else: + # English-only — replicates colleague's original PR approach. + paired = [(doc, doc["hadithText"]) for doc in englishHadiths] + + model_docs = _attach_semantic_field(paired) + results[model_key] = _index_one( + model["index"], + model_docs, + non_indexed, + model=model, + force_rebuild=force_rebuild, + ) + results[model_key]["failed"] = json.dumps(results[model_key].pop("errors")) + + if "lexical" in results: + results["lexical"]["failed"] = json.dumps(results["lexical"].pop("errors")) + + results["arabic_only_count"] = len(arabicOnlyHadiths) + results["timeInSeconds"] = round(time.time() - start, 1) + return jsonify(results) + + +@app.route("/index/status", methods=["GET"]) +def index_status(): + out = {} + for index_name in [LEXICAL_INDEX] + [m["index"] for m in EMBEDDING_MODELS.values()]: + try: + r = es_client.search(index=index_name, size=0, track_total_hits=True) + out[index_name] = {"indexed": True, "count": r["hits"]["total"]["value"]} + except NotFoundError: + out[index_name] = {"indexed": False} + return jsonify(out) + + +# ── Search helpers ──────────────────────────────────────────────────────────── + + +def get_suggest_query(field): return { - "all_hadith_index_results": { - "success_count": indexingSuccessCount, - "failed": json.dumps(indexingErrors), - }, - "arabic_only": { - "count": len(arabicOnlyHadiths), + "field": field, + "size": 3, + "gram_size": 3, + "direct_generator": [{"field": field, "suggest_mode": "missing"}], + "highlight": {"pre_tag": "", "post_tag": ""}, + "collate": { + "query": {"source": {"match": {field: "{{suggestion}}"}}}, + "prune": False, }, - "timeInSeconds": time.time() - start } -@app.route("/index/status", methods=["GET"]) -def index_status(): - try: - result = es_client.search( - index=INDEX_NAME, - size=0, - track_total_hits=True, - aggs={"english": {"filter": {"exists": {"field": "hadithText"}}}}, - ) - except NotFoundError: - return {"indexed": False} +def get_suggest_block(query): + return { + "text": query, + "english": {"phrase": get_suggest_query("hadithText.trigram")}, + "arabic": {"phrase": get_suggest_query("arabicText")}, + } + - total = result["hits"]["total"]["value"] - english = result["aggregations"]["english"]["doc_count"] +def build_semantic_query(query, filter_clauses): return { - "indexed": True, - "total_count": total, - "english_count": english, - "arabic_only_count": total - english, + "bool": { + "filter": filter_clauses, + "must": [{"semantic": {"field": SEMANTIC_FIELD, "query": query}}], + } } def get_filter_from_args(args): filters = [] - collection = args.getlist("collection") - if collection: + if collection := args.getlist("collection"): filters.append({"terms": {"collection": collection}}) - - grade = args.getlist("grade") - if grade: + if grade := args.getlist("grade"): filters.append({"terms": {"grade": grade}}) return filters + +def _resolve_mode(args): + """Normalize ?mode=... to a SearchMode. Falls back to LEXICAL for unknown + values and whenever SEMANTIC_ENABLED is off. + """ + try: + mode = SearchMode((args.get("mode") or "").lower()) + except ValueError: + return SearchMode.LEXICAL + if mode == SearchMode.SEMANTIC and not SEMANTIC_ENABLED: + return SearchMode.LEXICAL + return mode + + +def _resolve_model_key(args): + """Returns (key, error_message). error_message is non-None for explicit invalid input.""" + key = args.get("model") or DEFAULT_SEMANTIC_MODEL + if key in _ENABLED_MODELS: + return key, None + return None, f"unknown model '{key}'; enabled: {sorted(_ENABLED_MODELS)}" + + +def malformed_query_response(exc): + access_log.warning( + "malformed_query", + extra={"request_id": getattr(g, "request_id", None), "detail": str(exc)}, + ) + return jsonify({"error": "malformed query"}), 400 + + @app.route("//search", methods=["GET"]) def search(language): query = request.args.get("q") - filter = get_filter_from_args(request.args) + filters = get_filter_from_args(request.args) + mode = _resolve_mode(request.args) + + if mode == SearchMode.SEMANTIC: + model_key, err = _resolve_model_key(request.args) + if err: + return jsonify({"error": err}), 400 + model = _ENABLED_MODELS[model_key] + access_log.info( + "semantic_search", + extra={ + "request_id": getattr(g, "request_id", None), + "mode": mode, + "model": model_key, + "query": query, + }, + ) + return _semantic_search(model["index"], query, filters) + # Lexical path fields = ["hadithNumber^2", "hadithText", "arabicText", "collection^2"] - # TODO: Query string has a strict syntax and can cause failures when character like ":" appear in a search query. - # It's not recomended for search. But it's what allows us to do "AND collection:bukhari" or "AND hadithNumber:123" in the search bar - # Could be better to expose all those fields as filters instead and move away from query_string - def build_query(query_type): + def build_lexical(query_type): inner = {"query": query, "fields": fields} if query_type == "query_string": inner["type"] = "cross_fields" return { "function_score": { - "query": { - "bool": { - "filter": filter, - "must": [{query_type: inner}], - } - }, + "query": {"bool": {"filter": filters, "must": [{query_type: inner}]}}, "functions": [ - {"filter": {"term": {"collection": name}}, "weight": weight} - for name, weight in COLLECTION_BOOSTS + {"filter": {"term": {"collection": name}}, "weight": w} + for name, w in COLLECTION_BOOSTS ], "score_mode": "sum", "boost_mode": "sum", } } - search_kwargs = { - "index": language, + kwargs = { + "index": LEXICAL_INDEX, "from_": request.args.get("from", 0), "size": request.args.get("size", 10), + "_source": {"excludes": [SEMANTIC_FIELD]}, "highlight": {"number_of_fragments": 0, "fields": {"*": {}}}, - "suggest": { - "text": query, - "english": { - "phrase": get_suggest_query("hadithText.trigram"), - }, - "arabic": { - "phrase": get_suggest_query("arabicText"), - }, - }, + "suggest": get_suggest_block(query), } - try: try: - result = es_client.search(query=build_query("query_string"), **search_kwargs) + result = es_client.search(query=build_lexical("query_string"), **kwargs) except BadRequestError: - # query_string syntax is strict; retry once with simple_query_string, which tolerates malformed input - result = es_client.search(query=build_query("simple_query_string"), **search_kwargs) + result = es_client.search( + query=build_lexical("simple_query_string"), **kwargs + ) except BadRequestError as e: - # Don't leak ES internals (field paths, index names) to client. - access_log.warning( - "malformed_query", - extra={ - "request_id": getattr(g, "request_id", None), - "detail": str(e), - }, - ) - return jsonify({"error": "malformed query"}), 400 + return malformed_query_response(e) + return jsonify(result.body) + +def _semantic_search(search_index, query, filters): + try: + result = es_client.options(request_timeout=130).search( + index=search_index, + from_=int(request.args.get("from", 0)), + size=int(request.args.get("size", 10)), + query=build_semantic_query(query, filters), + _source={"excludes": [SEMANTIC_FIELD]}, + suggest=get_suggest_block(query), + ) + except BadRequestError as e: + return malformed_query_response(e) return jsonify(result.body) diff --git a/requirements.txt b/requirements.txt index bf410ce..69d88a5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,7 @@ Jinja2==2.11.3 python-dotenv==0.13.0 virtualenv==20.0.25 Werkzeug==1.0.1 -elasticsearch==8.9.0 +elasticsearch==8.16.0 MarkupSafe==1.1.1 itsdangerous==1.1.0 python-json-logger==2.0.7 diff --git a/tests/analyze_queries.py b/tests/analyze_queries.py new file mode 100644 index 0000000..8996255 --- /dev/null +++ b/tests/analyze_queries.py @@ -0,0 +1,90 @@ +""" +Stream-parse search_queries.12may26.sql and report: + - Top 100 multi-word queries (by total search frequency) + - Top 100 multi-word zero-result queries (numResults = 0) + +"Multi-word" = query contains at least one whitespace character (works for +English, Arabic, and any other script). Single-word queries are excluded. + +Run from the repo root: + python3 search/analyze_queries.py +""" + +import re +import sys +from collections import Counter + +SQL_PATH = "search_queries.12may26.sql" +TOP_N = 100 + +# Each data row looks like: +# (12345,'the query text','2024-01-01 00:00:00','1.2.3.4',7), +ROW_RE = re.compile( + r"^\(\d+,'(.*?)','(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})','[^']+',(\d+)\),?$" +) + + +def normalize(q): + return q.replace("''", "'").strip().lower() + + +def is_multiword(q): + return len(q.split()) >= 2 + + +all_queries = Counter() +zero_result = Counter() +total_rows = 0 +skipped_parse = 0 + +print(f"Streaming {SQL_PATH} …", flush=True) + +with open(SQL_PATH, encoding="utf-8", errors="replace") as fh: + for line in fh: + if not line.startswith("("): + continue + m = ROW_RE.match(line.rstrip()) + if not m: + skipped_parse += 1 + continue + + query = normalize(m.group(1)) + num_results = int(m.group(3)) + total_rows += 1 + + if total_rows % 5_000_000 == 0: + print(f" {total_rows:,} rows processed …", flush=True) + + if not is_multiword(query): + continue + + all_queries[query] += 1 + if num_results == 0: + zero_result[query] += 1 + +print(f"\nDone. {total_rows:,} total rows | {skipped_parse:,} unparseable lines\n") +print(f"Unique multi-word queries: {len(all_queries):,}") +print(f"Unique multi-word zero-result queries: {len(zero_result):,}") + + +def write_report(path, title, counter): + with open(path, "w", encoding="utf-8") as f: + f.write(f"# {title}\n\n") + f.write(f"| Rank | Count | Query |\n") + f.write(f"|------|-------|-------|\n") + for rank, (query, count) in enumerate(counter.most_common(TOP_N), 1): + escaped = query.replace("|", "\\|") + f.write(f"| {rank} | {count:,} | {escaped} |\n") + print(f"Written: {path}") + + +write_report( + "search/test results & reports/top100_multiword_queries.md", + "Top 100 Multi-Word Search Queries", + all_queries, +) +write_report( + "search/test results & reports/top100_multiword_zero_results.md", + "Top 100 Multi-Word Queries with Zero Results", + zero_result, +) diff --git a/tests/batch_search.py b/tests/batch_search.py new file mode 100644 index 0000000..2eb5bc9 --- /dev/null +++ b/tests/batch_search.py @@ -0,0 +1,248 @@ +""" +Batch search across all semantic models + lexical — production approach (size=100, HNSW). +Outputs: batch_results.csv and batch_report.md in /code/ inside the container. + +WHY run inside the container: + ES is not exposed to the host. It only resolves at http://elasticsearch:9200 + from within the Docker network (search-web-1 is on that network). + +STEP 1 — copy the script into the container: + docker cp search/batch_search.py search-web-1:/code/batch_search.py + +STEP 2 — run it: + docker exec search-web-1 python3 /code/batch_search.py + +STEP 3 — copy results back to your local machine: + docker cp search-web-1:/code/batch_results.csv search/batch_results.csv + docker cp search-web-1:/code/batch_report.md search/batch_report.md + +One-liner (copy in, run, copy results out): + docker cp search/batch_search.py search-web-1:/code/batch_search.py && \ + docker exec search-web-1 python3 /code/batch_search.py && \ + docker cp search-web-1:/code/batch_results.csv search/batch_results.csv && \ + docker cp search-web-1:/code/batch_report.md search/batch_report.md + +To add/remove queries: edit QUERIES below. +To change how many results are fetched per model per query: edit SIZE. +To change how many are shown in the markdown report: edit REPORT_TOP_N. + +NOTE: always include commas between query strings — Python silently concatenates +adjacent string literals without a comma, producing wrong queries with no error. +""" + +import csv +import os +import re +import textwrap +from datetime import date +from elasticsearch import Elasticsearch, BadRequestError + +ES_HOST = os.environ.get("ES_HOST", "http://elasticsearch:9200") +ES_PASS = os.environ.get("ELASTIC_PASSWORD", "123") +ES = Elasticsearch(ES_HOST, basic_auth=("elastic", ES_PASS)) + +LEXICAL_INDEX = "english-lexical" + +LEXICAL_FIELDS = ["hadithNumber^2", "hadithText", "arabicText", "collection^2"] + +COLLECTION_BOOSTS = [ + ("bukhari", 5.0), + ("muslim", 4.8), + ("nasai", 3.5), + ("abudawud", 3.0), + ("tirmidhi", 2.5), + ("ibnmajah", 2.0), + ("malik", 2.5), + ("ahmad", 2.5), + ("darimi", 2.0), + ("mishkat", 2.5), + ("nawawi40", 3.3), + ("riyadussalihin", 2.5), +] + +SEMANTIC_INDEXES = { + "openai-small-en": "english-openai-small-1779045411", + # "openai-small-multi": "multilingual-openai-small-1779017104", + "nomic": "english-nomic-1779026769", + "mxbai": "english-mxbai-1779026713", +} + +QUERIES = [ + "comparing yourself to others", + "aisha six years", + "music", + "actions are by intentions", + "ramadan", + "jesus", + "sex", + "marriage", + "masturbation", + "racism", + "polygamy", + "pork", + "dance", +] + +SIZE = 100 # fetch this many; report shows top 10 per model per query +REPORT_TOP_N = 10 + +OUT_DIR = "/code" +CSV_PATH = os.path.join(OUT_DIR, "batch_results.csv") +MD_PATH = os.path.join(OUT_DIR, "batch_report.md") + + +def lexical_search(query, size=SIZE): + def _build(query_type): + inner = {"query": query, "fields": LEXICAL_FIELDS} + if query_type == "query_string": + inner["type"] = "cross_fields" + return { + "function_score": { + "query": {"bool": {"must": [{query_type: inner}]}}, + "functions": [ + {"filter": {"term": {"collection": name}}, "weight": w} + for name, w in COLLECTION_BOOSTS + ], + "score_mode": "sum", + "boost_mode": "sum", + } + } + + kwargs = dict( + index=LEXICAL_INDEX, + size=size, + track_total_hits=True, + _source=["hadithText", "collection", "hadithNumber", "urn"], + ) + try: + res = ES.search(query=_build("query_string"), **kwargs) + except BadRequestError: + res = ES.search(query=_build("simple_query_string"), **kwargs) + return res["hits"]["hits"], res["hits"]["total"]["value"] + + +def semantic_search(index, query, size=SIZE): + res = ES.search( + index=index, + query={"semantic": {"field": "semantic_text", "query": query}}, + size=size, + track_total_hits=True, + _source=["hadithText", "collection", "hadithNumber", "urn"], + ) + return res["hits"]["hits"], res["hits"]["total"]["value"] + + +def query_anchor(query): + s = f'query: "{query}"' + s = s.lower() + s = re.sub(r"[^\w\s-]", "", s) + s = re.sub(r"\s+", "-", s.strip()) + return s + + +def snippet(text, width=160): + if not text: + return "" + return textwrap.shorten( + text.replace("\n", " ").strip(), width=width, placeholder="…" + ) + + +def run(): + csv_rows = [] + md_sections = [] + + all_models = {"lexical": None, **SEMANTIC_INDEXES} + + md_sections.append(f"# Batch Search Report") + md_sections.append(f"**Date:** {date.today()} ") + md_sections.append(f"**Models:** {', '.join(all_models)} ") + md_sections.append( + f"**Method:** lexical = BM25 + collection boosts; semantic = HNSW `size={SIZE}` " + ) + md_sections.append( + f"**Queries:** {len(QUERIES)}, report shows top {REPORT_TOP_N} per model" + ) + md_sections.append("") + + # Table of contents + md_sections.append("## Queries") + for query in QUERIES: + md_sections.append(f"- [{query}](#{query_anchor(query)})") + md_sections.append("") + + for query in QUERIES: + print(f"\nQuery: {query!r}") + md_sections.append(f'---\n\n## Query: "{query}"\n') + + for model_name, index in all_models.items(): + print(f" [{model_name}] ...", end=" ", flush=True) + try: + if model_name == "lexical": + hits, total = lexical_search(query, size=SIZE) + else: + hits, total = semantic_search(index, query, size=SIZE) + print(f"{len(hits)} hits (total: {total})") + except Exception as e: + print(f"ERROR: {e}") + md_sections.append(f"### {model_name}\n\n_Error: {e}_\n") + continue + + md_sections.append(f"### {model_name} — {total} total hits\n") + + for rank, h in enumerate(hits[:REPORT_TOP_N], 1): + src = h["_source"] + score = round(h["_score"], 4) + collection = src.get("collection", "") + hadith_num = src.get("hadithNumber", "") + text = src.get("hadithText", "") + urn = src.get("urn", "") + + csv_rows.append( + { + "query": query, + "model": model_name, + "rank": rank, + "collection": collection, + "hadithNumber": hadith_num, + "urn": urn, + "score": score, + "text_snippet": snippet(text, width=500), + } + ) + + md_sections.append( + f"**#{rank}** — {collection} {hadith_num} · score: {score}" + ) + md_sections.append(f"> {snippet(text, width=600)}\n") + + md_sections.append("") + + # Write CSV + fieldnames = [ + "query", + "model", + "rank", + "collection", + "hadithNumber", + "urn", + "score", + "text_snippet", + ] + with open(CSV_PATH, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(csv_rows) + print(f"\nCSV → {CSV_PATH}") + + # Write markdown + with open(MD_PATH, "w", encoding="utf-8") as f: + f.write("\n".join(md_sections) + "\n") + print(f"MD → {MD_PATH}") + print( + f"Rows: {len(csv_rows)} ({len(QUERIES)} queries × {len(all_models)} models × up to {REPORT_TOP_N})" + ) + + +if __name__ == "__main__": + run() diff --git a/tests/fetch_exact_knn.py b/tests/fetch_exact_knn.py new file mode 100644 index 0000000..a1ae348 --- /dev/null +++ b/tests/fetch_exact_knn.py @@ -0,0 +1,111 @@ +""" +Fetch exact kNN results for all 4 models by: +1. Getting query embedding via ES inference API +2. Running knn query on inner dense_vector field (semantic_text.inference.chunks.embeddings) + with num_candidates >= index doc count (forces exhaustive search) +""" + +import urllib.request, urllib.parse, json, sys +from base64 import b64decode + +ES = "http://elasticsearch:9200" +ES_AUTH = ("elastic", "123") +QUERY = "comparing yourself to others" +K = 10 # results to return + +MODELS = { + "openai-small-en": { + "inference_id": "openai-text-embedding-3-small", + "index": "english-openai-small-1779045411", + "num_docs": 99956, + "dims": 1536, + }, + "openai-small-multi": { + "inference_id": "openai-text-embedding-3-small", + "index": "multilingual-openai-small-1779017104", + "num_docs": 285236, + "dims": 1536, + }, + "nomic": { + "inference_id": "nomic-embed-text", + "index": "english-nomic-1779026769", + "num_docs": 99956, + "dims": 768, + }, + "mxbai": { + "inference_id": "mxbai-embed-large", + "index": "english-mxbai-1779026713", + "num_docs": 99956, + "dims": 1024, + }, +} + +import base64, urllib.error + + +def es_request(path, method="GET", body=None): + url = ES + path + data = json.dumps(body).encode() if body else None + req = urllib.request.Request(url, data=data, method=method) + creds = base64.b64encode(b"elastic:123").decode() + req.add_header("Authorization", f"Basic {creds}") + if data: + req.add_header("Content-Type", "application/json") + resp = urllib.request.urlopen(req, timeout=60) + return json.loads(resp.read()) + + +def get_embedding(inference_id, text): + r = es_request( + f"/_inference/text_embedding/{inference_id}", "POST", {"input": text} + ) + return r["text_embedding"][0]["embedding"] + + +results = {} +for model_key, cfg in MODELS.items(): + print(f"\nModel: {model_key}", file=sys.stderr) + print(f" Getting embedding via {cfg['inference_id']} ...", file=sys.stderr) + emb = get_embedding(cfg["inference_id"], QUERY) + print(f" Got {len(emb)}-dim embedding", file=sys.stderr) + + # Exact kNN: num_candidates >= num_docs forces exhaustive search + num_candidates = cfg["num_docs"] + 1000 + print( + f" Running knn on {cfg['index']} (num_candidates={num_candidates}) ...", + file=sys.stderr, + ) + + body = { + "size": K, + "knn": { + "field": "semantic_text.inference.chunks.embeddings", + "query_vector": emb, + "k": K, + "num_candidates": num_candidates, + "inner_hits": {"size": 1, "_source": False, "fields": []}, + }, + "_source": ["urn", "hadithText", "collection"], + } + + try: + r = es_request(f"/{cfg['index']}/_search", "POST", body) + hits = r.get("hits", {}).get("hits", []) + print(f" Got {len(hits)} hits", file=sys.stderr) + results[model_key] = [ + { + "rank": i + 1, + "urn": h.get("_source", {}).get("urn", h.get("_id", "")), + "score": round(h.get("_score", 0), 4), + "text": h.get("_source", {}).get("hadithText", ""), + } + for i, h in enumerate(hits) + ] + except urllib.error.HTTPError as e: + err_body = e.read().decode() + print(f" ERROR: {e.code} {err_body[:500]}", file=sys.stderr) + results[model_key] = {"error": err_body[:300]} + +with open("/tmp/exact_knn.json", "w") as f: + json.dump(results, f, indent=2) +print("\nDone — saved to /tmp/exact_knn.json", file=sys.stderr) diff --git a/tests/test_shortcode_pattern.py b/tests/test_shortcode_pattern.py index 4d35b95..1e370bd 100644 --- a/tests/test_shortcode_pattern.py +++ b/tests/test_shortcode_pattern.py @@ -41,7 +41,7 @@ def test_strips_self_closing_shortcode(self): def test_strips_unknown_but_shortcode_shaped_tags(self): # Future-proofing: any tag-shaped token is stripped. - self.assertEqual(strip("[somefuture id=\"5\"]z[/somefuture]"), " z ") + self.assertEqual(strip('[somefuture id="5"]z[/somefuture]'), " z ") def test_preserves_multi_word_parenthetical_asides(self): s = "Keep [bleeding (from the womb) in between a woman periods] intact" @@ -53,7 +53,9 @@ def test_preserves_qur_an_aside(self): def test_preserves_text_outside_shortcodes(self): s = "The Prophet (saws) said, [matn]be kind[/matn] to your neighbor." - self.assertEqual(strip(s), "The Prophet (saws) said, be kind to your neighbor.") + self.assertEqual( + strip(s), "The Prophet (saws) said, be kind to your neighbor." + ) def test_does_not_match_brackets_with_leading_space(self): # Tag name must be the first thing after `[`. diff --git a/utils/rate_limiter.py b/utils/rate_limiter.py new file mode 100644 index 0000000..4f0f741 --- /dev/null +++ b/utils/rate_limiter.py @@ -0,0 +1,43 @@ +"""Cross-thread interval pacer for outbound HTTP calls. + +Lives in utils/ so it can be reused if more remote inference providers are +added later, and so unit tests can exercise it without importing Flask. +""" + +import threading +import time + + +class RateLimiter: + """Enforce a minimum 60/rpm gap between successive `acquire()` calls. + + Chose interval pacing over a sliding window because some providers + (notably Mixedbread's free tier) reject bursts on the first second even + when the per-minute total is under cap. The window form would allow the + initial `rpm` calls instantly; this form spaces them evenly. + + Pass rpm = -1 to disable throttling — for providers that don't publish + an RPM cap (e.g. HF Dedicated Endpoints, which bill by compute time). + """ + + def __init__(self, rpm, log=None): + self.enabled = rpm > 0 + if rpm == 0 and log is not None: + log.warning( + "rate_limiter_rpm_zero", extra={"hint": "use -1 to disable throttling"} + ) + self.interval = 60.0 / rpm if self.enabled else 0.0 + self.lock = threading.Lock() + self.next_allowed = 0.0 + + def acquire(self): + if not self.enabled: + return + while True: + with self.lock: + now = time.monotonic() + if now >= self.next_allowed: + self.next_allowed = now + self.interval + return + wait = self.next_allowed - now + time.sleep(wait) diff --git a/uwsgi.ini b/uwsgi.ini index ee44f7a..b648664 100644 --- a/uwsgi.ini +++ b/uwsgi.ini @@ -8,4 +8,6 @@ enable-threads = true harakiri = 35 harakiri-verbose = true buffer-size=32768 +# /index rebuilds take minutes; bump harakiri for that route only. +route = ^/index harakiri:1800