diff --git a/.gitignore b/.gitignore index 4b4e08c..e3b1878 100644 --- a/.gitignore +++ b/.gitignore @@ -18,7 +18,7 @@ logs/ # Reports and analysis output reports/ -investigation_reports/ + analysis/ # IDE and editor files diff --git a/README.md b/README.md index 7b4fee0..79ae529 100755 --- a/README.md +++ b/README.md @@ -54,8 +54,6 @@ drt validate --config config.yaml # 4. Run comparison drt compare --config config.yaml -# 5. (Optional) Investigate regression issues -drt investigate --analysis-dir analysis/output_/ --config config.yaml ``` ## 📦 Platform-Specific Installation @@ -134,34 +132,6 @@ drt compare --config [OPTIONS] - `--verbose, -v` - Enable verbose output - `--dry-run` - Show what would be compared without executing -### Investigate - -Execute diagnostic queries from regression analysis. - -```bash -drt investigate --analysis-dir --config [OPTIONS] -``` - -**Options:** -- `--analysis-dir, -a PATH` - Analysis output directory containing `*_investigate.sql` files (required) -- `--config, -c PATH` - Configuration file (required) -- `--output-dir, -o PATH` - Output directory for reports (default: ./investigation_reports) -- `--verbose, -v` - Enable verbose output -- `--dry-run` - Show what would be executed without running - -**Example:** -```bash -drt investigate -a analysis/output_20251209_184032/ -c config.yaml -drt investigate -a analysis/output_20251209_184032/ -c config.yaml -o ./my_reports -``` - -**What it does:** -- Discovers all `*_investigate.sql` files in the analysis directory -- Parses SQL files (handles markdown, multiple queries per file) -- Executes queries on both baseline and target databases -- Handles errors gracefully (continues on failures) -- Generates HTML and CSV reports with side-by-side comparisons - ## ⚙️ Configuration ### Database Connections @@ -217,7 +187,7 @@ tables: ```yaml reporting: output_dir: "./reports" - investigation_dir: "./investigation_reports" + logging: output_dir: "./logs" @@ -249,7 +219,7 @@ Reports are saved to `./reports/` with timestamps. - **HTML Report** - Interactive report with collapsible query results, side-by-side baseline vs target comparison - **CSV Report** - Flattened structure with one row per query execution -Investigation reports are saved to `./investigation_reports/` with timestamps. + ## 🔄 Exit Codes @@ -324,14 +294,14 @@ grep -i "FAIL\|ERROR" logs/drt_*.log ``` src/drt/ ├── cli/ # Command-line interface -│ └── commands/ # CLI commands (compare, discover, validate, investigate) +│ └── commands/ # CLI commands (compare, discover, validate) ├── config/ # Configuration management ├── database/ # Database connectivity (READ ONLY) ├── models/ # Data models ├── reporting/ # Report generators ├── services/ # Business logic │ ├── checkers/ # Comparison checkers -│ ├── investigation.py # Investigation service + │ └── sql_parser.py # SQL file parser └── utils/ # Utilities ``` diff --git a/config.example.yaml b/config.example.yaml index 3ff8819..afd5d74 100755 --- a/config.example.yaml +++ b/config.example.yaml @@ -145,8 +145,7 @@ reporting: # Output directory for reports (use relative path or set via environment variable) output_dir: "./reports" - # Output directory for investigation reports (use relative path or set via environment variable) - investigation_dir: "./investigation_reports" + # Report formats to generate formats: diff --git a/config.quickstart.yaml b/config.quickstart.yaml index 639175a..426f60d 100755 --- a/config.quickstart.yaml +++ b/config.quickstart.yaml @@ -35,7 +35,7 @@ tables: reporting: output_dir: "./reports" - investigation_dir: "./investigation_reports" + formats: html: true csv: true diff --git a/config.test.yaml b/config.test.yaml index 19f047e..a3b8d8e 100755 --- a/config.test.yaml +++ b/config.test.yaml @@ -72,7 +72,7 @@ tables: reporting: output_directory: "/home/user/reports" - investigation_directory: "/home/user/investigation_reports" + formats: ["html", "csv"] filename_template: "test_regression_{timestamp}" diff --git a/src/drt/cli/commands/__init__.py b/src/drt/cli/commands/__init__.py index 4ce91d0..0e78a38 100755 --- a/src/drt/cli/commands/__init__.py +++ b/src/drt/cli/commands/__init__.py @@ -1,5 +1,5 @@ """CLI commands.""" -from drt.cli.commands import discover, compare, validate, investigate +from drt.cli.commands import discover, compare, validate -__all__ = ["discover", "compare", "validate", "investigate"] \ No newline at end of file +__all__ = ["discover", "compare", "validate"] diff --git a/src/drt/cli/commands/investigate.py b/src/drt/cli/commands/investigate.py deleted file mode 100644 index 634e685..0000000 --- a/src/drt/cli/commands/investigate.py +++ /dev/null @@ -1,177 +0,0 @@ -"""Investigate command implementation.""" - -import click -import sys -from pathlib import Path -from drt.config.loader import load_config -from drt.services.investigation import InvestigationService -from drt.reporting.investigation_report import ( - InvestigationHTMLReportGenerator, - InvestigationCSVReportGenerator -) -from drt.utils.logging import setup_logging, get_logger -from drt.utils.timestamps import get_timestamp - -logger = get_logger(__name__) - - -@click.command() -@click.option('--analysis-dir', '-a', required=True, type=click.Path(exists=True), - help='Analysis output directory containing *_investigate.sql files') -@click.option('--config', '-c', required=True, type=click.Path(exists=True), - help='Configuration file path') -@click.option('--output-dir', '-o', default=None, - help='Output directory for reports (overrides config setting)') -@click.option('--verbose', '-v', is_flag=True, help='Enable verbose output') -@click.option('--dry-run', is_flag=True, help='Show what would be executed without running') -def investigate(analysis_dir, config, output_dir, verbose, dry_run): - """ - Execute investigation queries from regression analysis. - - Processes all *_investigate.sql files in the analysis directory, - executes queries on both baseline and target databases, and - generates comprehensive reports. - - Example: - drt investigate -a /home/user/analysis/output_20251209_184032/ -c config.yaml - """ - # Load config first to get log directory - from drt.config.loader import load_config - cfg = load_config(config) - - # Setup logging using config - log_level = "DEBUG" if verbose else "INFO" - log_dir = cfg.logging.directory - setup_logging(log_level=log_level, log_dir=log_dir, log_to_file=not dry_run) - - click.echo("=" * 60) - click.echo("Data Regression Testing Framework - Investigation") - click.echo("=" * 60) - click.echo() - - try: - # Use output_dir from CLI if provided, otherwise use config - if output_dir is None: - output_dir = cfg.reporting.investigation_directory - - click.echo(f"✓ Configuration loaded") - click.echo(f" Database pairs: {len(cfg.database_pairs)}") - click.echo() - - # Convert paths - analysis_path = Path(analysis_dir) - output_path = Path(output_dir) - - # Create output directory - output_path.mkdir(parents=True, exist_ok=True) - - if dry_run: - click.echo("=" * 60) - click.echo("DRY RUN - Preview Only") - click.echo("=" * 60) - - # Discover SQL files - from drt.services.sql_parser import discover_sql_files - sql_files = discover_sql_files(analysis_path) - - click.echo(f"\nAnalysis Directory: {analysis_path}") - click.echo(f"Found {len(sql_files)} investigation SQL files") - - if sql_files: - click.echo("\nTables with investigation queries:") - for schema, table, sql_path in sql_files[:10]: # Show first 10 - click.echo(f" • {schema}.{table}") - - if len(sql_files) > 10: - click.echo(f" ... and {len(sql_files) - 10} more") - - for pair in cfg.database_pairs: - if not pair.enabled: - continue - - click.echo(f"\nDatabase Pair: {pair.name}") - click.echo(f" Baseline: {pair.baseline.server}.{pair.baseline.database}") - click.echo(f" Target: {pair.target.server}.{pair.target.database}") - - click.echo(f"\nReports would be saved to: {output_path}") - click.echo("\n" + "=" * 60) - click.echo("Use without --dry-run to execute investigation") - click.echo("=" * 60) - sys.exit(0) - - # Execute investigation for each database pair - all_summaries = [] - - for pair in cfg.database_pairs: - if not pair.enabled: - click.echo(f"Skipping disabled pair: {pair.name}") - continue - - click.echo(f"Investigating: {pair.name}") - click.echo(f" Baseline: {pair.baseline.server}.{pair.baseline.database}") - click.echo(f" Target: {pair.target.server}.{pair.target.database}") - click.echo() - - # Run investigation - investigation_service = InvestigationService(cfg) - summary = investigation_service.run_investigation(analysis_path, pair) - all_summaries.append(summary) - - click.echo() - - # Generate reports for all summaries - if all_summaries: - click.echo("=" * 60) - click.echo("Generating Reports") - click.echo("=" * 60) - - for summary in all_summaries: - timestamp = get_timestamp() - - # Generate HTML report - html_gen = InvestigationHTMLReportGenerator(cfg) - html_path = output_path / f"investigation_report_{timestamp}.html" - html_gen.generate(summary, html_path) - click.echo(f" ✓ HTML: {html_path}") - - # Generate CSV report - csv_gen = InvestigationCSVReportGenerator(cfg) - csv_path = output_path / f"investigation_report_{timestamp}.csv" - csv_gen.generate(summary, csv_path) - click.echo(f" ✓ CSV: {csv_path}") - - click.echo() - - # Display final summary - click.echo("=" * 60) - click.echo("INVESTIGATION COMPLETE") - click.echo("=" * 60) - - total_processed = sum(s.tables_processed for s in all_summaries) - total_successful = sum(s.tables_successful for s in all_summaries) - total_partial = sum(s.tables_partial for s in all_summaries) - total_failed = sum(s.tables_failed for s in all_summaries) - total_queries = sum(s.total_queries_executed for s in all_summaries) - - click.echo(f" Tables Processed: {total_processed:3d}") - click.echo(f" Successful: {total_successful:3d}") - click.echo(f" Partial: {total_partial:3d}") - click.echo(f" Failed: {total_failed:3d}") - click.echo(f" Total Queries: {total_queries:3d}") - click.echo("=" * 60) - - # Exit with appropriate code - if total_failed > 0: - click.echo("Status: COMPLETED WITH FAILURES ⚠️") - sys.exit(1) - elif total_partial > 0: - click.echo("Status: COMPLETED WITH PARTIAL RESULTS ◐") - sys.exit(0) - else: - click.echo("Status: SUCCESS ✓") - sys.exit(0) - - except Exception as e: - logger.error(f"Investigation failed: {e}", exc_info=verbose) - click.echo(f"✗ Error: {e}", err=True) - sys.exit(2) \ No newline at end of file diff --git a/src/drt/cli/main.py b/src/drt/cli/main.py index c704ef0..38d372b 100755 --- a/src/drt/cli/main.py +++ b/src/drt/cli/main.py @@ -3,7 +3,7 @@ import click import sys from drt import __version__ -from drt.cli.commands import discover, compare, validate, investigate +from drt.cli.commands import discover, compare, validate from drt.utils.logging import setup_logging @@ -45,7 +45,7 @@ def version(): cli.add_command(discover.discover) cli.add_command(compare.compare) cli.add_command(validate.validate) -cli.add_command(investigate.investigate) + if __name__ == '__main__': diff --git a/src/drt/config/models.py b/src/drt/config/models.py index dc9267d..3e7b21e 100755 --- a/src/drt/config/models.py +++ b/src/drt/config/models.py @@ -115,7 +115,7 @@ class TableConfig(BaseModel): class ReportingConfig(BaseModel): """Reporting configuration.""" output_directory: str = "./reports" - investigation_directory: str = "./investigation_reports" + formats: List[str] = Field(default_factory=lambda: ["html", "csv"]) filename_template: str = "regression_report_{timestamp}" html: Dict[str, Any] = Field(default_factory=lambda: { @@ -196,4 +196,4 @@ class Config(BaseModel): """Ensure at least one database pair is configured.""" if not v: raise ValueError("At least one database pair must be configured") - return v \ No newline at end of file + return v diff --git a/src/drt/database/executor.py b/src/drt/database/executor.py index 3fb6309..0ff42ef 100755 --- a/src/drt/database/executor.py +++ b/src/drt/database/executor.py @@ -190,78 +190,3 @@ class QueryExecutor: return results - def execute_investigation_query( - self, - query: str, - timeout: Optional[int] = None - ) -> Tuple[Status, Optional[pd.DataFrame], Optional[str], int]: - """ - Execute investigation query with comprehensive error handling. - - This method is specifically for investigation queries and does NOT - enforce the SELECT-only restriction. It handles errors gracefully - and returns detailed status information. - - Args: - query: SQL query to execute - timeout: Query timeout in seconds (optional) - - Returns: - Tuple of (status, result_df, error_message, execution_time_ms) - """ - start_time = time.time() - - try: - # Execute query - with self.conn_mgr.get_connection() as conn: - if timeout: - # Set query timeout if supported - try: - cursor = conn.cursor() - cursor.execute(f"SET QUERY_TIMEOUT {timeout}") - except Exception: - # Timeout setting not supported, continue anyway - pass - - df = pd.read_sql(query, conn) - - execution_time = int((time.time() - start_time) * 1000) - - return (Status.PASS, df, None, execution_time) - - except Exception as e: - execution_time = int((time.time() - start_time) * 1000) - error_msg = str(e) - error_type = type(e).__name__ - - # Categorize error - if any(phrase in error_msg.lower() for phrase in [ - 'does not exist', - 'invalid object name', - 'could not find', - 'not found' - ]): - status = Status.SKIP - message = f"Object not found: {error_msg}" - - elif 'timeout' in error_msg.lower(): - status = Status.FAIL - message = f"Query timeout: {error_msg}" - - elif any(phrase in error_msg.lower() for phrase in [ - 'syntax error', - 'incorrect syntax' - ]): - status = Status.FAIL - message = f"Syntax error: {error_msg}" - - elif 'permission' in error_msg.lower(): - status = Status.FAIL - message = f"Permission denied: {error_msg}" - - else: - status = Status.FAIL - message = f"{error_type}: {error_msg}" - - logger.debug(f"Query execution failed: {message}") - return (status, None, message, execution_time) \ No newline at end of file diff --git a/src/drt/models/investigation.py b/src/drt/models/investigation.py deleted file mode 100644 index b5fc7e4..0000000 --- a/src/drt/models/investigation.py +++ /dev/null @@ -1,70 +0,0 @@ -"""Data models for investigation feature.""" - -from dataclasses import dataclass, field -from typing import List, Optional -import pandas as pd -from drt.models.enums import Status - - -@dataclass -class QueryExecutionResult: - """Result of executing a single query.""" - query_number: int - query_text: str - status: Status - execution_time_ms: int - result_data: Optional[pd.DataFrame] = None - error_message: Optional[str] = None - row_count: int = 0 - - -@dataclass -class TableInvestigationResult: - """Results for all queries in a table's investigation.""" - schema: str - table: str - sql_file_path: str - baseline_results: List[QueryExecutionResult] - target_results: List[QueryExecutionResult] - overall_status: Status - timestamp: str - - @property - def full_name(self) -> str: - """Get full table name.""" - return f"{self.schema}.{self.table}" - - @property - def total_queries(self) -> int: - """Get total number of queries.""" - return len(self.baseline_results) - - @property - def successful_queries(self) -> int: - """Get number of successful queries.""" - all_results = self.baseline_results + self.target_results - return sum(1 for r in all_results if r.status == Status.PASS) - - -@dataclass -class InvestigationSummary: - """Overall investigation execution summary.""" - start_time: str - end_time: str - duration_seconds: int - analysis_directory: str - baseline_info: str - target_info: str - tables_processed: int - tables_successful: int - tables_partial: int - tables_failed: int - total_queries_executed: int - results: List[TableInvestigationResult] = field(default_factory=list) - - @property - def success_rate(self) -> float: - """Calculate success rate percentage.""" - if self.tables_processed == 0: - return 0.0 - return (self.tables_successful / self.tables_processed) * 100 \ No newline at end of file diff --git a/src/drt/reporting/investigation_report.py b/src/drt/reporting/investigation_report.py deleted file mode 100644 index ad95b68..0000000 --- a/src/drt/reporting/investigation_report.py +++ /dev/null @@ -1,357 +0,0 @@ -"""Investigation report generators for HTML and CSV formats.""" - -import csv -from pathlib import Path -from typing import Optional -from drt.models.investigation import InvestigationSummary, QueryExecutionResult -from drt.models.enums import Status -from drt.config.models import Config -from drt.utils.logging import get_logger -from drt.utils.timestamps import format_duration - -logger = get_logger(__name__) - - -class InvestigationHTMLReportGenerator: - """Generates HTML format investigation reports.""" - - def __init__(self, config: Config): - """ - Initialize HTML generator. - - Args: - config: Configuration object - """ - self.config = config - self.max_rows = 100 # Limit rows displayed in HTML - - def generate(self, summary: InvestigationSummary, filepath: Path) -> None: - """ - Generate HTML investigation report. - - Args: - summary: Investigation summary - filepath: Output file path - """ - html_content = self._build_html(summary) - - with open(filepath, "w", encoding="utf-8") as f: - f.write(html_content) - - logger.debug(f"Investigation HTML report written to {filepath}") - - def _build_html(self, summary: InvestigationSummary) -> str: - """Build complete HTML document.""" - return f""" - - - - - Investigation Report - {summary.start_time} - {self._get_styles()} - {self._get_scripts()} - - -
- {self._build_header(summary)} - {self._build_summary(summary)} - {self._build_table_results(summary)} - {self._build_footer(summary)} -
- -""" - - def _get_styles(self) -> str: - """Get embedded CSS styles.""" - return """""" - - def _get_scripts(self) -> str: - """Get embedded JavaScript.""" - return """""" - - def _build_header(self, summary: InvestigationSummary) -> str: - """Build report header.""" - return f"""
-

🔍 Investigation Report

-

Analysis Directory: {summary.analysis_directory}

-
- -
-
-
Start Time
-
{summary.start_time}
-
-
-
End Time
-
{summary.end_time}
-
-
-
Duration
-
{format_duration(summary.duration_seconds)}
-
-
-
Baseline
-
{summary.baseline_info}
-
-
-
Target
-
{summary.target_info}
-
-
-
Total Queries
-
{summary.total_queries_executed}
-
-
""" - - def _build_summary(self, summary: InvestigationSummary) -> str: - """Build summary section.""" - return f"""

Summary

-
-
-
{summary.tables_successful}
-
Successful
-
-
-
{summary.tables_partial}
-
Partial
-
-
-
{summary.tables_failed}
-
Failed
-
-
""" - - def _build_table_results(self, summary: InvestigationSummary) -> str: - """Build table-by-table results.""" - html = '

Investigation Results

' - - for idx, table_result in enumerate(summary.results): - html += f"""
-
- {table_result.full_name} - {table_result.overall_status.value} - -
-
-

SQL File: {table_result.sql_file_path}

-

Total Queries: {table_result.total_queries}

-

Successful Queries: {table_result.successful_queries}

- {self._build_queries(table_result)} -
-
""" - - return html - - def _build_queries(self, table_result) -> str: - """Build query results for a table.""" - html = "" - - for i, (baseline_result, target_result) in enumerate(zip( - table_result.baseline_results, - table_result.target_results - ), 1): - html += f"""
-
Query {baseline_result.query_number}
-
- View SQL -
{self._escape_html(baseline_result.query_text)}
-
-
- {self._build_query_result(baseline_result, "Baseline")} - {self._build_query_result(target_result, "Target")} -
-
""" - - return html - - def _build_query_result(self, result: QueryExecutionResult, env: str) -> str: - """Build single query result.""" - html = f"""
-
{env}
- {result.status.value} -
- ⏱️ {result.execution_time_ms}ms - 📊 {result.row_count} rows -
""" - - if result.error_message: - html += f'
❌ {self._escape_html(result.error_message)}
' - elif result.result_data is not None and not result.result_data.empty: - html += self._build_result_table(result) - - html += '
' - return html - - def _build_result_table(self, result: QueryExecutionResult) -> str: - """Build HTML table from DataFrame.""" - df = result.result_data - - if df is None or df.empty: - return '

No data returned

' - - # Limit rows - display_df = df.head(self.max_rows) - - html = '' - for col in display_df.columns: - html += f'' - html += '' - - for _, row in display_df.iterrows(): - html += '' - for val in row: - html += f'' - html += '' - - html += '
{self._escape_html(str(col))}
{self._escape_html(str(val))}
' - - if len(df) > self.max_rows: - html += f'

Showing first {self.max_rows} of {len(df)} rows

' - - return html - - def _escape_html(self, text: str) -> str: - """Escape HTML special characters.""" - return (text - .replace('&', '&') - .replace('<', '<') - .replace('>', '>') - .replace('"', '"') - .replace("'", ''')) - - def _build_footer(self, summary: InvestigationSummary) -> str: - """Build report footer.""" - return f"""""" - - -class InvestigationCSVReportGenerator: - """Generates CSV format investigation reports.""" - - def __init__(self, config: Config): - """ - Initialize CSV generator. - - Args: - config: Configuration object - """ - self.config = config - - def generate(self, summary: InvestigationSummary, filepath: Path) -> None: - """ - Generate CSV investigation report. - - Args: - summary: Investigation summary - filepath: Output file path - """ - csv_config = self.config.reporting.csv - delimiter = csv_config.get("delimiter", ",") - encoding = csv_config.get("encoding", "utf-8-sig") - - with open(filepath, "w", newline="", encoding=encoding) as f: - writer = csv.writer(f, delimiter=delimiter) - - # Write header - writer.writerow([ - "Timestamp", - "Schema", - "Table", - "Query_Number", - "Environment", - "Status", - "Row_Count", - "Execution_Time_Ms", - "Error_Message", - "SQL_File_Path" - ]) - - # Write data rows - for table_result in summary.results: - # Baseline results - for query_result in table_result.baseline_results: - writer.writerow([ - table_result.timestamp, - table_result.schema, - table_result.table, - query_result.query_number, - "baseline", - query_result.status.value, - query_result.row_count, - query_result.execution_time_ms, - query_result.error_message or "", - table_result.sql_file_path - ]) - - # Target results - for query_result in table_result.target_results: - writer.writerow([ - table_result.timestamp, - table_result.schema, - table_result.table, - query_result.query_number, - "target", - query_result.status.value, - query_result.row_count, - query_result.execution_time_ms, - query_result.error_message or "", - table_result.sql_file_path - ]) - - logger.debug(f"Investigation CSV report written to {filepath}") \ No newline at end of file diff --git a/src/drt/services/investigation.py b/src/drt/services/investigation.py deleted file mode 100644 index 166cbbc..0000000 --- a/src/drt/services/investigation.py +++ /dev/null @@ -1,297 +0,0 @@ -"""Investigation service for executing investigation queries.""" - -import time -from pathlib import Path -from typing import List, Tuple -from drt.database.connection import ConnectionManager -from drt.database.executor import QueryExecutor -from drt.config.models import Config, DatabasePairConfig -from drt.models.investigation import ( - QueryExecutionResult, - TableInvestigationResult, - InvestigationSummary -) -from drt.models.enums import Status -from drt.services.sql_parser import SQLParser, discover_sql_files -from drt.utils.logging import get_logger -from drt.utils.timestamps import get_timestamp - -logger = get_logger(__name__) - - -class InvestigationService: - """Service for executing investigation queries.""" - - def __init__(self, config: Config): - """ - Initialize investigation service. - - Args: - config: Configuration object - """ - self.config = config - self.parser = SQLParser() - - def run_investigation( - self, - analysis_dir: Path, - db_pair: DatabasePairConfig - ) -> InvestigationSummary: - """ - Run investigation for all SQL files in analysis directory. - - Args: - analysis_dir: Path to analysis output directory - db_pair: Database pair configuration - - Returns: - Investigation summary with all results - """ - start_time = get_timestamp() - start_ts = time.time() - - logger.info("=" * 60) - logger.info(f"Starting investigation: {analysis_dir.name}") - logger.info("=" * 60) - - # Initialize connections - baseline_mgr = ConnectionManager(db_pair.baseline) - target_mgr = ConnectionManager(db_pair.target) - - try: - # Connect to databases - baseline_mgr.connect() - target_mgr.connect() - - # Create executors - baseline_executor = QueryExecutor(baseline_mgr) - target_executor = QueryExecutor(target_mgr) - - # Discover SQL files - sql_files = discover_sql_files(analysis_dir) - logger.info(f"Found {len(sql_files)} investigation files") - - # Create summary - summary = InvestigationSummary( - start_time=start_time, - end_time="", - duration_seconds=0, - analysis_directory=str(analysis_dir), - baseline_info=f"{db_pair.baseline.server}.{db_pair.baseline.database}", - target_info=f"{db_pair.target.server}.{db_pair.target.database}", - tables_processed=0, - tables_successful=0, - tables_partial=0, - tables_failed=0, - total_queries_executed=0, - results=[] - ) - - # Process each SQL file - for idx, (schema, table, sql_path) in enumerate(sql_files, 1): - logger.info(f"[{idx:3d}/{len(sql_files)}] {schema}.{table:40s} ...") - - result = self._investigate_table( - schema, - table, - sql_path, - baseline_executor, - target_executor - ) - - summary.results.append(result) - summary.tables_processed += 1 - - # Update counters - if result.overall_status == Status.PASS: - summary.tables_successful += 1 - elif result.overall_status == Status.SKIP: - # Don't count skipped tables in partial/failed - pass - elif result.overall_status in [Status.WARNING, Status.INFO]: - # Treat WARNING/INFO as partial success - summary.tables_partial += 1 - elif self._is_partial_status(result): - summary.tables_partial += 1 - else: - summary.tables_failed += 1 - - # Count queries - summary.total_queries_executed += len(result.baseline_results) - summary.total_queries_executed += len(result.target_results) - - logger.info(f" {self._get_status_symbol(result.overall_status)} " - f"{result.overall_status.value}") - - # Finalize summary - end_time = get_timestamp() - duration = int(time.time() - start_ts) - summary.end_time = end_time - summary.duration_seconds = duration - - self._log_summary(summary) - - return summary - - finally: - baseline_mgr.disconnect() - target_mgr.disconnect() - - def _investigate_table( - self, - schema: str, - table: str, - sql_path: Path, - baseline_executor: QueryExecutor, - target_executor: QueryExecutor - ) -> TableInvestigationResult: - """Execute investigation queries for a single table.""" - - # Parse SQL file - queries = self.parser.parse_sql_file(sql_path) - - if not queries: - logger.warning(f"No valid queries found in {sql_path.name}") - return TableInvestigationResult( - schema=schema, - table=table, - sql_file_path=str(sql_path), - baseline_results=[], - target_results=[], - overall_status=Status.SKIP, - timestamp=get_timestamp() - ) - - logger.debug(f" └─ Executing {len(queries)} queries") - - # Execute on baseline - baseline_results = self._execute_queries( - queries, - baseline_executor, - "baseline" - ) - - # Execute on target - target_results = self._execute_queries( - queries, - target_executor, - "target" - ) - - # Determine overall status - overall_status = self._determine_overall_status( - baseline_results, - target_results - ) - - return TableInvestigationResult( - schema=schema, - table=table, - sql_file_path=str(sql_path), - baseline_results=baseline_results, - target_results=target_results, - overall_status=overall_status, - timestamp=get_timestamp() - ) - - def _execute_queries( - self, - queries: List[Tuple[int, str]], - executor: QueryExecutor, - environment: str - ) -> List[QueryExecutionResult]: - """Execute list of queries on one environment.""" - results = [] - - for query_num, query_text in queries: - logger.debug(f" └─ Query {query_num} on {environment}") - - status, result_df, error_msg, exec_time = \ - executor.execute_investigation_query(query_text) - - result = QueryExecutionResult( - query_number=query_num, - query_text=query_text, - status=status, - execution_time_ms=exec_time, - result_data=result_df, - error_message=error_msg, - row_count=len(result_df) if result_df is not None else 0 - ) - - results.append(result) - - logger.debug(f" └─ {status.value} ({exec_time}ms, " - f"{result.row_count} rows)") - - return results - - def _determine_overall_status( - self, - baseline_results: List[QueryExecutionResult], - target_results: List[QueryExecutionResult] - ) -> Status: - """Determine overall status for table investigation.""" - - all_results = baseline_results + target_results - - if not all_results: - return Status.SKIP - - success_count = sum(1 for r in all_results if r.status == Status.PASS) - failed_count = sum(1 for r in all_results if r.status == Status.FAIL) - skipped_count = sum(1 for r in all_results if r.status == Status.SKIP) - - # All successful - if success_count == len(all_results): - return Status.PASS - - # All failed - if failed_count == len(all_results): - return Status.FAIL - - # All skipped - if skipped_count == len(all_results): - return Status.SKIP - - # Mixed results - use WARNING to indicate partial success - if success_count > 0: - return Status.WARNING - else: - return Status.FAIL - - def _is_partial_status(self, result: TableInvestigationResult) -> bool: - """Check if result represents partial success.""" - all_results = result.baseline_results + result.target_results - if not all_results: - return False - - success_count = sum(1 for r in all_results if r.status == Status.PASS) - return 0 < success_count < len(all_results) - - def _get_status_symbol(self, status: Status) -> str: - """Get symbol for status.""" - symbols = { - Status.PASS: "✓", - Status.FAIL: "✗", - Status.WARNING: "◐", - Status.SKIP: "○", - Status.ERROR: "🔴", - Status.INFO: "ℹ" - } - return symbols.get(status, "?") - - def _log_summary(self, summary: InvestigationSummary) -> None: - """Log investigation summary.""" - logger.info("=" * 60) - logger.info("INVESTIGATION SUMMARY") - logger.info("=" * 60) - logger.info(f" Tables Processed: {summary.tables_processed}") - logger.info(f" Successful: {summary.tables_successful}") - logger.info(f" Partial: {summary.tables_partial}") - logger.info(f" Failed: {summary.tables_failed}") - logger.info(f" Total Queries: {summary.total_queries_executed}") - logger.info("=" * 60) - logger.info(f"Duration: {summary.duration_seconds} seconds") - logger.info(f"Success Rate: {summary.success_rate:.1f}%") - logger.info("=" * 60) \ No newline at end of file diff --git a/src/drt/services/sql_parser.py b/src/drt/services/sql_parser.py index e638de2..542d54a 100644 --- a/src/drt/services/sql_parser.py +++ b/src/drt/services/sql_parser.py @@ -1,4 +1,4 @@ -"""SQL file parser for investigation queries.""" +"""SQL file parser.""" import re from pathlib import Path @@ -9,7 +9,7 @@ logger = get_logger(__name__) class SQLParser: - """Parser for investigation SQL files.""" + """Parser for SQL files.""" @staticmethod def parse_sql_file(file_path: Path) -> List[Tuple[int, str]]: @@ -23,7 +23,7 @@ class SQLParser: List of tuples (query_number, query_text) Example: - >>> queries = SQLParser.parse_sql_file(Path("investigate.sql")) + >>> queries = SQLParser.parse_sql_file(Path("analysis.sql")) >>> for num, query in queries: ... print(f"Query {num}: {query[:50]}...") """ @@ -133,41 +133,4 @@ class SQLParser: return len(cleaned) > 0 -def discover_sql_files(analysis_dir: Path) -> List[Tuple[str, str, Path]]: - """ - Discover all *_investigate.sql files in analysis directory. - - Args: - analysis_dir: Root analysis directory - - Returns: - List of tuples (schema, table, file_path) - - Example: - >>> files = discover_sql_files(Path("analysis/output_20251209_184032")) - >>> for schema, table, path in files: - ... print(f"{schema}.{table}: {path}") - """ - sql_files = [] - - # Pattern: dbo.TableName/dbo.TableName_investigate.sql - pattern = "**/*_investigate.sql" - - for sql_file in analysis_dir.glob(pattern): - # Extract schema and table from filename - # Example: dbo.A_COREC_NACES2008_investigate.sql - filename = sql_file.stem # Remove .sql - - if filename.endswith('_investigate'): - # Remove _investigate suffix - full_name = filename[:-12] # len('_investigate') = 12 - - # Split schema.table - if '.' in full_name: - schema, table = full_name.split('.', 1) - sql_files.append((schema, table, sql_file)) - else: - logger.warning(f"Could not parse schema.table from {filename}") - - logger.info(f"Discovered {len(sql_files)} investigation SQL files") - return sql_files \ No newline at end of file +