Runtime#

Orchestrates the two-thread pipeline (downloader + processor), manages the scan queue, and handles graceful shutdown.

SQLite-based file processing state tracker.

Tracks radar files through pipeline stages (downloaded, regridded, analyzed, plotted). Enables idempotent processing with stop/restart, progress tracking, and failure recovery.

class adapt.runtime.file_tracker.FileProcessingTracker(db_path)#

Bases: object

Tracks file processing state and progress through pipeline stages.

Records the processing lifecycle of each NEXRAD file as it moves through the pipeline: download → regridding → analysis → visualization. Enables resumable processing (stop and restart without reprocessing completed files).

Pipeline Stages:

  1. Downloaded: Level-II file exists on disk (from AWS)

  2. Regridded: NetCDF file created with Cartesian grid

  3. Analyzed: Cell statistics extracted to SQLite database

  4. Plotted: Visualization PNG generated

Database Schema:

SQLite table radar_file_processing:

  • file_id: Unique filename (e.g., KDIX20250305_000310_V06)

  • radar: Radar identifier (e.g., KDIX)

  • scan_time: Scan timestamp (ISO format)

  • Status: pending, processing, completed, failed

  • Timestamps: When each stage completed (ISO format)

  • File paths: nexrad_path, gridnc_path, analysis_path, plot_path

  • Metadata: file_size_mb, num_cells, error_message

Resumability:

If pipeline stops/crashes after marking a stage complete, that stage is skipped on restart. Use reset_failed() to retry files marked as failed. Use cleanup_deleted_files() to reprocess files deleted from disk.

Thread Safety:

All methods are thread-safe via internal locking. Multiple threads can check status concurrently.

Typical Usage:

Called internally by orchestrator and processor. Advanced users can query for progress or manually reset failed files:

tracker = FileProcessingTracker(db_path)

# Check if file needs processing
if tracker.should_process(file_id, "analyzed"):
    # Process file
    ...
    tracker.mark_stage_complete(file_id, "analyzed", num_cells=42)

# Get stats
stats = tracker.get_statistics()
print(f"Processed {stats['completed']} files")

tracker.close()
Parameters:

db_path (Path | str)

cleanup_deleted_files(radar=None)#

Remove database records for files deleted from disk.

Useful after clearing output directories. On next run, these files will be re-downloaded and reprocessed.

Parameters:

radar (str | None) – Filter to specific radar. If None, cleans all radars.

Notes

Only checks NEXRAD Level-II paths. Does not verify gridded/analysis NetCDF or PNG files (those are considered intermediate).

close()#

Close database connection.

Called automatically by orchestrator.stop(). Safe to call multiple times.

get_file_status(file_id)#

Get complete processing status for a file.

Parameters:

file_id (str) – File identifier

Returns:

Dict with file metadata and stage completion status: - file_id, radar, scan_time - status: ‘pending’, ‘processing’, ‘completed’, ‘failed’ - error_message: Error details if failed - file_size_mb: Original NEXRAD file size - num_cells: Number of cells if analyzed - Timestamps: downloaded_at, regridded_at, analyzed_at, plotted_at - File paths: nexrad_path, gridnc_path, analysis_path, plot_path

None if file_id not found in database.

Return type:

dict | None

get_pending_files(stage=None, radar=None, limit=None)#

Get files awaiting processing at a specific stage.

Used by downloader/processor/plotter to find files needing work.

Parameters:
  • stage (str | None) –

    Filter by processing stage: - ‘regridded’: files downloaded but not regridded - ‘analyzed’: files regridded but not analyzed - ‘plotted’: files analyzed but not plotted

    If None, returns files with any pending stage.

  • radar (str | None) – Filter by radar ID (e.g., “KDIX”).

  • limit (int | None) – Max files to return (for batching). Default: None (all pending).

Returns:

List of file records matching criteria, ordered by scan_time (oldest first).

Return type:

list[dict]

get_statistics(radar=None)#

Get summary statistics for processing progress.

Parameters:

radar (str | None) – Filter to specific radar. If None, returns all radars.

Returns:

Summary statistics: - total: Total files registered - completed: Files fully processed (through plotting) - pending: Files awaiting any stage - failed: Files with errors - total_cells: Sum of cells across all analyzed files - radar: Filtered radar (if specified)

Return type:

dict

Notes

Used by orchestrator to log progress every 30 seconds.

mark_stage_complete(file_id, stage, path=None, num_cells=None, error=None, timings=None)#

Mark a pipeline stage as complete or failed for a file.

Called by downloader, processor, and plotter threads to record progress. Enables resumable processing: next run skips stages marked complete.

Parameters:
  • file_id (str) – File identifier (must be pre-registered via register_file).

  • stage (str) – Pipeline stage: ‘downloaded’, ‘regridded’, ‘analyzed’, ‘plotted’.

  • path (Path | None) – Path to output file created by this stage (for logging/debugging). - ‘downloaded’: NEXRAD Level-II path - ‘regridded’: Gridded NetCDF path - ‘analyzed’: Analysis NetCDF path - ‘plotted’: PNG plot path

  • num_cells (int | None) – Number of cells detected (for ‘analyzed’ stage).

  • error (str | None) – Error message if stage failed. If provided, status set to ‘failed’ and future runs will retry this stage.

  • timings (dict[str, float] | None)

Raises:

ValueError – If stage is not one of the valid pipeline stages.

Notes

Thread-safe. If called multiple times for the same stage, uses the most recent timestamp. Error stages can be reset with reset_failed().

register_file(file_id, radar, scan_time, nexrad_path=None)#

Register a new file for tracking.

Creates an initial database record for a newly discovered NEXRAD file. Called by downloader when a new file is discovered.

Parameters:
  • file_id (str) – Unique file identifier (e.g., KDIX20250305_000310_V06). Typically the Level-II filename without extension.

  • radar (str) – Radar identifier (e.g., KDIX, KHTX).

  • scan_time (datetime) – Scan timestamp (typically UTC). Used to filter historical ranges.

  • nexrad_path (Path | None) – Path to original NEXRAD Level-II file on disk. Used to compute file size for logging.

Returns:

True if file newly registered, False if already in database. Returning False does not indicate an error (file might be mid-processing or already complete).

Return type:

bool

Notes

Safe to call multiple times with same file_id (returns False on duplicates).

reset_failed(radar=None)#

Reset all failed files to pending for retry.

Useful for recovery after fixing errors (e.g., config changes, bug fixes). Clears error_message and marks status=’pending’ so files reprocess.

Parameters:

radar (str | None) – Filter to specific radar. If None, resets all radars.

Notes

Does not delete output files. On next run, stages will be skipped based on existing output files, not status. Use cleanup_deleted_files() if files were deleted and should be fully reprocessed.

should_process(file_id, stage)#

Check if a file needs processing at a given stage.

Used by processor/plotter threads to determine if a stage should be skipped.

Parameters:
  • file_id (str) – File identifier

  • stage (str) – Stage to check: ‘regridded’, ‘analyzed’, ‘plotted’

Returns:

True if file should be processed (stage incomplete), False if already done.

Return type:

bool