Files
drt/tests/test_parallel_logic.py
DevOps Team 40bc615bf7 Add parallel checker execution with connection pooling
Implements Level 2 parallelization for row_count, schema, and
aggregate checkers, improving performance by 2-3x for tables with
multiple enabled checks.

Changes:
- Add max_workers config option (default: 4)
- Add ConnectionPool module with SQLAlchemy QueuePool
- Add URL encoding for connection strings
- Implement parallel checker execution with ThreadPoolExecutor
- Add fail-fast behavior on checker errors
- Update executor for SQLAlchemy 2.0 compatibility
- Fix engine disposal resource leak
- Cache pooled engines in ConnectionManager
- Add disconnect() cleanup for pooled engines

Performance:
- Sequential: 3 checkers × 100ms = 300ms
- Parallel: 3 checkers ≈ 100ms (2-3x speedup)

Configuration:
  execution:
    max_workers: 4  # Controls parallel checker execution
    continue_on_error: true
2026-02-11 21:46:10 +07:00

178 lines
5.7 KiB
Python

"""Test parallel checker execution logic."""
import pytest
from unittest.mock import Mock, MagicMock, patch
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, Any
import time
class TestParallelCheckerLogic:
"""Test parallel checker execution logic."""
def test_parallel_execution_with_multiple_checkers(self):
"""Test that multiple checkers can run in parallel."""
# Don't import modules that need pyodbc - test the logic conceptually
# Create mock results
from drt.models.results import CheckResult
from drt.models.enums import Status, CheckType
from drt.models.table import TableInfo
row_count_result = CheckResult(
check_type=CheckType.ROW_COUNT,
status=Status.PASS,
message="Row count matches"
)
schema_result = CheckResult(
check_type=CheckType.SCHEMA,
status=Status.PASS,
message="Schema matches"
)
aggregate_result = CheckResult(
check_type=CheckType.AGGREGATE,
status=Status.PASS,
message="Aggregates match"
)
# Mock checkers
mock_row_count_checker = Mock()
mock_schema_checker = Mock()
mock_aggregate_checker = Mock()
# Simulate check execution with delay
def mock_check_row_count(table):
time.sleep(0.05)
return row_count_result
def mock_check_schema(table):
time.sleep(0.05)
return schema_result
def mock_check_aggregate(table):
time.sleep(0.05)
return aggregate_result
mock_row_count_checker.check = mock_check_row_count
mock_schema_checker.check = mock_check_schema
mock_aggregate_checker.check = mock_check_aggregate
# Create table mock
table = TableInfo(
schema="dbo",
name="TestTable",
enabled=True,
expected_in_target=True,
aggregate_columns=["Amount", "Quantity"]
)
# Test parallel execution timing
start = time.time()
results = {}
checkers = [
("row_count", mock_row_count_checker),
("schema", mock_schema_checker),
("aggregate", mock_aggregate_checker)
]
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(lambda c: c[1].check(table), c): c[0]
for c in checkers
}
for future in futures:
name = futures[future]
results[name] = future.result()
elapsed = time.time() - start
# Verify results
assert len(results) == 3
assert results["row_count"].status == Status.PASS
assert results["schema"].status == Status.PASS
assert results["aggregate"].status == Status.PASS
# Verify parallel execution (should be ~0.05s, not 0.15s if sequential)
assert elapsed < 0.15, f"Expected parallel execution but took {elapsed:.2f}s"
def test_fail_fast_on_error(self):
"""Test that parallel execution cancels remaining checkers on error."""
from drt.models.results import CheckResult
from drt.models.enums import Status, CheckType
# Mock checkers where second one fails
mock_checker1 = Mock()
mock_checker2 = Mock()
mock_checker3 = Mock()
result1 = CheckResult(
check_type=CheckType.ROW_COUNT,
status=Status.PASS,
message="OK"
)
result2 = CheckResult(
check_type=CheckType.SCHEMA,
status=Status.ERROR,
message="Database connection failed"
)
call_count = {"checker1": 0, "checker2": 0, "checker3": 0}
def mock_check1(table):
call_count["checker1"] += 1
time.sleep(0.05)
return result1
def mock_check2(table):
call_count["checker2"] += 1
time.sleep(0.05)
return result2
def mock_check3(table):
call_count["checker3"] += 1
time.sleep(0.05)
return CheckResult(check_type=CheckType.AGGREGATE, status=Status.PASS, message="OK")
mock_checker1.check = mock_check1
mock_checker2.check = mock_check2
mock_checker3.check = mock_check3
# Run with fail-fast
from drt.models.table import TableInfo
table = TableInfo(schema="dbo", name="TestTable", enabled=True)
results = {}
checkers = [
("row_count", mock_checker1),
("schema", mock_checker2),
("aggregate", mock_checker3)
]
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(lambda c: c[1].check(table), c): c[0]
for c in checkers
}
for future in futures:
name = futures[future]
try:
result = future.result()
results[name] = result
if result.status == Status.ERROR:
# Cancel remaining
for f in futures:
f.cancel()
break
except Exception:
pass
# Verify that we got at least one result
assert len(results) >= 1
if __name__ == "__main__":
pytest.main([__file__, "-v"])