Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cloud_pipelines_backend/backend_types_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,10 @@ class ExecutionNode(_TableBase):
repr=False,
)

_status_changed: bool = dataclasses.field(default=False, init=False, repr=False)


EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY = "container_execution_status_history"
EXECUTION_NODE_EXTRA_DATA_SYSTEM_ERROR_EXCEPTION_MESSAGE_KEY = (
"system_error_exception_message"
)
Expand Down
19 changes: 18 additions & 1 deletion cloud_pipelines_backend/instrumentation/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,18 @@
- Instrument: orchestrator_execution_system_errors
"""

import enum

from opentelemetry import metrics as otel_metrics


class MetricUnit(str, enum.Enum):
"""UCUM-style unit strings accepted by the OTel SDK."""

SECONDS = "s"
ERRORS = "{error}"


# ---------------------------------------------------------------------------
# tangle.orchestrator
# ---------------------------------------------------------------------------
Expand All @@ -32,5 +42,12 @@
execution_system_errors = orchestrator_meter.create_counter(
name="execution.system_errors",
description="Number of execution nodes that ended in SYSTEM_ERROR status",
unit="{error}",
unit=MetricUnit.ERRORS,
)

execution_status_transition_duration = orchestrator_meter.create_histogram(
name="execution.status_transition.duration",
description="Duration an execution spent in a status before transitioning to the next status",
unit=MetricUnit.SECONDS,
)

76 changes: 76 additions & 0 deletions cloud_pipelines_backend/sql_event_listeners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""SQLAlchemy event listeners for cloud_pipelines_backend models.

This module registers global SQLAlchemy event hooks. It must be imported at
application startup (start_local.py, orchestrator_main_oasis.py, etc.) for the
listeners to take effect.
"""

import datetime
import logging
import typing

from sqlalchemy import event as sql_event
from sqlalchemy import orm

from . import backend_types_sql
from .instrumentation import metrics

_logger = logging.getLogger(__name__)


@sql_event.listens_for(backend_types_sql.ExecutionNode.container_execution_status, "set")
def _handle_container_execution_status_set(
execution: backend_types_sql.ExecutionNode,
value: typing.Any,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe let's add type for value?

_old_value: typing.Any,
Copy link
Copy Markdown
Contributor

@Ark-kun Ark-kun Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check whether new value is different from the old value?
(If SqlAlchemy already checks that and does not fire event when the value is the same, then no need to change anything.)

_initiator: typing.Any,
) -> None:
if value is None:
return
if execution.extra_data is None:
execution.extra_data = {}
history: list = execution.extra_data.get(
backend_types_sql.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY, []
)
entry = {
"status": value.value,
"first_observed_at": datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%SZ"
),
}
execution.extra_data = {
**execution.extra_data,
backend_types_sql.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY: history + [entry],
Comment on lines +21 to +43
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this whole function to the orchestrator module. I think maintaining the status history becomes part of it's job, so it belongs there. I'm kind of on the fence here, but modifications to ExecutionNode is probably the jurisdiction of the orchestrator.
This will also solve the issue of wiring it up automatically without relying on an import.

And _handle_before_commit function can go to some instrumentation module.

}
execution._status_changed = True


@sql_event.listens_for(orm.Session, "before_commit")
def _handle_before_commit(session: orm.Session) -> None:
for obj in list(session.new) + list(session.dirty):
if not isinstance(obj, backend_types_sql.ExecutionNode):
continue
if not obj._status_changed:
continue
history: list = (obj.extra_data or {}).get(
backend_types_sql.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY, []
)
if len(history) >= 2:
prev = history[-2]
curr = history[-1]
prev_time = datetime.datetime.fromisoformat(prev["first_observed_at"])
curr_time = datetime.datetime.fromisoformat(curr["first_observed_at"])
try:
metrics.execution_status_transition_duration.record(
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OTel is built to never throw a Runtime exception, but in case it ever did, I don't want that to result in a rollback of the commit - so I've wrapped this with a try/except.

(curr_time - prev_time).total_seconds(),
attributes={
"execution.status.from": prev["status"],
"execution.status.to": curr["status"],
},
)
except Exception:
_logger.warning(
f"Failed to record status transition metric for execution {obj.id!r}",
exc_info=True,
)
obj._status_changed = False
1 change: 1 addition & 0 deletions start_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def get_user_details(request: fastapi.Request):
interfaces as storage_interfaces,
)
from cloud_pipelines_backend import orchestrator_sql
from cloud_pipelines_backend import sql_event_listeners


def run_orchestrator(
Expand Down
49 changes: 49 additions & 0 deletions tests/test_sql_event_listeners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Tests for cloud_pipelines_backend.sql_event_listeners."""

import unittest.mock

import pytest
from sqlalchemy import orm

from cloud_pipelines_backend import backend_types_sql as bts
from cloud_pipelines_backend import database_ops
from cloud_pipelines_backend import sql_event_listeners # noqa: F401 — registers listeners
from cloud_pipelines_backend.instrumentation import metrics


@pytest.fixture()
def session() -> orm.Session:
db_engine = database_ops.create_db_engine(database_uri="sqlite://")
bts._TableBase.metadata.create_all(db_engine)
with orm.Session(db_engine) as s:
yield s


class TestStatusHistoryListeners:
def test_status_change_appends_history_to_extra_data(self, session: orm.Session) -> None:
node = bts.ExecutionNode(task_spec={})
session.add(node)
node.container_execution_status = bts.ContainerExecutionStatus.QUEUED
session.commit()

history = node.extra_data[bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY]
assert len(history) == 1
assert history[0]["status"] == bts.ContainerExecutionStatus.QUEUED

def test_second_status_change_records_duration_metric(self, session: orm.Session) -> None:
node = bts.ExecutionNode(task_spec={})
session.add(node)
node.container_execution_status = bts.ContainerExecutionStatus.QUEUED
session.commit()

node.container_execution_status = bts.ContainerExecutionStatus.RUNNING
with unittest.mock.patch.object(
metrics.execution_status_transition_duration, "record"
) as mock_record:
session.commit()

mock_record.assert_called_once()
assert mock_record.call_args.kwargs["attributes"] == {
"execution.status.from": bts.ContainerExecutionStatus.QUEUED,
"execution.status.to": bts.ContainerExecutionStatus.RUNNING,
}
Loading