Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cloud_pipelines_backend/api_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from . import api_server_sql
from . import backend_types_sql
from . import container_statuses
from . import component_library_api_server as components_api
from . import database_ops
from . import errors
Expand Down Expand Up @@ -595,7 +596,7 @@ def admin_set_read_only_model(read_only: bool):
)
def admin_set_execution_node_status(
id: backend_types_sql.IdType,
status: backend_types_sql.ContainerExecutionStatus,
status: container_statuses.ContainerExecutionStatus,
session: typing.Annotated[orm.Session, fastapi.Depends(get_session)],
):
with session.begin():
Expand Down
37 changes: 19 additions & 18 deletions cloud_pipelines_backend/api_server_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from . import backend_types_sql as bts
from . import component_structures as structures
from . import container_statuses
from . import errors
from . import filter_query_sql

Expand Down Expand Up @@ -183,10 +184,10 @@ def terminate(
for execution_node in pipeline_run.root_execution.descendants
if execution_node.container_execution_status
in (
bts.ContainerExecutionStatus.QUEUED,
bts.ContainerExecutionStatus.WAITING_FOR_UPSTREAM,
bts.ContainerExecutionStatus.PENDING,
bts.ContainerExecutionStatus.RUNNING,
container_statuses.ContainerExecutionStatus.QUEUED,
container_statuses.ContainerExecutionStatus.WAITING_FOR_UPSTREAM,
container_statuses.ContainerExecutionStatus.PENDING,
container_statuses.ContainerExecutionStatus.RUNNING,
)
]
for execution_node in running_execution_nodes:
Expand Down Expand Up @@ -276,7 +277,7 @@ def _create_pipeline_run_response(

def _calculate_execution_status_stats(
self, session: orm.Session, root_execution_id: bts.IdType
) -> dict[bts.ContainerExecutionStatus, int]:
) -> dict[container_statuses.ContainerExecutionStatus, int]:
query = (
sql.select(
bts.ExecutionNode.container_execution_status,
Expand Down Expand Up @@ -476,10 +477,10 @@ class ExecutionStatusSummary:
has_ended: bool = False

def count_execution_status(
self, *, status: bts.ContainerExecutionStatus, count: int
self, *, status: container_statuses.ContainerExecutionStatus, count: int
) -> None:
self.total_executions += count
if status in bts.CONTAINER_STATUSES_ENDED:
if status in container_statuses.CONTAINER_STATUSES_ENDED:
self.ended_executions += count

self.has_ended = self.ended_executions == self.total_executions
Expand All @@ -505,7 +506,7 @@ class ExecutionNodeReference:

@dataclasses.dataclass
class GetContainerExecutionStateResponse:
status: bts.ContainerExecutionStatus
status: container_statuses.ContainerExecutionStatus
exit_code: int | None = None
started_at: datetime.datetime | None = None
ended_at: datetime.datetime | None = None
Expand Down Expand Up @@ -810,7 +811,7 @@ def get_container_execution_log(
if not container_execution:
if (
execution.container_execution_status
== bts.ContainerExecutionStatus.SYSTEM_ERROR
== container_statuses.ContainerExecutionStatus.SYSTEM_ERROR
):
return GetContainerExecutionLogResponse(
system_error_exception_full=system_error_exception_full,
Expand All @@ -821,10 +822,10 @@ def get_container_execution_log(
)
log_text: str | None = None
if container_execution.status in (
bts.ContainerExecutionStatus.SUCCEEDED,
bts.ContainerExecutionStatus.FAILED,
bts.ContainerExecutionStatus.SYSTEM_ERROR,
bts.ContainerExecutionStatus.CANCELLED,
container_statuses.ContainerExecutionStatus.SUCCEEDED,
container_statuses.ContainerExecutionStatus.FAILED,
container_statuses.ContainerExecutionStatus.SYSTEM_ERROR,
container_statuses.ContainerExecutionStatus.CANCELLED,
):
try:
# Returning completed log
Expand All @@ -848,10 +849,10 @@ def get_container_execution_log(
# We want to return the system error exception.
if (
container_execution.status
!= bts.ContainerExecutionStatus.SYSTEM_ERROR
!= container_statuses.ContainerExecutionStatus.SYSTEM_ERROR
):
raise
elif container_execution.status == bts.ContainerExecutionStatus.RUNNING:
elif container_execution.status == container_statuses.ContainerExecutionStatus.RUNNING:
if not container_launcher:
raise ApiServiceError(
"Reading log of an unfinished container requires `container_launcher`."
Expand Down Expand Up @@ -894,7 +895,7 @@ def stream_container_execution_log(
raise ApiServiceError(
"Execution does not have container launcher information."
)
if container_execution.status == bts.ContainerExecutionStatus.RUNNING:
if container_execution.status == container_statuses.ContainerExecutionStatus.RUNNING:
launched_container = (
container_launcher.deserialize_launched_container_from_dict(
container_execution.launcher_data
Expand Down Expand Up @@ -1567,13 +1568,13 @@ def _recursively_create_all_executions_and_artifacts(
# root_execution_node.container_execution_id = container_execution_node.id
# Done: Maybe set WAITING_FOR_UPSTREAM ourselves.
root_execution_node.container_execution_status = (
bts.ContainerExecutionStatus.QUEUED
container_statuses.ContainerExecutionStatus.QUEUED
if all(
not isinstance(artifact_node, bts.ArtifactNode)
or artifact_node.artifact_data
for artifact_node in input_artifact_nodes.values()
)
else bts.ContainerExecutionStatus.WAITING_FOR_UPSTREAM
else container_statuses.ContainerExecutionStatus.WAITING_FOR_UPSTREAM
)
elif isinstance(implementation, structures.GraphImplementation):
# Processing child tasks
Expand Down
41 changes: 5 additions & 36 deletions cloud_pipelines_backend/backend_types_sql.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,15 @@
import dataclasses
import datetime
import enum
import typing
from typing import Any, Final

import sqlalchemy as sql
from sqlalchemy import orm
from sqlalchemy.ext import mutable

IdType: typing.TypeAlias = str

from . import container_statuses

class ContainerExecutionStatus(str, enum.Enum):
INVALID = "INVALID" # Compatibility with Vertex AI CustomJob
UNINITIALIZED = "UNINITIALIZED" # Remove
QUEUED = "QUEUED" # Before WAITING_FOR_UPSTREAM or STARTING
# READY_TO_START = "READY_TO_START" # Input artifacts ready, but no job ID
WAITING_FOR_UPSTREAM = "WAITING_FOR_UPSTREAM"
# STARTING = "STARTING"
PENDING = "PENDING" # == Starting
RUNNING = "RUNNING"
SUCCEEDED = "SUCCEEDED"
FAILED = "FAILED"
# UPSTREAM_FAILED = "UPSTREAM_FAILED"
# CONDITIONALLY_SKIPPED = "CONDITIONALLY_SKIPPED"
SYSTEM_ERROR = "SYSTEM_ERROR"

# new
CANCELLING = "CANCELLING"
CANCELLED = "CANCELLED"
# UPSTREAM_FAILED_OR_SKIPPED = "UPSTREAM_FAILED_OR_SKIPPED"
SKIPPED = "SKIPPED"


CONTAINER_STATUSES_ENDED = {
ContainerExecutionStatus.INVALID,
ContainerExecutionStatus.SUCCEEDED,
ContainerExecutionStatus.FAILED,
ContainerExecutionStatus.SYSTEM_ERROR,
ContainerExecutionStatus.CANCELLED,
ContainerExecutionStatus.SKIPPED,
}
IdType: typing.TypeAlias = str


def generate_unique_id() -> str:
Expand All @@ -64,7 +33,7 @@ def generate_unique_id() -> str:

# # Needed to put a union type into DB
# class SqlIOTypeStruct(_BaseModel):
# type: structures.TypeSpecType
# type: structures.TypeSpecType
# No. We'll represent TypeSpecType as name:str + properties:dict
# Supported cases:
# * type: "name"
Expand Down Expand Up @@ -385,7 +354,7 @@ class ExecutionNode(_TableBase):
default=None, repr=False, back_populates="execution_nodes"
)
# TODO: Do we need this? It's denormalized.
container_execution_status: orm.Mapped[ContainerExecutionStatus | None] = (
container_execution_status: orm.Mapped[container_statuses.ContainerExecutionStatus | None] = (
orm.mapped_column(default=None, index=True)
)
container_execution_cache_key: orm.Mapped[str | None] = orm.mapped_column(
Expand Down Expand Up @@ -459,7 +428,7 @@ class ContainerExecution(_TableBase):
primary_key=True, init=False, insert_default=generate_unique_id
)
# task_spec: orm.Mapped[dict[str, Any]]
status: orm.Mapped[ContainerExecutionStatus] = orm.mapped_column(index=True)
status: orm.Mapped[container_statuses.ContainerExecutionStatus] = orm.mapped_column(index=True)
last_processed_at: orm.Mapped[datetime.datetime | None] = orm.mapped_column(
default=None
)
Expand Down
33 changes: 33 additions & 0 deletions cloud_pipelines_backend/container_statuses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import enum


class ContainerExecutionStatus(str, enum.Enum):
INVALID = "INVALID" # Compatibility with Vertex AI CustomJob
UNINITIALIZED = "UNINITIALIZED" # Remove
QUEUED = "QUEUED" # Before WAITING_FOR_UPSTREAM or STARTING
# READY_TO_START = "READY_TO_START" # Input artifacts ready, but no job ID
WAITING_FOR_UPSTREAM = "WAITING_FOR_UPSTREAM"
# STARTING = "STARTING"
PENDING = "PENDING" # == Starting
RUNNING = "RUNNING"
SUCCEEDED = "SUCCEEDED"
FAILED = "FAILED"
# UPSTREAM_FAILED = "UPSTREAM_FAILED"
# CONDITIONALLY_SKIPPED = "CONDITIONALLY_SKIPPED"
SYSTEM_ERROR = "SYSTEM_ERROR"

# new
CANCELLING = "CANCELLING"
CANCELLED = "CANCELLED"
# UPSTREAM_FAILED_OR_SKIPPED = "UPSTREAM_FAILED_OR_SKIPPED"
SKIPPED = "SKIPPED"


CONTAINER_STATUSES_ENDED = {
ContainerExecutionStatus.INVALID,
ContainerExecutionStatus.SUCCEEDED,
ContainerExecutionStatus.FAILED,
ContainerExecutionStatus.SYSTEM_ERROR,
ContainerExecutionStatus.CANCELLED,
ContainerExecutionStatus.SKIPPED,
}
Loading
Loading