From 514cee56b6f6546c828e03bd34f4ad98f1c00d3a Mon Sep 17 00:00:00 2001 From: Aaron-DSCubed Date: Tue, 23 Sep 2025 20:20:53 +1000 Subject: [PATCH 1/2] edits to database.py and readme so it connects to postgres (dscubed db) --- README.md | 57 ++++++++++-------- src/database.py | 156 +++++++++++++++++++++++++++++++++++------------- 2 files changed, 145 insertions(+), 68 deletions(-) diff --git a/README.md b/README.md index 26f0f5d..2acf24d 100644 --- a/README.md +++ b/README.md @@ -29,36 +29,47 @@ An autonomous AI system that manages email correspondence with new RAID (Respons ## Database Schema ```sql --- 1. Create the users table -DROP TYPE IF EXISTS sender_type; +-- Clean up in dependency order in case you're rerunning the migration +DROP TABLE IF EXISTS emails CASCADE; +DROP TABLE IF EXISTS email_workflow CASCADE; +DROP TABLE IF EXISTS email_users CASCADE; +DROP TYPE IF EXISTS sender_type; +-- 1) Enum for sender CREATE TYPE sender_type AS ENUM ('user', 'agent'); -CREATE TABLE users ( - email TEXT PRIMARY KEY, +-- 2) Users table (email directory for Gmail agent) +CREATE TABLE email_users ( + users_email_id SERIAL PRIMARY KEY, + email TEXT NOT NULL, name TEXT NOT NULL, - updated_at TIMESTAMPTZ DEFAULT NOW() NOT NULL + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + CONSTRAINT uq_email_users_email UNIQUE (email) ); -COMMENT ON TABLE users IS 'Stores user information for the Gmail agent.'; +COMMENT ON TABLE email_users IS 'Stores user information for the Gmail agent.'; --- 2. Create the messages table -CREATE TABLE messages ( +-- 3) Emails (messages) table +CREATE TABLE emails ( thread_id TEXT NOT NULL, - message_id TEXT NOT NULL, - user_email TEXT NOT NULL REFERENCES users(email) ON DELETE CASCADE, + email_id TEXT NOT NULL, + user_email TEXT NOT NULL, sender sender_type NOT NULL, body TEXT NOT NULL, subject TEXT NOT NULL, timestamp TIMESTAMPTZ NOT NULL, - PRIMARY KEY (thread_id, message_id) + PRIMARY KEY (thread_id, email_id), + CONSTRAINT fk_emails_user_email + FOREIGN KEY (user_email) + REFERENCES email_users(email) + ON DELETE CASCADE ); -COMMENT ON TABLE messages IS 'Stores individual email messages for the Gmail agent.'; +COMMENT ON TABLE emails IS 'Stores individual email messages for the Gmail agent.'; --- 3. Create table for workflow logging -CREATE TABLE IF NOT EXISTS workflows ( +-- 4) Workflow log table +CREATE TABLE IF NOT EXISTS email_workflow ( id SERIAL PRIMARY KEY, thread_id VARCHAR(255) UNIQUE NOT NULL, step INTEGER NOT NULL DEFAULT 0, @@ -67,20 +78,14 @@ CREATE TABLE IF NOT EXISTS workflows ( updated_at TIMESTAMP DEFAULT NOW() ); --- 4. Create Indexes for Performance - --- Index for fetching all messages in a specific thread -CREATE INDEX idx_messages_thread_id ON messages(thread_id); - --- Index for finding all messages for a specific user -CREATE INDEX idx_messages_user_email ON messages(user_email); +-- 5) Indexes +CREATE INDEX IF NOT EXISTS idx_emails_thread_id ON emails(thread_id); +CREATE INDEX IF NOT EXISTS idx_emails_user_email ON emails(user_email); +CREATE INDEX IF NOT EXISTS idx_emails_timestamp ON emails(timestamp); +CREATE INDEX IF NOT EXISTS idx_email_workflow_thread_id ON email_workflow(thread_id); --- Index for sorting messages chronologically within a thread -CREATE INDEX idx_messages_timestamp ON messages(timestamp); +COMMIT; --- Index for filtering workflows for different threads -CREATE INDEX IF NOT EXISTS idx_workflows_thread_id ON workflows(thread_id); -``` ## Setup diff --git a/src/database.py b/src/database.py index c3c65b1..ba87b2c 100644 --- a/src/database.py +++ b/src/database.py @@ -1,12 +1,68 @@ -from supabase import Client +from __future__ import annotations from datetime import datetime, timezone from typing import Dict, Any, Tuple, Optional +import asyncio +from dotenv import load_dotenv +import os +import json +import logging + +from sqlalchemy import String, Text, TIMESTAMP, func, ForeignKey, UniqueConstraint +from sqlalchemy import Column, Integer, select, JSON, true +from sqlalchemy.orm import declarative_base, Mapped, mapped_column, sessionmaker +from sqlalchemy.dialects.postgresql import insert, ENUM +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine + +Base = declarative_base() + +sender_type = ENUM("user", "agent", name="sender_type", create_type=False) + +load_dotenv() +DATABASE_URL = os.getenv('DATABASE_URL') + +logger = logging.getLogger(__name__) + +# Ensure the DATABASE_URL uses an async driver (asyncpg for PostgreSQL) +if DATABASE_URL and DATABASE_URL.startswith('postgresql://'): + # Convert to asyncpg driver if not already + DATABASE_URL = DATABASE_URL.replace('postgresql://', 'postgresql+asyncpg://', 1) + # Optionally, warn if the user is not using asyncpg +elif DATABASE_URL and DATABASE_URL.startswith('postgresql+psycopg2://'): + DATABASE_URL = DATABASE_URL.replace('postgresql+psycopg2://', 'postgresql+asyncpg://', 1) + +###### FIX THIS +class EmailUsers(Base): + __tablename__ = "email_users" + users_email_id = Column(Integer, primary_key=True, autoincrement=True) + email = Column(Text, nullable=False, unique=True) + name = Column(Text, nullable=False) + updated_at = Column( + TIMESTAMP(timezone=True), + nullable=False, + server_default=func.now(), + ) + +class Emails(Base): + __tablename__ = "emails" + thread_id = Column(Text, primary_key=True, nullable=False) + email_id = Column(Text, nullable=False) + user_email = Column( + Text, + ForeignKey("email_users.email", ondelete="CASCADE"), + nullable=False, + ) + sender = Column(sender_type, nullable=False) + body = Column(Text, nullable=False) + subject = Column(Text, nullable=False) + timestamp = Column(TIMESTAMP(timezone=True), nullable=False) class DatabaseManager: - def __init__(self, client: Client) -> None: - self.client = client + def __init__(self) -> None: + self.emails = None # Will be loaded asynchronously + self.engine = create_async_engine(DATABASE_URL, echo=False, future=True) + self.async_session = sessionmaker(self.engine, expire_on_commit=False, class_=AsyncSession) - def store_message(self, user_data: Dict[str, Any], message_data: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + async def store_message(self, user_data: Dict[str, Any], message_data: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """ Stores a user and their associated message in the database. @@ -30,48 +86,53 @@ def store_message(self, user_data: Dict[str, Any], message_data: Dict[str, Any]) try: # 1. Prepare and upsert the user record # We use the current UTC time for the updated_at field - current_utc_time = datetime.now(timezone.utc).isoformat() - user_record = { - "email": user_data["email"], - "name": user_data["name"], - "updated_at": current_utc_time - } - + current_utc_time = datetime.now(timezone.utc) + # Upsert the user. on_conflict='email' tells Supabase what the unique key is. - user_response = self.client.table("users") \ - .upsert(user_record, on_conflict="email") \ - .execute() - - # Check for errors in the user upsert operation - error = getattr(user_response, 'error', None) - if error: - return False, f"Error upserting user: {getattr(error, 'message', 'Unknown error')}" - - # print(f"✓ User upserted successfully: {user_data['email']}") + try: + async with self.async_session() as session: + async with session.begin(): + result = await session.execute( + select(EmailUsers).where(EmailUsers.email == user_data["email"]) + ) + existing_user = result.scalar_one_or_none() + + if not existing_user: + # Insert user + new_user = EmailUsers( + email= user_data["email"], + name= user_data["name"], + updated_at = current_utc_time + ) + session.add(new_user) + await session.flush() # Get thread_id + await session.commit() + except Exception as e: + logger.error(f"Error processing thread with {user_data["email"]}: {e}") + + # 2. Insert the message record - # The 'user_email' field in the message table is populated from the user_data - message_record = { - "thread_id": message_data["thread_id"], - "message_id": message_data["message_id"], - "user_email": user_data["email"], # This links the message to the user - "sender": message_data["sender"], # Changed from 'from' to 'sender' - "body": message_data["body"], - "subject": message_data["subject"], - "timestamp": message_data["timestamp"] - } - - message_response = self.client.table("messages") \ - .upsert(message_record, on_conflict="thread_id,message_id") \ - .execute() - - # Check for errors in the message insert operation - error = getattr(message_response, 'error', None) - if error: - return False, f"Error inserting message: {getattr(error, 'message', 'Unknown error')}" - # print(f"✓ Message inserted successfully into thread '{message_data['thread_id']}'") + try: + async with self.async_session() as session: + async with session.begin(): + # Insert thread + new_thread = Emails( + thread_id= message_data["thread_id"], + email_id= message_data["message_id"], + user_email= user_data["email"], # This links the message to the user + sender= message_data["sender"], # Changed from 'from' to 'sender' + body= message_data["body"], + subject= message_data["subject"], + timestamp= message_data["timestamp"] + ) + session.add(new_thread) + await session.flush() # Get thread_id + await session.commit() + except Exception as e: + logger.error(f"Error processing thread with {user_data["email"]}: {e}") return True, None @@ -86,5 +147,16 @@ def store_message(self, user_data: Dict[str, Any], message_data: Dict[str, Any]) print(error_msg) return False, error_msg - +if __name__=="__main__": + client=DatabaseManager() + user_data={"email":"test@gmail.com", "name":"Test Name"} + message_data={ + "thread_id": "18a9f84f0c5d7bae", + "message_id": "CAHkZjsD0eXAmpf", + "sender": "user", + "body": "This is the email text content.", + "subject": "Re: Hello", + "timestamp": datetime.now(timezone.utc) + } + asyncio.run(client.store_message(user_data=user_data, message_data=message_data)) From f97c8dd19f697f5ea7e8716c6b41debbcf87de31 Mon Sep 17 00:00:00 2001 From: Aaron-DSCubed Date: Wed, 24 Sep 2025 09:11:47 +1000 Subject: [PATCH 2/2] Edit to make message_id the primary key that is unique for emails table --- README.md | 2 +- src/database.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 2acf24d..889418e 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ CREATE TABLE emails ( body TEXT NOT NULL, subject TEXT NOT NULL, timestamp TIMESTAMPTZ NOT NULL, - PRIMARY KEY (thread_id, email_id), + PRIMARY KEY (email_id), CONSTRAINT fk_emails_user_email FOREIGN KEY (user_email) REFERENCES email_users(email) diff --git a/src/database.py b/src/database.py index ba87b2c..65b4441 100644 --- a/src/database.py +++ b/src/database.py @@ -152,9 +152,9 @@ async def store_message(self, user_data: Dict[str, Any], message_data: Dict[str, user_data={"email":"test@gmail.com", "name":"Test Name"} message_data={ "thread_id": "18a9f84f0c5d7bae", - "message_id": "CAHkZjsD0eXAmpf", + "message_id": "CAHkZjsD0eXBmpf", "sender": "user", - "body": "This is the email text content.", + "body": "This is the new email text content.", "subject": "Re: Hello", "timestamp": datetime.now(timezone.utc) }