Remove investigation feature completely

- Delete investigation CLI command and related services
- Remove investigation data models and report generators
- Clean up configuration options and documentation
- Update gitignore and remove stale egg-info

Investigation feature is no longer needed and has been fully removed.
Backup preserved in git tag 'pre-investigation-removal'.
This commit is contained in:
DevOps Team
2026-02-11 20:28:50 +07:00
parent 2966711ca6
commit f5b190c91d
14 changed files with 18 additions and 1062 deletions

2
.gitignore vendored
View File

@@ -18,7 +18,7 @@ logs/
# Reports and analysis output
reports/
investigation_reports/
analysis/
# IDE and editor files

View File

@@ -54,8 +54,6 @@ drt validate --config config.yaml
# 4. Run comparison
drt compare --config config.yaml
# 5. (Optional) Investigate regression issues
drt investigate --analysis-dir analysis/output_<TIMESTAMP>/ --config config.yaml
```
## 📦 Platform-Specific Installation
@@ -134,34 +132,6 @@ drt compare --config <CONFIG_FILE> [OPTIONS]
- `--verbose, -v` - Enable verbose output
- `--dry-run` - Show what would be compared without executing
### Investigate
Execute diagnostic queries from regression analysis.
```bash
drt investigate --analysis-dir <ANALYSIS_DIR> --config <CONFIG_FILE> [OPTIONS]
```
**Options:**
- `--analysis-dir, -a PATH` - Analysis output directory containing `*_investigate.sql` files (required)
- `--config, -c PATH` - Configuration file (required)
- `--output-dir, -o PATH` - Output directory for reports (default: ./investigation_reports)
- `--verbose, -v` - Enable verbose output
- `--dry-run` - Show what would be executed without running
**Example:**
```bash
drt investigate -a analysis/output_20251209_184032/ -c config.yaml
drt investigate -a analysis/output_20251209_184032/ -c config.yaml -o ./my_reports
```
**What it does:**
- Discovers all `*_investigate.sql` files in the analysis directory
- Parses SQL files (handles markdown, multiple queries per file)
- Executes queries on both baseline and target databases
- Handles errors gracefully (continues on failures)
- Generates HTML and CSV reports with side-by-side comparisons
## ⚙️ Configuration
### Database Connections
@@ -217,7 +187,7 @@ tables:
```yaml
reporting:
output_dir: "./reports"
investigation_dir: "./investigation_reports"
logging:
output_dir: "./logs"
@@ -249,7 +219,7 @@ Reports are saved to `./reports/` with timestamps.
- **HTML Report** - Interactive report with collapsible query results, side-by-side baseline vs target comparison
- **CSV Report** - Flattened structure with one row per query execution
Investigation reports are saved to `./investigation_reports/` with timestamps.
## 🔄 Exit Codes
@@ -324,14 +294,14 @@ grep -i "FAIL\|ERROR" logs/drt_*.log
```
src/drt/
├── cli/ # Command-line interface
│ └── commands/ # CLI commands (compare, discover, validate, investigate)
│ └── commands/ # CLI commands (compare, discover, validate)
├── config/ # Configuration management
├── database/ # Database connectivity (READ ONLY)
├── models/ # Data models
├── reporting/ # Report generators
├── services/ # Business logic
│ ├── checkers/ # Comparison checkers
│ ├── investigation.py # Investigation service
│ └── sql_parser.py # SQL file parser
└── utils/ # Utilities
```

View File

@@ -145,8 +145,7 @@ reporting:
# Output directory for reports (use relative path or set via environment variable)
output_dir: "./reports"
# Output directory for investigation reports (use relative path or set via environment variable)
investigation_dir: "./investigation_reports"
# Report formats to generate
formats:

View File

@@ -35,7 +35,7 @@ tables:
reporting:
output_dir: "./reports"
investigation_dir: "./investigation_reports"
formats:
html: true
csv: true

View File

@@ -72,7 +72,7 @@ tables:
reporting:
output_directory: "/home/user/reports"
investigation_directory: "/home/user/investigation_reports"
formats: ["html", "csv"]
filename_template: "test_regression_{timestamp}"

View File

@@ -1,5 +1,5 @@
"""CLI commands."""
from drt.cli.commands import discover, compare, validate, investigate
from drt.cli.commands import discover, compare, validate
__all__ = ["discover", "compare", "validate", "investigate"]
__all__ = ["discover", "compare", "validate"]

View File

@@ -1,177 +0,0 @@
"""Investigate command implementation."""
import click
import sys
from pathlib import Path
from drt.config.loader import load_config
from drt.services.investigation import InvestigationService
from drt.reporting.investigation_report import (
InvestigationHTMLReportGenerator,
InvestigationCSVReportGenerator
)
from drt.utils.logging import setup_logging, get_logger
from drt.utils.timestamps import get_timestamp
logger = get_logger(__name__)
@click.command()
@click.option('--analysis-dir', '-a', required=True, type=click.Path(exists=True),
help='Analysis output directory containing *_investigate.sql files')
@click.option('--config', '-c', required=True, type=click.Path(exists=True),
help='Configuration file path')
@click.option('--output-dir', '-o', default=None,
help='Output directory for reports (overrides config setting)')
@click.option('--verbose', '-v', is_flag=True, help='Enable verbose output')
@click.option('--dry-run', is_flag=True, help='Show what would be executed without running')
def investigate(analysis_dir, config, output_dir, verbose, dry_run):
"""
Execute investigation queries from regression analysis.
Processes all *_investigate.sql files in the analysis directory,
executes queries on both baseline and target databases, and
generates comprehensive reports.
Example:
drt investigate -a /home/user/analysis/output_20251209_184032/ -c config.yaml
"""
# Load config first to get log directory
from drt.config.loader import load_config
cfg = load_config(config)
# Setup logging using config
log_level = "DEBUG" if verbose else "INFO"
log_dir = cfg.logging.directory
setup_logging(log_level=log_level, log_dir=log_dir, log_to_file=not dry_run)
click.echo("=" * 60)
click.echo("Data Regression Testing Framework - Investigation")
click.echo("=" * 60)
click.echo()
try:
# Use output_dir from CLI if provided, otherwise use config
if output_dir is None:
output_dir = cfg.reporting.investigation_directory
click.echo(f"✓ Configuration loaded")
click.echo(f" Database pairs: {len(cfg.database_pairs)}")
click.echo()
# Convert paths
analysis_path = Path(analysis_dir)
output_path = Path(output_dir)
# Create output directory
output_path.mkdir(parents=True, exist_ok=True)
if dry_run:
click.echo("=" * 60)
click.echo("DRY RUN - Preview Only")
click.echo("=" * 60)
# Discover SQL files
from drt.services.sql_parser import discover_sql_files
sql_files = discover_sql_files(analysis_path)
click.echo(f"\nAnalysis Directory: {analysis_path}")
click.echo(f"Found {len(sql_files)} investigation SQL files")
if sql_files:
click.echo("\nTables with investigation queries:")
for schema, table, sql_path in sql_files[:10]: # Show first 10
click.echo(f"{schema}.{table}")
if len(sql_files) > 10:
click.echo(f" ... and {len(sql_files) - 10} more")
for pair in cfg.database_pairs:
if not pair.enabled:
continue
click.echo(f"\nDatabase Pair: {pair.name}")
click.echo(f" Baseline: {pair.baseline.server}.{pair.baseline.database}")
click.echo(f" Target: {pair.target.server}.{pair.target.database}")
click.echo(f"\nReports would be saved to: {output_path}")
click.echo("\n" + "=" * 60)
click.echo("Use without --dry-run to execute investigation")
click.echo("=" * 60)
sys.exit(0)
# Execute investigation for each database pair
all_summaries = []
for pair in cfg.database_pairs:
if not pair.enabled:
click.echo(f"Skipping disabled pair: {pair.name}")
continue
click.echo(f"Investigating: {pair.name}")
click.echo(f" Baseline: {pair.baseline.server}.{pair.baseline.database}")
click.echo(f" Target: {pair.target.server}.{pair.target.database}")
click.echo()
# Run investigation
investigation_service = InvestigationService(cfg)
summary = investigation_service.run_investigation(analysis_path, pair)
all_summaries.append(summary)
click.echo()
# Generate reports for all summaries
if all_summaries:
click.echo("=" * 60)
click.echo("Generating Reports")
click.echo("=" * 60)
for summary in all_summaries:
timestamp = get_timestamp()
# Generate HTML report
html_gen = InvestigationHTMLReportGenerator(cfg)
html_path = output_path / f"investigation_report_{timestamp}.html"
html_gen.generate(summary, html_path)
click.echo(f" ✓ HTML: {html_path}")
# Generate CSV report
csv_gen = InvestigationCSVReportGenerator(cfg)
csv_path = output_path / f"investigation_report_{timestamp}.csv"
csv_gen.generate(summary, csv_path)
click.echo(f" ✓ CSV: {csv_path}")
click.echo()
# Display final summary
click.echo("=" * 60)
click.echo("INVESTIGATION COMPLETE")
click.echo("=" * 60)
total_processed = sum(s.tables_processed for s in all_summaries)
total_successful = sum(s.tables_successful for s in all_summaries)
total_partial = sum(s.tables_partial for s in all_summaries)
total_failed = sum(s.tables_failed for s in all_summaries)
total_queries = sum(s.total_queries_executed for s in all_summaries)
click.echo(f" Tables Processed: {total_processed:3d}")
click.echo(f" Successful: {total_successful:3d}")
click.echo(f" Partial: {total_partial:3d}")
click.echo(f" Failed: {total_failed:3d}")
click.echo(f" Total Queries: {total_queries:3d}")
click.echo("=" * 60)
# Exit with appropriate code
if total_failed > 0:
click.echo("Status: COMPLETED WITH FAILURES ⚠️")
sys.exit(1)
elif total_partial > 0:
click.echo("Status: COMPLETED WITH PARTIAL RESULTS ◐")
sys.exit(0)
else:
click.echo("Status: SUCCESS ✓")
sys.exit(0)
except Exception as e:
logger.error(f"Investigation failed: {e}", exc_info=verbose)
click.echo(f"✗ Error: {e}", err=True)
sys.exit(2)

View File

@@ -3,7 +3,7 @@
import click
import sys
from drt import __version__
from drt.cli.commands import discover, compare, validate, investigate
from drt.cli.commands import discover, compare, validate
from drt.utils.logging import setup_logging
@@ -45,7 +45,7 @@ def version():
cli.add_command(discover.discover)
cli.add_command(compare.compare)
cli.add_command(validate.validate)
cli.add_command(investigate.investigate)
if __name__ == '__main__':

View File

@@ -115,7 +115,7 @@ class TableConfig(BaseModel):
class ReportingConfig(BaseModel):
"""Reporting configuration."""
output_directory: str = "./reports"
investigation_directory: str = "./investigation_reports"
formats: List[str] = Field(default_factory=lambda: ["html", "csv"])
filename_template: str = "regression_report_{timestamp}"
html: Dict[str, Any] = Field(default_factory=lambda: {

View File

@@ -190,78 +190,3 @@ class QueryExecutor:
return results
def execute_investigation_query(
self,
query: str,
timeout: Optional[int] = None
) -> Tuple[Status, Optional[pd.DataFrame], Optional[str], int]:
"""
Execute investigation query with comprehensive error handling.
This method is specifically for investigation queries and does NOT
enforce the SELECT-only restriction. It handles errors gracefully
and returns detailed status information.
Args:
query: SQL query to execute
timeout: Query timeout in seconds (optional)
Returns:
Tuple of (status, result_df, error_message, execution_time_ms)
"""
start_time = time.time()
try:
# Execute query
with self.conn_mgr.get_connection() as conn:
if timeout:
# Set query timeout if supported
try:
cursor = conn.cursor()
cursor.execute(f"SET QUERY_TIMEOUT {timeout}")
except Exception:
# Timeout setting not supported, continue anyway
pass
df = pd.read_sql(query, conn)
execution_time = int((time.time() - start_time) * 1000)
return (Status.PASS, df, None, execution_time)
except Exception as e:
execution_time = int((time.time() - start_time) * 1000)
error_msg = str(e)
error_type = type(e).__name__
# Categorize error
if any(phrase in error_msg.lower() for phrase in [
'does not exist',
'invalid object name',
'could not find',
'not found'
]):
status = Status.SKIP
message = f"Object not found: {error_msg}"
elif 'timeout' in error_msg.lower():
status = Status.FAIL
message = f"Query timeout: {error_msg}"
elif any(phrase in error_msg.lower() for phrase in [
'syntax error',
'incorrect syntax'
]):
status = Status.FAIL
message = f"Syntax error: {error_msg}"
elif 'permission' in error_msg.lower():
status = Status.FAIL
message = f"Permission denied: {error_msg}"
else:
status = Status.FAIL
message = f"{error_type}: {error_msg}"
logger.debug(f"Query execution failed: {message}")
return (status, None, message, execution_time)

View File

@@ -1,70 +0,0 @@
"""Data models for investigation feature."""
from dataclasses import dataclass, field
from typing import List, Optional
import pandas as pd
from drt.models.enums import Status
@dataclass
class QueryExecutionResult:
"""Result of executing a single query."""
query_number: int
query_text: str
status: Status
execution_time_ms: int
result_data: Optional[pd.DataFrame] = None
error_message: Optional[str] = None
row_count: int = 0
@dataclass
class TableInvestigationResult:
"""Results for all queries in a table's investigation."""
schema: str
table: str
sql_file_path: str
baseline_results: List[QueryExecutionResult]
target_results: List[QueryExecutionResult]
overall_status: Status
timestamp: str
@property
def full_name(self) -> str:
"""Get full table name."""
return f"{self.schema}.{self.table}"
@property
def total_queries(self) -> int:
"""Get total number of queries."""
return len(self.baseline_results)
@property
def successful_queries(self) -> int:
"""Get number of successful queries."""
all_results = self.baseline_results + self.target_results
return sum(1 for r in all_results if r.status == Status.PASS)
@dataclass
class InvestigationSummary:
"""Overall investigation execution summary."""
start_time: str
end_time: str
duration_seconds: int
analysis_directory: str
baseline_info: str
target_info: str
tables_processed: int
tables_successful: int
tables_partial: int
tables_failed: int
total_queries_executed: int
results: List[TableInvestigationResult] = field(default_factory=list)
@property
def success_rate(self) -> float:
"""Calculate success rate percentage."""
if self.tables_processed == 0:
return 0.0
return (self.tables_successful / self.tables_processed) * 100

View File

@@ -1,357 +0,0 @@
"""Investigation report generators for HTML and CSV formats."""
import csv
from pathlib import Path
from typing import Optional
from drt.models.investigation import InvestigationSummary, QueryExecutionResult
from drt.models.enums import Status
from drt.config.models import Config
from drt.utils.logging import get_logger
from drt.utils.timestamps import format_duration
logger = get_logger(__name__)
class InvestigationHTMLReportGenerator:
"""Generates HTML format investigation reports."""
def __init__(self, config: Config):
"""
Initialize HTML generator.
Args:
config: Configuration object
"""
self.config = config
self.max_rows = 100 # Limit rows displayed in HTML
def generate(self, summary: InvestigationSummary, filepath: Path) -> None:
"""
Generate HTML investigation report.
Args:
summary: Investigation summary
filepath: Output file path
"""
html_content = self._build_html(summary)
with open(filepath, "w", encoding="utf-8") as f:
f.write(html_content)
logger.debug(f"Investigation HTML report written to {filepath}")
def _build_html(self, summary: InvestigationSummary) -> str:
"""Build complete HTML document."""
return f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Investigation Report - {summary.start_time}</title>
{self._get_styles()}
{self._get_scripts()}
</head>
<body>
<div class="container">
{self._build_header(summary)}
{self._build_summary(summary)}
{self._build_table_results(summary)}
{self._build_footer(summary)}
</div>
</body>
</html>"""
def _get_styles(self) -> str:
"""Get embedded CSS styles."""
return """<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body { font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; background: #f5f5f5; padding: 20px; }
.container { max-width: 1600px; margin: 0 auto; background: white; padding: 30px; border-radius: 8px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); }
h1 { color: #333; border-bottom: 3px solid #007bff; padding-bottom: 10px; margin-bottom: 20px; }
h2 { color: #555; margin-top: 30px; margin-bottom: 15px; border-left: 4px solid #007bff; padding-left: 10px; }
h3 { color: #666; margin-top: 20px; margin-bottom: 10px; }
.header { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 20px; border-radius: 8px; margin-bottom: 30px; }
.header h1 { color: white; border: none; }
.info-grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); gap: 15px; margin: 20px 0; }
.info-box { background: #f8f9fa; padding: 15px; border-radius: 5px; border-left: 4px solid #007bff; }
.info-label { font-weight: bold; color: #666; font-size: 0.9em; }
.info-value { color: #333; font-size: 1.1em; margin-top: 5px; }
.summary-grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(150px, 1fr)); gap: 15px; margin: 20px 0; }
.summary-box { padding: 20px; border-radius: 8px; text-align: center; color: white; }
.summary-box.success { background: #28a745; }
.summary-box.partial { background: #ffc107; color: #333; }
.summary-box.failed { background: #dc3545; }
.summary-number { font-size: 2.5em; font-weight: bold; }
.summary-label { font-size: 0.9em; margin-top: 5px; }
.table-card { background: #fff; border: 1px solid #dee2e6; border-radius: 8px; margin: 20px 0; overflow: hidden; }
.table-header { background: #f8f9fa; padding: 15px; border-bottom: 2px solid #dee2e6; cursor: pointer; }
.table-header:hover { background: #e9ecef; }
.table-name { font-size: 1.2em; font-weight: bold; color: #333; }
.table-status { display: inline-block; padding: 4px 12px; border-radius: 12px; font-size: 0.85em; font-weight: 600; margin-left: 10px; }
.status-SUCCESS { background: #d4edda; color: #155724; }
.status-PASS { background: #d4edda; color: #155724; }
.status-FAIL { background: #f8d7da; color: #721c24; }
.status-WARNING { background: #fff3cd; color: #856404; }
.status-SKIP { background: #e2e3e5; color: #383d41; }
.table-content { padding: 20px; display: none; }
.table-content.active { display: block; }
.query-section { margin: 20px 0; padding: 15px; background: #f8f9fa; border-radius: 5px; }
.query-header { font-weight: bold; margin-bottom: 10px; color: #555; }
.comparison-grid { display: grid; grid-template-columns: 1fr 1fr; gap: 20px; margin: 15px 0; }
.env-section { background: white; padding: 15px; border-radius: 5px; border: 1px solid #dee2e6; }
.env-title { font-weight: bold; color: #007bff; margin-bottom: 10px; }
.query-code { background: #2d2d2d; color: #f8f8f2; padding: 15px; border-radius: 5px; overflow-x: auto; font-family: 'Courier New', monospace; font-size: 0.9em; margin: 10px 0; }
.result-table { width: 100%; border-collapse: collapse; margin: 10px 0; font-size: 0.9em; }
.result-table th { background: #007bff; color: white; padding: 8px; text-align: left; }
.result-table td { padding: 8px; border-bottom: 1px solid #dee2e6; }
.result-table tr:hover { background: #f8f9fa; }
.error-box { background: #fff5f5; border: 1px solid #feb2b2; border-radius: 5px; padding: 15px; margin: 10px 0; color: #c53030; }
.result-meta { display: flex; gap: 20px; margin: 10px 0; font-size: 0.9em; color: #666; }
.footer { margin-top: 40px; padding-top: 20px; border-top: 1px solid #dee2e6; text-align: center; color: #666; font-size: 0.9em; }
.toggle-icon { float: right; transition: transform 0.3s; }
.toggle-icon.active { transform: rotate(180deg); }
</style>"""
def _get_scripts(self) -> str:
"""Get embedded JavaScript."""
return """<script>
function toggleTable(id) {
const content = document.getElementById('content-' + id);
const icon = document.getElementById('icon-' + id);
content.classList.toggle('active');
icon.classList.toggle('active');
}
</script>"""
def _build_header(self, summary: InvestigationSummary) -> str:
"""Build report header."""
return f"""<div class="header">
<h1>🔍 Investigation Report</h1>
<p>Analysis Directory: {summary.analysis_directory}</p>
</div>
<div class="info-grid">
<div class="info-box">
<div class="info-label">Start Time</div>
<div class="info-value">{summary.start_time}</div>
</div>
<div class="info-box">
<div class="info-label">End Time</div>
<div class="info-value">{summary.end_time}</div>
</div>
<div class="info-box">
<div class="info-label">Duration</div>
<div class="info-value">{format_duration(summary.duration_seconds)}</div>
</div>
<div class="info-box">
<div class="info-label">Baseline</div>
<div class="info-value">{summary.baseline_info}</div>
</div>
<div class="info-box">
<div class="info-label">Target</div>
<div class="info-value">{summary.target_info}</div>
</div>
<div class="info-box">
<div class="info-label">Total Queries</div>
<div class="info-value">{summary.total_queries_executed}</div>
</div>
</div>"""
def _build_summary(self, summary: InvestigationSummary) -> str:
"""Build summary section."""
return f"""<h2>Summary</h2>
<div class="summary-grid">
<div class="summary-box success">
<div class="summary-number">{summary.tables_successful}</div>
<div class="summary-label">Successful</div>
</div>
<div class="summary-box partial">
<div class="summary-number">{summary.tables_partial}</div>
<div class="summary-label">Partial</div>
</div>
<div class="summary-box failed">
<div class="summary-number">{summary.tables_failed}</div>
<div class="summary-label">Failed</div>
</div>
</div>"""
def _build_table_results(self, summary: InvestigationSummary) -> str:
"""Build table-by-table results."""
html = '<h2>Investigation Results</h2>'
for idx, table_result in enumerate(summary.results):
html += f"""<div class="table-card">
<div class="table-header" onclick="toggleTable({idx})">
<span class="table-name">{table_result.full_name}</span>
<span class="table-status status-{table_result.overall_status.value}">{table_result.overall_status.value}</span>
<span class="toggle-icon" id="icon-{idx}">▼</span>
</div>
<div class="table-content" id="content-{idx}">
<p><strong>SQL File:</strong> {table_result.sql_file_path}</p>
<p><strong>Total Queries:</strong> {table_result.total_queries}</p>
<p><strong>Successful Queries:</strong> {table_result.successful_queries}</p>
{self._build_queries(table_result)}
</div>
</div>"""
return html
def _build_queries(self, table_result) -> str:
"""Build query results for a table."""
html = ""
for i, (baseline_result, target_result) in enumerate(zip(
table_result.baseline_results,
table_result.target_results
), 1):
html += f"""<div class="query-section">
<div class="query-header">Query {baseline_result.query_number}</div>
<details>
<summary>View SQL</summary>
<div class="query-code">{self._escape_html(baseline_result.query_text)}</div>
</details>
<div class="comparison-grid">
{self._build_query_result(baseline_result, "Baseline")}
{self._build_query_result(target_result, "Target")}
</div>
</div>"""
return html
def _build_query_result(self, result: QueryExecutionResult, env: str) -> str:
"""Build single query result."""
html = f"""<div class="env-section">
<div class="env-title">{env}</div>
<span class="table-status status-{result.status.value}">{result.status.value}</span>
<div class="result-meta">
<span>⏱️ {result.execution_time_ms}ms</span>
<span>📊 {result.row_count} rows</span>
</div>"""
if result.error_message:
html += f'<div class="error-box">❌ {self._escape_html(result.error_message)}</div>'
elif result.result_data is not None and not result.result_data.empty:
html += self._build_result_table(result)
html += '</div>'
return html
def _build_result_table(self, result: QueryExecutionResult) -> str:
"""Build HTML table from DataFrame."""
df = result.result_data
if df is None or df.empty:
return '<p>No data returned</p>'
# Limit rows
display_df = df.head(self.max_rows)
html = '<table class="result-table"><thead><tr>'
for col in display_df.columns:
html += f'<th>{self._escape_html(str(col))}</th>'
html += '</tr></thead><tbody>'
for _, row in display_df.iterrows():
html += '<tr>'
for val in row:
html += f'<td>{self._escape_html(str(val))}</td>'
html += '</tr>'
html += '</tbody></table>'
if len(df) > self.max_rows:
html += f'<p><em>Showing first {self.max_rows} of {len(df)} rows</em></p>'
return html
def _escape_html(self, text: str) -> str:
"""Escape HTML special characters."""
return (text
.replace('&', '&amp;')
.replace('<', '&lt;')
.replace('>', '&gt;')
.replace('"', '&quot;')
.replace("'", '&#39;'))
def _build_footer(self, summary: InvestigationSummary) -> str:
"""Build report footer."""
return f"""<div class="footer">
<p>Generated by Data Regression Testing Framework - Investigation Module</p>
<p>Success Rate: {summary.success_rate:.1f}%</p>
</div>"""
class InvestigationCSVReportGenerator:
"""Generates CSV format investigation reports."""
def __init__(self, config: Config):
"""
Initialize CSV generator.
Args:
config: Configuration object
"""
self.config = config
def generate(self, summary: InvestigationSummary, filepath: Path) -> None:
"""
Generate CSV investigation report.
Args:
summary: Investigation summary
filepath: Output file path
"""
csv_config = self.config.reporting.csv
delimiter = csv_config.get("delimiter", ",")
encoding = csv_config.get("encoding", "utf-8-sig")
with open(filepath, "w", newline="", encoding=encoding) as f:
writer = csv.writer(f, delimiter=delimiter)
# Write header
writer.writerow([
"Timestamp",
"Schema",
"Table",
"Query_Number",
"Environment",
"Status",
"Row_Count",
"Execution_Time_Ms",
"Error_Message",
"SQL_File_Path"
])
# Write data rows
for table_result in summary.results:
# Baseline results
for query_result in table_result.baseline_results:
writer.writerow([
table_result.timestamp,
table_result.schema,
table_result.table,
query_result.query_number,
"baseline",
query_result.status.value,
query_result.row_count,
query_result.execution_time_ms,
query_result.error_message or "",
table_result.sql_file_path
])
# Target results
for query_result in table_result.target_results:
writer.writerow([
table_result.timestamp,
table_result.schema,
table_result.table,
query_result.query_number,
"target",
query_result.status.value,
query_result.row_count,
query_result.execution_time_ms,
query_result.error_message or "",
table_result.sql_file_path
])
logger.debug(f"Investigation CSV report written to {filepath}")

View File

@@ -1,297 +0,0 @@
"""Investigation service for executing investigation queries."""
import time
from pathlib import Path
from typing import List, Tuple
from drt.database.connection import ConnectionManager
from drt.database.executor import QueryExecutor
from drt.config.models import Config, DatabasePairConfig
from drt.models.investigation import (
QueryExecutionResult,
TableInvestigationResult,
InvestigationSummary
)
from drt.models.enums import Status
from drt.services.sql_parser import SQLParser, discover_sql_files
from drt.utils.logging import get_logger
from drt.utils.timestamps import get_timestamp
logger = get_logger(__name__)
class InvestigationService:
"""Service for executing investigation queries."""
def __init__(self, config: Config):
"""
Initialize investigation service.
Args:
config: Configuration object
"""
self.config = config
self.parser = SQLParser()
def run_investigation(
self,
analysis_dir: Path,
db_pair: DatabasePairConfig
) -> InvestigationSummary:
"""
Run investigation for all SQL files in analysis directory.
Args:
analysis_dir: Path to analysis output directory
db_pair: Database pair configuration
Returns:
Investigation summary with all results
"""
start_time = get_timestamp()
start_ts = time.time()
logger.info("=" * 60)
logger.info(f"Starting investigation: {analysis_dir.name}")
logger.info("=" * 60)
# Initialize connections
baseline_mgr = ConnectionManager(db_pair.baseline)
target_mgr = ConnectionManager(db_pair.target)
try:
# Connect to databases
baseline_mgr.connect()
target_mgr.connect()
# Create executors
baseline_executor = QueryExecutor(baseline_mgr)
target_executor = QueryExecutor(target_mgr)
# Discover SQL files
sql_files = discover_sql_files(analysis_dir)
logger.info(f"Found {len(sql_files)} investigation files")
# Create summary
summary = InvestigationSummary(
start_time=start_time,
end_time="",
duration_seconds=0,
analysis_directory=str(analysis_dir),
baseline_info=f"{db_pair.baseline.server}.{db_pair.baseline.database}",
target_info=f"{db_pair.target.server}.{db_pair.target.database}",
tables_processed=0,
tables_successful=0,
tables_partial=0,
tables_failed=0,
total_queries_executed=0,
results=[]
)
# Process each SQL file
for idx, (schema, table, sql_path) in enumerate(sql_files, 1):
logger.info(f"[{idx:3d}/{len(sql_files)}] {schema}.{table:40s} ...")
result = self._investigate_table(
schema,
table,
sql_path,
baseline_executor,
target_executor
)
summary.results.append(result)
summary.tables_processed += 1
# Update counters
if result.overall_status == Status.PASS:
summary.tables_successful += 1
elif result.overall_status == Status.SKIP:
# Don't count skipped tables in partial/failed
pass
elif result.overall_status in [Status.WARNING, Status.INFO]:
# Treat WARNING/INFO as partial success
summary.tables_partial += 1
elif self._is_partial_status(result):
summary.tables_partial += 1
else:
summary.tables_failed += 1
# Count queries
summary.total_queries_executed += len(result.baseline_results)
summary.total_queries_executed += len(result.target_results)
logger.info(f" {self._get_status_symbol(result.overall_status)} "
f"{result.overall_status.value}")
# Finalize summary
end_time = get_timestamp()
duration = int(time.time() - start_ts)
summary.end_time = end_time
summary.duration_seconds = duration
self._log_summary(summary)
return summary
finally:
baseline_mgr.disconnect()
target_mgr.disconnect()
def _investigate_table(
self,
schema: str,
table: str,
sql_path: Path,
baseline_executor: QueryExecutor,
target_executor: QueryExecutor
) -> TableInvestigationResult:
"""Execute investigation queries for a single table."""
# Parse SQL file
queries = self.parser.parse_sql_file(sql_path)
if not queries:
logger.warning(f"No valid queries found in {sql_path.name}")
return TableInvestigationResult(
schema=schema,
table=table,
sql_file_path=str(sql_path),
baseline_results=[],
target_results=[],
overall_status=Status.SKIP,
timestamp=get_timestamp()
)
logger.debug(f" └─ Executing {len(queries)} queries")
# Execute on baseline
baseline_results = self._execute_queries(
queries,
baseline_executor,
"baseline"
)
# Execute on target
target_results = self._execute_queries(
queries,
target_executor,
"target"
)
# Determine overall status
overall_status = self._determine_overall_status(
baseline_results,
target_results
)
return TableInvestigationResult(
schema=schema,
table=table,
sql_file_path=str(sql_path),
baseline_results=baseline_results,
target_results=target_results,
overall_status=overall_status,
timestamp=get_timestamp()
)
def _execute_queries(
self,
queries: List[Tuple[int, str]],
executor: QueryExecutor,
environment: str
) -> List[QueryExecutionResult]:
"""Execute list of queries on one environment."""
results = []
for query_num, query_text in queries:
logger.debug(f" └─ Query {query_num} on {environment}")
status, result_df, error_msg, exec_time = \
executor.execute_investigation_query(query_text)
result = QueryExecutionResult(
query_number=query_num,
query_text=query_text,
status=status,
execution_time_ms=exec_time,
result_data=result_df,
error_message=error_msg,
row_count=len(result_df) if result_df is not None else 0
)
results.append(result)
logger.debug(f" └─ {status.value} ({exec_time}ms, "
f"{result.row_count} rows)")
return results
def _determine_overall_status(
self,
baseline_results: List[QueryExecutionResult],
target_results: List[QueryExecutionResult]
) -> Status:
"""Determine overall status for table investigation."""
all_results = baseline_results + target_results
if not all_results:
return Status.SKIP
success_count = sum(1 for r in all_results if r.status == Status.PASS)
failed_count = sum(1 for r in all_results if r.status == Status.FAIL)
skipped_count = sum(1 for r in all_results if r.status == Status.SKIP)
# All successful
if success_count == len(all_results):
return Status.PASS
# All failed
if failed_count == len(all_results):
return Status.FAIL
# All skipped
if skipped_count == len(all_results):
return Status.SKIP
# Mixed results - use WARNING to indicate partial success
if success_count > 0:
return Status.WARNING
else:
return Status.FAIL
def _is_partial_status(self, result: TableInvestigationResult) -> bool:
"""Check if result represents partial success."""
all_results = result.baseline_results + result.target_results
if not all_results:
return False
success_count = sum(1 for r in all_results if r.status == Status.PASS)
return 0 < success_count < len(all_results)
def _get_status_symbol(self, status: Status) -> str:
"""Get symbol for status."""
symbols = {
Status.PASS: "",
Status.FAIL: "",
Status.WARNING: "",
Status.SKIP: "",
Status.ERROR: "🔴",
Status.INFO: ""
}
return symbols.get(status, "?")
def _log_summary(self, summary: InvestigationSummary) -> None:
"""Log investigation summary."""
logger.info("=" * 60)
logger.info("INVESTIGATION SUMMARY")
logger.info("=" * 60)
logger.info(f" Tables Processed: {summary.tables_processed}")
logger.info(f" Successful: {summary.tables_successful}")
logger.info(f" Partial: {summary.tables_partial}")
logger.info(f" Failed: {summary.tables_failed}")
logger.info(f" Total Queries: {summary.total_queries_executed}")
logger.info("=" * 60)
logger.info(f"Duration: {summary.duration_seconds} seconds")
logger.info(f"Success Rate: {summary.success_rate:.1f}%")
logger.info("=" * 60)

View File

@@ -1,4 +1,4 @@
"""SQL file parser for investigation queries."""
"""SQL file parser."""
import re
from pathlib import Path
@@ -9,7 +9,7 @@ logger = get_logger(__name__)
class SQLParser:
"""Parser for investigation SQL files."""
"""Parser for SQL files."""
@staticmethod
def parse_sql_file(file_path: Path) -> List[Tuple[int, str]]:
@@ -23,7 +23,7 @@ class SQLParser:
List of tuples (query_number, query_text)
Example:
>>> queries = SQLParser.parse_sql_file(Path("investigate.sql"))
>>> queries = SQLParser.parse_sql_file(Path("analysis.sql"))
>>> for num, query in queries:
... print(f"Query {num}: {query[:50]}...")
"""
@@ -133,41 +133,4 @@ class SQLParser:
return len(cleaned) > 0
def discover_sql_files(analysis_dir: Path) -> List[Tuple[str, str, Path]]:
"""
Discover all *_investigate.sql files in analysis directory.
Args:
analysis_dir: Root analysis directory
Returns:
List of tuples (schema, table, file_path)
Example:
>>> files = discover_sql_files(Path("analysis/output_20251209_184032"))
>>> for schema, table, path in files:
... print(f"{schema}.{table}: {path}")
"""
sql_files = []
# Pattern: dbo.TableName/dbo.TableName_investigate.sql
pattern = "**/*_investigate.sql"
for sql_file in analysis_dir.glob(pattern):
# Extract schema and table from filename
# Example: dbo.A_COREC_NACES2008_investigate.sql
filename = sql_file.stem # Remove .sql
if filename.endswith('_investigate'):
# Remove _investigate suffix
full_name = filename[:-12] # len('_investigate') = 12
# Split schema.table
if '.' in full_name:
schema, table = full_name.split('.', 1)
sql_files.append((schema, table, sql_file))
else:
logger.warning(f"Could not parse schema.table from {filename}")
logger.info(f"Discovered {len(sql_files)} investigation SQL files")
return sql_files