A complete MongoDB management system implementing CRUD operations, aggregation pipelines, schema validation, and index management using PyMongo.
- Database Management
- Create/Drop databases
- List all databases
- Collection Operations
- Create/Drop collections
- List collection names
- Schema validation/modification
- Document Operations
- Insert (single/bulk)
- Update (single/multiple)
- Delete (single/multiple)
- Fetch documents
- Aggregation Framework
- 10 predefined pipelines
- Join operations between collections
- Complex data transformations
- Index Management
- Create indexes
- List existing indexes
- Drop indexes
- Validation System
- JSON schema validation
- Detailed error reporting
- Schema modification for existing collections
- Connection pooling management
- Comprehensive error handling
- Type hinting throughout
- Input validation for all operations
- Clean resource cleanup
- Sample Datasets
- Cars data (15 documents)
- Users & Orders data (5 users, 5 orders)
PyMongo/
├── pymongo_tutorial.py # Main MongoDB operations class
├── pymongo_pipelines.py # Aggregation pipelines and sample data
├── .gitignore # Ignore tracked files.
├── LICENSE # Grants rights to users
└── README.md # This documentation
- Python 3.10+
- PyMongo (4.6.0+)
- MongoDB Atlas URI
- Install PyMongo:
pip install pymongo- Add your MongoDB Atlas URI:
# In pymongo_tutorial.py
uri: str = "mongodb+srv://<username>:<password>@cluster.x.mongodb.net/"
- Aggregation Pipelines
# Execute car analysis pipeline
MongoDbOperation.execute_aggregate_pipeline(
pipeline_=Pipelines.pipeline_1()
)
# Perform collection join
MongoDbOperation.aggregate_join_collection(
pipeline_=Pipelines.join_pipeline()
)
- Database Management
# Create database
MongoDbOperation.create_database('new_db')
# List databases
MongoDbOperation.get_database_names()
- Collection Operations
# Create collection with validation
validation = Pipelines.validator()
MongoDbOperation.create_collection(
database_name='store_db',
collection_name='users',
validator=validation
)
# Drop collection
MongoDbOperation.drop_collection('store_db', 'temp_collection')
- Document Operations
# Insert document
MongoDbOperation.insert_document(
database_name='store_db',
collection_name='users',
document={
"name": "John Doe",
"age": 30,
"email": "john@example.com"
}
)
# Update document
MongoDbOperation.update_document(
database_name='store_db',
collection_name='users',
filter_condition={"name": "John Doe"},
update_values={"age": 31},
update_type="one"
)
- Index Management
# Create index
MongoDbOperation.create_index(
database_name='store_db',
collection_name='users',
index_name='email'
)
# List indexes
MongoDbOperation.show_indexes(
database_name='store_db',
collection_name='users'
)
# Drop index
MongoDbOperation.drop_index(
database_name='store_db',
collection_name='users',
index_name='email_1'
)
- Basic Aggregation
- Group by fuel type
- Calculate car statistics
Pipelines.pipeline_1()
- Data Transformation
- Add computed fields
- Format prices
Pipelines.pipeline_6()
- Joins
- Users with Orders
Pipelines.join_pipeline()
- Validation Schema
- Name (required string)
- Age (minimum 18)
- Email format validation
Pipelines.validator()
{
"maker": "Hyundai",
"model": "Creta",
"fuel_type": "Diesel",
"price": 1500000,
"features": ["Sunroof", "Leather Seats"],
"owners": [
{"name": "Raju", "location": "Mumbai"}
]
}
{
"_id": "user1",
"name": "Amit Sharma",
"email": "amit.sharma@example.com",
"phone": "+91-987654210"
}
{
"_id": "order1",
"user_id": "user1",
"product": "Laptop",
"amount": 50000
}
The implementation includes comprehensive error handling for:
-
Connection failures
-
Invalid operations
-
Schema validation errors
-
Write conflicts
-
Collection/Database existence checks
Supported Index Operations
-
Creation
- Single field indexes
- Unique constraints
-
Inspection
- List all indexes
- View index specifications
-
Removal
- Drop by index name
- Safety checks before removal
Example validation error output:
❌ WriteError: Document failed validation!
Field: age
Description: must be greater than or equal to 18
Operator: $gte
Specified: {"minimum": 18}
Reason: "consideredValue" '17' is not numeric
This project provides a robust framework for managing MongoDB operations with validation, aggregation, and error handling. Below is a detailed breakdown of the workflow:
- Establish secure MongoDB connection using
MongoDbOperationclass. - Automatic connection timeout handling (5 seconds) and verification.
- Graceful handling of connection failures.
Create databases and collections with strict schema validation rules.
- Predefined collections:
cars(inTestdatabase)users,orders(instore_dbdatabase)
- Validation rules:
agefield must be ≥18.- Proper email format enforcement.
- Required fields must be present.
- Load sample datasets:
- 15 cars
- 5 users
- 5 orders
- Validation performed during data insertion.
- Modify documents with:
- Single document operations
- Multiple document operations
- Retrieve documents with:
- Collection/database existence checks
- Structured error handling if not found
Execute 10 predefined aggregation pipelines including:
- Fuel-Type Statistics (
pipeline_1) - Price Categorization (
pipeline_9) - Service Cost Analysis (
pipeline_7) - Collection Joins (
join_pipeline)
Aggregation features:
- Field projections
- Grouping operations
- Computed fields
- Output to collections
- Structured validation errors with detailed, field-specific diagnostics.
- Connection failure recovery logic.
- Write conflict resolution handling.
- Guaranteed resource cleanup via
finallyblocks.
- List all databases and collections.
- Drop specific databases or collections.
- Modify schema validation for existing collections.
- Create temporary collections for database initialization.
Connection → Setup → Data Insertion → Aggregation → CRUD → Validation → Cleanup