A Django-based platform for visual JSON mapping with webhook and Google Cloud Pub/Sub integration.
- Webhooks: Auto-generated unique webhook URLs for receiving HTTP POST requests
- Google Cloud Pub/Sub:
- Push Mode: Google automatically sends messages to your endpoint
- Pull Mode: Background scheduler polls for messages at configurable intervals
- HTTP/HTTPS: Send transformed data to any REST API endpoint
- Support for GET and POST methods
- Multiple authentication types (Bearer, Basic Auth, API Key)
- Custom headers support
- Email (SMTP): Send transformed data via email
- Configurable SMTP server and credentials
- Support for TLS/SSL
- Dynamic recipient and subject fields
- Visual JSON Mapper: Interactive interface for mapping source fields to target fields
- Built-in Transformations: uppercase, lowercase, substring, replace, default, concat
- Custom JavaScript: Write custom transformation logic using JavaScript
- Conditional Processing: JavaScript-based conditions to skip or process integrations
- Complete Audit Trail: Log every integration execution with full request/response data
- Performance Metrics: Track transformation time and API call time
- Django Admin Interface: Manage integrations and view logs
- Change Tracking: Frontend validation before saving changes
- Active/Inactive Toggle: Enable or disable integrations without deleting
- Clone the repository
- Copy
.env.exampleto.envand configure:cp .env.example .env
- Edit
.envand setSITE_URLto your domain (important for webhooks and Pub/Sub):# For local development SITE_URL=http://localhost:8000 # For production SITE_URL=https://integrations.yourdomain.com
- Install dependencies:
pip install -r requirements.txt - Run migrations:
python manage.py migrate - Create superuser:
python manage.py createsuperuser - Start server:
python manage.py runserver
- Navigate to
https://yourdomain:8000/admin/in your browser - Configure source (Webhook or Pub/Sub)
- Configure target API endpoint
- Map fields from source to target
- Save to backend
When you save a webhook integration, a unique URL is generated:
https://yourdomain.com/webhook/abc123/
Send POST requests to this URL with JSON payload. The integration will:
- Receive the payload
- Apply any conditional logic
- Transform data according to field mappings
- Send to the configured target (HTTP/Email)
- Log the complete execution
For Pub/Sub push integrations:
- Configure GCP Project ID, Topic Name, and Subscription Name
- Paste service account JSON credentials
- Select "Push" mode
- Save integration - a push endpoint is auto-generated
- Google Pub/Sub automatically sends messages to your endpoint
Example push endpoint: https://yourdomain.com/pubsub/xyz789/
For Pub/Sub pull integrations:
- Configure GCP Project ID, Topic Name, and Subscription Name
- Paste service account JSON credentials
- Select "Pull" mode
- Set pull interval in seconds (default: 60)
- Save integration - a background thread starts polling for messages
The pull scheduler will:
- Check for new messages at the specified interval
- Process messages through the integration pipeline
- Automatically acknowledge processed messages
- Handle errors without losing messages
Configure SMTP settings to send transformed data via email:
{
"smtpServer": "smtp.gmail.com",
"smtpPort": 587,
"smtpUsername": "your-email@gmail.com",
"smtpPassword": "your-app-password",
"fromEmail": "sender@example.com",
"toEmail": "recipient@example.com",
"subject": "Integration Alert",
"useTLS": true
}- Go to Django Admin:
/admin/ - View "Integration Runs" to see all executions
- Click on any run to see incoming/outgoing payloads and responses
POST /api/integrations/- Create integrationGET /api/integrations/- List integrationsGET /api/integrations/{id}/- Get integrationPUT /api/integrations/{id}/- Update integrationDELETE /api/integrations/{id}/- Delete integrationPOST /api/integrations/{id}/toggle_active/- Toggle active statusPOST /api/integrations/{id}/test_pubsub/- Test Pub/Sub integration by publishing a message
GET /api/runs/- List integration runsGET /api/runs/?integration_id={id}- Filter runs by integrationGET /api/runs/{id}/- Get run details
POST /webhook/{path}/- Webhook endpoint (auto-generated per integration)POST /pubsub/{path}/- Pub/Sub push endpoint (auto-generated per integration)
IntegrationConfiguration
- Stores complete integration definition as JSON (config_json field)
- Extracts key fields for querying: source_type, target_url, target_method
- Generates unique webhook paths and Pub/Sub push endpoints
- Tracks Pub/Sub configuration:
- project_id, topic_id, subscription
- subscription_mode (push/pull)
- pull_interval_seconds
- listener_active status
- Links to IntegrationRun records for audit trail
IntegrationRun
- Logs every integration execution
- Stores:
- incoming_payload: Original data from source
- transformed_payload: Data after mappings and transformations
- outgoing_request: Full request sent to target (headers, body, URL)
- outgoing_response: Response from target
- status: success, error, skipped, or partial
- error_message: Details if execution failed
- Tracks performance metrics:
- transformation_time_ms: Time spent transforming data
- api_call_time_ms: Time spent calling target API
- Indexed by created_at, status, and integration for fast queries
- Webhook receives POST → Django view receives JSON payload
- Load configuration → Get integration by webhook path
- Evaluate condition → Run JavaScript condition if configured
- Transform data → Apply field mappings and transformations
- Route to target → HTTP, Email, or SMS based on target type
- Authenticate → Add auth headers for HTTP targets
- Send request → Execute HTTP call or send email/SMS
- Log result → Create IntegrationRun record with all details
- Google Pub/Sub pushes → Django view receives base64-encoded message
- Decode message → Extract JSON data from Pub/Sub envelope
- Load configuration → Get integration by push endpoint
- Process through pipeline → Same as webhook (condition → transform → send)
- Return 204 → Acknowledge message receipt to Google
- Background thread wakes → Scheduler checks for messages at interval
- Pull messages → Request up to 10 messages from subscription
- For each message:
- Decode and parse JSON data
- Process through integration pipeline
- Log execution result
- Acknowledge messages → Tell Google Pub/Sub messages were processed
- Sleep until next interval → Wait for configured pull_interval_seconds
integration_processor.py
- Core processing logic for all integration types
- Handles transformation, authentication, and target routing
- Routes to: process_http_integration, process_email_integration, or process_sms_integration
- Creates IntegrationRun records with performance metrics
pubsub_manager.py
- Google Cloud Pub/Sub client wrapper
- Functions: create_push_subscription, create_pull_subscription, delete_subscription
- pull_messages: Retrieve and acknowledge messages
- publish_test_message: Test integration by sending sample data
- handle_pubsub_push: Decode push notification payloads
pubsub_scheduler.py
- Background thread scheduler for pull subscriptions
- PubSubPullScheduler class manages active pullers
- One thread per active pull integration
- Graceful start/stop with threading events
- Handles errors without stopping scheduler
Integration Configurations
- List view with filters (source type, method, status)
- Detail view with full configuration
- "Open Editor" link to frontend mapper
- Toggle active/inactive status
Integration Runs
- List view with filters (status, date, integration)
- Read-only detail view showing:
- Incoming payload
- Transformed payload
- Outgoing request
- Outgoing response
- Error messages
- Performance metrics
// Initialize mapper with backend URL
const mapper = new JSONMapper({
container: '#mapper-container',
apiBaseUrl: '/api' // Django API base URL
});
// Save integration
await mapper.saveIntegrationToBackend();// Load by ID
await mapper.loadIntegrationFromBackend('integration-uuid');
// Or from URL parameter
// Navigate to: /mapper/?integration_id=integration-uuid// Get runs for current integration
const runs = await mapper.getIntegrationRuns(integrationId);
// Display in modal
showIntegrationRunsDialog();# Build and start services
docker-compose up -d
# Run migrations
docker-compose exec web python manage.py migrate
# Create superuser
docker-compose exec web python manage.py createsuperuser
# Start Pub/Sub listeners
docker-compose exec web python manage.py start_pubsub_listeners- Set up PostgreSQL database
- Configure environment variables
- Run migrations:
python manage.py migrate - Collect static files:
python manage.py collectstatic - Start Gunicorn:
gunicorn config.wsgi:application - Start Pub/Sub listeners:
python manage.py start_pubsub_listeners - Configure nginx/Apache as reverse proxy
# Django settings
SECRET_KEY=your-secret-key-here
DEBUG=False
ALLOWED_HOSTS=yourdomain.com,www.yourdomain.com
# Database
DATABASE_URL=postgresql://user:pass@host:5432/dbname
# CORS (for frontend on different domain)
CORS_ALLOWED_ORIGINS=https://yourdomain.com,https://www.yourdomain.com
# Site URL - IMPORTANT: Set to your actual domain
# This is used for webhook URLs and Pub/Sub push endpoints
# Examples:
# Development: http://localhost:8000
# Production: https://integrations.yourdomain.com
SITE_URL=https://yourdomain.comIMPORTANT: The SITE_URL setting is critical for:
- Webhook URL generation (e.g.,
https://yourdomain.com/webhook/abc123/) - Pub/Sub push endpoints (e.g.,
https://yourdomain.com/pubsub/xyz789/) - Must be publicly accessible for Pub/Sub push mode to work
- Create a GCP project at https://console.cloud.google.com
- Enable Pub/Sub API in APIs & Services
- Create a topic for your integration
- Create a service account with appropriate permissions
For Push Mode:
pubsub.subscriptions.createpubsub.subscriptions.updatepubsub.subscriptions.deletepubsub.topics.attachSubscription
For Pull Mode (requires additional):
pubsub.subscriptions.consume(to pull messages)pubsub.subscriptions.get
Recommended role: Pub/Sub Editor (roles/pubsub.editor)
- Go to IAM & Admin → Service Accounts
- Create or select service account
- Click Keys → Add Key → Create New Key
- Select JSON format
- Download the JSON file
- Paste the entire JSON content into the frontend credentials field
You can test your Pub/Sub configuration using gcloud CLI:
# Publish test message
gcloud pubsub topics publish YOUR_TOPIC --message '{"test": "data"}'
# Check subscription exists
gcloud pubsub subscriptions list
# Pull messages manually (for troubleshooting)
gcloud pubsub subscriptions pull YOUR_SUBSCRIPTION --limit=5- CSRF Protection: Enabled by default for all POST/PUT/DELETE requests
- Authentication: Configure
REST_FRAMEWORKpermissions for API access - HTTPS: Use HTTPS in production for webhook endpoints
- Secrets: Store API keys and tokens securely (use environment variables)
- Rate Limiting: Add rate limiting to webhook endpoints
- Input Validation: JSON payloads are validated before processing
Django logs all integration executions to IntegrationRun model:
- Success/error status
- Performance metrics
- Full request/response data
# Get all failed runs
failed_runs = IntegrationRun.objects.filter(status='error')
# Get runs for specific integration
integration_runs = IntegrationRun.objects.filter(
integration_id='uuid'
).order_by('-created_at')
# Get slow runs
slow_runs = IntegrationRun.objects.filter(
api_call_time_ms__gt=5000
)Add to your settings for database query monitoring:
LOGGING = {
'version': 1,
'handlers': {
'console': {
'class': 'logging.StreamHandler',
},
},
'loggers': {
'django.db.backends': {
'handlers': ['console'],
'level': 'DEBUG',
},
},
}- Check integration is active:
IntegrationConfiguration.objects.get(id=X).is_active - Verify webhook path matches
- Check Django logs for errors
- Test with curl:
curl -X POST https://domain.com/webhook/path/ -H "Content-Type: application/json" -d '{"test": "data"}'
Push Mode Issues:
- Verify service account credentials are valid JSON
- Check subscription exists:
gcloud pubsub subscriptions list - Ensure
SITE_URLenvironment variable is set correctly - Verify push endpoint is publicly accessible (not localhost)
- Check
pubsub_listener_activefield isTrue - Review Django logs for push handler errors
- Test endpoint manually:
curl -X POST https://yourdomain.com/pubsub/path/
Pull Mode Issues:
- Verify service account has
pubsub.subscriptions.consumepermission - Check if pull scheduler thread is running (look for log message)
- Verify
pubsub_pull_interval_secondsis set appropriately - Check subscription exists:
gcloud pubsub subscriptions list - Manually pull to test:
gcloud pubsub subscriptions pull YOUR_SUBSCRIPTION - Review Django logs for pull scheduler errors
- Restart Django server to restart pull threads
- Check IntegrationRun logs for error messages
- Verify target URL is accessible
- Check authentication configuration
- Test target API directly with curl
- Verify network/firewall settings
- Add database indexes on frequently queried fields
- Use database connection pooling
- Enable Redis caching
- Use Celery for async processing of large payloads
- Monitor database query performance
For high-volume integrations, process webhooks asynchronously:
# tasks.py
from celery import shared_task
from integrations.integration_processor import process_integration
@shared_task
def process_integration_async(integration_id, payload):
integration = IntegrationConfiguration.objects.get(id=integration_id)
return process_integration(integration, payload)
# views.py
@csrf_exempt
@api_view(['POST'])
def webhook_handler(request, webhook_path):
integration = get_object_or_404(...)
# Process asynchronously
process_integration_async.delay(
str(integration.id),
request.data
)
return JsonResponse({'status': 'queued'})Add custom transformation functions:
# integrations/transformations.py
def custom_transform(value, *params):
# Your custom logic
return transformed_value
# Register in integration_processor.py
CUSTOM_TRANSFORMS = {
'my_transform': custom_transform
}Add rate limiting using Django Ratelimit:
from django_ratelimit.decorators import ratelimit
@ratelimit(key='ip', rate='100/m', method='POST')
@csrf_exempt
@api_view(['POST'])
def webhook_handler(request, webhook_path):
# Your handler code
passAdd indexes for better query performance:
class IntegrationRun(models.Model):
# ... existing fields ...
class Meta:
indexes = [
models.Index(fields=['-created_at', 'status']),
models.Index(fields=['integration', '-created_at']),
models.Index(fields=['status', '-created_at']),
]Run tests:
python manage.py test integrationsWith coverage:
coverage run --source='.' manage.py test integrations
coverage report
coverage html # Generate HTML report- Fork the repository
- Create a feature branch
- Make your changes
- Add tests
- Submit a pull request
MIT License - see LICENSE file for details
The frontend provides an intuitive 4-step workflow:
Step 1: Configure Integration
- Name your integration
- Choose source type (Webhook or Pub/Sub)
- Choose target type (HTTP or Email)
- Configure authentication and settings
Step 2: Define Mappings
- Add source JSON sample data
- Create field mappings with drag-and-drop
- Apply transformations:
uppercase,lowercase- Change text casesubstring(start, length)- Extract substringreplace(find, replace)- Replace textdefault(value)- Provide fallback valueconcat(field1, field2, ...)- Combine fieldscustom(js)- Write custom JavaScript
- Add conditions to skip/process based on data
Step 3: Test Integration
- Preview transformed output
- View integration run history
Step 4: Save & Execute
- Validate integration before saving
- Save to backend (creates subscription/webhook)
- Execute full integration test
- View real-time results
The frontend tracks changes and enforces validation:
- Any modification marks integration as "changed"
- "Save to Backend" button is hidden until validation
- Click "Validate Integration" to check configuration
- Only after successful validation can you save
- This prevents saving broken configurations
Receive webhook from payment processor, transform data, send to Slack:
- Source: Webhook
- Transform: Extract amount, customer name, status
- Target: HTTP POST to Slack webhook URL
- Result: Instant Slack notifications for new payments
Pull messages from GCP Pub/Sub, send email alerts:
- Source: Pub/Sub (Pull mode, check every 30 seconds)
- Condition:
data.severity === 'critical' - Transform: Format error details
- Target: Email via SMTP
- Result: Automated critical error notifications
Process Pub/Sub messages and route to different targets based on data:
- Create multiple integrations for same topic
- Use conditions to route:
data.type === 'order' - Each integration sends to different target
- Result: Event-driven microservices integration
Backend:
- Django 4.2+ with Django REST Framework
- PostgreSQL for data persistence
- Google Cloud Pub/Sub Python client
- Threading for background pull schedulers
- SMTP for email and SMS delivery
Frontend:
- Vanilla JavaScript (no framework dependencies)
- Responsive CSS with Django Daisy autumn theme
- Fetch API for backend communication
- Local storage for draft integrations
Infrastructure:
- Docker & Docker Compose support
- Gunicorn WSGI server
- Nginx reverse proxy ready
- Environment-based configuration
For issues and questions, refer to the troubleshooting section above or check:
- Django logs:
python manage.py runserveroutput - Integration runs: Django Admin → Integration Runs
- Database: Query IntegrationRun model for details
- Security: Store Pub/Sub credentials encrypted in a vault.
- Security: Store SMTP config JSON in a vault.
- Pub/Sub Pull Scheduler: Add implementation to start/stop scheduler when server is started or integration is inactive.