diff --git a/.github/workflows/validate_examples.yaml b/.github/workflows/run-tests.yaml similarity index 98% rename from .github/workflows/validate_examples.yaml rename to .github/workflows/run-tests.yaml index ae784965e..cd69c953c 100644 --- a/.github/workflows/validate_examples.yaml +++ b/.github/workflows/run-tests.yaml @@ -104,5 +104,8 @@ jobs: sleep 10 ollama pull llama3.2:latest - name: Check examples + run: | + tox -e examples + - name: Run integration tests run: | tox -e integration diff --git a/AGENTS.md b/AGENTS.md index 27eed72a5..d1c67c21e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -33,7 +33,9 @@ ext/ # Extension packages (each is a separate PyPI packa └── flask_dapr/ # Flask integration ← see ext/flask_dapr/AGENTS.md tests/ # Unit tests (mirrors dapr/ package structure) -examples/ # Integration test suite ← see examples/AGENTS.md +├── examples/ # Output-based tests that run examples and check stdout +├── integration/ # Programmatic SDK tests using DaprClient directly +examples/ # User-facing example applications ← see examples/AGENTS.md docs/ # Sphinx documentation source tools/ # Build and release scripts ``` @@ -59,16 +61,19 @@ Each extension is a **separate PyPI package** with its own `setup.cfg`, `setup.p | `dapr-ext-langgraph` | `dapr.ext.langgraph` | LangGraph checkpoint persistence to Dapr state store | Moderate | | `dapr-ext-strands` | `dapr.ext.strands` | Strands agent session management via Dapr state store | New | -## Examples (integration test suite) +## Examples and testing -The `examples/` directory serves as both user-facing documentation and the project's integration test suite. Examples are validated by pytest-based integration tests in `tests/integration/`. +The `examples/` directory contains user-facing example applications. These are validated by two test suites: -**See `examples/AGENTS.md`** for the full guide on example structure and how to add new examples. +- **`tests/examples/`** — Output-based tests that run examples via `dapr run` and check stdout for expected strings. Uses a `DaprRunner` helper to manage process lifecycle. See `examples/AGENTS.md`. +- **`tests/integration/`** — Programmatic SDK tests that call `DaprClient` methods directly and assert on return values, gRPC status codes, and SDK types. More reliable than output-based tests since they don't depend on print statement formatting. See `tests/integration/AGENTS.md`. Quick reference: ```bash -tox -e integration # Run all examples (needs Dapr runtime) -tox -e integration -- test_state_store.py # Run a single example +tox -e examples # Run output-based example tests +tox -e examples -- test_state_store.py # Run a single example test +tox -e integration # Run programmatic SDK tests +tox -e integration -- test_state_store.py # Run a single integration test ``` ## Python version support @@ -106,7 +111,10 @@ tox -e ruff # Run type checking tox -e type -# Run integration tests / validate examples (requires Dapr runtime) +# Run output-based example tests (requires Dapr runtime) +tox -e examples + +# Run programmatic integration tests (requires Dapr runtime) tox -e integration ``` @@ -189,8 +197,8 @@ When completing any task on this project, work through this checklist. Not every ### Examples (integration tests) - [ ] If you added a new user-facing feature or building block, add or update an example in `examples/` -- [ ] Add a corresponding pytest integration test in `tests/integration/` -- [ ] If you changed output format of existing functionality, update expected output in the affected integration tests +- [ ] Add a corresponding pytest test in `tests/examples/` (output-based) and/or `tests/integration/` (programmatic) +- [ ] If you changed output format of existing functionality, update expected output in `tests/examples/` - [ ] See `examples/AGENTS.md` for full details on writing examples ### Documentation @@ -202,7 +210,7 @@ When completing any task on this project, work through this checklist. Not every - [ ] Run `tox -e ruff` — linting must be clean - [ ] Run `tox -e py311` (or your Python version) — all unit tests must pass -- [ ] If you touched examples: `tox -e integration -- test_.py` to validate locally +- [ ] If you touched examples: `tox -e examples -- test_.py` to validate locally - [ ] Commits must be signed off for DCO: `git commit -s` ## Important files @@ -217,7 +225,8 @@ When completing any task on this project, work through this checklist. Not every | `dev-requirements.txt` | Development/test dependencies | | `dapr/version/__init__.py` | SDK version string | | `ext/*/setup.cfg` | Extension package metadata and dependencies | -| `tests/integration/` | Pytest-based integration tests that validate examples | +| `tests/examples/` | Output-based tests that validate examples by checking stdout | +| `tests/integration/` | Programmatic SDK tests using DaprClient directly | ## Gotchas @@ -226,6 +235,6 @@ When completing any task on this project, work through this checklist. Not every - **Extension independence**: Each extension is a separate PyPI package. Core SDK changes should not break extensions; extension changes should not require core SDK changes unless intentional. - **DCO signoff**: PRs will be blocked by the DCO bot if commits lack `Signed-off-by`. Always use `git commit -s`. - **Ruff version pinned**: Dev requirements pin `ruff === 0.14.1`. Use this exact version to match CI. -- **Examples are integration tests**: Changing output format (log messages, print statements) can break integration tests. Always check expected output in `tests/integration/` when modifying user-visible output. +- **Examples are tested by output matching**: Changing output format (log messages, print statements) can break `tests/examples/`. Always check expected output there when modifying user-visible output. - **Background processes in examples**: Examples that start background services (servers, subscribers) must include a cleanup step to stop them, or CI will hang. - **Workflow is the most active area**: See `ext/dapr-ext-workflow/AGENTS.md` for workflow-specific architecture and constraints. diff --git a/README.md b/README.md index 333be6933..f47160556 100644 --- a/README.md +++ b/README.md @@ -121,17 +121,23 @@ tox -e py311 tox -e type ``` -8. Run integration tests (validates the examples) +8. Run integration tests ```bash tox -e integration ``` -If you need to run the examples against a pre-released version of the runtime, you can use the following command: +9. Validate the examples + +```bash +tox -e examples +``` + +If you need to run the examples or integration tests against a pre-released version of the runtime, you can use the following command: - Get your daprd runtime binary from [here](https://github.com/dapr/dapr/releases) for your platform. - Copy the binary to your dapr home folder at $HOME/.dapr/bin/daprd. Or using dapr cli directly: `dapr init --runtime-version ` -- Now you can run the examples with `tox -e integration`. +- Now you can run the examples with `tox -e examples` or the integration tests with `tox -e integration`. ## Documentation diff --git a/examples/AGENTS.md b/examples/AGENTS.md index 5dcbdd4ec..36bd171ef 100644 --- a/examples/AGENTS.md +++ b/examples/AGENTS.md @@ -1,11 +1,11 @@ # AGENTS.md — Dapr Python SDK Examples -The `examples/` directory serves as both **user-facing documentation** and the project's **integration test suite**. Each example is a self-contained application validated by pytest-based integration tests in `tests/integration/`. +The `examples/` directory serves as the **user-facing documentation**. Each example is a self-contained application validated by pytest-based tests in `tests/examples/`. ## How validation works -1. Each example has a corresponding test file in `tests/integration/` (e.g., `test_state_store.py`) -2. Tests use a `DaprRunner` helper (defined in `conftest.py`) that wraps `dapr run` commands +1. Each example has a corresponding test file in `tests/examples/` (e.g., `test_state_store.py`) +2. Tests use a `DaprRunner` helper (defined in `tests/examples/conftest.py`) that wraps `dapr run` commands 3. `DaprRunner.run()` executes a command and captures stdout; `DaprRunner.start()`/`stop()` manage background services 4. Tests assert that expected output lines appear in the captured output @@ -13,10 +13,10 @@ Run examples locally (requires a running Dapr runtime via `dapr init`): ```bash # All examples -tox -e integration +tox -e examples # Single example -tox -e integration -- test_state_store.py +tox -e examples -- test_state_store.py ``` In CI (`validate_examples.yaml`), examples run on all supported Python versions (3.10-3.14) on Ubuntu with a full Dapr runtime including Docker, Redis, and (for LLM examples) Ollama. @@ -132,17 +132,17 @@ The `workflow` example includes: `simple.py`, `task_chaining.py`, `fan_out_fan_i 2. Add Python source files and a `requirements.txt` referencing the needed SDK packages 3. Add Dapr component YAMLs in a `components/` subdirectory if the example uses state, pubsub, etc. 4. Write a `README.md` with introduction, pre-requisites, install instructions, and running instructions -5. Add a corresponding test in `tests/integration/test_.py`: +5. Add a corresponding test in `tests/examples/test_.py`: - Use the `@pytest.mark.example_dir('')` marker to set the working directory - Use `dapr.run()` for scripts that exit on their own, `dapr.start()`/`dapr.stop()` for long-running services - Assert expected output lines appear in the captured output -6. Test locally: `tox -e integration -- test_.py` +6. Test locally: `tox -e examples -- test_.py` ## Gotchas -- **Output format changes break tests**: If you modify print statements or log output in SDK code, check whether any integration test's expected lines depend on that output. +- **Output format changes break tests**: If you modify print statements or log output in SDK code, check whether any test's expected lines in `tests/examples/` depend on that output. - **Background processes must be cleaned up**: The `DaprRunner` fixture handles cleanup on teardown, but tests should still call `dapr.stop()` to capture output. - **Dapr prefixes output**: Application stdout appears as `== APP == ` when run via `dapr run`. - **Redis is available in CI**: The CI environment has Redis running on `localhost:6379` — most component YAMLs use this. - **Some examples need special setup**: `crypto` generates keys, `configuration` seeds Redis, `conversation` needs LLM config — check individual READMEs. -- **Infinite-loop example scripts**: Some example scripts (e.g., `invoke-caller.py`) have `while True` loops for demo purposes. Integration tests must either bypass these with HTTP API calls or use `dapr.run(until=...)` for early termination. \ No newline at end of file +- **Infinite-loop example scripts**: Some example scripts (e.g., `invoke-caller.py`) have `while True` loops for demo purposes. Tests must either bypass these with HTTP API calls or use `dapr.run(until=...)` for early termination. \ No newline at end of file diff --git a/tests/examples/conftest.py b/tests/examples/conftest.py new file mode 100644 index 000000000..33be85c66 --- /dev/null +++ b/tests/examples/conftest.py @@ -0,0 +1,140 @@ +import shlex +import subprocess +import tempfile +import threading +import time +from pathlib import Path +from typing import IO, Any, Generator + +import pytest + +from tests._process_utils import get_kwargs_for_process_group, terminate_process_group + +REPO_ROOT = Path(__file__).resolve().parent.parent.parent +EXAMPLES_DIR = REPO_ROOT / 'examples' + + +def pytest_configure(config: pytest.Config) -> None: + config.addinivalue_line('markers', 'example_dir(name): set the example directory for a test') + + +class DaprRunner: + """Helper to run `dapr run` commands and capture output.""" + + def __init__(self, cwd: Path) -> None: + self._cwd = cwd + self._bg_process: subprocess.Popen[str] | None = None + self._bg_output_file: IO[str] | None = None + + @staticmethod + def _terminate(proc: subprocess.Popen[str]) -> None: + if proc.poll() is not None: + return + + terminate_process_group(proc) + try: + proc.wait(timeout=10) + except subprocess.TimeoutExpired: + terminate_process_group(proc, force=True) + proc.wait() + + def run(self, args: str, *, timeout: int = 30, until: list[str] | None = None) -> str: + """Run a foreground command, block until it finishes, and return output. + + Use this for short-lived processes (e.g. a publisher that exits on its + own). For long-lived background services, use ``start()``/``stop()``. + + Args: + args: Arguments passed to ``dapr run``. + timeout: Maximum seconds to wait before killing the process. + until: If provided, the process is terminated as soon as every + string in this list has appeared in the accumulated output. + """ + proc = subprocess.Popen( + args=('dapr', 'run', *shlex.split(args)), + cwd=self._cwd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + **get_kwargs_for_process_group(), + ) + lines: list[str] = [] + assert proc.stdout is not None + + # Kill the process if it exceeds the timeout. A background timer is + # needed because `for line in proc.stdout` blocks indefinitely when + # the child never exits. + timer = threading.Timer( + interval=timeout, function=lambda: terminate_process_group(proc, force=True) + ) + timer.start() + + try: + for line in proc.stdout: + print(line, end='', flush=True) + lines.append(line) + if until and all(exp in ''.join(lines) for exp in until): + break + finally: + timer.cancel() + self._terminate(proc) + + return ''.join(lines) + + def start(self, args: str, *, wait: int = 5) -> None: + """Start a long-lived background service. + + Use this for servers/subscribers that must stay alive while a second + process runs via ``run()``. Call ``stop()`` to terminate and collect + output. Stdout is written to a temp file to avoid pipe-buffer deadlocks. + """ + output_file = tempfile.NamedTemporaryFile(mode='w+', suffix='.log') + proc = subprocess.Popen( + args=('dapr', 'run', *shlex.split(args)), + cwd=self._cwd, + stdout=output_file, + stderr=subprocess.STDOUT, + text=True, + **get_kwargs_for_process_group(), + ) + self._bg_process = proc + self._bg_output_file = output_file + time.sleep(wait) + + def stop(self) -> str: + """Stop the background service and return its captured output.""" + if self._bg_process is None: + return '' + self._terminate(self._bg_process) + self._bg_process = None + return self._read_and_close_output() + + def _read_and_close_output(self) -> str: + if self._bg_output_file is None: + return '' + self._bg_output_file.seek(0) + output = self._bg_output_file.read() + self._bg_output_file.close() + self._bg_output_file = None + print(output, end='', flush=True) + return output + + +@pytest.fixture +def dapr(request: pytest.FixtureRequest) -> Generator[DaprRunner, Any, None]: + """Provides a DaprRunner scoped to an example directory. + + Use the ``example_dir`` marker to select which example: + + @pytest.mark.example_dir('state_store') + def test_something(dapr): + ... + + Defaults to the examples root if no marker is set. + """ + marker = request.node.get_closest_marker('example_dir') + cwd = EXAMPLES_DIR / marker.args[0] if marker else EXAMPLES_DIR + + runner = DaprRunner(cwd) + yield runner + runner.stop() diff --git a/tests/examples/test_configuration.py b/tests/examples/test_configuration.py new file mode 100644 index 000000000..45f76624a --- /dev/null +++ b/tests/examples/test_configuration.py @@ -0,0 +1,49 @@ +import subprocess +import time + +import pytest + +REDIS_CONTAINER = 'dapr_redis' + +EXPECTED_LINES = [ + 'Got key=orderId1 value=100 version=1 metadata={}', + 'Got key=orderId2 value=200 version=1 metadata={}', + 'Subscribe key=orderId2 value=210 version=2 metadata={}', + 'Unsubscribed successfully? True', +] + + +@pytest.fixture() +def redis_config(): + """Seed configuration values in Redis before the test.""" + subprocess.run( + ('docker', 'exec', 'dapr_redis', 'redis-cli', 'SET', 'orderId1', '100||1'), + check=True, + capture_output=True, + ) + subprocess.run( + ('docker', 'exec', 'dapr_redis', 'redis-cli', 'SET', 'orderId2', '200||1'), + check=True, + capture_output=True, + ) + + +@pytest.mark.example_dir('configuration') +def test_configuration(dapr, redis_config): + dapr.start( + '--app-id configexample --resources-path components/ -- python3 configuration.py', + wait=5, + ) + # Update Redis to trigger the subscription notification + subprocess.run( + ('docker', 'exec', 'dapr_redis', 'redis-cli', 'SET', 'orderId2', '210||2'), + check=True, + capture_output=True, + ) + # configuration.py sleeps 10s after subscribing before it unsubscribes. + # Wait long enough for the full script to finish. + time.sleep(10) + + output = dapr.stop() + for line in EXPECTED_LINES: + assert line in output, f'Missing in output: {line}' diff --git a/tests/integration/test_conversation.py b/tests/examples/test_conversation.py similarity index 100% rename from tests/integration/test_conversation.py rename to tests/examples/test_conversation.py diff --git a/tests/integration/test_crypto.py b/tests/examples/test_crypto.py similarity index 100% rename from tests/integration/test_crypto.py rename to tests/examples/test_crypto.py diff --git a/tests/integration/test_demo_actor.py b/tests/examples/test_demo_actor.py similarity index 100% rename from tests/integration/test_demo_actor.py rename to tests/examples/test_demo_actor.py diff --git a/tests/examples/test_distributed_lock.py b/tests/examples/test_distributed_lock.py new file mode 100644 index 000000000..47e243c90 --- /dev/null +++ b/tests/examples/test_distributed_lock.py @@ -0,0 +1,21 @@ +import pytest + +EXPECTED_LINES = [ + 'Will try to acquire a lock from lock store named [lockstore]', + 'The lock is for a resource named [example-lock-resource]', + 'The client identifier is [example-client-id]', + 'The lock will expire in 60 seconds.', + 'Lock acquired successfully!!!', + 'We already released the lock so unlocking will not work.', + 'We tried to unlock it anyway and got back [UnlockResponseStatus.lock_does_not_exist]', +] + + +@pytest.mark.example_dir('distributed_lock') +def test_distributed_lock(dapr): + output = dapr.run( + '--app-id=locksapp --app-protocol grpc --resources-path components/ -- python3 lock.py', + timeout=10, + ) + for line in EXPECTED_LINES: + assert line in output, f'Missing in output: {line}' diff --git a/tests/integration/test_error_handling.py b/tests/examples/test_error_handling.py similarity index 100% rename from tests/integration/test_error_handling.py rename to tests/examples/test_error_handling.py diff --git a/tests/integration/test_grpc_proxying.py b/tests/examples/test_grpc_proxying.py similarity index 100% rename from tests/integration/test_grpc_proxying.py rename to tests/examples/test_grpc_proxying.py diff --git a/tests/integration/test_invoke_binding.py b/tests/examples/test_invoke_binding.py similarity index 100% rename from tests/integration/test_invoke_binding.py rename to tests/examples/test_invoke_binding.py diff --git a/tests/integration/test_invoke_custom_data.py b/tests/examples/test_invoke_custom_data.py similarity index 100% rename from tests/integration/test_invoke_custom_data.py rename to tests/examples/test_invoke_custom_data.py diff --git a/tests/integration/test_invoke_http.py b/tests/examples/test_invoke_http.py similarity index 100% rename from tests/integration/test_invoke_http.py rename to tests/examples/test_invoke_http.py diff --git a/tests/integration/test_invoke_simple.py b/tests/examples/test_invoke_simple.py similarity index 100% rename from tests/integration/test_invoke_simple.py rename to tests/examples/test_invoke_simple.py diff --git a/tests/integration/test_jobs.py b/tests/examples/test_jobs.py similarity index 100% rename from tests/integration/test_jobs.py rename to tests/examples/test_jobs.py diff --git a/tests/integration/test_langgraph_checkpointer.py b/tests/examples/test_langgraph_checkpointer.py similarity index 100% rename from tests/integration/test_langgraph_checkpointer.py rename to tests/examples/test_langgraph_checkpointer.py diff --git a/tests/examples/test_metadata.py b/tests/examples/test_metadata.py new file mode 100644 index 000000000..fb9641342 --- /dev/null +++ b/tests/examples/test_metadata.py @@ -0,0 +1,23 @@ +import pytest + +EXPECTED_LINES = [ + 'First, we will assign a new custom label to Dapr sidecar', + "Now, we will fetch the sidecar's metadata", + 'And this is what we got:', + 'application_id: my-metadata-app', + 'active_actors_count: {}', + 'registered_components:', + 'We will update our custom label value and check it was persisted', + 'We added a custom label named [is-this-our-metadata-example]', +] + + +@pytest.mark.example_dir('metadata') +def test_metadata(dapr): + output = dapr.run( + '--app-id=my-metadata-app --app-protocol grpc --resources-path components/ ' + '-- python3 app.py', + timeout=10, + ) + for line in EXPECTED_LINES: + assert line in output, f'Missing in output: {line}' diff --git a/tests/integration/test_pubsub_simple.py b/tests/examples/test_pubsub_simple.py similarity index 100% rename from tests/integration/test_pubsub_simple.py rename to tests/examples/test_pubsub_simple.py diff --git a/tests/integration/test_pubsub_streaming.py b/tests/examples/test_pubsub_streaming.py similarity index 100% rename from tests/integration/test_pubsub_streaming.py rename to tests/examples/test_pubsub_streaming.py diff --git a/tests/integration/test_pubsub_streaming_async.py b/tests/examples/test_pubsub_streaming_async.py similarity index 100% rename from tests/integration/test_pubsub_streaming_async.py rename to tests/examples/test_pubsub_streaming_async.py diff --git a/tests/examples/test_secret_store.py b/tests/examples/test_secret_store.py new file mode 100644 index 000000000..f14baf0eb --- /dev/null +++ b/tests/examples/test_secret_store.py @@ -0,0 +1,33 @@ +import pytest + +EXPECTED_LINES = [ + "{'secretKey': 'secretValue'}", + "[('random', {'random': 'randomValue'}), ('secretKey', {'secretKey': 'secretValue'})]", + "{'random': 'randomValue'}", +] + +EXPECTED_LINES_WITH_ACL = [ + "{'secretKey': 'secretValue'}", + "[('secretKey', {'secretKey': 'secretValue'})]", + 'Got expected error for accessing random key', +] + + +@pytest.mark.example_dir('secret_store') +def test_secret_store(dapr): + output = dapr.run( + '--app-id=secretsapp --app-protocol grpc --resources-path components/ -- python3 example.py', + timeout=30, + ) + for line in EXPECTED_LINES: + assert line in output, f'Missing in output: {line}' + + +@pytest.mark.example_dir('secret_store') +def test_secret_store_with_access_control(dapr): + output = dapr.run( + '--app-id=secretsapp --app-protocol grpc --config config.yaml --resources-path components/ -- python3 example.py', + timeout=30, + ) + for line in EXPECTED_LINES_WITH_ACL: + assert line in output, f'Missing in output: {line}' diff --git a/tests/examples/test_state_store.py b/tests/examples/test_state_store.py new file mode 100644 index 000000000..05d67d032 --- /dev/null +++ b/tests/examples/test_state_store.py @@ -0,0 +1,25 @@ +import pytest + +EXPECTED_LINES = [ + 'State store has successfully saved value_1 with key_1 as key', + 'Cannot save due to bad etag. ErrorCode=StatusCode.ABORTED', + 'State store has successfully saved value_2 with key_2 as key', + 'State store has successfully saved value_3 with key_3 as key', + 'Cannot save bulk due to bad etags. ErrorCode=StatusCode.ABORTED', + "Got value=b'value_1' eTag=1", + "Got items with etags: [(b'value_1_updated', '2'), (b'value_2', '2')]", + 'Transaction with outbox pattern executed successfully!', + "Got value after outbox pattern: b'val1'", + "Got values after transaction delete: [b'', b'']", + "Got value after delete: b''", +] + + +@pytest.mark.example_dir('state_store') +def test_state_store(dapr): + output = dapr.run( + '--resources-path components/ -- python3 state_store.py', + timeout=30, + ) + for line in EXPECTED_LINES: + assert line in output, f'Missing in output: {line}' diff --git a/tests/integration/test_state_store_query.py b/tests/examples/test_state_store_query.py similarity index 100% rename from tests/integration/test_state_store_query.py rename to tests/examples/test_state_store_query.py diff --git a/tests/integration/test_w3c_tracing.py b/tests/examples/test_w3c_tracing.py similarity index 100% rename from tests/integration/test_w3c_tracing.py rename to tests/examples/test_w3c_tracing.py diff --git a/tests/integration/test_workflow.py b/tests/examples/test_workflow.py similarity index 100% rename from tests/integration/test_workflow.py rename to tests/examples/test_workflow.py diff --git a/tests/integration/AGENTS.md b/tests/integration/AGENTS.md new file mode 100644 index 000000000..2f40750f8 --- /dev/null +++ b/tests/integration/AGENTS.md @@ -0,0 +1,95 @@ +# AGENTS.md — Programmatic Integration Tests + +This directory contains **programmatic SDK tests** that call `DaprClient` methods directly and assert on return values, gRPC status codes, and SDK types. Unlike the output-based tests in `tests/examples/` (which run example scripts and check stdout), these tests don't depend on print statement formatting. + +## How it works + +1. `DaprTestEnvironment` (defined in `conftest.py`) manages Dapr sidecar processes +2. `start_sidecar()` launches `dapr run` with explicit ports, waits for the health check, and returns a connected `DaprClient` +3. Tests call SDK methods on that client and assert on the response objects +4. Sidecar stdout is written to temp files (not pipes) to avoid buffer deadlocks +5. Cleanup terminates sidecars, closes clients, and removes log files + +Run locally (requires a running Dapr runtime via `dapr init`): + +```bash +# All integration tests +tox -e integration + +# Single test file +tox -e integration -- test_state_store.py + +# Single test +tox -e integration -- test_state_store.py -k test_save_and_get +``` + +## Directory structure + +``` +tests/integration/ +├── conftest.py # DaprTestEnvironment + fixtures (dapr_env, apps_dir, components_dir) +├── test_*.py # Test files (one per building block) +├── apps/ # Helper apps started alongside sidecars +│ ├── invoke_receiver.py # gRPC method handler for invoke tests +│ └── pubsub_subscriber.py # Subscriber that persists messages to state store +├── components/ # Dapr component YAMLs loaded by all sidecars +│ ├── statestore.yaml # state.redis +│ ├── pubsub.yaml # pubsub.redis +│ ├── lockstore.yaml # lock.redis +│ ├── configurationstore.yaml # configuration.redis +│ └── localsecretstore.yaml # secretstores.local.file +└── secrets.json # Secrets file for localsecretstore component +``` + +## Fixtures + +Sidecar and client fixtures are **module-scoped** — one sidecar per test file. Helper fixtures may use a different scope; see the table below. + +| Fixture | Scope | Type | Description | +|---------|-------|------|-------------| +| `dapr_env` | module | `DaprTestEnvironment` | Manages sidecar lifecycle; call `start_sidecar()` to get a client | +| `apps_dir` | module | `Path` | Path to `tests/integration/apps/` | +| `components_dir` | module | `Path` | Path to `tests/integration/components/` | +| `wait_until` | function | `Callable` | Polling helper `(predicate, timeout=10, interval=0.1)` for eventual-consistency assertions | + +Each test file defines its own module-scoped `client` fixture that calls `dapr_env.start_sidecar(...)`. + +## Building blocks covered + +| Test file | Building block | SDK methods tested | +|-----------|---------------|-------------------| +| `test_state_store.py` | State management | `save_state`, `get_state`, `save_bulk_state`, `get_bulk_state`, `execute_state_transaction`, `delete_state` | +| `test_invoke.py` | Service invocation | `invoke_method` | +| `test_pubsub.py` | Pub/sub | `publish_event`, `get_state` (to verify delivery) | +| `test_secret_store.py` | Secrets | `get_secret`, `get_bulk_secret` | +| `test_metadata.py` | Metadata | `get_metadata`, `set_metadata` | +| `test_distributed_lock.py` | Distributed lock | `try_lock`, `unlock`, context manager | +| `test_configuration.py` | Configuration | `get_configuration`, `subscribe_configuration`, `unsubscribe_configuration` | + +## Port allocation + +All sidecars default to gRPC port 50001 and HTTP port 3500. Since fixtures are module-scoped and tests run sequentially, only one sidecar is active at a time. If parallel execution is needed in the future, sidecars will need dynamic port allocation. + +## Helper apps + +Some building blocks (invoke, pubsub) require an app process running alongside the sidecar: + +- **`invoke_receiver.py`** — A `dapr.ext.grpc.App` that handles `my-method` and returns `INVOKE_RECEIVED`. +- **`pubsub_subscriber.py`** — Subscribes to `TOPIC_A` and persists received messages to the state store. This lets tests verify message delivery by reading state rather than parsing stdout. + +## Adding a new test + +1. Create `test_.py` +2. Add a module-scoped `client` fixture that calls `dapr_env.start_sidecar(app_id='test-')` +3. If the building block needs a new Dapr component, add a YAML to `components/` +4. If the building block needs a running app, add it to `apps/` and pass `app_cmd` / `app_port` to `start_sidecar()` +5. Use unique keys/resource IDs per test to avoid interference (the sidecar is shared within a module) +6. Assert on SDK return types and gRPC status codes, not on string output + +## Gotchas + +- **Requires `dapr init`** — the tests assume a local Dapr runtime with Redis (`dapr_redis` container on `localhost:6379`), which `dapr init` sets up automatically. +- **Configuration tests seed Redis directly** via `docker exec dapr_redis redis-cli`. +- **Lock and configuration APIs are alpha** and emit `UserWarning` on every call. Tests suppress these with `pytestmark = pytest.mark.filterwarnings('ignore::UserWarning')`. +- **`localsecretstore.yaml` uses a relative path** (`secrets.json`) resolved against `cwd=INTEGRATION_DIR`. +- **Dapr may normalize response fields** — e.g., `content_type` may lose charset parameters when proxied through gRPC. Assert on the media type prefix, not the full string. diff --git a/tests/integration/apps/invoke_receiver.py b/tests/integration/apps/invoke_receiver.py new file mode 100644 index 000000000..41592eb0e --- /dev/null +++ b/tests/integration/apps/invoke_receiver.py @@ -0,0 +1,13 @@ +"""gRPC method handler for invoke integration tests.""" + +from dapr.ext.grpc import App, InvokeMethodRequest, InvokeMethodResponse + +app = App() + + +@app.method(name='my-method') +def my_method(request: InvokeMethodRequest) -> InvokeMethodResponse: + return InvokeMethodResponse(b'INVOKE_RECEIVED', 'text/plain; charset=UTF-8') + + +app.run(50051) diff --git a/tests/integration/apps/pubsub_subscriber.py b/tests/integration/apps/pubsub_subscriber.py new file mode 100644 index 000000000..110fa14c8 --- /dev/null +++ b/tests/integration/apps/pubsub_subscriber.py @@ -0,0 +1,26 @@ +"""Pub/sub subscriber that persists received messages to state store. + +Used by integration tests to verify message delivery without relying on stdout. +""" + +import json + +from cloudevents.sdk.event import v1 +from dapr.ext.grpc import App + +from dapr.clients import DaprClient +from dapr.clients.grpc._response import TopicEventResponse + +app = App() + + +@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A') +def handle_topic_a(event: v1.Event) -> TopicEventResponse: + data = json.loads(event.Data()) + key = f'received-{data["run_id"]}-{data["id"]}' + with DaprClient() as d: + d.save_state('statestore', key, event.Data()) + return TopicEventResponse('success') + + +app.run(50051) diff --git a/tests/integration/components/configurationstore.yaml b/tests/integration/components/configurationstore.yaml new file mode 100644 index 000000000..fcf6569d0 --- /dev/null +++ b/tests/integration/components/configurationstore.yaml @@ -0,0 +1,11 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: configurationstore +spec: + type: configuration.redis + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" diff --git a/tests/integration/components/localsecretstore.yaml b/tests/integration/components/localsecretstore.yaml new file mode 100644 index 000000000..fd574a077 --- /dev/null +++ b/tests/integration/components/localsecretstore.yaml @@ -0,0 +1,13 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: localsecretstore +spec: + type: secretstores.local.file + metadata: + - name: secretsFile + # Relative to the Dapr process CWD (tests/integration/), set by + # DaprTestEnvironment via cwd=INTEGRATION_DIR. + value: secrets.json + - name: nestedSeparator + value: ":" diff --git a/tests/integration/components/lockstore.yaml b/tests/integration/components/lockstore.yaml new file mode 100644 index 000000000..424caceeb --- /dev/null +++ b/tests/integration/components/lockstore.yaml @@ -0,0 +1,11 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: lockstore +spec: + type: lock.redis + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" diff --git a/tests/integration/components/pubsub.yaml b/tests/integration/components/pubsub.yaml new file mode 100644 index 000000000..18764d8ce --- /dev/null +++ b/tests/integration/components/pubsub.yaml @@ -0,0 +1,12 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: pubsub +spec: + type: pubsub.redis + version: v1 + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" diff --git a/tests/integration/components/statestore.yaml b/tests/integration/components/statestore.yaml new file mode 100644 index 000000000..a0c53bc40 --- /dev/null +++ b/tests/integration/components/statestore.yaml @@ -0,0 +1,12 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.redis + version: v1 + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 33be85c66..554ef918d 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1,140 +1,214 @@ import shlex import subprocess import tempfile -import threading import time +from contextlib import contextmanager from pathlib import Path -from typing import IO, Any, Generator +from typing import Any, Callable, Generator, Iterator, TypeVar +import httpx import pytest +from dapr.clients import DaprClient +from dapr.conf import settings from tests._process_utils import get_kwargs_for_process_group, terminate_process_group -REPO_ROOT = Path(__file__).resolve().parent.parent.parent -EXAMPLES_DIR = REPO_ROOT / 'examples' +T = TypeVar('T') +INTEGRATION_DIR = Path(__file__).resolve().parent +COMPONENTS_DIR = INTEGRATION_DIR / 'components' +APPS_DIR = INTEGRATION_DIR / 'apps' -def pytest_configure(config: pytest.Config) -> None: - config.addinivalue_line('markers', 'example_dir(name): set the example directory for a test') +class DaprTestEnvironment: + """Manages Dapr sidecars and returns SDK clients for programmatic testing. -class DaprRunner: - """Helper to run `dapr run` commands and capture output.""" + Unlike tests.examples.DaprRunner (which captures stdout for output-based assertions), this + class returns real DaprClient instances so tests can make assertions against SDK return values. + """ - def __init__(self, cwd: Path) -> None: - self._cwd = cwd - self._bg_process: subprocess.Popen[str] | None = None - self._bg_output_file: IO[str] | None = None + def __init__(self, default_components: Path = COMPONENTS_DIR) -> None: + self._default_components = default_components + self._processes: list[subprocess.Popen[str]] = [] + self._clients: list[DaprClient] = [] + + def start_sidecar( + self, + app_id: str, + *, + grpc_port: int = 50001, + http_port: int = 3500, + app_port: int | None = None, + app_cmd: str | None = None, + components: Path | None = None, + ) -> DaprClient: + """Start a Dapr sidecar and return a connected DaprClient. - @staticmethod - def _terminate(proc: subprocess.Popen[str]) -> None: - if proc.poll() is not None: - return + Args: + app_id: Dapr application ID. + grpc_port: Sidecar gRPC port. + http_port: Sidecar HTTP port (also used for the SDK health check). + app_port: Port the app listens on (implies ``--app-protocol grpc``). + app_cmd: Shell command to start alongside the sidecar. + components: Path to component YAML directory. Defaults to + ``tests/integration/components/``. + """ + resources = components or self._default_components + + cmd = [ + 'dapr', + 'run', + '--app-id', + app_id, + '--resources-path', + str(resources), + '--dapr-grpc-port', + str(grpc_port), + '--dapr-http-port', + str(http_port), + ] + if app_port is not None: + cmd.extend(['--app-port', str(app_port), '--app-protocol', 'grpc']) + if app_cmd is not None: + cmd.extend(['--', *shlex.split(app_cmd)]) + + with tempfile.NamedTemporaryFile(mode='w', suffix=f'-{app_id}.log') as log: + proc = subprocess.Popen( + cmd, + cwd=INTEGRATION_DIR, + stdout=log, + stderr=subprocess.STDOUT, + text=True, + **get_kwargs_for_process_group(), + ) + self._processes.append(proc) + + # Point the SDK health check at the actual sidecar HTTP port. + # DaprHealth.wait_for_sidecar() reads settings.DAPR_HTTP_PORT, which + # is initialized once at import time and won't reflect a non-default + # http_port unless we update it here. The DaprClient constructor + # polls /healthz/outbound on this port, so we don't need to sleep first. + settings.DAPR_HTTP_PORT = http_port + + client = DaprClient(address=f'127.0.0.1:{grpc_port}') + self._clients.append(client) + + # /healthz/outbound (polled by DaprClient) only checks sidecar-side + # readiness. When we launched an app alongside the sidecar, also wait + # for /v1.0/healthz so invoke_method et al. don't race the app's server. + if app_cmd is not None: + _wait_for_app_health(http_port) + + return client + + def cleanup(self) -> None: + for client in self._clients: + client.close() + self._clients.clear() + + for proc in self._processes: + if proc.poll() is None: + terminate_process_group(proc) + try: + proc.wait(timeout=10) + except subprocess.TimeoutExpired: + terminate_process_group(proc, force=True) + proc.wait() + self._processes.clear() + + +def _wait_until( + predicate: Callable[[], T | None], + timeout: float = 10.0, + interval: float = 0.1, +) -> T: + """Poll `predicate` until it returns a truthy value. + Raises `TimeoutError` if it never returns.""" + deadline = time.monotonic() + timeout + while True: + result = predicate() + if result: + return result + if time.monotonic() >= deadline: + raise TimeoutError(f'wait_until timed out after {timeout}s') + time.sleep(interval) + + +def _wait_for_app_health(http_port: int, timeout: float = 30.0) -> None: + """Poll Dapr's app-facing /v1.0/healthz endpoint until it returns 2xx. + + ``/v1.0/healthz`` requires the app behind the sidecar to be reachable, + unlike ``/v1.0/healthz/outbound`` which only checks sidecar readiness. + """ + url = f'http://127.0.0.1:{http_port}/v1.0/healthz' - terminate_process_group(proc) + def _check() -> bool: try: - proc.wait(timeout=10) - except subprocess.TimeoutExpired: - terminate_process_group(proc, force=True) - proc.wait() + response = httpx.get(url, timeout=2.0) + except httpx.HTTPError: + return False + return response.is_success - def run(self, args: str, *, timeout: int = 30, until: list[str] | None = None) -> str: - """Run a foreground command, block until it finishes, and return output. + _wait_until(_check, timeout=timeout, interval=0.2) - Use this for short-lived processes (e.g. a publisher that exits on its - own). For long-lived background services, use ``start()``/``stop()``. - Args: - args: Arguments passed to ``dapr run``. - timeout: Maximum seconds to wait before killing the process. - until: If provided, the process is terminated as soon as every - string in this list has appeared in the accumulated output. - """ - proc = subprocess.Popen( - args=('dapr', 'run', *shlex.split(args)), - cwd=self._cwd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - **get_kwargs_for_process_group(), - ) - lines: list[str] = [] - assert proc.stdout is not None - - # Kill the process if it exceeds the timeout. A background timer is - # needed because `for line in proc.stdout` blocks indefinitely when - # the child never exits. - timer = threading.Timer( - interval=timeout, function=lambda: terminate_process_group(proc, force=True) - ) - timer.start() +@contextmanager +def _isolate_dapr_settings() -> Iterator[None]: + """Pin SDK HTTP settings to the local test sidecar for the duration. - try: - for line in proc.stdout: - print(line, end='', flush=True) - lines.append(line) - if until and all(exp in ''.join(lines) for exp in until): - break - finally: - timer.cancel() - self._terminate(proc) + ``DaprHealth.get_api_url()`` consults three settings (see + ``dapr/clients/http/helpers.py``): - return ''.join(lines) + - ``DAPR_HTTP_ENDPOINT``, if set, wins and bypasses host/port entirely. + - ``DAPR_RUNTIME_HOST`` is the host component of the fallback URL. + - ``DAPR_HTTP_PORT`` is the port component of the fallback URL. - def start(self, args: str, *, wait: int = 5) -> None: - """Start a long-lived background service. - - Use this for servers/subscribers that must stay alive while a second - process runs via ``run()``. Call ``stop()`` to terminate and collect - output. Stdout is written to a temp file to avoid pipe-buffer deadlocks. - """ - output_file = tempfile.NamedTemporaryFile(mode='w+', suffix='.log') - proc = subprocess.Popen( - args=('dapr', 'run', *shlex.split(args)), - cwd=self._cwd, - stdout=output_file, - stderr=subprocess.STDOUT, - text=True, - **get_kwargs_for_process_group(), - ) - self._bg_process = proc - self._bg_output_file = output_file - time.sleep(wait) - - def stop(self) -> str: - """Stop the background service and return its captured output.""" - if self._bg_process is None: - return '' - self._terminate(self._bg_process) - self._bg_process = None - return self._read_and_close_output() - - def _read_and_close_output(self) -> str: - if self._bg_output_file is None: - return '' - self._bg_output_file.seek(0) - output = self._bg_output_file.read() - self._bg_output_file.close() - self._bg_output_file = None - print(output, end='', flush=True) - return output + Any of these may be populated from the developer's environment (the Dapr + CLI sets them); without an override the SDK health check could target the + wrong sidecar. All three are snapshotted and restored so the test's + mutations don't leak across modules either. + """ + originals = { + 'DAPR_HTTP_ENDPOINT': settings.DAPR_HTTP_ENDPOINT, + 'DAPR_RUNTIME_HOST': settings.DAPR_RUNTIME_HOST, + 'DAPR_HTTP_PORT': settings.DAPR_HTTP_PORT, + } + settings.DAPR_HTTP_ENDPOINT = None + settings.DAPR_RUNTIME_HOST = '127.0.0.1' + try: + yield + finally: + for name, value in originals.items(): + setattr(settings, name, value) + + +@pytest.fixture(scope='module') +def dapr_env() -> Generator[DaprTestEnvironment, Any, None]: + """Provides a DaprTestEnvironment for programmatic SDK testing. + + Module-scoped so that all tests in a file share a single Dapr sidecar, + avoiding port conflicts from rapid start/stop cycles and cutting total + test time significantly. + """ + with _isolate_dapr_settings(): + env = DaprTestEnvironment() + try: + yield env + finally: + env.cleanup() @pytest.fixture -def dapr(request: pytest.FixtureRequest) -> Generator[DaprRunner, Any, None]: - """Provides a DaprRunner scoped to an example directory. +def wait_until() -> Callable[..., Any]: + """Returns the ``_wait_until(predicate, timeout=10, interval=0.1)`` helper.""" + return _wait_until - Use the ``example_dir`` marker to select which example: - @pytest.mark.example_dir('state_store') - def test_something(dapr): - ... +@pytest.fixture(scope='module') +def apps_dir() -> Path: + return APPS_DIR - Defaults to the examples root if no marker is set. - """ - marker = request.node.get_closest_marker('example_dir') - cwd = EXAMPLES_DIR / marker.args[0] if marker else EXAMPLES_DIR - runner = DaprRunner(cwd) - yield runner - runner.stop() +@pytest.fixture(scope='module') +def components_dir() -> Path: + return COMPONENTS_DIR diff --git a/tests/integration/secrets.json b/tests/integration/secrets.json new file mode 100644 index 000000000..e8db35141 --- /dev/null +++ b/tests/integration/secrets.json @@ -0,0 +1,4 @@ +{ + "secretKey": "secretValue", + "random": "randomValue" +} diff --git a/tests/integration/test_configuration.py b/tests/integration/test_configuration.py index 45f76624a..e73f1a16a 100644 --- a/tests/integration/test_configuration.py +++ b/tests/integration/test_configuration.py @@ -1,49 +1,90 @@ import subprocess +import threading import time import pytest +from dapr.clients.grpc._response import ConfigurationResponse + +STORE = 'configurationstore' REDIS_CONTAINER = 'dapr_redis' -EXPECTED_LINES = [ - 'Got key=orderId1 value=100 version=1 metadata={}', - 'Got key=orderId2 value=200 version=1 metadata={}', - 'Subscribe key=orderId2 value=210 version=2 metadata={}', - 'Unsubscribed successfully? True', -] +def _redis_set(key: str, value: str, version: int = 1) -> None: + """Seed a configuration value directly in Redis. -@pytest.fixture() -def redis_config(): - """Seed configuration values in Redis before the test.""" - subprocess.run( - ('docker', 'exec', 'dapr_redis', 'redis-cli', 'SET', 'orderId1', '100||1'), - check=True, - capture_output=True, - ) + Dapr's Redis configuration store encodes values as ``value||version``. + """ subprocess.run( - ('docker', 'exec', 'dapr_redis', 'redis-cli', 'SET', 'orderId2', '200||1'), + args=('docker', 'exec', REDIS_CONTAINER, 'redis-cli', 'SET', key, f'{value}||{version}'), check=True, capture_output=True, + timeout=10, ) -@pytest.mark.example_dir('configuration') -def test_configuration(dapr, redis_config): - dapr.start( - '--app-id configexample --resources-path components/ -- python3 configuration.py', - wait=5, - ) - # Update Redis to trigger the subscription notification - subprocess.run( - ('docker', 'exec', 'dapr_redis', 'redis-cli', 'SET', 'orderId2', '210||2'), - check=True, - capture_output=True, - ) - # configuration.py sleeps 10s after subscribing before it unsubscribes. - # Wait long enough for the full script to finish. - time.sleep(10) +@pytest.fixture(scope='module') +def client(dapr_env): + _redis_set('cfg-key-1', 'val-1') + _redis_set('cfg-key-2', 'val-2') + return dapr_env.start_sidecar(app_id='test-config') + + +class TestGetConfiguration: + def test_get_single_key(self, client): + resp = client.get_configuration(store_name=STORE, keys=['cfg-key-1']) + assert 'cfg-key-1' in resp.items + assert resp.items['cfg-key-1'].value == 'val-1' + + def test_get_multiple_keys(self, client): + resp = client.get_configuration(store_name=STORE, keys=['cfg-key-1', 'cfg-key-2']) + assert resp.items['cfg-key-1'].value == 'val-1' + assert resp.items['cfg-key-2'].value == 'val-2' + + def test_get_missing_key_returns_empty_items(self, client): + resp = client.get_configuration(store_name=STORE, keys=['nonexistent-cfg-key']) + # Dapr omits keys that don't exist from the response. + assert 'nonexistent-cfg-key' not in resp.items + + def test_items_have_version(self, client): + resp = client.get_configuration(store_name=STORE, keys=['cfg-key-1']) + item = resp.items['cfg-key-1'] + assert item.version + + +class TestSubscribeConfiguration: + def test_subscribe_receives_update(self, client): + received: list[ConfigurationResponse] = [] + event = threading.Event() + + def handler(_id: str, resp: ConfigurationResponse) -> None: + received.append(resp) + event.set() + + sub_id = client.subscribe_configuration( + store_name=STORE, keys=['cfg-sub-key'], handler=handler + ) + assert sub_id + + # Give the subscription watcher thread time to establish its gRPC + # stream before pushing the update, otherwise the notification is missed. + time.sleep(1) + _redis_set('cfg-sub-key', 'updated-val', version=2) + event.wait(timeout=10) + + assert len(received) >= 1 + last = received[-1] + assert 'cfg-sub-key' in last.items + assert last.items['cfg-sub-key'].value == 'updated-val' + + ok = client.unsubscribe_configuration(store_name=STORE, id=sub_id) + assert ok - output = dapr.stop() - for line in EXPECTED_LINES: - assert line in output, f'Missing in output: {line}' + def test_unsubscribe_returns_true(self, client): + sub_id = client.subscribe_configuration( + store_name=STORE, + keys=['cfg-unsub-key'], + handler=lambda _id, _resp: None, + ) + ok = client.unsubscribe_configuration(store_name=STORE, id=sub_id) + assert ok diff --git a/tests/integration/test_distributed_lock.py b/tests/integration/test_distributed_lock.py index 47e243c90..68362c296 100644 --- a/tests/integration/test_distributed_lock.py +++ b/tests/integration/test_distributed_lock.py @@ -1,21 +1,66 @@ import pytest -EXPECTED_LINES = [ - 'Will try to acquire a lock from lock store named [lockstore]', - 'The lock is for a resource named [example-lock-resource]', - 'The client identifier is [example-client-id]', - 'The lock will expire in 60 seconds.', - 'Lock acquired successfully!!!', - 'We already released the lock so unlocking will not work.', - 'We tried to unlock it anyway and got back [UnlockResponseStatus.lock_does_not_exist]', -] - - -@pytest.mark.example_dir('distributed_lock') -def test_distributed_lock(dapr): - output = dapr.run( - '--app-id=locksapp --app-protocol grpc --resources-path components/ -- python3 lock.py', - timeout=10, - ) - for line in EXPECTED_LINES: - assert line in output, f'Missing in output: {line}' +from dapr.clients.grpc._response import UnlockResponseStatus + +STORE = 'lockstore' + +# The distributed lock API emits alpha warnings on every call. +pytestmark = pytest.mark.filterwarnings('ignore::UserWarning') + + +@pytest.fixture(scope='module') +def client(dapr_env): + return dapr_env.start_sidecar(app_id='test-lock') + + +class TestTryLock: + def test_acquire_lock(self, client): + lock = client.try_lock(STORE, 'res-acquire', 'owner-a', expiry_in_seconds=10) + assert lock.success + + def test_second_owner_is_rejected(self, client): + first = client.try_lock(STORE, 'res-contention', 'owner-a', expiry_in_seconds=10) + second = client.try_lock(STORE, 'res-contention', 'owner-b', expiry_in_seconds=10) + assert first.success + assert not second.success + + def test_lock_is_truthy_on_success(self, client): + lock = client.try_lock(STORE, 'res-truthy', 'owner-a', expiry_in_seconds=10) + assert bool(lock) is True + + def test_failed_lock_is_falsy(self, client): + client.try_lock(STORE, 'res-falsy', 'owner-a', expiry_in_seconds=10) + contested = client.try_lock(STORE, 'res-falsy', 'owner-b', expiry_in_seconds=10) + assert bool(contested) is False + + +class TestUnlock: + def test_unlock_own_lock(self, client): + client.try_lock(STORE, 'res-unlock', 'owner-a', expiry_in_seconds=10) + resp = client.unlock(STORE, 'res-unlock', 'owner-a') + assert resp.status == UnlockResponseStatus.success + + def test_unlock_wrong_owner(self, client): + client.try_lock(STORE, 'res-wrong-owner', 'owner-a', expiry_in_seconds=10) + resp = client.unlock(STORE, 'res-wrong-owner', 'owner-b') + assert resp.status == UnlockResponseStatus.lock_belongs_to_others + + def test_unlock_nonexistent(self, client): + resp = client.unlock(STORE, 'res-does-not-exist', 'owner-a') + assert resp.status == UnlockResponseStatus.lock_does_not_exist + + def test_unlock_frees_resource_for_others(self, client): + client.try_lock(STORE, 'res-release', 'owner-a', expiry_in_seconds=10) + client.unlock(STORE, 'res-release', 'owner-a') + second = client.try_lock(STORE, 'res-release', 'owner-b', expiry_in_seconds=10) + assert second.success + + +class TestLockContextManager: + def test_context_manager_auto_unlocks(self, client): + with client.try_lock(STORE, 'res-ctx', 'owner-a', expiry_in_seconds=10) as lock: + assert lock + + # After the context manager exits, another owner should be able to acquire. + second = client.try_lock(STORE, 'res-ctx', 'owner-b', expiry_in_seconds=10) + assert second.success diff --git a/tests/integration/test_invoke.py b/tests/integration/test_invoke.py new file mode 100644 index 000000000..45abdcdcb --- /dev/null +++ b/tests/integration/test_invoke.py @@ -0,0 +1,34 @@ +import pytest + + +@pytest.fixture(scope='module') +def client(dapr_env, apps_dir): + return dapr_env.start_sidecar( + app_id='invoke-receiver', + grpc_port=50001, + app_port=50051, + app_cmd=f'python3 {apps_dir / "invoke_receiver.py"}', + ) + + +def test_invoke_method_returns_expected_response(client): + resp = client.invoke_method( + app_id='invoke-receiver', + method_name='my-method', + data=b'{"id": 1, "message": "hello world"}', + content_type='application/json', + ) + # The app returns 'text/plain; charset=UTF-8', but Dapr may strip + # parameters when proxying through gRPC, so only check the media type. + assert resp.content_type.startswith('text/plain') + assert resp.data == b'INVOKE_RECEIVED' + + +def test_invoke_method_with_text_data(client): + resp = client.invoke_method( + app_id='invoke-receiver', + method_name='my-method', + data=b'plain text', + content_type='text/plain', + ) + assert resp.data == b'INVOKE_RECEIVED' diff --git a/tests/integration/test_metadata.py b/tests/integration/test_metadata.py index fb9641342..88430ebbb 100644 --- a/tests/integration/test_metadata.py +++ b/tests/integration/test_metadata.py @@ -1,23 +1,42 @@ import pytest -EXPECTED_LINES = [ - 'First, we will assign a new custom label to Dapr sidecar', - "Now, we will fetch the sidecar's metadata", - 'And this is what we got:', - 'application_id: my-metadata-app', - 'active_actors_count: {}', - 'registered_components:', - 'We will update our custom label value and check it was persisted', - 'We added a custom label named [is-this-our-metadata-example]', -] - - -@pytest.mark.example_dir('metadata') -def test_metadata(dapr): - output = dapr.run( - '--app-id=my-metadata-app --app-protocol grpc --resources-path components/ ' - '-- python3 app.py', - timeout=10, - ) - for line in EXPECTED_LINES: - assert line in output, f'Missing in output: {line}' + +@pytest.fixture(scope='module') +def client(dapr_env): + return dapr_env.start_sidecar(app_id='test-metadata') + + +class TestGetMetadata: + def test_application_id_matches(self, client): + meta = client.get_metadata() + assert meta.application_id == 'test-metadata' + + def test_registered_components_present(self, client): + meta = client.get_metadata() + component_types = {c.type for c in meta.registered_components} + assert any(t.startswith('state.') for t in component_types) + + def test_registered_components_have_names(self, client): + meta = client.get_metadata() + for comp in meta.registered_components: + assert comp.name + assert comp.type + + +class TestSetMetadata: + def test_set_and_get_roundtrip(self, client): + client.set_metadata('test-key', 'test-value') + meta = client.get_metadata() + assert meta.extended_metadata.get('test-key') == 'test-value' + + def test_overwrite_existing_key(self, client): + client.set_metadata('overwrite-key', 'first') + client.set_metadata('overwrite-key', 'second') + meta = client.get_metadata() + assert meta.extended_metadata['overwrite-key'] == 'second' + + def test_empty_value_is_allowed(self, client): + client.set_metadata('empty-key', '') + meta = client.get_metadata() + assert 'empty-key' in meta.extended_metadata + assert meta.extended_metadata['empty-key'] == '' diff --git a/tests/integration/test_pubsub.py b/tests/integration/test_pubsub.py new file mode 100644 index 000000000..612405b89 --- /dev/null +++ b/tests/integration/test_pubsub.py @@ -0,0 +1,68 @@ +import json +import subprocess +import uuid + +import pytest + +STORE = 'statestore' +PUBSUB = 'pubsub' +TOPIC = 'TOPIC_A' +REDIS_CONTAINER = 'dapr_redis' + + +def _flush_redis() -> None: + """Flush the Dapr Redis instance to prevent state leaking between runs. + + Both the state store and the pubsub component point at the same + ``dapr_redis`` container (see ``tests/integration/components/``), so a + previous run's ``received-*`` keys could otherwise satisfy this test's + assertions even if no new message was delivered. + """ + subprocess.run( + args=('docker', 'exec', REDIS_CONTAINER, 'redis-cli', 'FLUSHDB'), + check=True, + capture_output=True, + timeout=10, + ) + + +@pytest.fixture(scope='module') +def client(dapr_env, apps_dir): + _flush_redis() + return dapr_env.start_sidecar( + app_id='test-subscriber', + grpc_port=50001, + app_port=50051, + app_cmd=f'python3 {apps_dir / "pubsub_subscriber.py"}', + ) + + +def test_published_messages_are_received_by_subscriber(client, wait_until): + run_id = uuid.uuid4().hex + for n in range(1, 4): + client.publish_event( + pubsub_name=PUBSUB, + topic_name=TOPIC, + data=json.dumps({'run_id': run_id, 'id': n, 'message': 'hello world'}), + data_content_type='application/json', + ) + + for n in range(1, 4): + key = f'received-{run_id}-{n}' + data = wait_until( + lambda k=key: client.get_state(store_name=STORE, key=k).data or None, + timeout=10, + ) + msg = json.loads(data) + assert msg['id'] == n + assert msg['message'] == 'hello world' + + +def test_publish_event_succeeds(client): + """Verify publish_event does not raise on a valid topic.""" + client.publish_event( + pubsub_name=PUBSUB, + topic_name=TOPIC, + data=json.dumps({'run_id': uuid.uuid4().hex, 'id': 99, 'message': 'smoke test'}), + data_content_type='application/json', + ) diff --git a/tests/integration/test_secret_store.py b/tests/integration/test_secret_store.py index f14baf0eb..b4e8e8679 100644 --- a/tests/integration/test_secret_store.py +++ b/tests/integration/test_secret_store.py @@ -1,33 +1,19 @@ import pytest -EXPECTED_LINES = [ - "{'secretKey': 'secretValue'}", - "[('random', {'random': 'randomValue'}), ('secretKey', {'secretKey': 'secretValue'})]", - "{'random': 'randomValue'}", -] +STORE = 'localsecretstore' -EXPECTED_LINES_WITH_ACL = [ - "{'secretKey': 'secretValue'}", - "[('secretKey', {'secretKey': 'secretValue'})]", - 'Got expected error for accessing random key', -] +@pytest.fixture(scope='module') +def client(dapr_env, components_dir): + return dapr_env.start_sidecar(app_id='test-secret', components=components_dir) -@pytest.mark.example_dir('secret_store') -def test_secret_store(dapr): - output = dapr.run( - '--app-id=secretsapp --app-protocol grpc --resources-path components/ -- python3 example.py', - timeout=30, - ) - for line in EXPECTED_LINES: - assert line in output, f'Missing in output: {line}' +def test_get_secret(client): + resp = client.get_secret(store_name=STORE, key='secretKey') + assert resp.secret == {'secretKey': 'secretValue'} -@pytest.mark.example_dir('secret_store') -def test_secret_store_with_access_control(dapr): - output = dapr.run( - '--app-id=secretsapp --app-protocol grpc --config config.yaml --resources-path components/ -- python3 example.py', - timeout=30, - ) - for line in EXPECTED_LINES_WITH_ACL: - assert line in output, f'Missing in output: {line}' + +def test_get_bulk_secret(client): + resp = client.get_bulk_secret(store_name=STORE) + assert 'secretKey' in resp.secrets + assert resp.secrets['secretKey'] == {'secretKey': 'secretValue'} diff --git a/tests/integration/test_state_store.py b/tests/integration/test_state_store.py index 05d67d032..26ef51cad 100644 --- a/tests/integration/test_state_store.py +++ b/tests/integration/test_state_store.py @@ -1,25 +1,102 @@ +import grpc import pytest -EXPECTED_LINES = [ - 'State store has successfully saved value_1 with key_1 as key', - 'Cannot save due to bad etag. ErrorCode=StatusCode.ABORTED', - 'State store has successfully saved value_2 with key_2 as key', - 'State store has successfully saved value_3 with key_3 as key', - 'Cannot save bulk due to bad etags. ErrorCode=StatusCode.ABORTED', - "Got value=b'value_1' eTag=1", - "Got items with etags: [(b'value_1_updated', '2'), (b'value_2', '2')]", - 'Transaction with outbox pattern executed successfully!', - "Got value after outbox pattern: b'val1'", - "Got values after transaction delete: [b'', b'']", - "Got value after delete: b''", -] - - -@pytest.mark.example_dir('state_store') -def test_state_store(dapr): - output = dapr.run( - '--resources-path components/ -- python3 state_store.py', - timeout=30, - ) - for line in EXPECTED_LINES: - assert line in output, f'Missing in output: {line}' +from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType +from dapr.clients.grpc._state import StateItem + +STORE = 'statestore' + + +@pytest.fixture(scope='module') +def client(dapr_env): + return dapr_env.start_sidecar(app_id='test-state') + + +class TestSaveAndGetState: + def test_save_and_get(self, client): + client.save_state(store_name=STORE, key='k1', value='v1') + state = client.get_state(store_name=STORE, key='k1') + assert state.data == b'v1' + assert state.etag + + def test_save_with_wrong_etag_fails(self, client): + client.save_state(store_name=STORE, key='etag-test', value='original') + with pytest.raises(grpc.RpcError) as exc_info: + client.save_state(store_name=STORE, key='etag-test', value='bad', etag='9999') + assert exc_info.value.code() == grpc.StatusCode.ABORTED + + def test_get_missing_key_returns_empty(self, client): + state = client.get_state(store_name=STORE, key='nonexistent-key') + assert state.data == b'' + + +class TestBulkState: + def test_save_and_get_bulk(self, client): + client.save_bulk_state( + store_name=STORE, + states=[ + StateItem(key='bulk-1', value='v1'), + StateItem(key='bulk-2', value='v2'), + ], + ) + items = client.get_bulk_state(store_name=STORE, keys=['bulk-1', 'bulk-2']).items + by_key = {i.key: i.data for i in items} + assert by_key['bulk-1'] == b'v1' + assert by_key['bulk-2'] == b'v2' + + def test_save_bulk_with_wrong_etag_fails(self, client): + client.save_state(store_name=STORE, key='bulk-etag-1', value='original') + with pytest.raises(grpc.RpcError) as exc_info: + client.save_bulk_state( + store_name=STORE, + states=[StateItem(key='bulk-etag-1', value='updated', etag='9999')], + ) + assert exc_info.value.code() == grpc.StatusCode.ABORTED + + +class TestStateTransactions: + def test_transaction_upsert(self, client): + client.save_state(store_name=STORE, key='tx-1', value='original') + etag = client.get_state(store_name=STORE, key='tx-1').etag + + client.execute_state_transaction( + store_name=STORE, + operations=[ + TransactionalStateOperation( + operation_type=TransactionOperationType.upsert, + key='tx-1', + data='updated', + etag=etag, + ), + TransactionalStateOperation(key='tx-2', data='new'), + ], + ) + + assert client.get_state(store_name=STORE, key='tx-1').data == b'updated' + assert client.get_state(store_name=STORE, key='tx-2').data == b'new' + + def test_transaction_delete(self, client): + client.save_state(store_name=STORE, key='tx-del-1', value='v1') + client.save_state(store_name=STORE, key='tx-del-2', value='v2') + + client.execute_state_transaction( + store_name=STORE, + operations=[ + TransactionalStateOperation( + operation_type=TransactionOperationType.delete, key='tx-del-1' + ), + TransactionalStateOperation( + operation_type=TransactionOperationType.delete, key='tx-del-2' + ), + ], + ) + + assert client.get_state(store_name=STORE, key='tx-del-1').data == b'' + assert client.get_state(store_name=STORE, key='tx-del-2').data == b'' + + +class TestDeleteState: + def test_delete_single(self, client): + client.save_state(store_name=STORE, key='del-1', value='v1') + client.delete_state(store_name=STORE, key='del-1') + assert client.get_state(store_name=STORE, key='del-1').data == b'' diff --git a/tox.ini b/tox.ini index de0b30a2d..4d448210e 100644 --- a/tox.ini +++ b/tox.ini @@ -38,8 +38,33 @@ commands = ruff check --fix ruff format +[testenv:examples] +; Stdout-based smoke tests that run examples/ and check expected output. +; Usage: tox -e examples # run all +; tox -e examples -- test_state_store.py # run one +passenv = HOME +basepython = python3 +changedir = ./tests/examples/ +commands = + pytest {posargs} -v --tb=short + +allowlist_externals=* + +commands_pre = + pip uninstall -y dapr dapr-ext-grpc dapr-ext-fastapi dapr-ext-langgraph dapr-ext-strands flask-dapr + pip install -r {toxinidir}/dev-requirements.txt \ + -e {toxinidir}/ \ + -e {toxinidir}/ext/dapr-ext-workflow/ \ + -e {toxinidir}/ext/dapr-ext-grpc/ \ + -e {toxinidir}/ext/dapr-ext-fastapi/ \ + -e {toxinidir}/ext/dapr-ext-langgraph/ \ + -e {toxinidir}/ext/dapr-ext-strands/ \ + -e {toxinidir}/ext/flask_dapr/ \ + opentelemetry-exporter-zipkin \ + langchain-ollama + [testenv:integration] -; Pytest-based integration tests that validate the examples/ directory. +; SDK-based integration tests using DaprClient directly. ; Usage: tox -e integration # run all ; tox -e integration -- test_state_store.py # run one passenv = HOME @@ -59,9 +84,7 @@ commands_pre = -e {toxinidir}/ext/dapr-ext-fastapi/ \ -e {toxinidir}/ext/dapr-ext-langgraph/ \ -e {toxinidir}/ext/dapr-ext-strands/ \ - -e {toxinidir}/ext/flask_dapr/ \ - opentelemetry-exporter-zipkin \ - langchain-ollama + -e {toxinidir}/ext/flask_dapr/ [testenv:type] basepython = python3