Skip to content

Dev#11

Open
Radom12 wants to merge 4 commits into
mainfrom
dev
Open

Dev#11
Radom12 wants to merge 4 commits into
mainfrom
dev

Conversation

@Radom12

@Radom12 Radom12 commented Apr 10, 2026

Copy link
Copy Markdown
Owner

No description provided.

Copilot AI review requested due to automatic review settings April 10, 2026 19:41
@Radom12 Radom12 self-assigned this Apr 10, 2026

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds structured correctness validation and structured plan diagnostics to OptiSpark’s interactive optimization loop, with accompanying unit tests, to improve reliability of AI-generated DataFrame rewrites.

Changes:

  • Introduces a new correctness validator (validate_optimization) and failure-prompt builder for self-healing retries.
  • Adds a structured Spark execution plan parser + rule-based analyzer to replace brittle string heuristics.
  • Integrates plan diagnostics + correctness validation into the agent sandbox execution path and enhances safety detection for explode/salt_array.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
tests/test_validator.py New unit/integration-style tests for validator checks and failure prompt building.
tests/test_plan_analyzer.py New tests for plan parsing, rule detection, and debug rendering.
src/optispark/validator.py New correctness validator module (schema/row-count/sample diff/aggregate parity) + prompt builder.
src/optispark/safety.py Replaces brittle string detection with AST-based detection for high-memory ops (explode/salt).
src/optispark/plan_analyzer.py New plan parser + diagnostics rules + debug helpers.
src/optispark/agent.py Wires plan diagnostics into context gathering and adds correctness validation/self-correction in sandbox execution.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +162 to +166
orig_count = original_df.limit(sample_size).count()
opt_count = optimized_df.limit(sample_size).count()

if orig_count == opt_count:
return {
Comment thread src/optispark/validator.py Outdated
Comment on lines +193 to +197
# Rows in original not in optimized
only_in_orig = orig_sample.subtract(opt_sample)
orig_diff_count = only_in_orig.count()

# Rows in optimized not in original
Comment on lines +95 to +99
# Handles lines like "BroadcastHashJoin [user_id#10], ..." or "*(2) HashAggregate(...)"
_NODE_TYPE_PATTERN = re.compile(
r'(?:\*\(\d+\)\s*)?' # optional WholeStageCodegen marker like *(2)
r'([A-Z][A-Za-z]*(?:\s[A-Za-z]+)?)' # PascalCase node type (1-2 words)
)
Comment on lines +382 to +384
Returns:
A list of PlanDiagnostic instances, deduplicated by rule_id.
"""
Comment thread tests/test_validator.py Outdated
Comment on lines +55 to +59
for n, t in fields:
if isinstance(t, (T.IntegerType, T.LongType, T.FloatType, T.DoubleType)):
total = sum(r.get(n, 0) for r in rows if r.get(n) is not None)
agg_row.__getitem__ = lambda self, key, _n=n, _t=total: _t if key == _n else 0
agg_row.__getitem__ = MagicMock(side_effect=lambda key: sum(r.get(key, 0) for r in rows if r.get(key) is not None))

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a structured plan analyzer and a correctness validator to OptiSpark, and integrates them into the agent flow so generated optimizations are both safety-checked and semantically validated before being accepted.

Changes:

  • Introduces optispark.plan_analyzer to parse Spark physical plans into a node tree and emit structured diagnostics.
  • Introduces optispark.validator to compare original vs optimized DataFrames (schema, sample row count, sample data integrity, aggregate parity) and generate an LLM failure prompt.
  • Integrates plan diagnostics + correctness validation into OptiSpark’s DataFrame introspection and sandbox execution; adds unit tests for analyzer/validator.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
src/optispark/plan_analyzer.py New plan parsing + rule-based diagnostics and debug printers.
src/optispark/validator.py New correctness validation pipeline and failure-prompt builder.
src/optispark/agent.py Uses plan diagnostics in context and validates optimized output before accepting; adds validation UI.
src/optispark/safety.py Replaces brittle string checks with AST-based detection for explode/salt patterns.
tests/test_plan_analyzer.py Adds unit tests covering parsing, rule detection, and debug output.
tests/test_validator.py Adds unit tests for validator checks, confidence scoring, and failure prompt building.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/optispark/safety.py
Comment thread src/optispark/validator.py Outdated
Comment on lines +193 to +199
# Rows in original not in optimized
only_in_orig = orig_sample.subtract(opt_sample)
orig_diff_count = only_in_orig.count()

# Rows in optimized not in original
only_in_opt = opt_sample.subtract(orig_sample)
opt_diff_count = only_in_opt.count()

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolve

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved in bdf76eb. _check_data_integrity now uses exceptAll() (multiset/EXCEPT ALL semantics) instead of subtract() (EXCEPT DISTINCT), so duplicate-row multiplicity mismatches are correctly detected. Test mocks updated to match.

Comment thread src/optispark/validator.py Outdated
Comment on lines +287 to +289
# Relative difference check
orig_f = float(orig_val)
opt_f = float(opt_val)
Comment on lines +374 to +400
def analyze_plan(roots: list, broadcast_threshold_bytes: int = 10 * 1024 * 1024) -> list:
"""Run all detection rules over a parsed plan tree.

Args:
roots: List of root PlanNode instances from parse_plan().
broadcast_threshold_bytes: The autoBroadcastJoinThreshold from the SparkSession.
Defaults to Spark's default of 10 MB.

Returns:
A list of PlanDiagnostic instances, deduplicated by rule_id.
"""
# Flatten the forest into a single list for rule functions.
all_nodes = []
for root in roots:
all_nodes.extend(root.walk())

diagnostics = []
diagnostics.extend(_rule_missing_broadcast(all_nodes, broadcast_threshold_bytes))
diagnostics.extend(_rule_excessive_shuffle(all_nodes))
diagnostics.extend(_rule_redundant_repartition(all_nodes))
# Only add informational SMJ rule if MISSING_BROADCAST didn't already flag it
broadcast_flagged = {d.node for d in diagnostics if d.rule_id == "MISSING_BROADCAST"}
for d in _rule_sort_merge_join_info(all_nodes):
if d.node not in broadcast_flagged:
diagnostics.append(d)

return diagnostics
Comment thread src/optispark/agent.py
return local_vars["optimized_df"]
else:
if "optimized_df" not in local_vars:
print(f" {C.RED}✖ Error: The code executed but did not assign 'optimized_df'.{C.RESET}")
@Radom12

Radom12 commented Apr 12, 2026

Copy link
Copy Markdown
Owner Author

@copilot fix all issues

…nt clarity, _NODE_TYPE_PATTERN, docstrings, agent retry

Agent-Logs-Url: https://github.com/Radom12/OptiSpark/sessions/5809aa55-d44b-4d24-8e6f-591290199a24

Co-authored-by: Radom12 <69920588+Radom12@users.noreply.github.com>

Copilot AI commented Apr 12, 2026

Copy link
Copy Markdown
Contributor

@copilot fix all issues

All issues addressed in bdf76eb:

  • _check_data_integrity: subtract()exceptAll() for correct multiset semantics
  • _check_row_count: message now honestly says "at least N rows — full counts were not compared" when both hit the sample limit
  • _check_aggregate_parity: DecimalType columns are compared with Python decimal.Decimal arithmetic to avoid float precision loss
  • plan_analyzer._classify_node: _NODE_TYPE_PATTERN is now used as a Priority-4 regex fallback (no more dead code)
  • plan_analyzer.analyze_plan docstring: updated to accurately describe what deduplication is actually done
  • tests/test_validator._make_mock_df: removed the redundant first __getitem__ assignment
  • agent._execute_sandbox: when optimized_df is not assigned, the function now attempts a self-healing LLM retry (consistent with the exception-handling path) before giving up

@Radom12

Radom12 commented Apr 12, 2026

Copy link
Copy Markdown
Owner Author

@copilot
2026-04-12T06:50:32.5904207Z ##[group]Run pytest tests/ --cov=src/optispark --cov=server --cov-fail-under=90 --cov-report=term-missing -v
pytest tests/ --cov=src/optispark --cov=server --cov-fail-under=90 --cov-report=term-missing -v
shell: /usr/bin/bash -e {0}
env:
GEMINI_API_KEY: ci_test_placeholder
pythonLocation: /opt/hostedtoolcache/Python/3.11.15/x64
PKG_CONFIG_PATH: /opt/hostedtoolcache/Python/3.11.15/x64/lib/pkgconfig
Python_ROOT_DIR: /opt/hostedtoolcache/Python/3.11.15/x64
Python2_ROOT_DIR: /opt/hostedtoolcache/Python/3.11.15/x64
Python3_ROOT_DIR: /opt/hostedtoolcache/Python/3.11.15/x64
LD_LIBRARY_PATH: /opt/hostedtoolcache/Python/3.11.15/x64/lib
JAVA_HOME: /opt/hostedtoolcache/Java_Temurin-Hotspot_jdk/11.0.30-7/x64
JAVA_HOME_11_X64: /opt/hostedtoolcache/Java_Temurin-Hotspot_jdk/11.0.30-7/x64
============================= test session starts ==============================
platform linux -- Python 3.11.15, pytest-9.0.3, pluggy-1.6.0 -- /opt/hostedtoolcache/Python/3.11.15/x64/bin/python
cachedir: .pytest_cache
rootdir: /home/runner/work/OptiSpark/OptiSpark
configfile: pyproject.toml
plugins: cov-7.1.0, anyio-4.13.0
collecting ... collected 149 items

tests/test_agent.py::test_optimize_success PASSED [ 0%]
tests/test_agent.py::test_optimize_no_metrics PASSED [ 1%]
tests/test_agent.py::test_optimize_safety_blocked PASSED [ 2%]
tests/test_agent.py::test_chat_exit PASSED [ 2%]
tests/test_agent.py::test_chat_interaction_and_execution PASSED [ 3%]
tests/test_agent.py::test_chat_commands PASSED [ 4%]
tests/test_agent.py::test_chat_benchmark PASSED [ 4%]
tests/test_agent.py::test_chat_keyboard_interrupt PASSED [ 5%]
tests/test_agent.py::test_agent_introspect_error_handling PASSED [ 6%]
tests/test_agent.py::test_agent_chat_legacy_path PASSED [ 6%]
tests/test_agent.py::test_agent_chat_log_dir_path PASSED [ 7%]
tests/test_agent.py::test_agent_chat_session_init_failure PASSED [ 8%]
tests/test_agent.py::test_chat_exec_failure_self_heal PASSED [ 8%]
tests/test_agent.py::test_chat_generic_exception PASSED [ 9%]
tests/test_agent.py::test_chat_benchmark_no_code PASSED [ 10%]
tests/test_agent.py::test_chat_empty_input PASSED [ 10%]
tests/test_agent_ui.py::test_ui_renders PASSED [ 11%]
tests/test_agent_ui.py::test_clear_screen PASSED [ 12%]
tests/test_benchmark.py::test_benchmark_success PASSED [ 12%]
tests/test_benchmark.py::test_benchmark_empty_df PASSED [ 13%]
tests/test_benchmark.py::test_benchmark_execution_error PASSED [ 14%]
tests/test_benchmark.py::test_benchmark_no_df_opt PASSED [ 14%]
tests/test_benchmark.py::test_benchmark_total_failure PASSED [ 15%]
tests/test_cli.py::test_cli_analyze PASSED [ 16%]
tests/test_cli.py::test_cli_chat PASSED [ 16%]
tests/test_cli.py::test_cli_custom_server_url PASSED [ 17%]
tests/test_listener.py::test_listener PASSED [ 18%]
tests/test_parser.py::test_calculate_bottlenecks PASSED [ 18%]
tests/test_parser.py::test_extract_features_from_system_tables_success PASSED [ 19%]
tests/test_parser.py::test_extract_features_from_system_tables_empty PASSED [ 20%]
tests/test_parser.py::test_extract_features_from_system_tables_fail PASSED [ 20%]
tests/test_parser.py::test_extract_features_from_logs_empty PASSED [ 21%]
tests/test_parser.py::test_extract_features_from_logs_success PASSED [ 22%]
tests/test_plan_analyzer.py::TestPlanNode::test_walk_single_node PASSED [ 22%]
tests/test_plan_analyzer.py::TestPlanNode::test_walk_tree PASSED [ 23%]
tests/test_plan_analyzer.py::TestPlanNode::test_find_all PASSED [ 24%]
tests/test_plan_analyzer.py::TestPlanNode::test_repr PASSED [ 24%]
tests/test_plan_analyzer.py::TestClassifyNode::test_sort_merge_join PASSED [ 25%]
tests/test_plan_analyzer.py::TestClassifyNode::test_broadcast_hash_join PASSED [ 26%]
tests/test_plan_analyzer.py::TestClassifyNode::test_exchange PASSED [ 26%]
tests/test_plan_analyzer.py::TestClassifyNode::test_hash_aggregate PASSED [ 27%]
tests/test_plan_analyzer.py::TestClassifyNode::test_file_scan_parquet PASSED [ 28%]
tests/test_plan_analyzer.py::TestClassifyNode::test_file_scan_csv PASSED [ 28%]
tests/test_plan_analyzer.py::TestClassifyNode::test_wholestage_codegen PASSED [ 29%]
tests/test_plan_analyzer.py::TestClassifyNode::test_unknown_node PASSED [ 30%]
tests/test_plan_analyzer.py::TestClassifyNode::test_filter PASSED [ 30%]
tests/test_plan_analyzer.py::TestClassifyNode::test_sort PASSED [ 31%]
tests/test_plan_analyzer.py::TestClassifyNode::test_repartition_by_expression PASSED [ 32%]
tests/test_plan_analyzer.py::TestClassifyNode::test_broadcast_exchange PASSED [ 32%]
tests/test_plan_analyzer.py::TestParseAttributes::test_join_keys PASSED [ 33%]
tests/test_plan_analyzer.py::TestParseAttributes::test_join_type PASSED [ 34%]
tests/test_plan_analyzer.py::TestParseAttributes::test_exchange_type_hash PASSED [ 34%]
tests/test_plan_analyzer.py::TestParseAttributes::test_exchange_type_range PASSED [ 35%]
tests/test_plan_analyzer.py::TestParseAttributes::test_scan_format PASSED [ 36%]
tests/test_plan_analyzer.py::TestParseAttributes::test_no_attributes_for_filter PASSED [ 36%]
tests/test_plan_analyzer.py::TestParsePlan::test_empty_plan PASSED [ 37%]
tests/test_plan_analyzer.py::TestParsePlan::test_simple_scan PASSED [ 38%]
tests/test_plan_analyzer.py::TestParsePlan::test_sort_merge_join_plan PASSED [ 38%]
tests/test_plan_analyzer.py::TestParsePlan::test_broadcast_join_plan PASSED [ 39%]
tests/test_plan_analyzer.py::TestParsePlan::test_nodes_have_depth PASSED [ 40%]
tests/test_plan_analyzer.py::TestBuildTree::test_empty PASSED [ 40%]
tests/test_plan_analyzer.py::TestBuildTree::test_single_node PASSED [ 41%]
tests/test_plan_analyzer.py::TestBuildTree::test_parent_child_relationship PASSED [ 42%]
tests/test_plan_analyzer.py::TestBuildTree::test_two_siblings PASSED [ 42%]
tests/test_plan_analyzer.py::TestAnalyzePlan::test_missing_broadcast_detected PASSED [ 43%]
tests/test_plan_analyzer.py::TestAnalyzePlan::test_missing_broadcast_uses_threshold PASSED [ 44%]
tests/test_plan_analyzer.py::TestAnalyzePlan::test_no_missing_broadcast_when_already_broadcast PASSED [ 44%]
tests/test_plan_analyzer.py::TestAnalyzePlan::test_excessive_shuffle_detected PASSED [ 45%]
tests/test_plan_analyzer.py::TestAnalyzePlan::test_no_excessive_shuffle_on_normal_plan PASSED [ 46%]
tests/test_plan_analyzer.py::TestAnalyzePlan::test_redundant_repartition_detected PASSED [ 46%]
tests/test_plan_analyzer.py::TestAnalyzePlan::test_empty_plan_no_diagnostics PASSED [ 47%]
tests/test_plan_analyzer.py::TestAnalyzePlan::test_sort_merge_join_not_duplicated_with_broadcast PASSED [ 48%]
tests/test_plan_analyzer.py::TestPlanDiagnostic::test_to_dict PASSED [ 48%]
tests/test_plan_analyzer.py::TestPlanDiagnostic::test_to_dict_no_node PASSED [ 49%]
tests/test_plan_analyzer.py::TestDebugOutput::test_print_plan_tree PASSED [ 50%]
tests/test_plan_analyzer.py::TestDebugOutput::test_print_diagnostics PASSED [ 51%]
tests/test_plan_analyzer.py::TestDebugOutput::test_print_diagnostics_empty PASSED [ 51%]
tests/test_reasoning.py::test_start_chat_success PASSED [ 52%]
tests/test_reasoning.py::test_start_chat_connection_error PASSED [ 53%]
tests/test_reasoning.py::test_start_chat_server_error PASSED [ 53%]
tests/test_reasoning.py::test_send_message PASSED [ 54%]
tests/test_reasoning.py::test_send_message_error PASSED [ 55%]
tests/test_reasoning.py::test_diagnose PASSED [ 55%]
tests/test_reasoning.py::test_generate_fix_uses_fallback PASSED [ 56%]
tests/test_reasoning.py::test_generate_connection_error PASSED [ 57%]
tests/test_reasoning.py::test_generate_server_error PASSED [ 57%]
tests/test_reasoning.py::test_default_server_url PASSED [ 58%]
tests/test_safety.py::test_safety_safe_code PASSED [ 59%]
tests/test_safety.py::test_safety_dangerous_no_df PASSED [ 59%]
tests/test_safety.py::test_safety_dangerous_with_safe_size PASSED [ 60%]
tests/test_safety.py::test_safety_dangerous_with_unsafe_size PASSED [ 61%]
tests/test_safety.py::test_safety_catalyst_stats_error PASSED [ 61%]
tests/test_safety.py::test_validator_allows_safe_code PASSED [ 62%]
tests/test_safety.py::test_validator_blocks_save PASSED [ 63%]
tests/test_safety.py::test_validator_blocks_write_attribute PASSED [ 63%]
tests/test_safety.py::test_validator_blocks_save_as_table PASSED [ 64%]
tests/test_safety.py::test_validator_blocks_insert_into PASSED [ 65%]
tests/test_safety.py::test_validator_blocks_drop PASSED [ 65%]
tests/test_safety.py::test_validator_blocks_destructive_sql PASSED [ 66%]
tests/test_safety.py::test_validator_allows_select_sql PASSED [ 67%]
tests/test_safety.py::test_secure_exec_runs_safe_code PASSED [ 67%]
tests/test_safety.py::test_secure_exec_blocks_unsafe_code PASSED [ 68%]
tests/test_server.py::test_health PASSED [ 69%]
tests/test_server.py::test_start_chat_success PASSED [ 69%]
tests/test_server.py::test_start_chat_all_models_fail PASSED [ 70%]
tests/test_server.py::test_send_message_success PASSED [ 71%]
tests/test_server.py::test_send_message_invalid_session PASSED [ 71%]
tests/test_server.py::test_send_message_model_error PASSED [ 72%]
tests/test_server.py::test_generate_success PASSED [ 73%]
tests/test_server.py::test_generate_with_fallback PASSED [ 73%]
tests/test_server.py::test_generate_all_fail PASSED [ 74%]
tests/test_server.py::test_build_context_injection_full PASSED [ 75%]
tests/test_server.py::test_build_context_injection_empty PASSED [ 75%]
tests/test_server.py::test_build_context_injection_string_schema PASSED [ 76%]
tests/test_server.py::test_cleanup_expired_sessions PASSED [ 77%]
tests/test_validator.py::TestValidationResult::test_to_dict PASSED [ 77%]
tests/test_validator.py::TestValidationResult::test_defaults PASSED [ 78%]
tests/test_validator.py::TestConfidenceScoring::test_all_passed_first_attempt PASSED [ 79%]
tests/test_validator.py::TestConfidenceScoring::test_all_passed_second_attempt PASSED [ 79%]
tests/test_validator.py::TestConfidenceScoring::test_all_passed_third_attempt PASSED [ 80%]
tests/test_validator.py::TestConfidenceScoring::test_any_failed PASSED [ 81%]
tests/test_validator.py::TestSchemaCheck::test_matching_schema SKIPPED [ 81%]
tests/test_validator.py::TestSchemaCheck::test_missing_column SKIPPED [ 82%]
tests/test_validator.py::TestSchemaCheck::test_extra_column SKIPPED [ 83%]
tests/test_validator.py::TestSchemaCheck::test_type_mismatch SKIPPED [ 83%]
tests/test_validator.py::TestSchemaCheck::test_matching_schema_with_mock PASSED [ 84%]
tests/test_validator.py::TestSchemaCheck::test_schema_mismatch_with_mock PASSED [ 85%]
tests/test_validator.py::TestRowCountCheck::test_matching_count PASSED [ 85%]
tests/test_validator.py::TestRowCountCheck::test_both_at_sample_limit_passes_with_caveat PASSED [ 86%]
tests/test_validator.py::TestRowCountCheck::test_mismatched_count PASSED [ 87%]
tests/test_validator.py::TestRowCountCheck::test_count_error_handled PASSED [ 87%]
tests/test_validator.py::TestDataIntegrityCheck::test_identical_data PASSED [ 88%]
tests/test_validator.py::TestDataIntegrityCheck::test_different_data PASSED [ 89%]
tests/test_validator.py::TestDataIntegrityCheck::test_integrity_error_handled PASSED [ 89%]
tests/test_validator.py::TestAggregateParity::test_no_numeric_columns PASSED [ 90%]
tests/test_validator.py::TestAggregateParity::test_matching_aggregates FAILED [ 91%]
tests/test_validator.py::TestAggregateParity::test_mismatched_aggregates FAILED [ 91%]
tests/test_validator.py::TestAggregateParity::test_aggregate_error_handled PASSED [ 92%]
tests/test_validator.py::TestAggregateParity::test_decimal_type_matching FAILED [ 93%]
tests/test_validator.py::TestAggregateParity::test_decimal_type_mismatch FAILED [ 93%]
tests/test_validator.py::TestValidateOptimization::test_identical_dataframes_pass FAILED [ 94%]
tests/test_validator.py::TestValidateOptimization::test_schema_mismatch_fails_fast PASSED [ 95%]
tests/test_validator.py::TestValidateOptimization::test_row_count_mismatch_fails_fast PASSED [ 95%]
tests/test_validator.py::TestValidateOptimization::test_retry_degrades_confidence PASSED [ 96%]
tests/test_validator.py::TestValidateOptimization::test_empty_dataframes FAILED [ 97%]
tests/test_validator.py::TestBuildFailurePrompt::test_contains_failure_details PASSED [ 97%]
tests/test_validator.py::TestBuildFailurePrompt::test_contains_sample_diffs PASSED [ 98%]
tests/test_validator.py::TestBuildFailurePrompt::test_contains_plan_diagnostics PASSED [ 99%]
tests/test_validator.py::TestBuildFailurePrompt::test_no_plan_diagnostics PASSED [100%]

=================================== FAILURES ===================================
_________________ TestAggregateParity.test_matching_aggregates _________________

self = <test_validator.TestAggregateParity object at 0x7f6a52a43590>

def test_matching_aggregates(self):
    fields = [("id", T.IntegerType()), ("amount", T.DoubleType())]
    rows = [{"id": 1, "amount": 10.0}, {"id": 2, "amount": 20.0}]
    df1 = _make_mock_df(fields, rows=rows)
    df2 = _make_mock_df(fields, rows=rows)

    # Both should return same sums
    agg_row = MagicMock()
    agg_row.__getitem__ = MagicMock(side_effect=lambda k: 3 if k == "id" else 30.0)
    df1.limit.return_value.agg.return_value.collect.return_value = [agg_row]
    df2.limit.return_value.agg.return_value.collect.return_value = [agg_row]

    result = _check_aggregate_parity(df1, df2, sample_size=1000, tolerance=0.0001)
  assert result["passed"] is True

E assert False is True

tests/test_validator.py:269: AssertionError
________________ TestAggregateParity.test_mismatched_aggregates ________________

self = <test_validator.TestAggregateParity object at 0x7f6a52a439d0>

def test_mismatched_aggregates(self):
    fields = [("id", T.IntegerType()), ("amount", T.DoubleType())]
    df1 = _make_mock_df(fields)
    df2 = _make_mock_df(fields)

    agg1 = MagicMock()
    agg1.__getitem__ = MagicMock(side_effect=lambda k: 3 if k == "id" else 30.0)
    df1.limit.return_value.agg.return_value.collect.return_value = [agg1]

    agg2 = MagicMock()
    agg2.__getitem__ = MagicMock(side_effect=lambda k: 3 if k == "id" else 999.0)
    df2.limit.return_value.agg.return_value.collect.return_value = [agg2]

    result = _check_aggregate_parity(df1, df2, sample_size=1000, tolerance=0.0001)
    assert result["passed"] is False
  assert "amount" in result["detail"]

E AssertionError: assert 'amount' in 'Aggregate parity check failed with error: '

tests/test_validator.py:286: AssertionError
________________ TestAggregateParity.test_decimal_type_matching ________________

self = <test_validator.TestAggregateParity object at 0x7f6a52a49b50>

def test_decimal_type_matching(self):
    """DecimalType columns are compared with Decimal arithmetic, not float."""
    from decimal import Decimal
    fields = [("price", T.DecimalType(18, 6))]
    df1 = _make_mock_df(fields)
    df2 = _make_mock_df(fields)

    agg1 = MagicMock()
    agg1.__getitem__ = MagicMock(return_value=Decimal("123456789.123456"))
    df1.limit.return_value.agg.return_value.collect.return_value = [agg1]

    agg2 = MagicMock()
    agg2.__getitem__ = MagicMock(return_value=Decimal("123456789.123456"))
    df2.limit.return_value.agg.return_value.collect.return_value = [agg2]

    result = _check_aggregate_parity(df1, df2, sample_size=1000, tolerance=0.0001)
  assert result["passed"] is True

E assert False is True

tests/test_validator.py:312: AssertionError
________________ TestAggregateParity.test_decimal_type_mismatch ________________

self = <test_validator.TestAggregateParity object at 0x7f6a52a4a890>

def test_decimal_type_mismatch(self):
    """DecimalType mismatch is detected without float precision loss."""
    from decimal import Decimal
    fields = [("price", T.DecimalType(18, 6))]
    df1 = _make_mock_df(fields)
    df2 = _make_mock_df(fields)

    agg1 = MagicMock()
    agg1.__getitem__ = MagicMock(return_value=Decimal("100.000000"))
    df1.limit.return_value.agg.return_value.collect.return_value = [agg1]

    agg2 = MagicMock()
    agg2.__getitem__ = MagicMock(return_value=Decimal("200.000000"))
    df2.limit.return_value.agg.return_value.collect.return_value = [agg2]

    result = _check_aggregate_parity(df1, df2, sample_size=1000, tolerance=0.0001)
    assert result["passed"] is False
  assert "price" in result["detail"]

E AssertionError: assert 'price' in 'Aggregate parity check failed with error: '

tests/test_validator.py:331: AssertionError
___________ TestValidateOptimization.test_identical_dataframes_pass ____________

self = <test_validator.TestValidateOptimization object at 0x7f6a52a4af90>

def test_identical_dataframes_pass(self):
    fields = [("id", T.IntegerType()), ("name", T.StringType())]
    df = _make_mock_df(fields, row_count=3)

    # Aggregate parity: no numeric columns (StringType is not numeric)
    # Actually IntegerType IS numeric — need to set up agg mock
    agg_row = MagicMock()
    agg_row.__getitem__ = MagicMock(return_value=6)  # sum of ids
    df.limit.return_value.agg.return_value.collect.return_value = [agg_row]

    result = validate_optimization(df, df)
  assert result.passed is True

E AssertionError: assert False is True
E + where False = ValidationResult(passed=False, checks=[{'name': 'schema_match', 'passed': True, 'detail': 'Schema matches (2 columns).'}, {'name': 'row_count', 'passed': True, 'detail': 'Row counts match (3 rows).'}, {'name': 'data_integrity', 'passed': True, 'detail': 'No row-level differences detected in sample.'}, {'name': 'aggregate_parity', 'passed': False, 'detail': 'Aggregate parity check failed with error: '}], sample_diffs=None, confidence='LOW', confidence_score=0.0).passed

tests/test_validator.py:349: AssertionError
________________ TestValidateOptimization.test_empty_dataframes ________________

self = <test_validator.TestValidateOptimization object at 0x7f6a52a48390>

def test_empty_dataframes(self):
    fields = [("id", T.IntegerType())]
    df = _make_mock_df(fields, row_count=0)
    result = validate_optimization(df, df)
  assert result.passed is True

E AssertionError: assert False is True
E + where False = ValidationResult(passed=False, checks=[{'name': 'schema_match', 'passed': True, 'detail': 'Schema matches (1 columns).'}, {'name': 'row_count', 'passed': True, 'detail': 'Row counts match (0 rows).'}, {'name': 'data_integrity', 'passed': True, 'detail': 'No row-level differences detected in sample.'}, {'name': 'aggregate_parity', 'passed': False, 'detail': 'Aggregate parity check failed with error: '}], sample_diffs=None, confidence='LOW', confidence_score=0.0).passed

tests/test_validator.py:386: AssertionError
=============================== warnings summary ===============================
server/main.py:115
/home/runner/work/OptiSpark/OptiSpark/server/main.py:115: UserWarning: CORS_ALLOWED_ORIGINS is not set; all cross-origin browser requests will be blocked. Set CORS_ALLOWED_ORIGINS to a comma-separated list of trusted origins.
warnings.warn(

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
================================ tests coverage ================================
_______________ coverage: platform linux, python 3.11.15-final-0 _______________

Name Stmts Miss Cover Missing

server/init.py 0 0 100%
server/main.py 145 8 94% 30, 95-99, 198, 295
src/optispark/init.py 3 0 100%
src/optispark/agent.py 518 62 88% 156-157, 163, 211, 217, 223-224, 304-305, 366-367, 375-377, 381-390, 428-435, 465-486, 499-503, 521-522, 533-543, 575, 602, 726-727, 739-740
src/optispark/benchmark.py 51 3 94% 22-23, 78
src/optispark/cli.py 15 1 93% 46
src/optispark/listener.py 25 1 96% 27
src/optispark/parser.py 82 2 98% 14, 51
src/optispark/plan_analyzer.py 186 8 96% 80, 134, 139, 173-174, 206, 210, 406
src/optispark/reasoning.py 63 3 95% 16-17, 30
src/optispark/safety.py 50 3 94% 58, 60-62
src/optispark/validator.py 170 35 79% 142, 148, 150, 304-354

TOTAL 1308 126 90%
Required test coverage of 90% reached. Total coverage: 90.37%
=========================== short test summary info ============================
FAILED tests/test_validator.py::TestAggregateParity::test_matching_aggregates - assert False is True
FAILED tests/test_validator.py::TestAggregateParity::test_mismatched_aggregates - AssertionError: assert 'amount' in 'Aggregate parity check failed with error: '
FAILED tests/test_validator.py::TestAggregateParity::test_decimal_type_matching - assert False is True
FAILED tests/test_validator.py::TestAggregateParity::test_decimal_type_mismatch - AssertionError: assert 'price' in 'Aggregate parity check failed with error: '
FAILED tests/test_validator.py::TestValidateOptimization::test_identical_dataframes_pass - AssertionError: assert False is True

  • where False = ValidationResult(passed=False, checks=[{'name': 'schema_match', 'passed': True, 'detail': 'Schema matches (2 columns).'}, {'name': 'row_count', 'passed': True, 'detail': 'Row counts match (3 rows).'}, {'name': 'data_integrity', 'passed': True, 'detail': 'No row-level differences detected in sample.'}, {'name': 'aggregate_parity', 'passed': False, 'detail': 'Aggregate parity check failed with error: '}], sample_diffs=None, confidence='LOW', confidence_score=0.0).passed
    FAILED tests/test_validator.py::TestValidateOptimization::test_empty_dataframes - AssertionError: assert False is True
  • where False = ValidationResult(passed=False, checks=[{'name': 'schema_match', 'passed': True, 'detail': 'Schema matches (1 columns).'}, {'name': 'row_count', 'passed': True, 'detail': 'Row counts match (0 rows).'}, {'name': 'data_integrity', 'passed': True, 'detail': 'No row-level differences detected in sample.'}, {'name': 'aggregate_parity', 'passed': False, 'detail': 'Aggregate parity check failed with error: '}], sample_diffs=None, confidence='LOW', confidence_score=0.0).passed
    ============= 6 failed, 139 passed, 4 skipped, 1 warning in 7.13s ==============
    Process completed with exit code 1.

Fix this issue

… in aggregate parity check

Agent-Logs-Url: https://github.com/Radom12/OptiSpark/sessions/d114b664-f815-4def-a65b-826592e9d68d

Co-authored-by: Radom12 <69920588+Radom12@users.noreply.github.com>

Copilot AI commented Apr 12, 2026

Copy link
Copy Markdown
Contributor

@copilot
2026-04-12T06:50:32.5904207Z ##[group]Run pytest tests/ --cov=src/optispark --cov=server --cov-fail-under=90 --cov-report=term-missing -v
pytest tests/ --cov=src/optispark --cov=server --...

Fixed in d9a011a. The root cause was that PySpark 4.x requires an active SparkContext to create Column objects (F.col(), F.sum()). In CI (Python 3.11), Java is unavailable so the spark fixture is skipped — no SparkSession is ever created. The previous code called F.sum(F.col(c)).alias(c) to build aggregation expressions, which throws AssertionError('') (caught silently by the except Exception handler, producing the empty error message in the test output).

The fix replaces the Column-expression approach with a plain Python dict passed to df.agg({'col': 'sum', ...}). No PySpark Column objects are created on the Python client side; PySpark processes the dict on the JVM where a SparkContext always exists. The result column names change from col to sum(col) (Spark's dict-agg naming), so the access key in the comparison loop and the test mock side_effects were updated accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants