From fd0568a7050cba02bb9662b796db6ce31ac7babb Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Wed, 8 Apr 2026 15:28:16 -0700 Subject: [PATCH] refactor: Move ContainerExecutionStatus to enums module Breaks the circular import that prevented event_listeners.py from typing StatusTransitionEvent fields as ContainerExecutionStatus. All consumers updated to import from the new enums module. --- cloud_pipelines_backend/api_router.py | 3 +- cloud_pipelines_backend/api_server_sql.py | 37 +++++----- cloud_pipelines_backend/backend_types_sql.py | 41 ++--------- cloud_pipelines_backend/container_statuses.py | 33 +++++++++ cloud_pipelines_backend/orchestrator_sql.py | 73 ++++++++++--------- tests/test_api_server_sql.py | 9 ++- 6 files changed, 101 insertions(+), 95 deletions(-) create mode 100644 cloud_pipelines_backend/container_statuses.py diff --git a/cloud_pipelines_backend/api_router.py b/cloud_pipelines_backend/api_router.py index 44356da..bd1cbf1 100644 --- a/cloud_pipelines_backend/api_router.py +++ b/cloud_pipelines_backend/api_router.py @@ -12,6 +12,7 @@ from . import api_server_sql from . import backend_types_sql +from . import container_statuses from . import component_library_api_server as components_api from . import database_ops from . import errors @@ -595,7 +596,7 @@ def admin_set_read_only_model(read_only: bool): ) def admin_set_execution_node_status( id: backend_types_sql.IdType, - status: backend_types_sql.ContainerExecutionStatus, + status: container_statuses.ContainerExecutionStatus, session: typing.Annotated[orm.Session, fastapi.Depends(get_session)], ): with session.begin(): diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index 413d806..4d29517 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -9,6 +9,7 @@ from . import backend_types_sql as bts from . import component_structures as structures +from . import container_statuses from . import errors from . import filter_query_sql @@ -183,10 +184,10 @@ def terminate( for execution_node in pipeline_run.root_execution.descendants if execution_node.container_execution_status in ( - bts.ContainerExecutionStatus.QUEUED, - bts.ContainerExecutionStatus.WAITING_FOR_UPSTREAM, - bts.ContainerExecutionStatus.PENDING, - bts.ContainerExecutionStatus.RUNNING, + container_statuses.ContainerExecutionStatus.QUEUED, + container_statuses.ContainerExecutionStatus.WAITING_FOR_UPSTREAM, + container_statuses.ContainerExecutionStatus.PENDING, + container_statuses.ContainerExecutionStatus.RUNNING, ) ] for execution_node in running_execution_nodes: @@ -276,7 +277,7 @@ def _create_pipeline_run_response( def _calculate_execution_status_stats( self, session: orm.Session, root_execution_id: bts.IdType - ) -> dict[bts.ContainerExecutionStatus, int]: + ) -> dict[container_statuses.ContainerExecutionStatus, int]: query = ( sql.select( bts.ExecutionNode.container_execution_status, @@ -476,10 +477,10 @@ class ExecutionStatusSummary: has_ended: bool = False def count_execution_status( - self, *, status: bts.ContainerExecutionStatus, count: int + self, *, status: container_statuses.ContainerExecutionStatus, count: int ) -> None: self.total_executions += count - if status in bts.CONTAINER_STATUSES_ENDED: + if status in container_statuses.CONTAINER_STATUSES_ENDED: self.ended_executions += count self.has_ended = self.ended_executions == self.total_executions @@ -505,7 +506,7 @@ class ExecutionNodeReference: @dataclasses.dataclass class GetContainerExecutionStateResponse: - status: bts.ContainerExecutionStatus + status: container_statuses.ContainerExecutionStatus exit_code: int | None = None started_at: datetime.datetime | None = None ended_at: datetime.datetime | None = None @@ -810,7 +811,7 @@ def get_container_execution_log( if not container_execution: if ( execution.container_execution_status - == bts.ContainerExecutionStatus.SYSTEM_ERROR + == container_statuses.ContainerExecutionStatus.SYSTEM_ERROR ): return GetContainerExecutionLogResponse( system_error_exception_full=system_error_exception_full, @@ -821,10 +822,10 @@ def get_container_execution_log( ) log_text: str | None = None if container_execution.status in ( - bts.ContainerExecutionStatus.SUCCEEDED, - bts.ContainerExecutionStatus.FAILED, - bts.ContainerExecutionStatus.SYSTEM_ERROR, - bts.ContainerExecutionStatus.CANCELLED, + container_statuses.ContainerExecutionStatus.SUCCEEDED, + container_statuses.ContainerExecutionStatus.FAILED, + container_statuses.ContainerExecutionStatus.SYSTEM_ERROR, + container_statuses.ContainerExecutionStatus.CANCELLED, ): try: # Returning completed log @@ -848,10 +849,10 @@ def get_container_execution_log( # We want to return the system error exception. if ( container_execution.status - != bts.ContainerExecutionStatus.SYSTEM_ERROR + != container_statuses.ContainerExecutionStatus.SYSTEM_ERROR ): raise - elif container_execution.status == bts.ContainerExecutionStatus.RUNNING: + elif container_execution.status == container_statuses.ContainerExecutionStatus.RUNNING: if not container_launcher: raise ApiServiceError( "Reading log of an unfinished container requires `container_launcher`." @@ -894,7 +895,7 @@ def stream_container_execution_log( raise ApiServiceError( "Execution does not have container launcher information." ) - if container_execution.status == bts.ContainerExecutionStatus.RUNNING: + if container_execution.status == container_statuses.ContainerExecutionStatus.RUNNING: launched_container = ( container_launcher.deserialize_launched_container_from_dict( container_execution.launcher_data @@ -1567,13 +1568,13 @@ def _recursively_create_all_executions_and_artifacts( # root_execution_node.container_execution_id = container_execution_node.id # Done: Maybe set WAITING_FOR_UPSTREAM ourselves. root_execution_node.container_execution_status = ( - bts.ContainerExecutionStatus.QUEUED + container_statuses.ContainerExecutionStatus.QUEUED if all( not isinstance(artifact_node, bts.ArtifactNode) or artifact_node.artifact_data for artifact_node in input_artifact_nodes.values() ) - else bts.ContainerExecutionStatus.WAITING_FOR_UPSTREAM + else container_statuses.ContainerExecutionStatus.WAITING_FOR_UPSTREAM ) elif isinstance(implementation, structures.GraphImplementation): # Processing child tasks diff --git a/cloud_pipelines_backend/backend_types_sql.py b/cloud_pipelines_backend/backend_types_sql.py index b984c6c..631626f 100644 --- a/cloud_pipelines_backend/backend_types_sql.py +++ b/cloud_pipelines_backend/backend_types_sql.py @@ -1,6 +1,5 @@ import dataclasses import datetime -import enum import typing from typing import Any, Final @@ -8,39 +7,9 @@ from sqlalchemy import orm from sqlalchemy.ext import mutable -IdType: typing.TypeAlias = str - +from . import container_statuses -class ContainerExecutionStatus(str, enum.Enum): - INVALID = "INVALID" # Compatibility with Vertex AI CustomJob - UNINITIALIZED = "UNINITIALIZED" # Remove - QUEUED = "QUEUED" # Before WAITING_FOR_UPSTREAM or STARTING - # READY_TO_START = "READY_TO_START" # Input artifacts ready, but no job ID - WAITING_FOR_UPSTREAM = "WAITING_FOR_UPSTREAM" - # STARTING = "STARTING" - PENDING = "PENDING" # == Starting - RUNNING = "RUNNING" - SUCCEEDED = "SUCCEEDED" - FAILED = "FAILED" - # UPSTREAM_FAILED = "UPSTREAM_FAILED" - # CONDITIONALLY_SKIPPED = "CONDITIONALLY_SKIPPED" - SYSTEM_ERROR = "SYSTEM_ERROR" - - # new - CANCELLING = "CANCELLING" - CANCELLED = "CANCELLED" - # UPSTREAM_FAILED_OR_SKIPPED = "UPSTREAM_FAILED_OR_SKIPPED" - SKIPPED = "SKIPPED" - - -CONTAINER_STATUSES_ENDED = { - ContainerExecutionStatus.INVALID, - ContainerExecutionStatus.SUCCEEDED, - ContainerExecutionStatus.FAILED, - ContainerExecutionStatus.SYSTEM_ERROR, - ContainerExecutionStatus.CANCELLED, - ContainerExecutionStatus.SKIPPED, -} +IdType: typing.TypeAlias = str def generate_unique_id() -> str: @@ -64,7 +33,7 @@ def generate_unique_id() -> str: # # Needed to put a union type into DB # class SqlIOTypeStruct(_BaseModel): -# type: structures.TypeSpecType +# type: structures.TypeSpecType # No. We'll represent TypeSpecType as name:str + properties:dict # Supported cases: # * type: "name" @@ -385,7 +354,7 @@ class ExecutionNode(_TableBase): default=None, repr=False, back_populates="execution_nodes" ) # TODO: Do we need this? It's denormalized. - container_execution_status: orm.Mapped[ContainerExecutionStatus | None] = ( + container_execution_status: orm.Mapped[container_statuses.ContainerExecutionStatus | None] = ( orm.mapped_column(default=None, index=True) ) container_execution_cache_key: orm.Mapped[str | None] = orm.mapped_column( @@ -459,7 +428,7 @@ class ContainerExecution(_TableBase): primary_key=True, init=False, insert_default=generate_unique_id ) # task_spec: orm.Mapped[dict[str, Any]] - status: orm.Mapped[ContainerExecutionStatus] = orm.mapped_column(index=True) + status: orm.Mapped[container_statuses.ContainerExecutionStatus] = orm.mapped_column(index=True) last_processed_at: orm.Mapped[datetime.datetime | None] = orm.mapped_column( default=None ) diff --git a/cloud_pipelines_backend/container_statuses.py b/cloud_pipelines_backend/container_statuses.py new file mode 100644 index 0000000..5f28bad --- /dev/null +++ b/cloud_pipelines_backend/container_statuses.py @@ -0,0 +1,33 @@ +import enum + + +class ContainerExecutionStatus(str, enum.Enum): + INVALID = "INVALID" # Compatibility with Vertex AI CustomJob + UNINITIALIZED = "UNINITIALIZED" # Remove + QUEUED = "QUEUED" # Before WAITING_FOR_UPSTREAM or STARTING + # READY_TO_START = "READY_TO_START" # Input artifacts ready, but no job ID + WAITING_FOR_UPSTREAM = "WAITING_FOR_UPSTREAM" + # STARTING = "STARTING" + PENDING = "PENDING" # == Starting + RUNNING = "RUNNING" + SUCCEEDED = "SUCCEEDED" + FAILED = "FAILED" + # UPSTREAM_FAILED = "UPSTREAM_FAILED" + # CONDITIONALLY_SKIPPED = "CONDITIONALLY_SKIPPED" + SYSTEM_ERROR = "SYSTEM_ERROR" + + # new + CANCELLING = "CANCELLING" + CANCELLED = "CANCELLED" + # UPSTREAM_FAILED_OR_SKIPPED = "UPSTREAM_FAILED_OR_SKIPPED" + SKIPPED = "SKIPPED" + + +CONTAINER_STATUSES_ENDED = { + ContainerExecutionStatus.INVALID, + ContainerExecutionStatus.SUCCEEDED, + ContainerExecutionStatus.FAILED, + ContainerExecutionStatus.SYSTEM_ERROR, + ContainerExecutionStatus.CANCELLED, + ContainerExecutionStatus.SKIPPED, +} diff --git a/cloud_pipelines_backend/orchestrator_sql.py b/cloud_pipelines_backend/orchestrator_sql.py index 4ac30a9..461bbad 100644 --- a/cloud_pipelines_backend/orchestrator_sql.py +++ b/cloud_pipelines_backend/orchestrator_sql.py @@ -18,6 +18,7 @@ from . import backend_types_sql as bts from . import component_structures as structures +from . import container_statuses from .launchers import common_annotations from .launchers import interfaces as launcher_interfaces from .instrumentation import contextual_logging @@ -85,8 +86,8 @@ def internal_process_queued_executions_queue(self, session: orm.Session): sql.select(bts.ExecutionNode).where( bts.ExecutionNode.container_execution_status.in_( ( - bts.ContainerExecutionStatus.UNINITIALIZED, - bts.ContainerExecutionStatus.QUEUED, + container_statuses.ContainerExecutionStatus.UNINITIALIZED, + container_statuses.ContainerExecutionStatus.QUEUED, ) ) ) @@ -110,7 +111,7 @@ def internal_process_queued_executions_queue(self, session: orm.Session): _logger.exception("Error processing queued execution") session.rollback() queued_execution.container_execution_status = ( - bts.ContainerExecutionStatus.SYSTEM_ERROR + container_statuses.ContainerExecutionStatus.SYSTEM_ERROR ) record_system_error_exception( execution=queued_execution, exception=ex @@ -135,8 +136,8 @@ def internal_process_running_executions_queue(self, session: orm.Session): .where( bts.ContainerExecution.status.in_( ( - bts.ContainerExecutionStatus.PENDING, - bts.ContainerExecutionStatus.RUNNING, + container_statuses.ContainerExecutionStatus.PENDING, + container_statuses.ContainerExecutionStatus.RUNNING, ) ) ) @@ -171,7 +172,7 @@ def internal_process_running_executions_queue(self, session: orm.Session): _logger.exception("Error processing running container execution") session.rollback() running_container_execution.status = ( - bts.ContainerExecutionStatus.SYSTEM_ERROR + container_statuses.ContainerExecutionStatus.SYSTEM_ERROR ) # Doing an intermediate commit here because it's most important to mark the problematic execution as SYSTEM_ERROR. session.commit() @@ -180,7 +181,7 @@ def internal_process_running_executions_queue(self, session: orm.Session): execution_nodes = running_container_execution.execution_nodes for execution_node in execution_nodes: execution_node.container_execution_status = ( - bts.ContainerExecutionStatus.SYSTEM_ERROR + container_statuses.ContainerExecutionStatus.SYSTEM_ERROR ) record_system_error_exception( execution=execution_node, exception=ex @@ -240,7 +241,7 @@ def internal_process_one_queued_execution( f"Execution did not have all input artifact data present. Waiting for upstream. {execution.id=}" ) execution.container_execution_status = ( - bts.ContainerExecutionStatus.WAITING_FOR_UPSTREAM + container_statuses.ContainerExecutionStatus.WAITING_FOR_UPSTREAM ) session.commit() return @@ -280,10 +281,10 @@ def internal_process_one_queued_execution( # We can reuse both succeeded executions and also non yet finished ones. # Reusing still running executions is important since it allows cache reuse # when multiple versions of a pipeline are submitted in parallel. - # bts.ContainerExecutionStatus.STARTING, # Doesn't exist yet - bts.ContainerExecutionStatus.PENDING, - bts.ContainerExecutionStatus.RUNNING, - bts.ContainerExecutionStatus.SUCCEEDED, + # container_statuses.ContainerExecutionStatus.STARTING, # Doesn't exist yet + container_statuses.ContainerExecutionStatus.PENDING, + container_statuses.ContainerExecutionStatus.RUNNING, + container_statuses.ContainerExecutionStatus.SUCCEEDED, ] ) ) @@ -339,7 +340,7 @@ def internal_process_one_queued_execution( # However if the container execution has already ended, we need to copy the outputs ourselves. if ( old_execution.container_execution_status - == bts.ContainerExecutionStatus.SUCCEEDED + == container_statuses.ContainerExecutionStatus.SUCCEEDED ): # Copying the output artifact data (if the execution already succeeded). reused_execution_output_artifact_data_ids = { @@ -371,10 +372,10 @@ def internal_process_one_queued_execution( ): if ( downstream_execution.container_execution_status - == bts.ContainerExecutionStatus.WAITING_FOR_UPSTREAM + == container_statuses.ContainerExecutionStatus.WAITING_FOR_UPSTREAM ): downstream_execution.container_execution_status = ( - bts.ContainerExecutionStatus.QUEUED + container_statuses.ContainerExecutionStatus.QUEUED ) session.commit() return @@ -411,7 +412,7 @@ def internal_process_one_queued_execution( f"Cancelling execution {execution.id} and skipping all downstream executions." ) execution.container_execution_status = ( - bts.ContainerExecutionStatus.CANCELLED + container_statuses.ContainerExecutionStatus.CANCELLED ) _mark_all_downstream_executions_as_skipped( session=session, execution=execution @@ -584,7 +585,7 @@ def generate_execution_log_uri( # Logs whole exception _logger.exception(f"Error launching container for {execution.id=}") execution.container_execution_status = ( - bts.ContainerExecutionStatus.SYSTEM_ERROR + container_statuses.ContainerExecutionStatus.SYSTEM_ERROR ) record_system_error_exception(execution=execution, exception=ex) _mark_all_downstream_executions_as_skipped( @@ -595,7 +596,7 @@ def generate_execution_log_uri( current_time = _get_current_time() container_execution = bts.ContainerExecution( - status=bts.ContainerExecutionStatus(launched_container.status), + status=container_statuses.ContainerExecutionStatus(launched_container.status), last_processed_at=current_time, created_at=current_time, launcher_data=launched_container.to_dict(), @@ -646,8 +647,8 @@ def internal_process_one_running_execution( ) previous_status = launched_container.status if previous_status not in ( - bts.ContainerExecutionStatus.PENDING, - bts.ContainerExecutionStatus.RUNNING, + container_statuses.ContainerExecutionStatus.PENDING, + container_statuses.ContainerExecutionStatus.RUNNING, ): raise OrchestratorError( f"Unexpected running container status: {previous_status=}, {launched_container=}" @@ -686,7 +687,7 @@ def internal_process_one_running_execution( launched_container.terminate() container_execution.ended_at = _get_current_time() # We need to mark the execution as CANCELLED otherwise orchestrator will continue polling it. - container_execution.status = bts.ContainerExecutionStatus.CANCELLED + container_execution.status = container_statuses.ContainerExecutionStatus.CANCELLED terminated = True # Mark the execution nodes as cancelled only after the launched container is successfully terminated (if needed) @@ -695,7 +696,7 @@ def internal_process_one_running_execution( f"Cancelling execution {execution_node.id} and skipping all downstream executions." ) execution_node.container_execution_status = ( - bts.ContainerExecutionStatus.CANCELLED + container_statuses.ContainerExecutionStatus.CANCELLED ) _mark_all_downstream_executions_as_skipped( session=session, execution=execution_node @@ -739,14 +740,14 @@ def internal_process_one_running_execution( ) if new_status == launcher_interfaces.ContainerStatus.RUNNING: - container_execution.status = bts.ContainerExecutionStatus.RUNNING + container_execution.status = container_statuses.ContainerExecutionStatus.RUNNING container_execution.started_at = reloaded_launched_container.started_at for execution_node in execution_nodes: execution_node.container_execution_status = ( - bts.ContainerExecutionStatus.RUNNING + container_statuses.ContainerExecutionStatus.RUNNING ) elif new_status == launcher_interfaces.ContainerStatus.SUCCEEDED: - container_execution.status = bts.ContainerExecutionStatus.SUCCEEDED + container_execution.status = container_statuses.ContainerExecutionStatus.SUCCEEDED container_execution.exit_code = reloaded_launched_container.exit_code container_execution.started_at = reloaded_launched_container.started_at container_execution.ended_at = reloaded_launched_container.ended_at @@ -803,7 +804,7 @@ def _maybe_preload_value( if missing_output_names: # Marking the container execution as FAILED (even though the program itself has completed successfully) - container_execution.status = bts.ContainerExecutionStatus.FAILED + container_execution.status = container_statuses.ContainerExecutionStatus.FAILED orchestration_error_message = f"Container execution is marked as FAILED due to missing outputs: {missing_output_names}." _logger.error(orchestration_error_message) _record_orchestration_error_message( @@ -814,7 +815,7 @@ def _maybe_preload_value( # Skip downstream executions for execution_node in execution_nodes: execution_node.container_execution_status = ( - bts.ContainerExecutionStatus.FAILED + container_statuses.ContainerExecutionStatus.FAILED ) _mark_all_downstream_executions_as_skipped( session=session, execution=execution_node @@ -856,7 +857,7 @@ def _maybe_preload_value( session.add_all(new_output_artifact_data_map.values()) for execution_node in execution_nodes: execution_node.container_execution_status = ( - bts.ContainerExecutionStatus.SUCCEEDED + container_statuses.ContainerExecutionStatus.SUCCEEDED ) # TODO: Optimize for output_name, artifact_node in session.execute( @@ -875,13 +876,13 @@ def _maybe_preload_value( ): if ( downstream_execution.container_execution_status - == bts.ContainerExecutionStatus.WAITING_FOR_UPSTREAM + == container_statuses.ContainerExecutionStatus.WAITING_FOR_UPSTREAM ): downstream_execution.container_execution_status = ( - bts.ContainerExecutionStatus.QUEUED + container_statuses.ContainerExecutionStatus.QUEUED ) elif new_status == launcher_interfaces.ContainerStatus.FAILED: - container_execution.status = bts.ContainerExecutionStatus.FAILED + container_execution.status = container_statuses.ContainerExecutionStatus.FAILED container_execution.exit_code = reloaded_launched_container.exit_code container_execution.started_at = reloaded_launched_container.started_at container_execution.ended_at = reloaded_launched_container.ended_at @@ -898,7 +899,7 @@ def _maybe_preload_value( # Skip downstream executions for execution_node in execution_nodes: execution_node.container_execution_status = ( - bts.ContainerExecutionStatus.FAILED + container_statuses.ContainerExecutionStatus.FAILED ) _mark_all_downstream_executions_as_skipped( session=session, execution=execution_node @@ -906,7 +907,7 @@ def _maybe_preload_value( elif new_status == launcher_interfaces.ContainerStatus.PENDING: for execution_node in execution_nodes: execution_node.container_execution_status = ( - bts.ContainerExecutionStatus.PENDING + container_statuses.ContainerExecutionStatus.PENDING ) # ? Should we reset `started_at` or keep it? container_execution.started_at = None @@ -949,11 +950,11 @@ def _mark_all_downstream_executions_as_skipped( return seen_execution_ids.add(execution.id) if execution.container_execution_status in { - bts.ContainerExecutionStatus.WAITING_FOR_UPSTREAM, + container_statuses.ContainerExecutionStatus.WAITING_FOR_UPSTREAM, # A downstream ExecutionNode can be in "Queued" state when it's been "woken up" by one of its upstreams. - bts.ContainerExecutionStatus.QUEUED, + container_statuses.ContainerExecutionStatus.QUEUED, }: - execution.container_execution_status = bts.ContainerExecutionStatus.SKIPPED + execution.container_execution_status = container_statuses.ContainerExecutionStatus.SKIPPED # for artifact_node in execution.output_artifact_nodes: # for downstream_execution in artifact_node.downstream_executions: diff --git a/tests/test_api_server_sql.py b/tests/test_api_server_sql.py index bf30c53..8dc067e 100644 --- a/tests/test_api_server_sql.py +++ b/tests/test_api_server_sql.py @@ -7,6 +7,7 @@ from cloud_pipelines_backend import api_server_sql from cloud_pipelines_backend import backend_types_sql as bts +from cloud_pipelines_backend import container_statuses from cloud_pipelines_backend import component_structures as structures from cloud_pipelines_backend import database_ops from cloud_pipelines_backend import errors @@ -23,7 +24,7 @@ def test_initial_state(self): def test_accumulate_all_ended_statuses(self): """Add each ended status with 2^i count for robust uniqueness.""" summary = api_server_sql.ExecutionStatusSummary() - ended_statuses = sorted(bts.CONTAINER_STATUSES_ENDED, key=lambda s: s.value) + ended_statuses = sorted(container_statuses.CONTAINER_STATUSES_ENDED, key=lambda s: s.value) expected_total = 0 expected_ended = 0 for i, status in enumerate(ended_statuses): @@ -39,7 +40,7 @@ def test_accumulate_all_in_progress_statuses(self): """Add each in-progress status with 2^i count for robust uniqueness.""" summary = api_server_sql.ExecutionStatusSummary() in_progress_statuses = sorted( - set(bts.ContainerExecutionStatus) - bts.CONTAINER_STATUSES_ENDED, + set(container_statuses.ContainerExecutionStatus) - container_statuses.CONTAINER_STATUSES_ENDED, key=lambda s: s.value, ) expected_total = 0 @@ -54,13 +55,13 @@ def test_accumulate_all_in_progress_statuses(self): def test_accumulate_all_statuses(self): """Add every status with 2^i count. Summary math must be exact.""" summary = api_server_sql.ExecutionStatusSummary() - all_statuses = sorted(bts.ContainerExecutionStatus, key=lambda s: s.value) + all_statuses = sorted(container_statuses.ContainerExecutionStatus, key=lambda s: s.value) expected_total = 0 expected_ended = 0 for i, status in enumerate(all_statuses): count = 2**i expected_total += count - if status in bts.CONTAINER_STATUSES_ENDED: + if status in container_statuses.CONTAINER_STATUSES_ENDED: expected_ended += count summary.count_execution_status(status=status, count=count) assert summary.total_executions == expected_total