Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 31 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,47 @@ An autonomous AI system that manages email correspondence with new RAID (Respons
## Database Schema

```sql
-- 1. Create the users table
DROP TYPE IF EXISTS sender_type;

-- Clean up in dependency order in case you're rerunning the migration
DROP TABLE IF EXISTS emails CASCADE;
DROP TABLE IF EXISTS email_workflow CASCADE;
DROP TABLE IF EXISTS email_users CASCADE;
DROP TYPE IF EXISTS sender_type;

-- 1) Enum for sender
CREATE TYPE sender_type AS ENUM ('user', 'agent');

CREATE TABLE users (
email TEXT PRIMARY KEY,
-- 2) Users table (email directory for Gmail agent)
CREATE TABLE email_users (
users_email_id SERIAL PRIMARY KEY,
email TEXT NOT NULL,
name TEXT NOT NULL,
updated_at TIMESTAMPTZ DEFAULT NOW() NOT NULL
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT uq_email_users_email UNIQUE (email)
);

COMMENT ON TABLE users IS 'Stores user information for the Gmail agent.';
COMMENT ON TABLE email_users IS 'Stores user information for the Gmail agent.';

-- 2. Create the messages table
CREATE TABLE messages (
-- 3) Emails (messages) table
CREATE TABLE emails (
thread_id TEXT NOT NULL,
message_id TEXT NOT NULL,
user_email TEXT NOT NULL REFERENCES users(email) ON DELETE CASCADE,
email_id TEXT NOT NULL,
user_email TEXT NOT NULL,
sender sender_type NOT NULL,
body TEXT NOT NULL,
subject TEXT NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
PRIMARY KEY (thread_id, message_id)
PRIMARY KEY (email_id),
CONSTRAINT fk_emails_user_email
FOREIGN KEY (user_email)
REFERENCES email_users(email)
ON DELETE CASCADE
);

COMMENT ON TABLE messages IS 'Stores individual email messages for the Gmail agent.';
COMMENT ON TABLE emails IS 'Stores individual email messages for the Gmail agent.';

-- 3. Create table for workflow logging
CREATE TABLE IF NOT EXISTS workflows (
-- 4) Workflow log table
CREATE TABLE IF NOT EXISTS email_workflow (
id SERIAL PRIMARY KEY,
thread_id VARCHAR(255) UNIQUE NOT NULL,
step INTEGER NOT NULL DEFAULT 0,
Expand All @@ -67,20 +78,14 @@ CREATE TABLE IF NOT EXISTS workflows (
updated_at TIMESTAMP DEFAULT NOW()
);

-- 4. Create Indexes for Performance

-- Index for fetching all messages in a specific thread
CREATE INDEX idx_messages_thread_id ON messages(thread_id);

-- Index for finding all messages for a specific user
CREATE INDEX idx_messages_user_email ON messages(user_email);
-- 5) Indexes
CREATE INDEX IF NOT EXISTS idx_emails_thread_id ON emails(thread_id);
CREATE INDEX IF NOT EXISTS idx_emails_user_email ON emails(user_email);
CREATE INDEX IF NOT EXISTS idx_emails_timestamp ON emails(timestamp);
CREATE INDEX IF NOT EXISTS idx_email_workflow_thread_id ON email_workflow(thread_id);

-- Index for sorting messages chronologically within a thread
CREATE INDEX idx_messages_timestamp ON messages(timestamp);
COMMIT;

-- Index for filtering workflows for different threads
CREATE INDEX IF NOT EXISTS idx_workflows_thread_id ON workflows(thread_id);
```

## Setup

Expand Down
156 changes: 114 additions & 42 deletions src/database.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,68 @@
from supabase import Client
from __future__ import annotations
from datetime import datetime, timezone
from typing import Dict, Any, Tuple, Optional
import asyncio
from dotenv import load_dotenv
import os
import json
import logging

from sqlalchemy import String, Text, TIMESTAMP, func, ForeignKey, UniqueConstraint
from sqlalchemy import Column, Integer, select, JSON, true
from sqlalchemy.orm import declarative_base, Mapped, mapped_column, sessionmaker
from sqlalchemy.dialects.postgresql import insert, ENUM
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine

Base = declarative_base()

sender_type = ENUM("user", "agent", name="sender_type", create_type=False)

load_dotenv()
DATABASE_URL = os.getenv('DATABASE_URL')

logger = logging.getLogger(__name__)

# Ensure the DATABASE_URL uses an async driver (asyncpg for PostgreSQL)
if DATABASE_URL and DATABASE_URL.startswith('postgresql://'):
# Convert to asyncpg driver if not already
DATABASE_URL = DATABASE_URL.replace('postgresql://', 'postgresql+asyncpg://', 1)
# Optionally, warn if the user is not using asyncpg
elif DATABASE_URL and DATABASE_URL.startswith('postgresql+psycopg2://'):
DATABASE_URL = DATABASE_URL.replace('postgresql+psycopg2://', 'postgresql+asyncpg://', 1)

###### FIX THIS
class EmailUsers(Base):
__tablename__ = "email_users"
users_email_id = Column(Integer, primary_key=True, autoincrement=True)
email = Column(Text, nullable=False, unique=True)
name = Column(Text, nullable=False)
updated_at = Column(
TIMESTAMP(timezone=True),
nullable=False,
server_default=func.now(),
)

class Emails(Base):
__tablename__ = "emails"
thread_id = Column(Text, primary_key=True, nullable=False)
email_id = Column(Text, nullable=False)
user_email = Column(
Text,
ForeignKey("email_users.email", ondelete="CASCADE"),
nullable=False,
)
sender = Column(sender_type, nullable=False)
body = Column(Text, nullable=False)
subject = Column(Text, nullable=False)
timestamp = Column(TIMESTAMP(timezone=True), nullable=False)

class DatabaseManager:
def __init__(self, client: Client) -> None:
self.client = client
def __init__(self) -> None:
self.emails = None # Will be loaded asynchronously
self.engine = create_async_engine(DATABASE_URL, echo=False, future=True)
self.async_session = sessionmaker(self.engine, expire_on_commit=False, class_=AsyncSession)

def store_message(self, user_data: Dict[str, Any], message_data: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
async def store_message(self, user_data: Dict[str, Any], message_data: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""
Stores a user and their associated message in the database.

Expand All @@ -30,48 +86,53 @@ def store_message(self, user_data: Dict[str, Any], message_data: Dict[str, Any])
try:
# 1. Prepare and upsert the user record
# We use the current UTC time for the updated_at field
current_utc_time = datetime.now(timezone.utc).isoformat()
user_record = {
"email": user_data["email"],
"name": user_data["name"],
"updated_at": current_utc_time
}

current_utc_time = datetime.now(timezone.utc)

# Upsert the user. on_conflict='email' tells Supabase what the unique key is.
user_response = self.client.table("users") \
.upsert(user_record, on_conflict="email") \
.execute()


# Check for errors in the user upsert operation
error = getattr(user_response, 'error', None)
if error:
return False, f"Error upserting user: {getattr(error, 'message', 'Unknown error')}"

# print(f"✓ User upserted successfully: {user_data['email']}")
try:
async with self.async_session() as session:
async with session.begin():
result = await session.execute(
select(EmailUsers).where(EmailUsers.email == user_data["email"])
)
existing_user = result.scalar_one_or_none()

if not existing_user:
# Insert user
new_user = EmailUsers(
email= user_data["email"],
name= user_data["name"],
updated_at = current_utc_time
)
session.add(new_user)
await session.flush() # Get thread_id
await session.commit()
except Exception as e:
logger.error(f"Error processing thread with {user_data["email"]}: {e}")



# 2. Insert the message record
# The 'user_email' field in the message table is populated from the user_data
message_record = {
"thread_id": message_data["thread_id"],
"message_id": message_data["message_id"],
"user_email": user_data["email"], # This links the message to the user
"sender": message_data["sender"], # Changed from 'from' to 'sender'
"body": message_data["body"],
"subject": message_data["subject"],
"timestamp": message_data["timestamp"]
}

message_response = self.client.table("messages") \
.upsert(message_record, on_conflict="thread_id,message_id") \
.execute()

# Check for errors in the message insert operation
error = getattr(message_response, 'error', None)
if error:
return False, f"Error inserting message: {getattr(error, 'message', 'Unknown error')}"

# print(f"✓ Message inserted successfully into thread '{message_data['thread_id']}'")
try:
async with self.async_session() as session:
async with session.begin():
# Insert thread
new_thread = Emails(
thread_id= message_data["thread_id"],
email_id= message_data["message_id"],
user_email= user_data["email"], # This links the message to the user
sender= message_data["sender"], # Changed from 'from' to 'sender'
body= message_data["body"],
subject= message_data["subject"],
timestamp= message_data["timestamp"]
)
session.add(new_thread)
await session.flush() # Get thread_id
await session.commit()
except Exception as e:
logger.error(f"Error processing thread with {user_data["email"]}: {e}")

return True, None

Expand All @@ -86,5 +147,16 @@ def store_message(self, user_data: Dict[str, Any], message_data: Dict[str, Any])
print(error_msg)
return False, error_msg


if __name__=="__main__":
client=DatabaseManager()
user_data={"email":"test@gmail.com", "name":"Test Name"}
message_data={
"thread_id": "18a9f84f0c5d7bae",
"message_id": "CAHkZjsD0eXBmpf",
"sender": "user",
"body": "This is the new email text content.",
"subject": "Re: Hello",
"timestamp": datetime.now(timezone.utc)
}
asyncio.run(client.store_message(user_data=user_data, message_data=message_data))