Add Notion and Google Drive connector routes#205
Conversation
|
✅ Staging Deployment Report
🟢 Staging is live and healthy! Test your changes at the staging URL above. Ready to ship? Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a new connector OAuth routing system for external knowledge sources, specifically Notion and Google Drive, along with configuration settings and tests. The review feedback highlights several important improvements: migrating from in-memory dictionaries to a persistent store like Redis to support multi-process production environments, handling OAuth errors and redirecting users back to the frontend during callbacks, removing the unused 'pending' state, and cleaning up expired states to prevent memory leaks.
| _pending_states: Dict[str, PendingOAuthState] = {} | ||
| _connections: Dict[str, StoredConnection] = {} |
There was a problem hiding this comment.
Using in-memory dictionaries (_pending_states and _connections) for state and connection tracking will fail in multi-process (e.g., multiple uvicorn/gunicorn workers) or multi-instance (e.g., load-balanced containers/servers) production environments. The state generated on one worker will not be available if the callback hits a different worker. Consider using a shared/persistent store (such as Redis or the existing database store) for production deployments.
| async def connector_oauth_callback( | ||
| connector_id: str, | ||
| code: str = Query(..., min_length=1), | ||
| state: str = Query(..., min_length=1), | ||
| ) -> dict: | ||
| connector = _get_connector(connector_id) | ||
| pending = _pending_states.pop(state, None) | ||
| if not pending or pending.connector_id != connector.id or pending.expires_at <= _now(): | ||
| raise HTTPException( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail="Invalid or expired connector authorization state", | ||
| ) | ||
|
|
||
| # Token exchange and source ingestion are intentionally separate follow-up steps. | ||
| # This callback validates the flow and records a pending connection marker only. | ||
| _connections[_connection_key(pending.user_id, connector.id)] = StoredConnection( | ||
| connector_id=connector.id, | ||
| user_id=pending.user_id, | ||
| connected_at=_now(), | ||
| scopes=connector.scopes, | ||
| ) | ||
| return { | ||
| "status": "connected", | ||
| "connector_id": connector.id, | ||
| "detail": f"{connector.name} authorization received", | ||
| } |
There was a problem hiding this comment.
Improve the OAuth callback handler to:
- Handle errors and denials gracefully: If the user cancels or denies the authorization request, the OAuth provider redirects with an
errorquery parameter instead of acode. Makingcodeoptional and checking forerrorprevents a raw422 Unprocessable Entityvalidation error. - Redirect to the frontend: Instead of returning a raw JSON response (which leaves the user stranded on a blank API page), redirect them back to the frontend application using
RedirectResponsewith the status and connector ID.
async def connector_oauth_callback(
connector_id: str,
state: str = Query(..., min_length=1),
code: Optional[str] = Query(None),
error: Optional[str] = Query(None),
) -> RedirectResponse:
connector = _get_connector(connector_id)
if error or not code:
return RedirectResponse(
url=f"{settings.frontend_url}/connectors?status=error&error={error or 'access_denied'}&connector_id={connector.id}"
)
pending = _pending_states.pop(state, None)
if not pending or pending.connector_id != connector.id or pending.expires_at <= _now():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired connector authorization state",
)
# Token exchange and source ingestion are intentionally separate follow-up steps.
# This callback validates the flow and records a pending connection marker only.
_connections[_connection_key(pending.user_id, connector.id)] = StoredConnection(
connector_id=connector.id,
user_id=pending.user_id,
connected_at=_now(),
scopes=connector.scopes,
)
return RedirectResponse(
url=f"{settings.frontend_url}/connectors?status=success&connector_id={connector.id}"
)| from fastapi import APIRouter, Depends, HTTPException, Query, status | ||
| from pydantic import BaseModel, Field |
There was a problem hiding this comment.
Import RedirectResponse from fastapi.responses to support redirecting the user back to the frontend application after the OAuth callback completes.
| from fastapi import APIRouter, Depends, HTTPException, Query, status | |
| from pydantic import BaseModel, Field | |
| from fastapi import APIRouter, Depends, HTTPException, Query, status | |
| from fastapi.responses import RedirectResponse | |
| from pydantic import BaseModel, Field |
| router = APIRouter(prefix="/api/connectors", tags=["Connectors"]) | ||
|
|
||
| ConnectorId = Literal["notion", "google-drive"] | ||
| ConnectorState = Literal["connected", "not_connected", "pending"] |
There was a problem hiding this comment.
The ConnectorState literal includes "pending", but this state is never returned by _status_for or used anywhere in the status responses. If pending status is not needed, consider removing it from the Literal type to keep the schema clean. Otherwise, implement a check in _status_for to see if there is an active pending state in _pending_states for the user.
| ConnectorState = Literal["connected", "not_connected", "pending"] | |
| ConnectorState = Literal["connected", "not_connected"] |
| state = secrets.token_urlsafe(32) | ||
| expires_at = _now() + timedelta(minutes=STATE_TTL_MINUTES) | ||
| _pending_states[state] = PendingOAuthState( | ||
| connector_id=connector.id, | ||
| user_id=str(current_user.get("id")), | ||
| expires_at=expires_at, | ||
| ) |
There was a problem hiding this comment.
To prevent memory leaks from abandoned OAuth flows, clean up any expired states from the _pending_states dictionary whenever a new OAuth flow is initiated.
# Clean up expired states to prevent memory leaks
now_time = _now()
expired_states = [k for k, v in _pending_states.items() if v.expires_at <= now_time]
for k in expired_states:
_pending_states.pop(k, None)
state = secrets.token_urlsafe(32)
expires_at = now_time + timedelta(minutes=STATE_TTL_MINUTES)
_pending_states[state] = PendingOAuthState(
connector_id=connector.id,
user_id=str(current_user.get("id")),
expires_at=expires_at,
)
🔍 API Schema Diff---REPORT--- Auto-generated by API Schema Diff workflow |
|
| Filename | Overview |
|---|---|
| src/api/routes/connectors.py | New connector OAuth routes — addresses previously flagged unauthenticated callback and memory-leak issues; two minor off-by-one and input-validation concerns remain |
| tests/api/test_connectors.py | Focused test suite covering list, start, callback, and disconnect flows with proper dependency overrides and autouse state cleanup |
| src/api/app.py | Straightforward router registration; connectors_router included alongside other feature routers with no ordering concerns |
Sequence Diagram
sequenceDiagram
participant Client
participant API as FastAPI (connectors router)
participant States as _pending_states (in-memory)
participant Provider as OAuth Provider
Client->>API: "POST /{connector_id}/oauth/start (Bearer JWT)"
API->>API: _prune_pending_states() - evict expired + overflow
API->>API: generate state token, build authorization_url
API->>States: "store PendingOAuthState {user_id, expires_at}"
API-->>Client: "{authorization_url, state, expires_at}"
Client->>Provider: redirect user to authorization_url
alt User grants access
Provider-->>Client: "redirect to /oauth/callback?code=X&state=S"
Client->>API: "GET /{connector_id}/oauth/callback?code=X&state=S"
API->>States: pop(state)
API->>API: validate connector_id + expiry
API-->>Client: "{status: pending} - token exchange deferred"
else User denies or error
Provider-->>Client: "redirect to /oauth/callback?error=access_denied&state=S"
Client->>API: "GET /{connector_id}/oauth/callback?error=access_denied&state=S"
API->>States: pop(state) - consumed
API-->>Client: 400 Authorization denied
end
Client->>API: GET /api/connectors (Bearer JWT)
API-->>Client: "[{id, state: not_connected}]"
Client->>API: "POST /{connector_id}/disconnect (Bearer JWT)"
API-->>Client: "{disconnected: false}"
Reviews (5): Last reviewed commit: "Avoid stale pending connector OAuth stat..." | Re-trigger Greptile
✅ Staging Deployment Report
🟢 Staging is live and healthy! Test your changes at the staging URL above. Ready to ship? Comment |
🔍 API Schema Diff---REPORT--- Auto-generated by API Schema Diff workflow |
✅ Staging Deployment Report
🟢 Staging is live and healthy! Test your changes at the staging URL above. Ready to ship? Comment |
🔍 API Schema Diff---REPORT--- Auto-generated by API Schema Diff workflow |
✅ Staging Deployment Report
🟢 Staging is live and healthy! Test your changes at the staging URL above. Ready to ship? Comment |
🔍 API Schema Diff---REPORT--- Auto-generated by API Schema Diff workflow |
✅ Staging Deployment Report
🟢 Staging is live and healthy! Test your changes at the staging URL above. Ready to ship? Comment |
🔍 API Schema Diff---REPORT--- Auto-generated by API Schema Diff workflow |
✅ Staging Deployment Report
🟢 Staging is live and healthy! Test your changes at the staging URL above. Ready to ship? Comment |
🔍 API Schema Diff---REPORT--- Auto-generated by API Schema Diff workflow |
Summary
Tests
Security