Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .env.sample
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
VIDEO_DB_API_KEY=
VIDEO_DB_API_KEY=

# Optional: use TwelveLabs (Marengo retrieval + Pegasus answers) instead of VideoDB.
# Leave VIDEO_SEARCH_PROVIDER unset to keep the default VideoDB backend.
# VIDEO_SEARCH_PROVIDER=twelvelabs
# TWELVELABS_API_KEY=
# TWELVELABS_INDEX_ID=
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,43 @@ It enables developers to:
- **Upload your collection to VideoDB:** Add your links in `upload.py`.
- **Run locally:** Start the flask server with `python app.py`.

## Optional: use TwelveLabs instead of VideoDB 🧠

StreamRAG can also be powered by [TwelveLabs](https://twelvelabs.io) as an
**opt-in, drop-in backend** — no change to the GPT prompt or `openapi.yaml`:

* 🔍 **Marengo** handles retrieval — multimodal (visual + audio) search over your index.
* 📝 **Pegasus** handles answers — video understanding generates per-video summaries.

To switch over, set these in your `env` file (leave `VIDEO_SEARCH_PROVIDER`
unset to keep the default VideoDB behavior):

```bash
VIDEO_SEARCH_PROVIDER=twelvelabs
TWELVELABS_API_KEY=tlk_...
TWELVELABS_INDEX_ID=<your index id>
```

Create an index and upload videos once with the TwelveLabs SDK:

```python
from twelvelabs import TwelveLabs

client = TwelveLabs(api_key="tlk_...")
index = client.indexes.create(
index_name="streamrag",
models=[
{"model_name": "marengo3.0", "model_options": ["visual", "audio"]},
{"model_name": "pegasus1.2", "model_options": ["visual", "audio"]},
],
)
task = client.tasks.create(index_id=index.id, video_url="https://...")
client.tasks.wait_for_done(task_id=task.id)
print("Set TWELVELABS_INDEX_ID =", index.id)
```

Grab a free API key at [twelvelabs.io](https://twelvelabs.io) — there's a generous free tier. 🆓

## Publishing on ChatGPT Store 🏪
[📺 Watch: Create New GPT](https://console.videodb.io/player?url=https://stream.videodb.io/v3/published/manifests/b4b01b80-f38b-47f7-a238-09e53d844792.m3u8)

Expand Down
14 changes: 14 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@

load_dotenv()

# Opt-in alternative backend. When VIDEO_SEARCH_PROVIDER=twelvelabs, the
# /videos, /video/<id> and /search routes are served by TwelveLabs (Marengo
# retrieval + Pegasus answers). Unset/anything else keeps the default VideoDB
# behavior untouched. See twelvelabs_provider.py.
USE_TWELVELABS = os.getenv("VIDEO_SEARCH_PROVIDER", "").lower() == "twelvelabs"
if USE_TWELVELABS:
import twelvelabs_provider

# Flask config
app = Flask(__name__)
app.secret_key = os.getenv("SECRET_KEY")
Expand All @@ -29,6 +37,8 @@ def list_videos():
"""
Get a list of all videos in the database of your default collection.
"""
if USE_TWELVELABS:
return twelvelabs_provider.list_videos()
conn = get_connection()
all_videos = conn.get_collection().get_videos()
all_videos = [
Expand All @@ -49,6 +59,8 @@ def get_video(id):
"""
Get a single video by id from default collection
"""
if USE_TWELVELABS:
return twelvelabs_provider.get_video(id)
conn = get_connection()
all_videos = conn.get_collection().get_videos()

Expand Down Expand Up @@ -77,6 +89,8 @@ def search_videos():
"""
data = request.get_json()
query = data.get("query")
if USE_TWELVELABS:
return twelvelabs_provider.search(query)
conn = get_connection()
try:
coll = conn.get_collection()
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ MarkupSafe==2.1.3
python-dotenv==1.0.0
pytube==15.0.0
requests==2.31.0
twelvelabs==1.2.8
urllib3==2.1.0
videodb==0.0.2
Werkzeug==3.0.1
Expand Down
92 changes: 92 additions & 0 deletions test_twelvelabs_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""Tests for the opt-in TwelveLabs backend (twelvelabs_provider).

The unit tests mock the TwelveLabs SDK and need no network or API key.
The live test exercises real Marengo search + Pegasus analyze and is skipped
unless both TWELVELABS_API_KEY and TWELVELABS_INDEX_ID are set.

Run with: python -m unittest test_twelvelabs_provider
"""

import os
import unittest
from types import SimpleNamespace
from unittest import mock

import twelvelabs_provider as provider


class SearchShapeTest(unittest.TestCase):
def test_search_returns_videodb_compatible_shape(self):
items = [
SimpleNamespace(
transcription="a goal is scored",
thumbnail_url="https://t/1.jpg",
video_id="v1",
start=0.0,
end=8.0,
),
SimpleNamespace(
transcription=None,
thumbnail_url=None,
video_id="v2",
start=4.0,
end=12.0,
),
]
fake = mock.Mock()
fake.search.query.return_value = items

with mock.patch.dict(os.environ, {"TWELVELABS_INDEX_ID": "idx"}), \
mock.patch.object(provider, "get_client", return_value=fake):
result = provider.search("goal")

self.assertEqual(result["compilationVideo"], None)
self.assertEqual(len(result["chunks"]), 2)
# First chunk uses the transcription; second falls back to a locator.
self.assertEqual(result["chunks"][0]["text"], "a goal is scored")
self.assertIn("v2", result["chunks"][1]["text"])
# Search wiring uses the configured index and multimodal options.
_, kwargs = fake.search.query.call_args
self.assertEqual(kwargs["index_id"], "idx")
self.assertEqual(kwargs["search_options"], provider.SEARCH_OPTIONS)
self.assertEqual(kwargs["query_text"], "goal")

def test_get_video_uses_pegasus_summary(self):
fake = mock.Mock()
fake.analyze.return_value = SimpleNamespace(data="A soccer highlight.")
with mock.patch.object(provider, "get_client", return_value=fake):
result = provider.get_video("v1")

self.assertEqual(result["video"]["id"], "v1")
self.assertEqual(result["video"]["transcript"], "A soccer highlight.")
_, kwargs = fake.analyze.call_args
self.assertEqual(kwargs["model_name"], provider.PEGASUS_MODEL)
self.assertEqual(kwargs["video_id"], "v1")

def test_missing_index_id_raises(self):
with mock.patch.dict(os.environ, {}, clear=True), \
mock.patch.object(provider, "get_client", return_value=mock.Mock()):
with self.assertRaises(provider.TwelveLabsError):
provider.search("anything")


@unittest.skipUnless(
os.getenv("TWELVELABS_API_KEY") and os.getenv("TWELVELABS_INDEX_ID"),
"set TWELVELABS_API_KEY and TWELVELABS_INDEX_ID for the live test",
)
class LiveTest(unittest.TestCase):
def test_marengo_text_embedding(self):
vec = provider.text_embedding("a cat playing piano")
self.assertEqual(len(vec), 512)

def test_marengo_search_and_pegasus_answer(self):
result = provider.search("goal")
self.assertIn("chunks", result)
self.assertTrue(result["chunks"])
video_id = result["chunks"][0]["videoId"]
answer = provider.get_video(video_id)
self.assertTrue(answer["video"]["transcript"])


if __name__ == "__main__":
unittest.main()
159 changes: 159 additions & 0 deletions twelvelabs_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
"""Opt-in TwelveLabs backend for StreamRAG.

By default StreamRAG talks to VideoDB (see ``app.py``). This module provides an
alternative, opt-in backend powered by TwelveLabs:

* **Marengo** handles retrieval. ``search()`` runs a multimodal (visual + audio)
search over an index and returns the same ``{compilationVideo, chunks}`` shape
the VideoDB path returns, so the OpenAPI action and GPT prompt are unchanged.
* **Pegasus** handles answers. ``get_video()`` uses video understanding to
generate a description for an individual video instead of a raw transcript.

To switch StreamRAG over to TwelveLabs set two env vars (see ``.env.sample``)::

VIDEO_SEARCH_PROVIDER=twelvelabs
TWELVELABS_API_KEY=tlk_...
TWELVELABS_INDEX_ID=<your index id>

When ``VIDEO_SEARCH_PROVIDER`` is unset (or anything other than ``twelvelabs``)
this module is never imported and behavior is identical to upstream.

Create an index and upload videos with the TwelveLabs SDK once, up front::

from twelvelabs import TwelveLabs
client = TwelveLabs(api_key="tlk_...")
index = client.indexes.create(
index_name="streamrag",
models=[{"model_name": "marengo3.0", "model_options": ["visual", "audio"]},
{"model_name": "pegasus1.2", "model_options": ["visual", "audio"]}],
)
task = client.tasks.create(index_id=index.id, video_url="https://...")
client.tasks.wait_for_done(task_id=task.id)

Grab a free API key at https://twelvelabs.io (generous free tier).
"""

import os

from twelvelabs import TwelveLabs

# Marengo model used for both search and (text) embeddings.
MARENGO_MODEL = os.getenv("TWELVELABS_MARENGO_MODEL", "marengo3.0")
# Pegasus model used for video understanding / answers.
PEGASUS_MODEL = os.getenv("TWELVELABS_PEGASUS_MODEL", "pegasus1.2")
# Search modalities; Marengo indexes are typically created with visual + audio.
SEARCH_OPTIONS = ["visual", "audio"]


class TwelveLabsError(Exception):
"""Raised when the TwelveLabs backend is misconfigured."""


def _index_id():
index_id = os.getenv("TWELVELABS_INDEX_ID")
if not index_id:
raise TwelveLabsError(
"TWELVELABS_INDEX_ID is not set. Create an index and upload videos "
"with the TwelveLabs SDK, then set TWELVELABS_INDEX_ID."
)
return index_id


def get_client():
"""Create a TwelveLabs client from ``TWELVELABS_API_KEY``."""
api_key = os.getenv("TWELVELABS_API_KEY")
if not api_key:
raise TwelveLabsError("TWELVELABS_API_KEY is not set.")
return TwelveLabs(api_key=api_key)


def list_videos():
"""Mirror ``app.list_videos``: return ``{"videos": [...]}``."""
client = get_client()
index_id = _index_id()
videos = []
for vid in client.indexes.videos.list(index_id=index_id):
meta = getattr(vid, "system_metadata", None)
videos.append(
{
"id": vid.id,
"title": getattr(meta, "filename", None) or vid.id,
"url": getattr(meta, "video_url", None),
"length": round(float(getattr(meta, "duration", 0) or 0)),
}
)
return {"videos": videos}


def get_video(video_id):
"""Mirror ``app.get_video`` but answer with Pegasus video understanding.

Instead of a raw transcript we ask Pegasus to summarize the video, which is
the "answers" half of the integration. The summary is returned in the same
``transcript`` field so the existing OpenAPI schema and GPT prompt work
unchanged.
"""
client = get_client()
res = client.analyze(
model_name=PEGASUS_MODEL,
video_id=video_id,
prompt="Summarize this video, including the key topics and notable moments.",
max_tokens=2048,
)
return {
"video": {
"id": video_id,
"title": video_id,
"url": None,
"length": None,
"transcript": res.data or "",
}
}


def search(query):
"""Mirror ``app.search_videos``: return ``{"compilationVideo", "chunks"}``.

Marengo multimodal search returns ranked clips. Each clip becomes a chunk
with its transcription text and thumbnail. TwelveLabs does not stitch a
compilation video, so ``compilationVideo`` is ``None`` and the GPT falls
back to the per-clip references (the prompt already handles this).
"""
client = get_client()
index_id = _index_id()
results = client.search.query(
index_id=index_id,
search_options=SEARCH_OPTIONS,
query_text=query,
group_by="clip",
page_limit=10,
)
chunks = []
for item in results:
# Prefer the transcription when the index was built with it; otherwise
# fall back to a human-readable locator so the chunk is never empty.
text = item.transcription or (
f"Clip from video {item.video_id} "
f"({item.start:.0f}s-{item.end:.0f}s)"
)
chunks.append(
{
"text": text,
"video": item.thumbnail_url,
"videoId": item.video_id,
"start": item.start,
"end": item.end,
}
)
return {"compilationVideo": None, "chunks": chunks}


def text_embedding(text):
"""Return a 512-dim Marengo embedding for ``text``.

Exposed for callers that want to build their own vector index on top of
Marengo rather than use the hosted search above.
"""
client = get_client()
res = client.embed.create(model_name=MARENGO_MODEL, text=text)
return res.text_embedding.segments[0].float_