diff --git a/.env.sample b/.env.sample index e3388fd..bb73f27 100644 --- a/.env.sample +++ b/.env.sample @@ -1 +1,7 @@ -VIDEO_DB_API_KEY= \ No newline at end of file +VIDEO_DB_API_KEY= + +# Optional: use TwelveLabs (Marengo retrieval + Pegasus answers) instead of VideoDB. +# Leave VIDEO_SEARCH_PROVIDER unset to keep the default VideoDB backend. +# VIDEO_SEARCH_PROVIDER=twelvelabs +# TWELVELABS_API_KEY= +# TWELVELABS_INDEX_ID= diff --git a/README.md b/README.md index d48d010..ffb9ee5 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,43 @@ It enables developers to: - **Upload your collection to VideoDB:** Add your links in `upload.py`. - **Run locally:** Start the flask server with `python app.py`. +## Optional: use TwelveLabs instead of VideoDB 🧠 + +StreamRAG can also be powered by [TwelveLabs](https://twelvelabs.io) as an +**opt-in, drop-in backend** — no change to the GPT prompt or `openapi.yaml`: + +* 🔍 **Marengo** handles retrieval — multimodal (visual + audio) search over your index. +* 📝 **Pegasus** handles answers — video understanding generates per-video summaries. + +To switch over, set these in your `env` file (leave `VIDEO_SEARCH_PROVIDER` +unset to keep the default VideoDB behavior): + +```bash +VIDEO_SEARCH_PROVIDER=twelvelabs +TWELVELABS_API_KEY=tlk_... +TWELVELABS_INDEX_ID= +``` + +Create an index and upload videos once with the TwelveLabs SDK: + +```python +from twelvelabs import TwelveLabs + +client = TwelveLabs(api_key="tlk_...") +index = client.indexes.create( + index_name="streamrag", + models=[ + {"model_name": "marengo3.0", "model_options": ["visual", "audio"]}, + {"model_name": "pegasus1.2", "model_options": ["visual", "audio"]}, + ], +) +task = client.tasks.create(index_id=index.id, video_url="https://...") +client.tasks.wait_for_done(task_id=task.id) +print("Set TWELVELABS_INDEX_ID =", index.id) +``` + +Grab a free API key at [twelvelabs.io](https://twelvelabs.io) — there's a generous free tier. 🆓 + ## Publishing on ChatGPT Store 🏪 [📺 Watch: Create New GPT](https://console.videodb.io/player?url=https://stream.videodb.io/v3/published/manifests/b4b01b80-f38b-47f7-a238-09e53d844792.m3u8) diff --git a/app.py b/app.py index 92c068b..2db7b5e 100644 --- a/app.py +++ b/app.py @@ -7,6 +7,14 @@ load_dotenv() +# Opt-in alternative backend. When VIDEO_SEARCH_PROVIDER=twelvelabs, the +# /videos, /video/ and /search routes are served by TwelveLabs (Marengo +# retrieval + Pegasus answers). Unset/anything else keeps the default VideoDB +# behavior untouched. See twelvelabs_provider.py. +USE_TWELVELABS = os.getenv("VIDEO_SEARCH_PROVIDER", "").lower() == "twelvelabs" +if USE_TWELVELABS: + import twelvelabs_provider + # Flask config app = Flask(__name__) app.secret_key = os.getenv("SECRET_KEY") @@ -29,6 +37,8 @@ def list_videos(): """ Get a list of all videos in the database of your default collection. """ + if USE_TWELVELABS: + return twelvelabs_provider.list_videos() conn = get_connection() all_videos = conn.get_collection().get_videos() all_videos = [ @@ -49,6 +59,8 @@ def get_video(id): """ Get a single video by id from default collection """ + if USE_TWELVELABS: + return twelvelabs_provider.get_video(id) conn = get_connection() all_videos = conn.get_collection().get_videos() @@ -77,6 +89,8 @@ def search_videos(): """ data = request.get_json() query = data.get("query") + if USE_TWELVELABS: + return twelvelabs_provider.search(query) conn = get_connection() try: coll = conn.get_collection() diff --git a/requirements.txt b/requirements.txt index 95ccaa9..7e9d3d2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,6 +14,7 @@ MarkupSafe==2.1.3 python-dotenv==1.0.0 pytube==15.0.0 requests==2.31.0 +twelvelabs==1.2.8 urllib3==2.1.0 videodb==0.0.2 Werkzeug==3.0.1 diff --git a/test_twelvelabs_provider.py b/test_twelvelabs_provider.py new file mode 100644 index 0000000..d78ff65 --- /dev/null +++ b/test_twelvelabs_provider.py @@ -0,0 +1,92 @@ +"""Tests for the opt-in TwelveLabs backend (twelvelabs_provider). + +The unit tests mock the TwelveLabs SDK and need no network or API key. +The live test exercises real Marengo search + Pegasus analyze and is skipped +unless both TWELVELABS_API_KEY and TWELVELABS_INDEX_ID are set. + +Run with: python -m unittest test_twelvelabs_provider +""" + +import os +import unittest +from types import SimpleNamespace +from unittest import mock + +import twelvelabs_provider as provider + + +class SearchShapeTest(unittest.TestCase): + def test_search_returns_videodb_compatible_shape(self): + items = [ + SimpleNamespace( + transcription="a goal is scored", + thumbnail_url="https://t/1.jpg", + video_id="v1", + start=0.0, + end=8.0, + ), + SimpleNamespace( + transcription=None, + thumbnail_url=None, + video_id="v2", + start=4.0, + end=12.0, + ), + ] + fake = mock.Mock() + fake.search.query.return_value = items + + with mock.patch.dict(os.environ, {"TWELVELABS_INDEX_ID": "idx"}), \ + mock.patch.object(provider, "get_client", return_value=fake): + result = provider.search("goal") + + self.assertEqual(result["compilationVideo"], None) + self.assertEqual(len(result["chunks"]), 2) + # First chunk uses the transcription; second falls back to a locator. + self.assertEqual(result["chunks"][0]["text"], "a goal is scored") + self.assertIn("v2", result["chunks"][1]["text"]) + # Search wiring uses the configured index and multimodal options. + _, kwargs = fake.search.query.call_args + self.assertEqual(kwargs["index_id"], "idx") + self.assertEqual(kwargs["search_options"], provider.SEARCH_OPTIONS) + self.assertEqual(kwargs["query_text"], "goal") + + def test_get_video_uses_pegasus_summary(self): + fake = mock.Mock() + fake.analyze.return_value = SimpleNamespace(data="A soccer highlight.") + with mock.patch.object(provider, "get_client", return_value=fake): + result = provider.get_video("v1") + + self.assertEqual(result["video"]["id"], "v1") + self.assertEqual(result["video"]["transcript"], "A soccer highlight.") + _, kwargs = fake.analyze.call_args + self.assertEqual(kwargs["model_name"], provider.PEGASUS_MODEL) + self.assertEqual(kwargs["video_id"], "v1") + + def test_missing_index_id_raises(self): + with mock.patch.dict(os.environ, {}, clear=True), \ + mock.patch.object(provider, "get_client", return_value=mock.Mock()): + with self.assertRaises(provider.TwelveLabsError): + provider.search("anything") + + +@unittest.skipUnless( + os.getenv("TWELVELABS_API_KEY") and os.getenv("TWELVELABS_INDEX_ID"), + "set TWELVELABS_API_KEY and TWELVELABS_INDEX_ID for the live test", +) +class LiveTest(unittest.TestCase): + def test_marengo_text_embedding(self): + vec = provider.text_embedding("a cat playing piano") + self.assertEqual(len(vec), 512) + + def test_marengo_search_and_pegasus_answer(self): + result = provider.search("goal") + self.assertIn("chunks", result) + self.assertTrue(result["chunks"]) + video_id = result["chunks"][0]["videoId"] + answer = provider.get_video(video_id) + self.assertTrue(answer["video"]["transcript"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/twelvelabs_provider.py b/twelvelabs_provider.py new file mode 100644 index 0000000..af45edb --- /dev/null +++ b/twelvelabs_provider.py @@ -0,0 +1,159 @@ +"""Opt-in TwelveLabs backend for StreamRAG. + +By default StreamRAG talks to VideoDB (see ``app.py``). This module provides an +alternative, opt-in backend powered by TwelveLabs: + +* **Marengo** handles retrieval. ``search()`` runs a multimodal (visual + audio) + search over an index and returns the same ``{compilationVideo, chunks}`` shape + the VideoDB path returns, so the OpenAPI action and GPT prompt are unchanged. +* **Pegasus** handles answers. ``get_video()`` uses video understanding to + generate a description for an individual video instead of a raw transcript. + +To switch StreamRAG over to TwelveLabs set two env vars (see ``.env.sample``):: + + VIDEO_SEARCH_PROVIDER=twelvelabs + TWELVELABS_API_KEY=tlk_... + TWELVELABS_INDEX_ID= + +When ``VIDEO_SEARCH_PROVIDER`` is unset (or anything other than ``twelvelabs``) +this module is never imported and behavior is identical to upstream. + +Create an index and upload videos with the TwelveLabs SDK once, up front:: + + from twelvelabs import TwelveLabs + client = TwelveLabs(api_key="tlk_...") + index = client.indexes.create( + index_name="streamrag", + models=[{"model_name": "marengo3.0", "model_options": ["visual", "audio"]}, + {"model_name": "pegasus1.2", "model_options": ["visual", "audio"]}], + ) + task = client.tasks.create(index_id=index.id, video_url="https://...") + client.tasks.wait_for_done(task_id=task.id) + +Grab a free API key at https://twelvelabs.io (generous free tier). +""" + +import os + +from twelvelabs import TwelveLabs + +# Marengo model used for both search and (text) embeddings. +MARENGO_MODEL = os.getenv("TWELVELABS_MARENGO_MODEL", "marengo3.0") +# Pegasus model used for video understanding / answers. +PEGASUS_MODEL = os.getenv("TWELVELABS_PEGASUS_MODEL", "pegasus1.2") +# Search modalities; Marengo indexes are typically created with visual + audio. +SEARCH_OPTIONS = ["visual", "audio"] + + +class TwelveLabsError(Exception): + """Raised when the TwelveLabs backend is misconfigured.""" + + +def _index_id(): + index_id = os.getenv("TWELVELABS_INDEX_ID") + if not index_id: + raise TwelveLabsError( + "TWELVELABS_INDEX_ID is not set. Create an index and upload videos " + "with the TwelveLabs SDK, then set TWELVELABS_INDEX_ID." + ) + return index_id + + +def get_client(): + """Create a TwelveLabs client from ``TWELVELABS_API_KEY``.""" + api_key = os.getenv("TWELVELABS_API_KEY") + if not api_key: + raise TwelveLabsError("TWELVELABS_API_KEY is not set.") + return TwelveLabs(api_key=api_key) + + +def list_videos(): + """Mirror ``app.list_videos``: return ``{"videos": [...]}``.""" + client = get_client() + index_id = _index_id() + videos = [] + for vid in client.indexes.videos.list(index_id=index_id): + meta = getattr(vid, "system_metadata", None) + videos.append( + { + "id": vid.id, + "title": getattr(meta, "filename", None) or vid.id, + "url": getattr(meta, "video_url", None), + "length": round(float(getattr(meta, "duration", 0) or 0)), + } + ) + return {"videos": videos} + + +def get_video(video_id): + """Mirror ``app.get_video`` but answer with Pegasus video understanding. + + Instead of a raw transcript we ask Pegasus to summarize the video, which is + the "answers" half of the integration. The summary is returned in the same + ``transcript`` field so the existing OpenAPI schema and GPT prompt work + unchanged. + """ + client = get_client() + res = client.analyze( + model_name=PEGASUS_MODEL, + video_id=video_id, + prompt="Summarize this video, including the key topics and notable moments.", + max_tokens=2048, + ) + return { + "video": { + "id": video_id, + "title": video_id, + "url": None, + "length": None, + "transcript": res.data or "", + } + } + + +def search(query): + """Mirror ``app.search_videos``: return ``{"compilationVideo", "chunks"}``. + + Marengo multimodal search returns ranked clips. Each clip becomes a chunk + with its transcription text and thumbnail. TwelveLabs does not stitch a + compilation video, so ``compilationVideo`` is ``None`` and the GPT falls + back to the per-clip references (the prompt already handles this). + """ + client = get_client() + index_id = _index_id() + results = client.search.query( + index_id=index_id, + search_options=SEARCH_OPTIONS, + query_text=query, + group_by="clip", + page_limit=10, + ) + chunks = [] + for item in results: + # Prefer the transcription when the index was built with it; otherwise + # fall back to a human-readable locator so the chunk is never empty. + text = item.transcription or ( + f"Clip from video {item.video_id} " + f"({item.start:.0f}s-{item.end:.0f}s)" + ) + chunks.append( + { + "text": text, + "video": item.thumbnail_url, + "videoId": item.video_id, + "start": item.start, + "end": item.end, + } + ) + return {"compilationVideo": None, "chunks": chunks} + + +def text_embedding(text): + """Return a 512-dim Marengo embedding for ``text``. + + Exposed for callers that want to build their own vector index on top of + Marengo rather than use the hosted search above. + """ + client = get_client() + res = client.embed.create(model_name=MARENGO_MODEL, text=text) + return res.text_embedding.segments[0].float_