Version: 0.4.9
Programmatic usage guide for integrating FilePrepper into .NET applications.
dotnet add package FilePrepperThe Pipeline API provides a fluent interface for efficient data preprocessing with 67-90% reduction in file I/O.
using FilePrepper.Pipeline;
await DataPipeline
.FromCsvAsync("data.csv")
.Normalize(columns: new[] { "Age", "Salary" }, method: NormalizationMethod.MinMax)
.FillMissing(columns: new[] { "Score" }, method: FillMethod.Mean)
.FilterRows(row => int.Parse(row["Age"]) >= 18)
.ToCsvAsync("output.csv");Factory Methods:
// From CSV file
Task<DataPipeline> FromCsvAsync(string path, bool hasHeader = true)
// From Excel file (.xls, .xlsx)
Task<DataPipeline> FromExcelAsync(string path, bool hasHeader = true, string? sheetName = null, int sheetIndex = 0)
// From JSON file (array of objects)
Task<DataPipeline> FromJsonAsync(string path)
// From XML file (simple flat structure: root/row/column)
Task<DataPipeline> FromXmlAsync(string path, string rowElement = "row")
// From in-memory data
DataPipeline FromData(IEnumerable<Dictionary<string, string>> data)Transformation Methods (all return this for chaining):
DataPipeline AddColumn(string columnName, Func<Dictionary<string, string>, string> valueSelector)
DataPipeline RemoveColumns(string[] columnNames)
DataPipeline RenameColumn(string oldName, string newName)
DataPipeline FilterRows(Func<Dictionary<string, string>, bool> predicate)
DataPipeline Normalize(string[] columns, NormalizationMethod method, double minValue = 0, double maxValue = 1)
DataPipeline FillMissing(string[] columns, FillMethod method, string? constantValue = null)
// NEW in v0.4.4: Advanced Analytics
GroupedDataPipeline GroupBy(string keyColumn)
DataPipeline Join(DataPipeline right, string leftKey, string rightKey, JoinType joinType = JoinType.Inner,
string[]? selectColumns = null, string? leftPrefix = null, string? rightPrefix = null)
ColumnStatistics GetStatistics(string column)
DataPipeline Normalize(string column, NormalizationMethod method, string? outputColumn = null)Output Methods:
DataFrame ToDataFrame() // Get immutable snapshot
Task ToCsvAsync(string path, bool hasHeader = true)
Task ToExcelAsync(string path, bool hasHeader = true, string sheetName = "Sheet1")
Task ToJsonAsync(string path, bool indented = true)
Task ToXmlAsync(string path, string rootElement = "data", string rowElement = "row")
IEnumerable<string> GetColumn(string columnName)Enums:
public enum NormalizationMethod { MinMax, ZScore, Robust } // Robust added in v0.4.4
public enum FillMethod { Mean, Median, Mode, Forward, Backward, Constant }
public enum JoinType { Inner, Left, Right, Outer } // NEW in v0.4.4
public enum AggregationMethod { // Extended in v0.4.4
Mean, Sum, Min, Max, Count, Std,
Var, Median, First, Last
}Represents data grouped by a key column, ready for aggregation:
public class GroupedDataPipeline
{
DataPipeline Aggregate(
(string column, AggregationMethod method)[] aggregations,
bool keepKey = true,
string? suffixFormat = "_{method}")
}Comprehensive statistical summary for numeric columns:
public record ColumnStatistics
{
double Mean { get; }
double Std { get; } // Sample standard deviation
double Min { get; }
double Max { get; }
double Median { get; }
double Q1 { get; } // 25th percentile
double Q3 { get; } // 75th percentile
double IQR { get; } // Interquartile range (Q3 - Q1)
int Count { get; } // Valid numeric values
int NullCount { get; } // Null/non-numeric values
double Variance { get; } // Sample variance
}Immutable data container for inspection:
public class DataFrame
{
IReadOnlyList<Dictionary<string, string>> Rows { get; }
IReadOnlyList<string> ColumnNames { get; }
int RowCount { get; }
int ColumnCount { get; }
IEnumerable<string> GetColumn(string columnName)
DataFrame Select(params string[] columnNames)
DataFrame Where(Func<Dictionary<string, string>, bool> predicate)
string ToCsv()
string ToJson()
}ML Feature Engineering:
var result = await DataPipeline
.FromCsvAsync("orders.csv")
.AddColumn("Year", row => DateTime.Parse(row["OrderDate"]).Year.ToString())
.AddColumn("Month", row => DateTime.Parse(row["OrderDate"]).Month.ToString())
.Normalize(columns: new[] { "Revenue", "Quantity" }, method: NormalizationMethod.MinMax)
.FilterRows(row => int.Parse(row["Year"]) >= 2023)
.ToDataFrame();In-Memory Processing:
var data = new List<Dictionary<string, string>>
{
new() { ["Name"] = "Alice", ["Age"] = "25" },
new() { ["Name"] = "Bob", ["Age"] = "30" }
};
var processed = DataPipeline
.FromData(data)
.Normalize(columns: new[] { "Age" }, method: NormalizationMethod.MinMax)
.ToDataFrame();Multi-Format Processing - Excel to JSON:
await DataPipeline
.FromExcelAsync("sales.xlsx", sheetName: "Q4_Data")
.AddColumn("Total", row =>
(double.Parse(row["Price"]) * double.Parse(row["Quantity"])).ToString())
.FilterRows(row => double.Parse(row["Total"]) >= 1000)
.ToJsonAsync("high_value_sales.json");Multi-Format Processing - JSON to Excel:
await DataPipeline
.FromJsonAsync("api_response.json")
.Normalize(columns: new[] { "Score", "Rating" }, method: NormalizationMethod.MinMax)
.ToExcelAsync("normalized_data.xlsx", sheetName: "Results");Multi-Format Processing - XML to CSV:
await DataPipeline
.FromXmlAsync("legacy_data.xml", rowElement: "record")
.RenameColumn("OldName", "NewName")
.RemoveColumns(new[] { "ObsoleteField" })
.ToCsvAsync("modernized_data.csv");Convert to All Formats:
var pipeline = await DataPipeline.FromCsvAsync("source.csv");
await pipeline.ToExcelAsync("output.xlsx");
await pipeline.ToJsonAsync("output.json");
await pipeline.ToXmlAsync("output.xml");GroupBy/Aggregate (NEW in v0.4.4) - Time-series batch aggregation:
var aggregated = await DataPipeline
.FromCsvAsync("sensor_data.csv")
.GroupBy("batch_id")
.Aggregate(new[]
{
("temperature", AggregationMethod.Mean),
("temperature", AggregationMethod.Std),
("pressure", AggregationMethod.Min),
("pressure", AggregationMethod.Max),
("duration", AggregationMethod.Sum)
});
// Result columns: batch_id, temperature_mean, temperature_std,
// pressure_min, pressure_max, duration_sumJoin Operations (NEW in v0.4.4) - Combine multiple data sources:
var sensorData = await DataPipeline.FromCsvAsync("sensors.csv");
var qualityLabels = await DataPipeline.FromCsvAsync("quality.csv");
// Inner join
var joined = sensorData.Join(
qualityLabels,
leftKey: "batch_id",
rightKey: "batch_id",
joinType: JoinType.Inner,
selectColumns: new[] { "defect_rate", "quality_score" }
);
// Left join with prefixes (avoid column collision)
var leftJoined = sensorData.Join(
qualityLabels,
leftKey: "id",
rightKey: "sensor_id",
joinType: JoinType.Left,
leftPrefix: "sensor_",
rightPrefix: "quality_"
);Statistical Analysis (NEW in v0.4.4) - Data exploration:
var data = await DataPipeline.FromCsvAsync("measurements.csv");
// Get comprehensive statistics
var stats = data.GetStatistics("temperature");
Console.WriteLine($"Mean: {stats.Mean}, Std: {stats.Std}");
Console.WriteLine($"Median: {stats.Median}, IQR: {stats.IQR}");
Console.WriteLine($"Range: [{stats.Min}, {stats.Max}]");
Console.WriteLine($"Valid: {stats.Count}, Missing: {stats.NullCount}");
// Normalize data with different methods
var normalized = data
.Normalize("temperature", NormalizationMethod.ZScore) // Mean=0, Std=1
.Normalize("pressure", NormalizationMethod.MinMax) // [0, 1]
.Normalize("humidity", NormalizationMethod.Robust); // Robust to outliersComplete ML Pipeline (v0.4.4) - Full preprocessing workflow:
var result = await DataPipeline
.FromCsvAsync("raw_sensor_data.csv")
// 1. Aggregate by batch
.GroupBy("batch_id")
.Aggregate(new[]
{
("temp_zone1", AggregationMethod.Mean),
("temp_zone1", AggregationMethod.Std),
("temp_zone2", AggregationMethod.Mean),
("pressure", AggregationMethod.Max)
})
// 2. Join with quality labels
.Join(
await DataPipeline.FromCsvAsync("quality_labels.csv"),
leftKey: "batch_id",
rightKey: "batch_id",
joinType: JoinType.Inner,
selectColumns: new[] { "defect_rate", "quality_score" }
)
// 3. Normalize features
.Normalize("temp_zone1_mean", NormalizationMethod.ZScore)
.Normalize("temp_zone2_mean", NormalizationMethod.ZScore)
.Normalize("pressure_max", NormalizationMethod.MinMax)
// 4. Export for ML training
.ToCsvAsync("ml_ready_dataset.csv");All tasks follow a consistent pattern:
- Create options object
- Instantiate task with logger
- Execute with TaskContext
using FilePrepper.Tasks.NormalizeData;
using Microsoft.Extensions.Logging;
// 1. Configure options
var options = new NormalizeDataOption
{
InputPath = "input.csv",
OutputPath = "output.csv",
TargetColumns = new[] { "Age", "Salary" },
Method = NormalizationMethod.MinMax,
MinValue = 0,
MaxValue = 1
};
// 2. Create task
var logger = loggerFactory.CreateLogger<NormalizeDataTask>();
var task = new NormalizeDataTask(logger);
// 3. Execute
var context = new TaskContext(options);
bool success = await task.ExecuteAsync(context);public interface ITask
{
Task<bool> ExecuteAsync(TaskContext context);
bool Execute(TaskContext context);
}public interface ITaskOption
{
string InputPath { get; set; }
string OutputPath { get; set; }
bool HasHeader { get; set; }
bool IgnoreErrors { get; set; }
string Encoding { get; set; } // "auto", "utf-8", "cp949", "euc-kr" (default: "auto")
int SkipRows { get; set; } // Rows to skip before header (default: 0)
string[] Validate();
}public class TaskContext
{
public ITaskOption Options { get; }
public TaskContext(ITaskOption options) { ... }
}using FilePrepper.Tasks.AddColumns;
var options = new AddColumnsOption
{
InputPath = "data.csv",
OutputPath = "output.csv",
NewColumns = new Dictionary<string, string>
{
{ "Status", "Active" },
{ "Priority", "High" }
}
};
var task = new AddColumnsTask(logger);
await task.ExecuteAsync(new TaskContext(options));using FilePrepper.Tasks.RemoveColumns;
var options = new RemoveColumnsOption
{
InputPath = "data.csv",
OutputPath = "output.csv",
TargetColumns = new[] { "TempCol", "Debug", "Notes" }
};using FilePrepper.Tasks.RenameColumns;
var options = new RenameColumnsOption
{
InputPath = "data.csv",
OutputPath = "output.csv",
ColumnMapping = new Dictionary<string, string>
{
{ "OldName", "NewName" },
{ "Age", "Years" }
}
};using FilePrepper.Tasks.ReorderColumns;
var options = new ReorderColumnsOption
{
InputPath = "data.csv",
OutputPath = "output.csv",
Order = new List<string> { "ID", "Name", "Email", "Age" }
};using FilePrepper.Tasks.DataTypeConvert;
var options = new DataTypeConvertOption
{
InputPath = "data.csv",
OutputPath = "output.csv",
Conversions = new List<ColumnTypeConversion>
{
new()
{
ColumnName = "Date",
TargetType = DataType.DateTime,
Format = "yyyy-MM-dd"
},
new()
{
ColumnName = "Age",
TargetType = DataType.Integer
},
new()
{
ColumnName = "Price",
TargetType = DataType.Decimal
}
},
Culture = CultureInfo.GetCultureInfo("en-US")
};DataType Enum: String, Integer, Decimal, DateTime, Boolean
using FilePrepper.Tasks.NormalizeData;
var options = new NormalizeDataOption
{
InputPath = "data.csv",
OutputPath = "output.csv",
TargetColumns = new[] { "Age", "Salary", "Score" },
Method = NormalizationMethod.MinMax,
MinValue = 0,
MaxValue = 1
};NormalizationMethod: MinMax, ZScore
using FilePrepper.Tasks.ScaleData;
var options = new ScaleDataOption
{
InputPath = "data.csv",
OutputPath = "output.csv",
TargetColumns = new[] { "Height", "Weight" },
Method = ScalingMethod.Standardization
};ScalingMethod: MinMax, Standardization
using FilePrepper.Tasks.FillMissingValues;
var options = new FillMissingValuesOption
{
InputPath = "data.csv",
OutputPath = "output.csv",
TargetColumns = new[] { "Age", "Salary" },
Method = FillMethod.Mean,
ConstantValue = "0" // For FillMethod.Constant
};FillMethod: Mean, Median, Mode, Forward, Backward, Constant
using FilePrepper.Tasks.DateExtraction;
var options = new DateExtractionOption
{
InputPath = "data.csv",
OutputPath = "output.csv",
Extractions = new List<DateColumnExtraction>
{
new()
{
SourceColumn = "OrderDate",
Components = new List<DateComponent>
{
DateComponent.Year,
DateComponent.Month,
DateComponent.Day,
DateComponent.DayOfWeek
},
OutputColumnTemplate = "{column}_{component}"
}
}
};DateComponent: Year, Month, Day, Hour, Minute, Second, DayOfWeek, DayOfYear, Quarter, WeekOfYear
using FilePrepper.Tasks.Aggregate;
var options = new AggregateOption
{
InputPath = "sales.csv",
OutputPath = "summary.csv",
GroupByColumns = new[] { "Region", "Category" },
Aggregations = new List<AggregationColumn>
{
new()
{
SourceColumn = "Sales",
Function = AggregateFunction.Sum,
OutputColumn = "TotalSales"
},
new()
{
SourceColumn = "Quantity",
Function = AggregateFunction.Average,
OutputColumn = "AvgQuantity"
}
}
};AggregateFunction: Sum, Average, Count, Min, Max
using FilePrepper.Tasks.BasicStatistics;
var options = new BasicStatisticsOption
{
InputPath = "data.csv",
OutputPath = "stats.csv",
TargetColumns = new[] { "Age", "Salary", "Score" }
};Output includes: Count, Mean, StdDev, Min, Q1, Median, Q3, Max
using FilePrepper.Tasks.DropDuplicates;
var options = new DropDuplicatesOption
{
InputPath = "data.csv",
OutputPath = "unique.csv",
TargetColumns = new[] { "Email", "PhoneNumber" },
KeepFirst = true // true = keep first, false = keep last
};using FilePrepper.Tasks.FilterRows;
var options = new FilterRowsOption
{
InputPath = "data.csv",
OutputPath = "filtered.csv",
Conditions = new List<FilterCondition>
{
new()
{
Column = "Age",
Operator = FilterOperator.GreaterThan,
Value = "18"
}
}
};FilterOperator: Equals, NotEquals, GreaterThan, LessThan, GreaterThanOrEqual, LessThanOrEqual, Contains, StartsWith, EndsWith
using FilePrepper.Tasks.Merge;
// Vertical merge (stack rows)
var verticalOptions = new MergeOption
{
InputPaths = new[] { "file1.csv", "file2.csv", "file3.csv" },
OutputPath = "merged.csv",
MergeType = MergeType.Vertical
};
// Horizontal merge (join)
var horizontalOptions = new MergeOption
{
InputPaths = new[] { "customers.csv", "orders.csv" },
OutputPath = "joined.csv",
MergeType = MergeType.Horizontal,
JoinType = JoinType.Left,
JoinKeys = new List<JoinKey>
{
new() { Name = "CustomerID" }
}
};MergeType: Vertical, Horizontal JoinType: Inner, Left, Right, Outer
All tasks support error handling options:
var options = new NormalizeDataOption
{
InputPath = "data.csv",
OutputPath = "output.csv",
TargetColumns = new[] { "Age", "Salary" },
Method = NormalizationMethod.MinMax,
// Error handling
IgnoreErrors = true,
DefaultValue = "0"
};Options are validated before execution:
var options = new NormalizeDataOption
{
TargetColumns = new string[0] // Invalid!
};
string[] errors = options.Validate();
if (errors.Length > 0)
{
foreach (var error in errors)
Console.WriteLine($"Validation error: {error}");
}Integrate with ASP.NET Core or other DI frameworks:
// Startup.cs or Program.cs
services.AddTransient<NormalizeDataTask>();
services.AddTransient<FillMissingValuesTask>();
// ... register other tasks
// Usage in controller/service
public class DataProcessingService
{
private readonly NormalizeDataTask _normalizeTask;
private readonly ILogger<DataProcessingService> _logger;
public DataProcessingService(
NormalizeDataTask normalizeTask,
ILogger<DataProcessingService> logger)
{
_normalizeTask = normalizeTask;
_logger = logger;
}
public async Task<bool> ProcessData(string inputPath, string outputPath)
{
var options = new NormalizeDataOption
{
InputPath = inputPath,
OutputPath = outputPath,
TargetColumns = new[] { "Age", "Salary" },
Method = NormalizationMethod.MinMax
};
return await _normalizeTask.ExecuteAsync(new TaskContext(options));
}
}using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using FilePrepper.Tasks.NormalizeData;
public class DataProcessor
{
private readonly ILogger<DataProcessor> _logger;
public DataProcessor(ILogger<DataProcessor> logger)
{
_logger = logger;
}
[Function("ProcessData")]
public async Task<HttpResponseData> Run(
[HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req)
{
// Get file from request
var formData = await req.ReadFormAsync();
var file = formData.Files["file"];
// Save to temp location
var inputPath = Path.GetTempFileName();
var outputPath = Path.GetTempFileName();
using (var stream = file.OpenReadStream())
using (var fileStream = File.Create(inputPath))
{
await stream.CopyToAsync(fileStream);
}
// Process with FilePrepper
var options = new NormalizeDataOption
{
InputPath = inputPath,
OutputPath = outputPath,
TargetColumns = new[] { "Age", "Salary" },
Method = NormalizationMethod.MinMax
};
var task = new NormalizeDataTask(_logger);
bool success = await task.ExecuteAsync(new TaskContext(options));
if (!success)
{
var response = req.CreateResponse(HttpStatusCode.InternalServerError);
return response;
}
// Return processed file
var result = req.CreateResponse(HttpStatusCode.OK);
result.Headers.Add("Content-Type", "text/csv");
var bytes = await File.ReadAllBytesAsync(outputPath);
await result.Body.WriteAsync(bytes);
// Cleanup
File.Delete(inputPath);
File.Delete(outputPath);
return result;
}
}- Use DI - Register tasks in DI container
- Validate options - Call
Validate()before execution - Handle errors - Check return value and log errors
- Dispose resources - Tasks handle file cleanup automatically
- Use async - Prefer
ExecuteAsyncoverExecute - Log appropriately - Provide logger for diagnostics
- Test with small data - Verify logic before large files
// For large files, consider:
1. Appropriate data types (Integer vs Decimal)
2. Error handling strategy (IgnoreErrors vs fail fast)
3. Memory availability (tasks load files into memory)
4. Batch processing for very large datasets- CLI Guide - Command-line usage
- Common Scenarios - Practical examples
- Quick Start - Getting started guide