From b8c37dbf22a2f009162008eef8ab19172562520a Mon Sep 17 00:00:00 2001 From: Abdul Ateya Date: Thu, 9 Apr 2026 10:37:50 -0500 Subject: [PATCH 1/2] Add initial Admin API CLI support --- README.md | 42 +++- duo_cli/admin_client.py | 129 +++++++++++ duo_cli/commands/admin.py | 421 ++++++++++++++++++++++++++++++++++ duo_cli/commands/configure.py | 14 +- duo_cli/main.py | 2 + duo_cli/output.py | 16 ++ pyproject.toml | 2 +- tests/test_admin.py | 138 +++++++++++ tests/test_admin_client.py | 33 +++ tests/test_cli.py | 16 ++ tests/test_config.py | 14 +- tests/test_output.py | 18 +- 12 files changed, 837 insertions(+), 8 deletions(-) create mode 100644 duo_cli/admin_client.py create mode 100644 duo_cli/commands/admin.py create mode 100644 tests/test_admin.py create mode 100644 tests/test_admin_client.py diff --git a/README.md b/README.md index b96881e..06cd5d9 100644 --- a/README.md +++ b/README.md @@ -30,9 +30,10 @@ pip install -e . ## Configuration -Duo has separate **Auth API** and **Universal Prompt (Web SDK)** integrations — configure each one you need: +Duo has separate **Admin API**, **Auth API**, and **Universal Prompt (Web SDK)** integrations — configure each one you need: ```bash +duo-cli configure --api admin duo-cli configure --api auth duo-cli configure --api universal ``` @@ -42,6 +43,11 @@ The interactive setup walks you through where to find credentials in the Duo Adm Credentials are stored in `~/.duo-cli/config.json`. You can also use environment variables (useful for CI/agents): ```bash +# Admin API +export DUO_ADMIN_IKEY=DIXXXXXXXXXXXXXXXXXX +export DUO_ADMIN_SKEY=your-secret-key +export DUO_ADMIN_HOST=api-XXXXXXXX.duosecurity.com + # Auth API export DUO_AUTH_IKEY=DIXXXXXXXXXXXXXXXXXX export DUO_AUTH_SKEY=your-secret-key @@ -56,6 +62,17 @@ export DUO_UNIVERSAL_HOST=api-XXXXXXXX.duosecurity.com ## Quick Start ```bash +# Verify Admin API credentials +duo-cli admin check + +# List users, groups, phones, integrations, and policies +duo-cli admin users --limit 25 +duo-cli admin groups --limit 25 +duo-cli admin integrations --limit 25 + +# Inspect recent logs +duo-cli admin logs auth --days 7 --limit 20 + # Verify credentials duo-cli auth check @@ -77,6 +94,28 @@ duo-cli -o json universal login jsmith ## Commands +### Admin API + +These commands provide a first milestone of high-value Admin API coverage for operators, shell scripts, and MCP servers. JSON output returns the full underlying API response, while table output shows a compact summary. + +| Command | Description | +|---------|-------------| +| `duo-cli admin check` | Verify Admin API credentials and return account summary data | +| `duo-cli admin users [--username ]` | List users or filter to a single username | +| `duo-cli admin user ` | Fetch a single user | +| `duo-cli admin groups` | List groups | +| `duo-cli admin group ` | Fetch a single group | +| `duo-cli admin phones` | List phones | +| `duo-cli admin phone ` | Fetch a single phone | +| `duo-cli admin integrations` | List integrations | +| `duo-cli admin integration ` | Fetch a single integration | +| `duo-cli admin policies` | List policies | +| `duo-cli admin policy ` | Fetch a single policy | +| `duo-cli admin logs auth` | Fetch recent authentication log events | +| `duo-cli admin logs administrator` | Fetch recent administrator log events | +| `duo-cli admin logs activity` | Fetch recent activity log events | +| `duo-cli admin raw ` | Call an Admin API path directly for long-tail coverage | + ### Auth API | Command | Description | @@ -179,6 +218,7 @@ if "allow" in result.lower(): All commands support `--output json` for machine-readable output: ```bash +duo-cli -o json admin users --limit 10 duo-cli -o json auth preauth jsmith duo-cli -o json universal login jsmith ``` diff --git a/duo_cli/admin_client.py b/duo_cli/admin_client.py new file mode 100644 index 0000000..3d2e1b1 --- /dev/null +++ b/duo_cli/admin_client.py @@ -0,0 +1,129 @@ +"""Admin API wrapper used by the CLI and future MCP-facing integrations.""" + +from datetime import datetime, timedelta, timezone +from typing import Dict, List, Optional, Sequence, Union + +import duo_client + +from duo_cli.config import get_client_kwargs + +DEFAULT_LIST_LIMIT = 100 +DEFAULT_LOG_LIMIT = 20 + + +def _time_window(days: int, *, milliseconds: bool) -> tuple[int, int]: + now = datetime.now(tz=timezone.utc) + start = now - timedelta(days=days) + + if milliseconds: + return int(start.timestamp() * 1000), int(now.timestamp() * 1000) + return int(start.timestamp()), int(now.timestamp()) + + +class AdminClient: + """Small adapter around ``duo_client.Admin`` for CLI-oriented defaults.""" + + def __init__(self, client: Optional[duo_client.Admin] = None): + self._client = client or duo_client.Admin(**get_client_kwargs("admin")) + + def check(self) -> dict: + return self._client.get_info_summary() + + def list_users( + self, + *, + limit: int = DEFAULT_LIST_LIMIT, + offset: int = 0, + username: Optional[str] = None, + ) -> list[dict]: + if username: + return self._client.get_users_by_name(username) + return self._client.get_users(limit=limit, offset=offset) + + def get_user(self, user_id: str) -> dict: + return self._client.get_user_by_id(user_id) + + def list_groups(self, *, limit: int = DEFAULT_LIST_LIMIT, offset: int = 0) -> list[dict]: + return self._client.get_groups(limit=limit, offset=offset) + + def get_group(self, group_id: str) -> dict: + return self._client.get_group(group_id, api_version=2) + + def list_phones(self, *, limit: int = DEFAULT_LIST_LIMIT, offset: int = 0) -> list[dict]: + return self._client.get_phones(limit=limit, offset=offset) + + def get_phone(self, phone_id: str) -> dict: + return self._client.get_phone_by_id(phone_id) + + def list_integrations( + self, + *, + limit: int = DEFAULT_LIST_LIMIT, + offset: int = 0, + ) -> list[dict]: + return self._client.get_integrations(limit=limit, offset=offset) + + def get_integration(self, integration_key: str) -> dict: + return self._client.get_integration(integration_key) + + def list_policies( + self, + *, + limit: int = DEFAULT_LIST_LIMIT, + offset: int = 0, + ) -> list[dict]: + return self._client.get_policies_v2(limit=limit, offset=offset) + + def get_policy(self, policy_key: str) -> dict: + return self._client.get_policy_v2(policy_key) + + def authentication_logs( + self, + *, + days: int = 7, + limit: int = DEFAULT_LOG_LIMIT, + ) -> dict: + mintime, maxtime = _time_window(days, milliseconds=True) + return self._client.get_authentication_log( + api_version=2, + mintime=mintime, + maxtime=maxtime, + limit=str(limit), + ) + + def administrator_logs( + self, + *, + days: int = 7, + limit: int = DEFAULT_LOG_LIMIT, + ) -> list[dict]: + mintime, _ = _time_window(days, milliseconds=False) + return self._client.get_administrator_log(mintime=mintime)[:limit] + + def activity_logs( + self, + *, + days: int = 7, + limit: int = DEFAULT_LOG_LIMIT, + ) -> dict: + mintime, maxtime = _time_window(days, milliseconds=True) + return self._client.get_activity_logs( + mintime=mintime, + maxtime=maxtime, + limit=limit, + sort="DESC", + ) + + def raw(self, method: str, path: str, params: Dict[str, str]) -> Union[dict, List[dict]]: + return self._client.json_api_call(method.upper(), path, params) + + +def parse_params(items: Sequence[str]) -> Dict[str, str]: + """Convert repeated ``key=value`` CLI parameters into a dict.""" + params: Dict[str, str] = {} + for item in items: + if "=" not in item: + raise ValueError(f"parameter must be key=value, got: {item}") + key, value = item.split("=", 1) + params[key] = value + return params diff --git a/duo_cli/commands/admin.py b/duo_cli/commands/admin.py new file mode 100644 index 0000000..3ed68a9 --- /dev/null +++ b/duo_cli/commands/admin.py @@ -0,0 +1,421 @@ +"""Admin API commands for operator workflows and MCP-facing reuse.""" + +import json +from typing import Any, Iterable, Sequence + +import click + +from duo_cli.admin_client import AdminClient, parse_params +from duo_cli.output import render, render_object + +LIST_LIMIT = click.IntRange(min=1, max=500) +LOG_LIMIT = click.IntRange(min=1, max=1000) +DAYS_RANGE = click.IntRange(min=1, max=365) + + +def _output_format(ctx: click.Context) -> str: + return ctx.obj.get("output", "table") + + +def _has_value(value: Any) -> bool: + return value not in (None, "", [], {}) + + +def _pick_columns(rows: Sequence[dict], preferred: Sequence[str]) -> list[str]: + if not rows: + return list(preferred) + + selected = [ + column + for column in preferred + if any(_has_value(row.get(column)) for row in rows) + ] + if selected: + return selected + + first_row = rows[0] + return list(first_row.keys())[: min(len(first_row), 8)] + + +def _emit_list( + ctx: click.Context, + raw_data: Any, + rows: Sequence[dict], + *, + preferred_columns: Sequence[str], + title: str, +) -> None: + if _output_format(ctx) == "json": + click.echo(json.dumps(raw_data, default=str)) + return + + if not rows: + if raw_data: + click.echo(json.dumps(raw_data, default=str)) + return + click.echo("No results.") + return + + columns = _pick_columns(rows, preferred_columns) + if not columns: + click.echo(json.dumps(raw_data, default=str)) + return + + render(list(rows), columns, title=title) + + +def _emit_object(ctx: click.Context, raw_data: dict, *, title: str) -> None: + render_object(raw_data, output_format=_output_format(ctx), title=title) + + +def _normalize_path(path: str) -> str: + return path if path.startswith("/") else f"/{path}" + + +def _user_rows(users: Iterable[dict]) -> list[dict]: + return [ + { + "user_id": user.get("user_id"), + "username": user.get("username"), + "realname": user.get("realname"), + "status": user.get("status"), + "email": user.get("email"), + } + for user in users + ] + + +def _group_rows(groups: Iterable[dict]) -> list[dict]: + return [ + { + "group_id": group.get("group_id"), + "name": group.get("name"), + "status": group.get("status"), + "desc": group.get("desc"), + } + for group in groups + ] + + +def _phone_rows(phones: Iterable[dict]) -> list[dict]: + return [ + { + "phone_id": phone.get("phone_id"), + "number": phone.get("number"), + "name": phone.get("name"), + "platform": phone.get("platform"), + "type": phone.get("type"), + } + for phone in phones + ] + + +def _integration_rows(integrations: Iterable[dict]) -> list[dict]: + return [ + { + "integration_key": integration.get("integration_key"), + "name": integration.get("name"), + "type": integration.get("type"), + "active": integration.get("active"), + "version": integration.get("version"), + } + for integration in integrations + ] + + +def _policy_rows(policies: Iterable[dict]) -> list[dict]: + return [ + { + "policy_key": policy.get("policy_key"), + "name": policy.get("name"), + "type": policy.get("type") or policy.get("policy_type"), + "description": policy.get("description"), + } + for policy in policies + ] + + +def _authentication_log_rows(response: dict) -> list[dict]: + rows = [] + for item in response.get("authlogs", []): + user = item.get("user", {}) + application = item.get("application", {}) + access_device = item.get("access_device", {}) + ip = access_device.get("ip") + if isinstance(ip, dict): + ip = ip.get("address") + rows.append( + { + "timestamp": item.get("timestamp"), + "result": item.get("result"), + "factor": item.get("factor"), + "reason": item.get("reason"), + "user": user.get("name"), + "application": application.get("name"), + "ip": ip, + } + ) + return rows + + +def _administrator_log_rows(items: Iterable[dict]) -> list[dict]: + return [ + { + "timestamp": item.get("timestamp"), + "action": item.get("action"), + "username": item.get("username"), + "object": item.get("object"), + "description": item.get("description"), + } + for item in items + ] + + +def _activity_log_rows(response: dict) -> list[dict]: + rows = [] + for item in response.get("items", []): + actor = item.get("actor", {}) + target = item.get("target", {}) + rows.append( + { + "ts": item.get("ts"), + "action": item.get("action"), + "actor": actor.get("name"), + "actor_type": actor.get("type"), + "target": target.get("name"), + "target_type": target.get("type"), + "activity_id": item.get("activity_id"), + } + ) + return rows + + +@click.group() +def admin(): + """Read and manage Duo Admin API resources.""" + pass + + +@admin.command("check") +@click.pass_context +def admin_check(ctx: click.Context): + """Verify that the Duo Admin API credentials are valid.""" + summary = AdminClient().check() + _emit_object(ctx, summary, title="Admin API summary") + + +@admin.command("users") +@click.option("--limit", default=100, show_default=True, type=LIST_LIMIT, help="Maximum users to return.") +@click.option("--offset", default=0, show_default=True, type=click.IntRange(min=0), help="Result offset.") +@click.option("--username", default=None, help="Filter to an exact username match.") +@click.pass_context +def admin_users(ctx: click.Context, limit: int, offset: int, username: str): + """List Duo users.""" + users = AdminClient().list_users(limit=limit, offset=offset, username=username) + _emit_list( + ctx, + users, + _user_rows(users), + preferred_columns=["user_id", "username", "realname", "status", "email"], + title="Users", + ) + + +@admin.command("user") +@click.argument("user_id") +@click.pass_context +def admin_user(ctx: click.Context, user_id: str): + """Get a Duo user by ID.""" + user = AdminClient().get_user(user_id) + _emit_object(ctx, user, title="User") + + +@admin.command("groups") +@click.option("--limit", default=100, show_default=True, type=LIST_LIMIT, help="Maximum groups to return.") +@click.option("--offset", default=0, show_default=True, type=click.IntRange(min=0), help="Result offset.") +@click.pass_context +def admin_groups(ctx: click.Context, limit: int, offset: int): + """List Duo groups.""" + groups = AdminClient().list_groups(limit=limit, offset=offset) + _emit_list( + ctx, + groups, + _group_rows(groups), + preferred_columns=["group_id", "name", "status", "desc"], + title="Groups", + ) + + +@admin.command("group") +@click.argument("group_id") +@click.pass_context +def admin_group(ctx: click.Context, group_id: str): + """Get a Duo group by ID.""" + group = AdminClient().get_group(group_id) + _emit_object(ctx, group, title="Group") + + +@admin.command("phones") +@click.option("--limit", default=100, show_default=True, type=LIST_LIMIT, help="Maximum phones to return.") +@click.option("--offset", default=0, show_default=True, type=click.IntRange(min=0), help="Result offset.") +@click.pass_context +def admin_phones(ctx: click.Context, limit: int, offset: int): + """List Duo phones.""" + phones = AdminClient().list_phones(limit=limit, offset=offset) + _emit_list( + ctx, + phones, + _phone_rows(phones), + preferred_columns=["phone_id", "number", "name", "platform", "type"], + title="Phones", + ) + + +@admin.command("phone") +@click.argument("phone_id") +@click.pass_context +def admin_phone(ctx: click.Context, phone_id: str): + """Get a Duo phone by ID.""" + phone = AdminClient().get_phone(phone_id) + _emit_object(ctx, phone, title="Phone") + + +@admin.command("integrations") +@click.option( + "--limit", + default=100, + show_default=True, + type=LIST_LIMIT, + help="Maximum integrations to return.", +) +@click.option("--offset", default=0, show_default=True, type=click.IntRange(min=0), help="Result offset.") +@click.pass_context +def admin_integrations(ctx: click.Context, limit: int, offset: int): + """List Duo integrations.""" + integrations = AdminClient().list_integrations(limit=limit, offset=offset) + _emit_list( + ctx, + integrations, + _integration_rows(integrations), + preferred_columns=["integration_key", "name", "type", "active", "version"], + title="Integrations", + ) + + +@admin.command("integration") +@click.argument("integration_key") +@click.pass_context +def admin_integration(ctx: click.Context, integration_key: str): + """Get a Duo integration by key.""" + integration = AdminClient().get_integration(integration_key) + _emit_object(ctx, integration, title="Integration") + + +@admin.command("policies") +@click.option("--limit", default=100, show_default=True, type=LIST_LIMIT, help="Maximum policies to return.") +@click.option("--offset", default=0, show_default=True, type=click.IntRange(min=0), help="Result offset.") +@click.pass_context +def admin_policies(ctx: click.Context, limit: int, offset: int): + """List Duo policies.""" + policies = AdminClient().list_policies(limit=limit, offset=offset) + _emit_list( + ctx, + policies, + _policy_rows(policies), + preferred_columns=["policy_key", "name", "type", "description"], + title="Policies", + ) + + +@admin.command("policy") +@click.argument("policy_key") +@click.pass_context +def admin_policy(ctx: click.Context, policy_key: str): + """Get a Duo policy by key.""" + policy = AdminClient().get_policy(policy_key) + _emit_object(ctx, policy, title="Policy") + + +@admin.group("logs") +def admin_logs(): + """Inspect recent Duo Admin API log data.""" + pass + + +@admin_logs.command("auth") +@click.option("--days", default=7, show_default=True, type=DAYS_RANGE, help="Look back this many days.") +@click.option("--limit", default=20, show_default=True, type=LOG_LIMIT, help="Maximum events to return.") +@click.pass_context +def admin_logs_auth(ctx: click.Context, days: int, limit: int): + """List recent authentication log events.""" + response = AdminClient().authentication_logs(days=days, limit=limit) + _emit_list( + ctx, + response, + _authentication_log_rows(response), + preferred_columns=["timestamp", "result", "factor", "user", "application", "ip", "reason"], + title="Authentication Log", + ) + + +@admin_logs.command("administrator") +@click.option("--days", default=7, show_default=True, type=DAYS_RANGE, help="Look back this many days.") +@click.option("--limit", default=20, show_default=True, type=LOG_LIMIT, help="Maximum events to return.") +@click.pass_context +def admin_logs_administrator(ctx: click.Context, days: int, limit: int): + """List recent administrator log events.""" + response = AdminClient().administrator_logs(days=days, limit=limit) + _emit_list( + ctx, + response, + _administrator_log_rows(response), + preferred_columns=["timestamp", "action", "username", "object", "description"], + title="Administrator Log", + ) + + +@admin_logs.command("activity") +@click.option("--days", default=7, show_default=True, type=DAYS_RANGE, help="Look back this many days.") +@click.option("--limit", default=20, show_default=True, type=LOG_LIMIT, help="Maximum events to return.") +@click.pass_context +def admin_logs_activity(ctx: click.Context, days: int, limit: int): + """List recent activity log events.""" + response = AdminClient().activity_logs(days=days, limit=limit) + _emit_list( + ctx, + response, + _activity_log_rows(response), + preferred_columns=["ts", "action", "actor", "actor_type", "target", "target_type", "activity_id"], + title="Activity Log", + ) + + +@admin.command("raw") +@click.argument("method", type=click.Choice(["GET", "POST", "PUT", "PATCH", "DELETE"], case_sensitive=False)) +@click.argument("path") +@click.option("--param", "-p", multiple=True, help="Request parameter as key=value. Repeatable.") +@click.pass_context +def admin_raw(ctx: click.Context, method: str, path: str, param: Sequence[str]): + """Call an Admin API path directly for long-tail endpoint coverage.""" + try: + params = parse_params(param) + except ValueError as exc: + raise click.BadParameter(str(exc), param_hint="--param") from exc + + response = AdminClient().raw(method, _normalize_path(path), params) + if _output_format(ctx) != "json" and isinstance(response, list): + click.echo(json.dumps(response, indent=2, default=str)) + return + + if isinstance(response, dict): + _emit_object(ctx, response, title=f"{method.upper()} {_normalize_path(path)}") + return + + rows = response if all(isinstance(item, dict) for item in response) else [] + _emit_list( + ctx, + response, + rows, + preferred_columns=list(rows[0].keys()) if rows else [], + title=f"{method.upper()} {_normalize_path(path)}", + ) diff --git a/duo_cli/commands/configure.py b/duo_cli/commands/configure.py index d91e78f..a528d70 100644 --- a/duo_cli/commands/configure.py +++ b/duo_cli/commands/configure.py @@ -5,6 +5,14 @@ from duo_cli.config import load_config, save_config SETUP_HELP = { + "admin": ( + "\n To get Admin API credentials:\n" + " 1. Log into the Duo Admin Panel (admin.duosecurity.com)\n" + " 2. Go to Applications > Protect an Application\n" + " 3. Search for \"Admin API\" in the application catalog\n" + " 4. Click Protect to create the integration\n" + " 5. Copy the Integration key, Secret key, and API hostname below\n" + ), "auth": ( "\n To get Auth API credentials:\n" " 1. Log into the Duo Admin Panel (admin.duosecurity.com)\n" @@ -26,14 +34,14 @@ ), } -API_CHOICES = click.Choice(["auth", "universal"]) +API_CHOICES = click.Choice(["auth", "admin", "universal"]) @click.command() @click.option("--api", type=API_CHOICES, default=None, help="Which Duo API to configure.") -@click.option("--ikey", default=None, help="Integration key (auth API).") -@click.option("--skey", default=None, help="Secret key (auth API).") +@click.option("--ikey", default=None, help="Integration key (auth/admin API).") +@click.option("--skey", default=None, help="Secret key (auth/admin API).") @click.option("--client-id", default=None, help="Client ID (universal API).") @click.option("--client-secret", default=None, help="Client secret (universal API).") @click.option("--host", default=None, help="Duo API hostname.") diff --git a/duo_cli/main.py b/duo_cli/main.py index 1ec5533..a116821 100644 --- a/duo_cli/main.py +++ b/duo_cli/main.py @@ -3,6 +3,7 @@ import click from duo_cli import __version__ +from duo_cli.commands.admin import admin from duo_cli.commands.configure import configure from duo_cli.commands.auth import auth from duo_cli.commands.universal import universal @@ -20,6 +21,7 @@ def cli(ctx, output): cli.add_command(configure) +cli.add_command(admin) cli.add_command(auth) cli.add_command(universal) diff --git a/duo_cli/output.py b/duo_cli/output.py index ef5c10d..3f5d2e4 100644 --- a/duo_cli/output.py +++ b/duo_cli/output.py @@ -27,3 +27,19 @@ def render(data, columns: list[str], *, output_format: str = "table", title: str for row in data: table.add_row(*(str(row.get(c, "")) for c in columns)) console.print(table) + + +def render_object(data: dict, *, output_format: str = "table", title: str = ""): + """Render a single dict as JSON or a two-column key/value table.""" + if output_format == "json": + console.print_json(json.dumps(data, default=str)) + return + + table = Table(title=title, show_header=False, show_lines=False) + table.add_column("field", style="cyan") + table.add_column("value") + for key, value in data.items(): + if isinstance(value, (dict, list)): + value = json.dumps(value, default=str) + table.add_row(str(key), str(value)) + console.print(table) diff --git a/pyproject.toml b/pyproject.toml index 0c90a8b..e33959a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,7 @@ authors = [ {name = "Colin Medfisch"}, ] dependencies = [ - "duo-client>=5.0.0", + "duo-client>=5.4.0", "duo-universal>=2.0.0", "click>=8.1", "rich>=13.0", diff --git a/tests/test_admin.py b/tests/test_admin.py new file mode 100644 index 0000000..98495f1 --- /dev/null +++ b/tests/test_admin.py @@ -0,0 +1,138 @@ +"""Tests for Admin API CLI commands.""" + +import json +import os +from unittest.mock import MagicMock, patch + +import pytest +from click.testing import CliRunner + +from duo_cli.main import cli + + +@pytest.fixture +def runner(): + return CliRunner() + + +@pytest.fixture(autouse=True) +def _isolate_config(tmp_path, monkeypatch): + monkeypatch.setenv("DUO_CLI_CONFIG", str(tmp_path / "config.json")) + for key in list(os.environ): + if key.startswith("DUO_") and key != "DUO_CLI_CONFIG": + monkeypatch.delenv(key, raising=False) + + +class TestAdminConfigure: + def test_configure_admin_noninteractive(self, runner): + result = runner.invoke( + cli, + [ + "configure", + "--api", + "admin", + "--ikey", + "DIADMIN123456789012", + "--skey", + "testsecret", + "--host", + "api-test.duosecurity.com", + ], + ) + assert result.exit_code == 0 + assert "Admin API configuration saved" in result.output + + +class TestAdminCommands: + @patch("duo_cli.commands.admin.AdminClient") + def test_admin_check(self, mock_client_cls, runner): + mock_client = MagicMock() + mock_client.check.return_value = {"users": 42, "phones": 9} + mock_client_cls.return_value = mock_client + + result = runner.invoke(cli, ["admin", "check"]) + assert result.exit_code == 0 + assert "users" in result.output + assert "42" in result.output + + @patch("duo_cli.commands.admin.AdminClient") + def test_admin_users_json(self, mock_client_cls, runner): + mock_client = MagicMock() + mock_client.list_users.return_value = [ + { + "user_id": "DU123", + "username": "alice", + "realname": "Alice Example", + "status": "active", + "email": "alice@example.com", + } + ] + mock_client_cls.return_value = mock_client + + result = runner.invoke(cli, ["-o", "json", "admin", "users", "--username", "alice"]) + assert result.exit_code == 0 + data = json.loads(result.output) + assert data[0]["username"] == "alice" + mock_client.list_users.assert_called_once_with(limit=100, offset=0, username="alice") + + @patch("duo_cli.commands.admin.AdminClient") + def test_admin_integrations_table(self, mock_client_cls, runner): + mock_client = MagicMock() + mock_client.list_integrations.return_value = [ + { + "integration_key": "DIKEY123", + "name": "VPN", + "type": "websdk", + "active": True, + } + ] + mock_client_cls.return_value = mock_client + + result = runner.invoke(cli, ["admin", "integrations"]) + assert result.exit_code == 0 + assert "DIKEY123" in result.output + assert "VPN" in result.output + + @patch("duo_cli.commands.admin.AdminClient") + def test_admin_logs_auth_json(self, mock_client_cls, runner): + mock_client = MagicMock() + mock_client.authentication_logs.return_value = { + "authlogs": [ + { + "timestamp": 1700000000000, + "result": "SUCCESS", + "factor": "duo_push", + "reason": "user approved", + "user": {"name": "alice"}, + "application": {"name": "VPN"}, + "access_device": {"ip": "10.0.0.1"}, + } + ], + "metadata": {"total_objects": 1}, + } + mock_client_cls.return_value = mock_client + + result = runner.invoke(cli, ["-o", "json", "admin", "logs", "auth", "--days", "3"]) + assert result.exit_code == 0 + data = json.loads(result.output) + assert data["authlogs"][0]["user"]["name"] == "alice" + mock_client.authentication_logs.assert_called_once_with(days=3, limit=20) + + @patch("duo_cli.commands.admin.AdminClient") + def test_admin_raw_normalizes_path(self, mock_client_cls, runner): + mock_client = MagicMock() + mock_client.raw.return_value = [{"user_id": "DU123", "username": "alice"}] + mock_client_cls.return_value = mock_client + + result = runner.invoke( + cli, + ["admin", "raw", "GET", "admin/v1/users", "-p", "limit=1"], + ) + assert result.exit_code == 0 + mock_client.raw.assert_called_once_with("GET", "/admin/v1/users", {"limit": "1"}) + + @patch("duo_cli.commands.admin.AdminClient") + def test_admin_raw_rejects_bad_param(self, mock_client_cls, runner): + result = runner.invoke(cli, ["admin", "raw", "GET", "/admin/v1/users", "-p", "badparam"]) + assert result.exit_code != 0 + assert "parameter must be key=value" in result.output diff --git a/tests/test_admin_client.py b/tests/test_admin_client.py new file mode 100644 index 0000000..1244e01 --- /dev/null +++ b/tests/test_admin_client.py @@ -0,0 +1,33 @@ +"""Tests for the AdminClient wrapper.""" + +from unittest.mock import MagicMock + +from duo_cli.admin_client import AdminClient + + +def test_authentication_logs_passes_string_limit(): + mock_client = MagicMock() + client = AdminClient(client=mock_client) + + client.authentication_logs(days=3, limit=2) + + kwargs = mock_client.get_authentication_log.call_args.kwargs + assert kwargs["api_version"] == 2 + assert kwargs["limit"] == "2" + assert isinstance(kwargs["mintime"], int) + assert isinstance(kwargs["maxtime"], int) + assert "sort" not in kwargs + + +def test_administrator_logs_applies_limit_after_fetch(): + mock_client = MagicMock() + mock_client.get_administrator_log.return_value = [ + {"action": "one"}, + {"action": "two"}, + {"action": "three"}, + ] + client = AdminClient(client=mock_client) + + result = client.administrator_logs(days=7, limit=2) + + assert result == [{"action": "one"}, {"action": "two"}] diff --git a/tests/test_cli.py b/tests/test_cli.py index 27098d8..f24c66f 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -56,6 +56,16 @@ def test_configure_universal_noninteractive(self, runner): assert result.exit_code == 0 assert "Universal API configuration saved" in result.output + def test_configure_admin_noninteractive(self, runner): + result = runner.invoke(cli, [ + "configure", "--api", "admin", + "--ikey", "DITEST123456789012", + "--skey", "testsecret", + "--host", "api-test.duosecurity.com", + ]) + assert result.exit_code == 0 + assert "Admin API configuration saved" in result.output + def test_configure_auth_interactive(self, runner): result = runner.invoke(cli, ["configure"], input="auth\nDIKEY123\nsecret\napi-host.duo.com\n") assert result.exit_code == 0 @@ -72,6 +82,12 @@ def test_configure_shows_setup_help_auth(self, runner): assert "Auth API" in result.output assert "Protect an Application" in result.output + def test_configure_shows_setup_help_admin(self, runner): + result = runner.invoke(cli, ["configure", "--api", "admin"], + input="DIKEY123\nsecret\napi-host.duo.com\n") + assert "Admin API" in result.output + assert "Protect an Application" in result.output + def test_configure_shows_setup_help_universal(self, runner): result = runner.invoke(cli, ["configure", "--api", "universal"], input="CLIENT123\nsecret\napi-host.duo.com\n") diff --git a/tests/test_config.py b/tests/test_config.py index 0464ee1..4066029 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -13,7 +13,7 @@ def _isolate_config(tmp_path, monkeypatch): monkeypatch.setenv("DUO_CLI_CONFIG", str(tmp_path / "config.json")) # Clear any Duo env vars that might leak from the real environment for key in list(os.environ): - if key.startswith("DUO_"): + if key.startswith("DUO_") and key != "DUO_CLI_CONFIG": monkeypatch.delenv(key, raising=False) @@ -43,6 +43,7 @@ def test_save_overwrites_existing(self): def test_round_trip_all_api_sections(self): config = { + "admin": {"ikey": "DI_ADMIN", "skey": "sk_admin", "host": "api-admin.duo.com"}, "auth": {"ikey": "DI_AUTH", "skey": "sk_auth", "host": "api-auth.duo.com"}, "universal": { "client_id": "DI_UNI", @@ -63,6 +64,17 @@ def test_loads_from_config_file(self): result = get_client_kwargs("auth") assert result == {"ikey": "DI_A", "skey": "SK_A", "host": "api-a.duo.com"} + def test_loads_admin_from_config_file(self): + save_config({ + "admin": {"ikey": "DI_ADMIN", "skey": "SK_ADMIN", "host": "api-admin.duo.com"}, + }) + result = get_client_kwargs("admin") + assert result == { + "ikey": "DI_ADMIN", + "skey": "SK_ADMIN", + "host": "api-admin.duo.com", + } + def test_env_vars_override_config(self, monkeypatch): save_config({ "auth": {"ikey": "file_ikey", "skey": "file_skey", "host": "file_host"}, diff --git a/tests/test_output.py b/tests/test_output.py index 9ad2de9..e18aa32 100644 --- a/tests/test_output.py +++ b/tests/test_output.py @@ -2,8 +2,7 @@ import json -from duo_cli.output import render -from rich.console import Console +from duo_cli.output import render, render_object def test_render_json(capsys): @@ -35,3 +34,18 @@ def test_render_missing_keys(capsys): render(data, ["name", "role"], output_format="table") captured = capsys.readouterr() assert "alice" in captured.out + + +def test_render_object_json(capsys): + render_object({"name": "alice", "meta": {"role": "admin"}}, output_format="json") + captured = capsys.readouterr() + parsed = json.loads(captured.out) + assert parsed["name"] == "alice" + assert parsed["meta"]["role"] == "admin" + + +def test_render_object_table(capsys): + render_object({"name": "alice", "role": "admin"}, output_format="table", title="User") + captured = capsys.readouterr() + assert "alice" in captured.out + assert "role" in captured.out From 07d519478c54c5fc0b81344d257bc6a5193f0b20 Mon Sep 17 00:00:00 2001 From: Abdul Ateya Date: Thu, 9 Apr 2026 10:39:50 -0500 Subject: [PATCH 2/2] Refine admin CLI docs and examples --- README.md | 40 ++++++++++++++++++++++++++++++++++++++- duo_cli/admin_client.py | 3 ++- duo_cli/commands/admin.py | 2 +- 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 06cd5d9..dc5d3df 100644 --- a/README.md +++ b/README.md @@ -96,7 +96,7 @@ duo-cli -o json universal login jsmith ### Admin API -These commands provide a first milestone of high-value Admin API coverage for operators, shell scripts, and MCP servers. JSON output returns the full underlying API response, while table output shows a compact summary. +These commands provide a first milestone of high-value Admin API coverage for operators and shell scripts. JSON output returns the full underlying API response, while table output shows a compact summary. | Command | Description | |---------|-------------| @@ -116,6 +116,44 @@ These commands provide a first milestone of high-value Admin API coverage for op | `duo-cli admin logs activity` | Fetch recent activity log events | | `duo-cli admin raw ` | Call an Admin API path directly for long-tail coverage | +#### Admin API Examples + +```bash +# Save Admin API credentials +duo-cli configure --api admin + +# Confirm the Admin API integration works +duo-cli admin check + +# Find a user by username +duo-cli admin users --username aateya + +# Inspect a small slice of the tenant +duo-cli admin users --limit 10 +duo-cli admin groups --limit 10 +duo-cli admin phones --limit 10 +duo-cli admin integrations --limit 10 +duo-cli admin policies --limit 10 + +# Fetch a single object by ID/key +duo-cli admin user DUXXXXXXXXXXXXXXXXXX +duo-cli admin group DGXXXXXXXXXXXXXXXXXX +duo-cli admin phone DPXXXXXXXXXXXXXXXXXX +duo-cli admin integration DIXXXXXXXXXXXXXXXXXX +duo-cli admin policy POXXXXXXXXXXXXXXXXXX + +# Look at recent logs +duo-cli admin logs auth --days 7 --limit 20 +duo-cli admin logs administrator --days 7 --limit 20 +duo-cli admin logs activity --days 7 --limit 20 + +# Use JSON for scripts +duo-cli -o json admin users --limit 5 + +# Call an endpoint directly when there is not a dedicated command yet +duo-cli admin raw GET /admin/v1/users -p limit=1 +``` + ### Auth API | Command | Description | diff --git a/duo_cli/admin_client.py b/duo_cli/admin_client.py index 3d2e1b1..11f6a66 100644 --- a/duo_cli/admin_client.py +++ b/duo_cli/admin_client.py @@ -1,4 +1,4 @@ -"""Admin API wrapper used by the CLI and future MCP-facing integrations.""" +"""Admin API wrapper used by the CLI.""" from datetime import datetime, timedelta, timezone from typing import Dict, List, Optional, Sequence, Union @@ -7,6 +7,7 @@ from duo_cli.config import get_client_kwargs +# Interactive CLI defaults chosen to keep output readable. DEFAULT_LIST_LIMIT = 100 DEFAULT_LOG_LIMIT = 20 diff --git a/duo_cli/commands/admin.py b/duo_cli/commands/admin.py index 3ed68a9..248fdf8 100644 --- a/duo_cli/commands/admin.py +++ b/duo_cli/commands/admin.py @@ -1,4 +1,4 @@ -"""Admin API commands for operator workflows and MCP-facing reuse.""" +"""Admin API commands for operator workflows.""" import json from typing import Any, Iterable, Sequence