Skip to content

Core Module

The core module provides the foundational classes for cascade orchestration, including the execution engine, configuration schema, and execution context management.

CascadeEngine

Main orchestration engine for cascade execution.

The engine manages stage registration, execution planning, and dynamic routing based on configuration and runtime conditions.

Uses slots for memory efficiency.

Attributes:

Name Type Description
config

Cascade configuration.

_handlers Dict[str, Callable]

Registered stage handlers.

_result_cache Dict[str, Tuple[Any, float]]

Cache for stage results.

_plan_cache Dict[str, List[str]]

Cache for execution plans.

_statistics

Execution statistics.

__init__(config: CascadeConfig)

Initialize cascade engine.

Parameters:

Name Type Description Default
config CascadeConfig

Cascade configuration.

required

register_stage(name: str, handler: Callable) -> None

Register a stage handler.

Wraps handler with monitoring and error handling.

Parameters:

Name Type Description Default
name str

Stage name matching configuration.

required
handler Callable

Async callable that processes stage execution.

required

Raises:

Type Description
ValueError

If stage name not in configuration.

execute(data: Dict[str, Any]) -> Dict[str, Any] async

Execute cascade with input data.

Main entry point for cascade execution. Creates execution context, generates or retrieves cached execution plan, and executes stages with dynamic routing.

Parameters:

Name Type Description Default
data Dict[str, Any]

Input data dictionary.

required

Returns:

Type Description
Dict[str, Any]

Execution result dictionary.

Raises:

Type Description
TimeoutError

If global timeout exceeded.

RuntimeError

If execution fails critically.

get_statistics() -> Dict[str, Any]

Get execution statistics.

Returns:

Type Description
Dict[str, Any]

Dictionary of statistics per stage.

clear_cache() -> None

Clear all caches (result and plan caches).

update_config(config: CascadeConfig) -> None

Hot-reload configuration.

Updates engine configuration and recompiles routing rules without losing registered handlers or statistics.

Parameters:

Name Type Description Default
config CascadeConfig

New cascade configuration.

required

CascadeConfig

Complete cascade orchestration configuration.

Attributes:

Name Type Description
name str

Cascade configuration name.

version str

Configuration version.

stages Dict[str, StageConfig]

Dictionary of stage configurations keyed by name.

execution_order List[str]

Initial execution order (stage names).

global_timeout_ms int

Global execution timeout in milliseconds.

max_parallel_stages int

Maximum stages to execute in parallel.

global_termination_conditions List[Condition]

Conditions that terminate entire cascade.

enable_caching bool

Whether to enable result caching.

cache_key_fields List[str]

Fields to include in cache key.

domain_config Dict[str, Any]

Domain-specific configuration.

from_dict(data: Dict[str, Any]) -> CascadeConfig classmethod

Create cascade config from dictionary.

from_file(path: Union[str, Path]) -> CascadeConfig classmethod

Load cascade config from JSON or YAML file.

Parameters:

Name Type Description Default
path Union[str, Path]

Path to configuration file (.json or .yaml/.yml).

required

Returns:

Type Description
CascadeConfig

Loaded cascade configuration.

Raises:

Type Description
ValueError

If file format is unsupported.

ImportError

If YAML file provided but PyYAML not installed.

to_dict() -> Dict[str, Any]

Convert cascade config to dictionary.

to_json(indent: int = 2) -> str

Convert cascade config to JSON string.

Parameters:

Name Type Description Default
indent int

JSON indentation level.

2

Returns:

Type Description
str

JSON representation of config.

to_yaml() -> str

Convert cascade config to YAML string.

Returns:

Type Description
str

YAML representation of config.

Raises:

Type Description
ImportError

If PyYAML is not installed.

StageConfig

Configuration for a single execution stage.

Attributes:

Name Type Description
name str

Unique stage identifier.

enabled bool

Whether stage is initially enabled.

handler_type Optional[str]

Type of handler (determines handler resolution).

timeout_ms int

Execution timeout in milliseconds.

max_retries int

Maximum retry attempts on failure.

retry_delay_ms int

Delay between retries in milliseconds.

can_run_parallel bool

Whether stage can execute in parallel with others.

parallel_group Optional[str]

Group identifier for parallel execution.

depends_on List[str]

List of stage names this stage depends on.

routing_rules List[RoutingRule]

Stage-specific routing rules.

cache_enabled bool

Whether to cache stage results.

cache_ttl_seconds int

Cache TTL in seconds.

custom_properties Dict[str, Any]

Domain-specific configuration properties.

from_dict(data: Dict[str, Any]) -> StageConfig classmethod

Create stage config from dictionary.

to_dict() -> Dict[str, Any]

Convert stage config to dictionary.

RoutingRule

Defines a routing rule for stage execution control.

Attributes:

Name Type Description
name str

Unique rule identifier.

type str

Rule type (precondition, routing, postcondition).

condition Condition

Condition to evaluate.

action RoutingAction

Action to take when condition matches.

priority int

Rule priority (higher values execute first).

from_dict(data: Dict[str, Any]) -> RoutingRule classmethod

Create rule from dictionary.

to_dict() -> Dict[str, Any]

Convert rule to dictionary.

Condition

Represents a condition for routing decisions.

Conditions can be simple (field-based) or composite (nested conditions).

Attributes:

Name Type Description
field Optional[str]

Field path to evaluate (dot notation supported).

operator Optional[Union[str, ConditionOperator]]

Condition operator to apply.

value Optional[Any]

Expected value for comparison.

conditions Optional[List[Condition]]

Nested conditions for logical operators (AND, OR, NOT).

from_dict(data: Dict[str, Any]) -> Condition classmethod

Create condition from dictionary.

to_dict() -> Dict[str, Any]

Convert condition to dictionary.

ConditionOperator

Bases: str, Enum

Operators for condition evaluation.

ExecutionContext

Execution context for cascade orchestration.

Manages state, results, and metadata throughout cascade execution. Uses slots for memory efficiency and zero-copy reference to input data.

Attributes:

Name Type Description
_data

Reference to input data (zero-copy).

_stage_results List[StageResult]

List of stage execution results.

_errors List[str]

List of error messages.

_metadata Dict[str, Any]

Additional metadata dictionary.

_start_time

Execution start timestamp.

_termination_flag

Whether execution should terminate.

_next_stage Optional[str]

Next stage to execute (for skip_to actions).

_enabled_stages set

Set of dynamically enabled stage names.

_disabled_stages set

Set of dynamically disabled stage names.

_cache Dict[str, Any]

Cache for computed values (dot notation lookups).

_execution_order List[str]

Planned execution order.

_timeline List[Dict[str, Any]]

Timeline of execution events.

_routing_decisions List[Dict[str, Any]]

List of routing decisions made.

data: Dict[str, Any] property

Get reference to input data.

elapsed_ms: float property

Get elapsed execution time in milliseconds.

should_terminate: bool property

Check if execution should terminate.

__init__(data: Union[Dict[str, Any], EventWithContext])

Initialize execution context.

Parameters:

Name Type Description Default
data Union[Dict[str, Any], EventWithContext]

Input data - either a dictionary or EventWithContext object. If EventWithContext, it will be converted to flat dict for access.

required

Examples:

Flat dictionary (backward compatible)

ctx = ExecutionContext({"user_id": "123", "amount": 100})

EventWithContext (domain-agnostic)

ctx = ExecutionContext(EventWithContext(event=..., context=...))

get(path: str, default: Any = None) -> Any

Get value from data or stage results using dot notation with caching.

Supports paths like: - "user.profile.age" - looks up in input data - "stages.FAST_CHECK.confidence" - looks up in stage results

Parameters:

Name Type Description Default
path str

Dot-separated path (e.g., "user.profile.age").

required
default Any

Default value if path not found.

None

Returns:

Type Description
Any

Value at path or default.

Examples:

>>> ctx.get("user.name")
'John'
>>> ctx.get("stages.FAST_CHECK.confidence")
0.85

set(path: str, value: Any) -> None

Set value in data using dot notation.

Parameters:

Name Type Description Default
path str

Dot-separated path (e.g., "user.profile.age").

required
value Any

Value to set.

required

Examples:

>>> ctx.set("user.name", "Jane")
>>> ctx.set("user.settings.theme", "dark")

add_stage_result(result: StageResult) -> None

Add stage execution result.

Parameters:

Name Type Description Default
result StageResult

Stage result to add.

required

add_stage_error(stage_name: str, error: str) -> None

Add stage execution error.

Parameters:

Name Type Description Default
stage_name str

Name of the stage that failed.

required
error str

Error message.

required

get_stage_result(stage_name: str) -> Optional[StageResult]

Get result for a specific stage.

Parameters:

Name Type Description Default
stage_name str

Name of the stage.

required

Returns:

Type Description
Optional[StageResult]

Stage result if found, None otherwise.

set_termination_flag(reason: Optional[str] = None) -> None

Set termination flag to stop execution.

Parameters:

Name Type Description Default
reason Optional[str]

Optional reason for termination.

None

set_next_stage(stage_name: str) -> None

Set next stage to execute (skip_to action).

Parameters:

Name Type Description Default
stage_name str

Name of the stage to skip to.

required

get_next_stage() -> Optional[str]

Get and clear next stage override.

Returns:

Type Description
Optional[str]

Next stage name if set, None otherwise.

enable_stage(stage_name: str) -> None

Dynamically enable a stage.

Parameters:

Name Type Description Default
stage_name str

Name of the stage to enable.

required

disable_stage(stage_name: str) -> None

Dynamically disable a stage.

Parameters:

Name Type Description Default
stage_name str

Name of the stage to disable.

required

is_stage_enabled(stage_name: str, default: bool = True) -> bool

Check if a stage is enabled.

Parameters:

Name Type Description Default
stage_name str

Name of the stage.

required
default bool

Default value if not explicitly set.

True

Returns:

Type Description
bool

True if stage is enabled, False otherwise.

set_metadata(key: str, value: Any) -> None

Set metadata value.

Parameters:

Name Type Description Default
key str

Metadata key.

required
value Any

Metadata value.

required

get_metadata(key: str, default: Any = None) -> Any

Get metadata value.

Parameters:

Name Type Description Default
key str

Metadata key.

required
default Any

Default value if key not found.

None

Returns:

Type Description
Any

Metadata value or default.

get_result() -> Dict[str, Any]

Get final execution result.

Returns a generic result dictionary containing execution summary.

Returns:

Type Description
Dict[str, Any]

Dictionary with execution results and metadata.

StageResult

Represents the result of a stage execution.

Uses slots for memory efficiency.

Attributes:

Name Type Description
stage_name

Name of the executed stage.

result

Result value from the stage handler.

confidence

Confidence score (0-1) if applicable.

data

Additional data returned by the stage.

error

Error message if stage failed.

time_ms

Execution time in milliseconds.

__init__(stage_name: str, result: Any = None, confidence: Optional[float] = None, data: Optional[Dict[str, Any]] = None, error: Optional[str] = None, time_ms: float = 0.0)

to_dict() -> Dict[str, Any]

Convert stage result to dictionary.