diff --git a/.gitignore b/.gitignore index cfaff7127..869bfcb42 100644 --- a/.gitignore +++ b/.gitignore @@ -100,6 +100,7 @@ !/3MF/ !/calibre/ !/rekordbox/ +!/cc-switch/ !/siyuan/ # Step 5: Inside each software dir, ignore everything (including dotfiles) /gimp/* @@ -208,6 +209,8 @@ /calibre/.* /rekordbox/* /rekordbox/.* +/cc-switch/* +/cc-switch/.* /siyuan/* /siyuan/.* @@ -275,6 +278,7 @@ !/quietshrink/agent-harness/ !/mailchimp/agent-harness/ !/rekordbox/agent-harness/ +!/cc-switch/agent-harness/ !/siyuan/agent-harness/ # Exclude non-gedit demo macros from macrocli (local only) diff --git a/cc-switch/agent-harness/CCSWITCH.md b/cc-switch/agent-harness/CCSWITCH.md new file mode 100644 index 000000000..332e21f61 --- /dev/null +++ b/cc-switch/agent-harness/CCSWITCH.md @@ -0,0 +1,57 @@ +# CC Switch SOP: CLI Harness Design + +## Software Identity +- **Name**: CC Switch v3.15.0 +- **Type**: Cross-platform desktop GUI (Tauri 2: Rust + React) +- **Purpose**: All-in-One configuration manager for AI-powered coding CLI tools (Claude Code, Codex, Gemini CLI, OpenCode, OpenClaw, Hermes Agent) +- **Core operations**: Provider switching management, local HTTP proxy, MCP server management, skills management, prompt management, session browsing, usage tracking, cloud sync + +## Architecture Analysis + +### Backend Engine +- **Rust/Tauri 2** (`src-tauri/src/`) — native system integration, SQLite database, embedded HTTP proxy (axum), config file I/O +- **React/TypeScript** (`src/`) — UI components, forms, charts + +### Data Model +- **Primary Store**: SQLite database at `~/.cc-switch/cc-switch.db` (12+ tables) +- **Config Files**: `~/.cc-switch/config.json` (MultiAppConfig), `~/.cc-switch/settings.json` +- **Managed Files**: `~/.claude/settings.json`, `~/.codex/*`, `~/.gemini/*`, `~/.opencode/*`, `~/.openclaw/*`, Hermes config dirs + +### Key Database Tables +| Table | Purpose | +|-------|---------| +| providers | AI provider configurations (API keys, endpoints, models) | +| mcp_servers | MCP server definitions with per-app enable flags | +| skills | Installed skills with per-app enable flags | +| skill_repos | Registered skill GitHub repos | +| prompts | Prompt presets with per-app association | +| proxy_config | Per-app proxy settings (claude/codex/gemini) | +| proxy_request_logs | API request logs for usage tracking | +| usage_daily_rollups | Daily aggregated usage stats | +| model_pricing | Model pricing data | +| provider_health | Provider health monitoring | +| session_log_sync | Session log file sync state | +| settings | Key-value settings store | + +### Existing CLI/Scripting Capabilities +- **None**. All operations are Tauri IPC commands invoked from React frontend via `invoke()`. +- **Deep Link Protocol** (`ccswitch://`) provides URL-based import for providers/MCP/prompts/skills. + +## Command Map + +| GUI Feature | CLI Command Group | Data Source | +|-------------|------------------|-------------| +| Provider Management | `providers` | SQLite providers table | +| Proxy Server | `proxy` | SQLite proxy_config, proxy_request_logs tables | +| MCP Management | `mcp` | SQLite mcp_servers table + app config files | +| Skills Management | `skills` | SQLite skills + skill_repos tables + filesystem | +| Prompt Management | `prompts` | SQLite prompts table + prompt files | +| Session Browser | `sessions` | Filesystem (JSON log files) + SQLite session_log_sync | +| Usage Dashboard | `usage` | SQLite proxy_request_logs + usage_daily_rollups | +| Settings | `settings` | SQLite settings table + settings.json | +| Cloud Sync | `sync` | WebDAV config + filesystem | + +## Rendering Gap Assessment +- **No rendering gap** — CC Switch manages text-based AI CLI configuration files. There is no visual/graphical rendering. The CLI directly reads/writes SQLite and JSON, which is functionally identical to what the GUI does. + +The current harness focuses on status, provider switching, proxy, MCP, skills, usage, settings, and sessions. Preview-style summaries can be added later on top of the existing `usage stats` data. diff --git a/cc-switch/agent-harness/cli_anything/ccswitch/README.md b/cc-switch/agent-harness/cli_anything/ccswitch/README.md new file mode 100644 index 000000000..83874278b --- /dev/null +++ b/cc-switch/agent-harness/cli_anything/ccswitch/README.md @@ -0,0 +1,119 @@ +# CC Switch CLI - Agent Harness + +A CLI interface for CC Switch, a desktop app that manages AI coding +tool configurations across Claude Code, Codex, Gemini CLI, OpenCode, OpenClaw, +and Hermes. Reads directly from the live CC Switch SQLite database. + +## Installation + +```bash +pip install git+https://github.com/HKUDS/CLI-Anything.git#subdirectory=cc-switch/agent-harness +``` + +Or for local development: + +```bash +cd cc-switch/agent-harness +pip install -e . +``` + +Requires CC Switch installed with an active database at `~/.cc-switch/cc-switch.db`. + +## Quick Start + +```bash +# Status overview +cli-anything-ccswitch +cli-anything-ccswitch status + +# List all providers +cli-anything-ccswitch providers list + +# Filter by app +cli-anything-ccswitch providers list --app claude + +# Switch active provider +cli-anything-ccswitch providers set-current --app claude + +# Check proxy status +cli-anything-ccswitch proxy status --app claude + +# View usage stats (7 days) +cli-anything-ccswitch usage stats --days 7 + +# Recent request logs +cli-anything-ccswitch usage logs --limit 10 + +# List skills +cli-anything-ccswitch skills list + +# List MCP servers +cli-anything-ccswitch mcp list + +# Manage settings +cli-anything-ccswitch settings list +cli-anything-ccswitch settings get + +# Session logs +cli-anything-ccswitch sessions list --app claude +``` + +## JSON Output Mode + +All commands support `--json` for machine-readable output. Place it before the subcommand: + +```bash +cli-anything-ccswitch --json providers list +cli-anything-ccswitch --json usage stats --days 30 +cli-anything-ccswitch --json providers get --app claude +``` + +## Command Groups + +| Group | Description | +|-------|-------------| +| `status` | Show a quick database overview | +| `providers` | Manage AI provider configurations (list, get, set-current) | +| `proxy` | Manage the local HTTP proxy server (status, config) | +| `mcp` | Manage MCP servers (list, enable) | +| `skills` | Manage installed skills (list, repos) | +| `usage` | View API usage and cost statistics (stats, logs) | +| `settings` | View and manage settings (list, get, set) | +| `sessions` | Browse AI conversation sessions (list) | + +## Running Tests + +```bash +# All tests +python3 -m pytest cli_anything/ccswitch/tests/ -v + +# Unit tests (no CC Switch needed) +python3 -m pytest cli_anything/ccswitch/tests/test_core.py -v + +# E2E tests (requires live CC Switch database) +python3 -m pytest cli_anything/ccswitch/tests/test_full_e2e.py -v +``` + +## Architecture + +``` +cli_anything/ccswitch/ +├── __init__.py +├── __main__.py # python3 -m cli_anything.ccswitch +├── ccswitch_cli.py # Main Click CLI (7 command groups) +├── utils/ +│ ├── __init__.py +│ └── db.py # SQLite database utilities +├── skills/ +│ └── SKILL.md # Packaged AI skill definition +└── tests/ + ├── TEST.md # Test plan and results + ├── test_core.py # 30 unit tests + └── test_full_e2e.py # 20 E2E tests +``` + +## Database + +Reads directly from `~/.cc-switch/cc-switch.db`. Read operations are safe; +write operations (`providers set-current`, `proxy config`, `mcp enable`, +`settings set`) modify the database and write live config to app settings files. diff --git a/cc-switch/agent-harness/cli_anything/ccswitch/__init__.py b/cc-switch/agent-harness/cli_anything/ccswitch/__init__.py new file mode 100644 index 000000000..782e4cc84 --- /dev/null +++ b/cc-switch/agent-harness/cli_anything/ccswitch/__init__.py @@ -0,0 +1,3 @@ +"""CC Switch CLI - Command-line interface for CC Switch configuration manager.""" + +__version__ = "1.0.0" diff --git a/cc-switch/agent-harness/cli_anything/ccswitch/__main__.py b/cc-switch/agent-harness/cli_anything/ccswitch/__main__.py new file mode 100644 index 000000000..7f0b7941a --- /dev/null +++ b/cc-switch/agent-harness/cli_anything/ccswitch/__main__.py @@ -0,0 +1,5 @@ +"""Allow python -m cli_anything.ccswitch""" +from cli_anything.ccswitch.ccswitch_cli import main + +if __name__ == "__main__": + main() diff --git a/cc-switch/agent-harness/cli_anything/ccswitch/ccswitch_cli.py b/cc-switch/agent-harness/cli_anything/ccswitch/ccswitch_cli.py new file mode 100644 index 000000000..2b9a0e3d5 --- /dev/null +++ b/cc-switch/agent-harness/cli_anything/ccswitch/ccswitch_cli.py @@ -0,0 +1,988 @@ +"""CC Switch CLI — command-line interface for CC Switch configuration manager. + +Usage: + ccswitch providers list [--app claude] [--json] + ccswitch providers set-current --app claude + ccswitch proxy status [--app claude] [--json] + ccswitch proxy config get [--app claude] + ccswitch mcp list [--json] + ccswitch skills list [--json] + ccswitch usage stats [--days 30] [--json] + ccswitch settings get +""" + +import json as _json +import os as _os +import sys +import tempfile as _tempfile +from collections.abc import Mapping +from copy import deepcopy as _deepcopy +import click +import tomlkit + +from .utils.db import connect_db, load_config, load_settings, VALID_APP_TYPES + +_MANAGED_APPS = VALID_APP_TYPES + +# ────────────────────────────────────────────── +# Shared helpers +# ────────────────────────────────────────────── + + +def _resolve_app(app: str | None) -> str | None: + if app is not None: + app = app.lower() + if app not in VALID_APP_TYPES: + raise click.BadParameter(f"Invalid app: {app}. Valid: {', '.join(VALID_APP_TYPES)}") + return app + + +def _mask_sensitive(key: str, value) -> str: + """Mask sensitive values like API tokens and keys.""" + if isinstance(value, str) and _is_sensitive_key(key): + if len(value) > 12: + return value[:8] + "..." + value[-4:] + return "***" + if isinstance(value, Mapping | list | tuple): + return _format_masked_value(_mask_value(value, key)) + return str(value) + + +def _is_sensitive_key(key: str) -> bool: + lowered = key.lower() + normalized = "".join(ch if ch.isalnum() else "_" for ch in lowered).strip("_") + parts = {p for p in normalized.split("_") if p} + if not normalized: + return False + if normalized in {"key", "apikey", "api_key"}: + return True + if normalized.endswith("_key") or normalized.startswith("key_"): + return True + sensitive_words = { + "token", "secret", "password", "auth", "credential", + "bearer", "cookie", "private", + } + if parts & sensitive_words: + return True + return any(word in normalized for word in ( + "apikey", "api_key", "authtoken", "accesskey", "secretkey", + "secretaccesskey", "authorization", + )) + + +def _mask_value(value, key: str = ""): + """Return a JSON-safe copy with sensitive nested values masked.""" + if _is_sensitive_key(key): + if isinstance(value, str): + return _mask_sensitive(key, value) + return "***" + if isinstance(value, Mapping): + return {k: _mask_value(v, str(k)) for k, v in value.items()} + if isinstance(value, list): + return [_mask_value(item, key) for item in value] + if isinstance(value, tuple): + return [_mask_value(item, key) for item in value] + return value + + +def _format_masked_value(value) -> str: + if isinstance(value, Mapping): + return "{" + ", ".join( + f"{k}: {_format_masked_value(v)}" for k, v in value.items() + ) + "}" + if isinstance(value, list): + return "[" + ", ".join(_format_masked_value(item) for item in value) + "]" + return str(value) + + +def _table(headers: list[str], rows: list[tuple]) -> str: + """Format data as a simple aligned table.""" + if not rows: + return "(empty)" + all_rows = [headers] + [list(map(str, r)) for r in rows] + col_widths = [max(len(r[i]) for r in all_rows) for i in range(len(headers))] + lines = [] + header = " ".join(h.ljust(col_widths[i]) for i, h in enumerate(headers)) + lines.append(header) + lines.append("-" * len(header)) + for row in all_rows[1:]: + lines.append(" ".join(v.ljust(col_widths[i]) for i, v in enumerate(row))) + return "\n".join(lines) + + +def _enabled_apps_str(row: Mapping) -> str: + apps = [app for app in _MANAGED_APPS if row[f"enabled_{app}"]] + return ",".join(apps) if apps else "-" + + +def _normalise_usage_rows(rows) -> list[dict]: + normalised = [] + for row in rows: + data = dict(row) + data["input_tok"] = data["input_tok"] or 0 + data["output_tok"] = data["output_tok"] or 0 + data["cost"] = data["cost"] or 0 + normalised.append(data) + return normalised + + +def _enabled_app_columns(db, table: str) -> str: + columns = {row["name"] for row in db.execute(f"PRAGMA table_info({table})")} + parts = [] + for app in _MANAGED_APPS: + column = f"enabled_{app}" + parts.append(column if column in columns else f"0 AS {column}") + return ", ".join(parts) + + +_CODEX_PROVIDER_CONFIG_KEYS = ("model", "model_provider", "base_url") + + +def _merge_codex_provider_toml(target, provider_config: str) -> None: + """Merge provider-owned Codex TOML fields without deleting user settings.""" + try: + existing_text = target.read_text(encoding="utf-8") if target.exists() else "" + existing_doc = ( + tomlkit.parse(existing_text) if existing_text.strip() else tomlkit.document() + ) + provider_doc = ( + tomlkit.parse(provider_config) if provider_config.strip() else tomlkit.document() + ) + except Exception as exc: + raise click.ClickException(f"Invalid Codex config.toml: {exc}") from exc + + for key in _CODEX_PROVIDER_CONFIG_KEYS: + if key in provider_doc: + existing_doc[key] = _deepcopy(provider_doc[key]) + + if "model_providers" in provider_doc: + provider_table = provider_doc["model_providers"] + if not hasattr(provider_table, "items"): + existing_doc["model_providers"] = _deepcopy(provider_table) + else: + existing_table = existing_doc.get("model_providers") + if not hasattr(existing_table, "items"): + existing_doc["model_providers"] = tomlkit.table() + existing_table = existing_doc["model_providers"] + for provider_id, provider_settings in provider_table.items(): + existing_table[provider_id] = _deepcopy(provider_settings) + + _write_text_config(target, tomlkit.dumps(existing_doc)) + + +# ────────────────────────────────────────────── +# Main CLI +# ────────────────────────────────────────────── + +@click.group(invoke_without_command=True) +@click.option("--json", "json_mode", is_flag=True, help="Output in JSON format") +@click.option("--db", "db_path", type=click.Path(), help="Override database path") +@click.pass_context +def cli(ctx: click.Context, json_mode: bool, db_path: str | None) -> None: + """CC Switch CLI — Manage AI coding tool configurations from the terminal.""" + ctx.ensure_object(dict) + ctx.obj["json_mode"] = json_mode + ctx.obj["db_path"] = db_path + if ctx.invoked_subcommand is None: + # Show status overview + _show_status(ctx) + + +@cli.command("status") +@click.pass_context +def status(ctx: click.Context) -> None: + """Show a quick database overview.""" + _show_status(ctx) + + +def _show_status(ctx: click.Context) -> None: + """Show a quick status overview.""" + db = connect_db(ctx.obj.get("db_path")) + try: + # Count providers + prov_count = db.execute("SELECT COUNT(*) FROM providers").fetchone()[0] + # Current provider per app + cur = db.execute( + "SELECT app_type, name FROM providers WHERE is_current=1 ORDER BY app_type" + ).fetchall() + # Skill count + skill_count = db.execute("SELECT COUNT(*) FROM skills").fetchone()[0] + # MCP count + mcp_count = db.execute("SELECT COUNT(*) FROM mcp_servers").fetchone()[0] + + if ctx.obj.get("json_mode"): + _json.dump({ + "providers": prov_count, + "current": {r["app_type"]: r["name"] for r in cur}, + "skills": skill_count, + "mcp_servers": mcp_count, + }, sys.stdout, indent=2) + return + + click.echo("CC Switch Status") + click.echo("-" * 40) + click.echo(f" Providers: {prov_count}") + click.echo(f" Skills: {skill_count}") + click.echo(f" MCP Servers: {mcp_count}") + click.echo() + click.echo(" Current providers:") + for r in cur: + click.echo(f" {r['app_type']:>10s}: {r['name']}") + finally: + db.close() + + +# ────────────────────────────────────────────── +# Providers +# ────────────────────────────────────────────── + +@cli.group() +def providers() -> None: + """Manage AI provider configurations.""" + pass + + +@providers.command("list") +@click.option("--app", "-a", help="Filter by app type") +@click.pass_context +def providers_list(ctx: click.Context, app: str | None) -> None: + """List all configured providers.""" + app = _resolve_app(app) + db = connect_db(ctx.obj.get("db_path")) + try: + if app: + rows = db.execute( + "SELECT id, name, category, is_current, sort_index FROM providers " + "WHERE app_type=? ORDER BY sort_index", + (app,), + ).fetchall() + else: + rows = db.execute( + "SELECT app_type, id, name, category, is_current, sort_index " + "FROM providers ORDER BY app_type, sort_index" + ).fetchall() + + if ctx.obj.get("json_mode"): + _json.dump([dict(r) for r in rows], sys.stdout, indent=2, default=str) + return + + if app: + click.echo(_table(["ID", "Name", "Category", "Current", "Sort"], [ + (r["id"], r["name"], r["category"] or "", "*" if r["is_current"] else "", r["sort_index"]) + for r in rows + ])) + else: + click.echo(_table(["App", "ID", "Name", "Category", "Current", "Sort"], [ + (r["app_type"], r["id"], r["name"], r["category"] or "", "*" if r["is_current"] else "", r["sort_index"]) + for r in rows + ])) + finally: + db.close() + + +@providers.command("get") +@click.argument("provider_id") +@click.option("--app", "-a", required=True, help="App type (claude/codex/gemini/...)") +@click.pass_context +def providers_get(ctx: click.Context, provider_id: str, app: str) -> None: + """Get detailed configuration for a provider.""" + app = _resolve_app(app) + db = connect_db(ctx.obj.get("db_path")) + try: + row = db.execute( + "SELECT * FROM providers WHERE id=? AND app_type=?", (provider_id, app) + ).fetchone() + if not row: + click.echo(f"Provider '{provider_id}' not found for app '{app}'", err=True) + raise SystemExit(1) + + data = dict(row) + # Parse settings_config JSON + data["settings_config"] = _json.loads(data["settings_config"]) + + if ctx.obj.get("json_mode"): + data["settings_config"] = _mask_value(data["settings_config"]) + _json.dump(data, sys.stdout, indent=2, default=str) + return + + click.echo(f"Provider: {data['name']}") + click.echo(f" ID: {data['id']}") + click.echo(f" App: {data['app_type']}") + click.echo(f" Category: {data.get('category', 'N/A')}") + click.echo(f" Current: {bool(data['is_current'])}") + click.echo(f" Settings:") + for k, v in sorted(data["settings_config"].items()): + click.echo(f" {k}: {_format_masked_value(_mask_value(v, k))}") + finally: + db.close() + + +@providers.command("set-current") +@click.argument("provider_id") +@click.option("--app", "-a", required=True, help="App type") +@click.pass_context +def providers_set_current(ctx: click.Context, provider_id: str, app: str) -> None: + """Set the current/active provider for an app.""" + app = _resolve_app(app) + db = connect_db(ctx.obj.get("db_path")) + try: + # Verify provider exists + row = db.execute( + "SELECT id, name FROM providers WHERE id=? AND app_type=?", (provider_id, app) + ).fetchone() + if not row: + click.echo(f"Provider '{provider_id}' not found for '{app}'", err=True) + raise SystemExit(1) + + # Unset all, then set current + db.execute("UPDATE providers SET is_current=0 WHERE app_type=?", (app,)) + db.execute( + "UPDATE providers SET is_current=1 WHERE id=? AND app_type=?", (provider_id, app) + ) + + _write_live_config(app, db) + db.commit() + click.echo(f"Switched {app} to provider: {row['name']}") + finally: + db.close() + + +def _write_live_config(app: str, db) -> None: + """Write the current provider config to the live app config file.""" + from pathlib import Path + + home = Path(_os.path.expanduser("~")) + row = db.execute( + "SELECT * FROM providers WHERE app_type=? AND is_current=1", (app,) + ).fetchone() + if not row: + return + + config = _json.loads(row["settings_config"]) + + target_map = { + "claude": home / ".claude" / "settings.json", + "codex": home / ".codex" / "config.toml", + "gemini": home / ".gemini" / ".env", + "opencode": home / ".config" / "opencode" / "opencode.json", + "openclaw": home / ".openclaw" / "openclaw.json", + "hermes": home / ".hermes" / "config.yaml", + } + + target = target_map.get(app) + if not target: + return + + if app == "claude": + _write_json_env_config(target, _env_config(config)) + click.echo(f" Written live config to: {target}") + elif app == "codex": + wrote = False + if "config" in config: + _merge_codex_provider_toml(target, str(config["config"])) + wrote = True + if isinstance(config.get("auth"), dict): + _write_json_config(home / ".codex" / "auth.json", config["auth"]) + wrote = True + if wrote: + click.echo(f" Written live config to: {target}") + else: + click.echo(f" (Note: no supported {app} live config payload found)") + elif app == "gemini": + written_targets = [] + env = _gemini_env_config(config) + if env: + _write_env_config(target, env) + written_targets.append(target) + if "config" in config: + settings_target = home / ".gemini" / "settings.json" + if isinstance(config["config"], dict): + _write_json_config(settings_target, config["config"]) + else: + _write_text_config(settings_target, str(config["config"])) + written_targets.append(settings_target) + if written_targets: + click.echo( + " Written live config to: " + + ", ".join(str(path) for path in written_targets) + ) + else: + click.echo(f" (Note: no supported {app} live config payload found)") + elif app == "opencode": + if "config" in config: + _write_text_config(target, str(config["config"])) + elif "settingsConfig" in config: + _write_opencode_provider_config(target, str(row["id"]), config["settingsConfig"]) + else: + click.echo(f" (Note: no supported {app} live config payload found)") + return + click.echo(f" Written live config to: {target}") + elif app == "openclaw": + _write_json_config(target, config) + click.echo(f" Written live config to: {target}") + elif app == "hermes": + if "config" in config: + _write_text_config(target, str(config["config"])) + else: + click.echo(f" (Note: no supported {app} live config payload found)") + return + click.echo(f" Written live config to: {target}") + + +def _env_config(config: dict) -> dict: + env = config.get("env") + if isinstance(env, dict): + return env + return config + + +def _gemini_env_config(config: dict) -> dict: + env = config.get("env") + if isinstance(env, dict): + return env + if "config" in config or "auth" in config or "settingsConfig" in config: + return {} + return config + + +def _write_json_env_config(target, env: dict) -> None: + existing = {} + if target.exists(): + with open(target, encoding="utf-8") as f: + existing = _json.load(f) + if not isinstance(existing, dict): + existing = {} + existing_env = existing.get("env") + if not isinstance(existing_env, dict): + existing_env = {} + existing_env.update(env) + existing["env"] = existing_env + _write_json_config(target, existing) + + +def _write_env_config(target, env: dict) -> None: + existing = {} + if target.exists(): + existing = _parse_env_file(target.read_text(encoding="utf-8")) + existing.update(_coerce_env_config(env)) + lines = [f"{key}={existing[key]}" for key in sorted(existing)] + _write_text_config(target, "\n".join(lines)) + + +def _parse_env_file(content: str) -> dict[str, str]: + parsed = {} + for raw_line in content.splitlines(): + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, value = line.split("=", 1) + key = key.strip() + if _is_valid_env_key(key): + parsed[key] = value + return parsed + + +def _coerce_env_config(env: dict) -> dict[str, str]: + result = {} + for key, value in env.items(): + key = str(key) + if not _is_valid_env_key(key): + continue + result[key] = _env_value(value) + return result + + +def _is_valid_env_key(key: str) -> bool: + return bool(key) and all( + ch == "_" or "A" <= ch <= "Z" or "a" <= ch <= "z" or "0" <= ch <= "9" + for ch in key + ) + + +def _env_value(value) -> str: + text = "" if value is None else str(value) + return text.replace("\r", "\\r").replace("\n", "\\n") + + +def _write_json_config(target, config: dict) -> None: + _write_text_config(target, _json.dumps(config, indent=2, ensure_ascii=False) + "\n") + + +def _write_opencode_provider_config(target, provider_id: str, provider_config: dict) -> None: + existing = { + "$schema": "https://opencode.ai/config.json", + } + if target.exists(): + with open(target, encoding="utf-8") as f: + existing = _json.load(f) + if not isinstance(existing, dict): + existing = {} + providers = existing.get("provider") + if not isinstance(providers, dict): + providers = {} + providers[provider_id] = provider_config + existing["provider"] = providers + _write_json_config(target, existing) + + +def _write_text_config(target, content: str) -> None: + if content and not content.endswith("\n"): + content += "\n" + _write_secure_file(target, content) + + +def _write_secure_file(target, content: str) -> None: + target.parent.mkdir(parents=True, exist_ok=True) + fd, tmp_name = _tempfile.mkstemp(prefix=f".{target.name}.", dir=target.parent) + try: + with _os.fdopen(fd, "w", encoding="utf-8") as f: + f.write(content) + try: + _os.chmod(tmp_name, 0o600) + except OSError: + pass + _os.replace(tmp_name, target) + except Exception: + try: + _os.unlink(tmp_name) + except OSError: + pass + raise + + +# ────────────────────────────────────────────── +# Proxy +# ────────────────────────────────────────────── + +@cli.group() +def proxy() -> None: + """Manage the local HTTP proxy server.""" + pass + + +@proxy.command("status") +@click.option("--app", "-a", default="claude", help="App type") +@click.pass_context +def proxy_status(ctx: click.Context, app: str) -> None: + """Show proxy server status.""" + app = _resolve_app(app) + db = connect_db(ctx.obj.get("db_path")) + try: + row = db.execute( + "SELECT * FROM proxy_config WHERE app_type=?", (app,) + ).fetchone() + if not row: + click.echo(f"No proxy config for {app}") + return + + data = dict(row) + if ctx.obj.get("json_mode"): + _json.dump(data, sys.stdout, indent=2, default=str) + return + + click.echo(f"Proxy Status ({app}):") + click.echo(f" Enabled: {bool(data['enabled'])}") + click.echo(f" Listen: {data['listen_address']}:{data['listen_port']}") + click.echo(f" Proxy Enabled: {bool(data['proxy_enabled'])}") + click.echo(f" Auto Failover: {bool(data['auto_failover_enabled'])}") + click.echo(f" Max Retries: {data['max_retries']}") + click.echo(f" Circuit Breaker: {bool(data.get('live_takeover_active', 0))}") + finally: + db.close() + + +@proxy.command("config") +@click.option("--app", "-a", default="claude", help="App type") +@click.option("--set-port", type=int, help="Set listen port") +@click.option("--enable/--disable", default=None, help="Enable/disable proxy") +@click.option("--failover/--no-failover", default=None, help="Enable/disable auto failover") +@click.pass_context +def proxy_config( + ctx: click.Context, app: str, set_port: int | None, + enable: bool | None, failover: bool | None +) -> None: + """Get or set proxy configuration.""" + app = _resolve_app(app) + db = connect_db(ctx.obj.get("db_path")) + try: + if set_port is not None: + db.execute("UPDATE proxy_config SET listen_port=? WHERE app_type=?", (set_port, app)) + if enable is True: + db.execute("UPDATE proxy_config SET proxy_enabled=1 WHERE app_type=?", (app,)) + elif enable is False: + db.execute("UPDATE proxy_config SET proxy_enabled=0 WHERE app_type=?", (app,)) + if failover is True: + db.execute("UPDATE proxy_config SET auto_failover_enabled=1 WHERE app_type=?", (app,)) + elif failover is False: + db.execute("UPDATE proxy_config SET auto_failover_enabled=0 WHERE app_type=?", (app,)) + db.commit() + + # Show updated config + row = db.execute( + "SELECT * FROM proxy_config WHERE app_type=?", (app,) + ).fetchone() + if row: + data = dict(row) + click.echo(_json.dumps({ + "app": app, + "listen": f"{data['listen_address']}:{data['listen_port']}", + "enabled": bool(data["enabled"]), + "proxy_enabled": bool(data["proxy_enabled"]), + "auto_failover": bool(data["auto_failover_enabled"]), + "max_retries": data["max_retries"], + }, indent=2)) + finally: + db.close() + + +# ────────────────────────────────────────────── +# MCP +# ────────────────────────────────────────────── + +@cli.group() +def mcp() -> None: + """Manage MCP (Model Context Protocol) servers.""" + pass + + +@mcp.command("list") +@click.pass_context +def mcp_list(ctx: click.Context) -> None: + """List all MCP servers.""" + db = connect_db(ctx.obj.get("db_path")) + try: + enabled_columns = _enabled_app_columns(db, "mcp_servers") + rows = db.execute( + f"SELECT id, name, description, {enabled_columns} " + "FROM mcp_servers ORDER BY name" + ).fetchall() + + if ctx.obj.get("json_mode"): + _json.dump([dict(r) for r in rows], sys.stdout, indent=2, default=str) + return + + click.echo(_table(["ID", "Name", "Apps", "Description"], [ + (r["id"][:30], r["name"], _enabled_apps_str(r), (r["description"] or "")[:50]) + for r in rows + ])) + finally: + db.close() + + +@mcp.command("enable") +@click.argument("server_id") +@click.option("--app", "-a", required=True, help="App type") +@click.option("--on/--off", default=True, help="Enable or disable") +@click.pass_context +def mcp_enable(ctx: click.Context, server_id: str, app: str, on: bool) -> None: + """Enable or disable an MCP server for an app.""" + app = _resolve_app(app) + db = connect_db(ctx.obj.get("db_path")) + try: + col = f"enabled_{app}" + db.execute(f"UPDATE mcp_servers SET {col}=? WHERE id=?", (int(on), server_id)) + if db.total_changes == 0: + click.echo(f"MCP server '{server_id}' not found", err=True) + raise SystemExit(1) + db.commit() + click.echo(f"MCP '{server_id}' {'enabled' if on else 'disabled'} for {app}") + finally: + db.close() + + +# ────────────────────────────────────────────── +# Skills +# ────────────────────────────────────────────── + +@cli.group() +def skills() -> None: + """Manage installed skills.""" + pass + + +@skills.command("list") +@click.pass_context +def skills_list(ctx: click.Context) -> None: + """List all installed skills.""" + db = connect_db(ctx.obj.get("db_path")) + try: + enabled_columns = _enabled_app_columns(db, "skills") + rows = db.execute( + "SELECT id, name, description, repo_owner, repo_name, " + f"{enabled_columns} " + "FROM skills ORDER BY name" + ).fetchall() + + if ctx.obj.get("json_mode"): + _json.dump([dict(r) for r in rows], sys.stdout, indent=2, default=str) + return + + click.echo(_table(["Name", "Source", "Apps", "Description"], [ + (r["name"], f"{r['repo_owner']}/{r['repo_name']}" if r["repo_owner"] else "local", + _enabled_apps_str(r), (r["description"] or "")[:50]) + for r in rows + ])) + finally: + db.close() + + +@skills.command("repos") +@click.pass_context +def skills_repos(ctx: click.Context) -> None: + """List registered skill repositories.""" + db = connect_db(ctx.obj.get("db_path")) + try: + rows = db.execute( + "SELECT owner, name, branch, enabled FROM skill_repos ORDER BY owner, name" + ).fetchall() + + if ctx.obj.get("json_mode"): + _json.dump([dict(r) for r in rows], sys.stdout, indent=2, default=str) + return + + click.echo(_table(["Owner", "Name", "Branch", "Enabled"], [ + (r["owner"], r["name"], r["branch"], "yes" if r["enabled"] else "no") + for r in rows + ])) + finally: + db.close() + + +# ────────────────────────────────────────────── +# Usage +# ────────────────────────────────────────────── + +@cli.group() +def usage() -> None: + """View API usage and cost statistics.""" + pass + + +@usage.command("stats") +@click.option("--days", "-d", default=30, type=int, help="Number of days to show") +@click.option("--app", "-a", help="Filter by app type") +@click.pass_context +def usage_stats(ctx: click.Context, days: int, app: str | None) -> None: + """Show usage statistics.""" + app = _resolve_app(app) + db = connect_db(ctx.obj.get("db_path")) + try: + if app: + rows = db.execute( + "SELECT model, COUNT(*) as requests, SUM(input_tokens) as input_tok, " + "SUM(output_tokens) as output_tok, " + "SUM(CAST(total_cost_usd AS REAL)) as cost " + "FROM proxy_request_logs " + "WHERE app_type=? AND created_at > unixepoch('now', ? || ' days') " + "GROUP BY model ORDER BY cost DESC", + (app, f"-{days}"), + ).fetchall() + else: + rows = db.execute( + "SELECT app_type, model, COUNT(*) as requests, SUM(input_tokens) as input_tok, " + "SUM(output_tokens) as output_tok, " + "SUM(CAST(total_cost_usd AS REAL)) as cost " + "FROM proxy_request_logs " + "WHERE created_at > unixepoch('now', ? || ' days') " + "GROUP BY app_type, model ORDER BY cost DESC", + (f"-{days}",), + ).fetchall() + + rows = _normalise_usage_rows(rows) + + if ctx.obj.get("json_mode"): + _json.dump(rows, sys.stdout, indent=2, default=str) + return + + total_cost = sum(r["cost"] for r in rows) + total_requests = sum(r["requests"] for r in rows) + total_in = sum(r["input_tok"] for r in rows) + total_out = sum(r["output_tok"] for r in rows) + + if app: + click.echo(_table(["Model", "Requests", "Input Tokens", "Output Tokens", "Cost (USD)"], [ + (r["model"], r["requests"], f'{r["input_tok"]:,}', f'{r["output_tok"]:,}', f'${r["cost"]:.4f}') + for r in rows + ])) + else: + click.echo(_table(["App", "Model", "Requests", "Input Tokens", "Output Tokens", "Cost (USD)"], [ + (r["app_type"], r["model"], r["requests"], + f'{r["input_tok"]:,}', f'{r["output_tok"]:,}', f'${r["cost"]:.4f}') + for r in rows + ])) + + click.echo() + click.echo(f" Total ({days} days): {total_requests:,} requests | " + f"{total_in + total_out:,} tokens | ${total_cost:.4f}") + finally: + db.close() + + +@usage.command("logs") +@click.option("--limit", "-n", default=20, type=int, help="Number of recent logs to show") +@click.option("--app", "-a", help="Filter by app type") +@click.pass_context +def usage_logs(ctx: click.Context, limit: int, app: str | None) -> None: + """Show recent API request logs.""" + app = _resolve_app(app) + db = connect_db(ctx.obj.get("db_path")) + try: + if app: + rows = db.execute( + "SELECT app_type, model, status_code, input_tokens, output_tokens, " + "total_cost_usd, latency_ms, datetime(created_at, 'unixepoch') as ts " + "FROM proxy_request_logs WHERE app_type=? " + "ORDER BY created_at DESC LIMIT ?", + (app, limit), + ).fetchall() + else: + rows = db.execute( + "SELECT app_type, model, status_code, input_tokens, output_tokens, " + "total_cost_usd, latency_ms, datetime(created_at, 'unixepoch') as ts " + "FROM proxy_request_logs " + "ORDER BY created_at DESC LIMIT ?", + (limit,), + ).fetchall() + + if ctx.obj.get("json_mode"): + _json.dump([dict(r) for r in rows], sys.stdout, indent=2, default=str) + return + + click.echo(_table(["App", "Model", "Status", "Tokens (in/out)", "Cost", "Latency", "Time"], [ + (r["app_type"], r["model"][:25], r["status_code"], + f'{r["input_tokens"]}/{r["output_tokens"]}', + f'${float(r["total_cost_usd"] or 0):.4f}', + f'{r["latency_ms"]}ms', r["ts"]) + for r in rows + ])) + finally: + db.close() + + +# ────────────────────────────────────────────── +# Settings +# ────────────────────────────────────────────── + +@cli.group() +def settings() -> None: + """View and manage CC Switch settings.""" + pass + + +@settings.command("list") +@click.pass_context +def settings_list(ctx: click.Context) -> None: + """List all settings key-value pairs.""" + db = connect_db(ctx.obj.get("db_path")) + try: + rows = db.execute("SELECT key, value FROM settings ORDER BY key").fetchall() + if ctx.obj.get("json_mode"): + _json.dump({ + r["key"]: _mask_value(r["value"], r["key"]) for r in rows + }, sys.stdout, indent=2) + return + click.echo(_table(["Key", "Value"], [ + (r["key"], _mask_sensitive(r["key"], r["value"])[:80]) for r in rows + ])) + finally: + db.close() + + +@settings.command("get") +@click.argument("key") +@click.pass_context +def settings_get(ctx: click.Context, key: str) -> None: + """Get a specific setting value.""" + db = connect_db(ctx.obj.get("db_path")) + try: + row = db.execute("SELECT value FROM settings WHERE key=?", (key,)).fetchone() + if not row: + click.echo(f"Setting '{key}' not found", err=True) + raise SystemExit(1) + if ctx.obj.get("json_mode"): + _json.dump({key: _mask_value(row["value"], key)}, sys.stdout, indent=2) + return + click.echo(_mask_sensitive(key, row["value"])) + finally: + db.close() + + +@settings.command("set") +@click.argument("key") +@click.argument("value") +@click.pass_context +def settings_set(ctx: click.Context, key: str, value: str) -> None: + """Set a setting value.""" + db = connect_db(ctx.obj.get("db_path")) + try: + db.execute( + "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)", (key, value) + ) + db.commit() + click.echo(f"Set '{key}' = '{_mask_sensitive(key, value)}'") + finally: + db.close() + + +# ────────────────────────────────────────────── +# Sessions +# ────────────────────────────────────────────── + +@cli.group() +def sessions() -> None: + """Browse and search AI conversation sessions.""" + pass + + +@sessions.command("list") +@click.option("--app", "-a", help="Filter by app type") +@click.option("--limit", "-n", default=20, type=int) +@click.pass_context +def sessions_list(ctx: click.Context, app: str | None, limit: int) -> None: + """List recent conversation sessions.""" + app = _resolve_app(app) + db = connect_db(ctx.obj.get("db_path")) + try: + if app: + rows = db.execute( + "SELECT file_path, last_modified, last_synced_at " + "FROM session_log_sync WHERE file_path LIKE ? " + "ORDER BY last_modified DESC LIMIT ?", + (f"%{app}%", limit), + ).fetchall() + else: + rows = db.execute( + "SELECT file_path, last_modified, last_synced_at " + "FROM session_log_sync ORDER BY last_modified DESC LIMIT ?", + (limit,), + ).fetchall() + + if ctx.obj.get("json_mode"): + _json.dump([dict(r) for r in rows], sys.stdout, indent=2, default=str) + return + + if not rows: + click.echo("No session logs found. Enable usage tracking in CC Switch first.") + return + + click.echo(_table(["Path", "Last Modified", "Last Synced"], [ + (r["file_path"][:60], + r["last_modified"], + r["last_synced_at"]) + for r in rows + ])) + finally: + db.close() + + +# ────────────────────────────────────────────── +# Entry point +# ────────────────────────────────────────────── + +def main() -> None: + """Main entry point for CC Switch CLI.""" + cli(prog_name="ccswitch") + + +if __name__ == "__main__": + main() diff --git a/cc-switch/agent-harness/cli_anything/ccswitch/core/__init__.py b/cc-switch/agent-harness/cli_anything/ccswitch/core/__init__.py new file mode 100644 index 000000000..ff1ed699f --- /dev/null +++ b/cc-switch/agent-harness/cli_anything/ccswitch/core/__init__.py @@ -0,0 +1 @@ +"""Core modules for ccswitch CLI.""" diff --git a/cc-switch/agent-harness/cli_anything/ccswitch/skills/SKILL.md b/cc-switch/agent-harness/cli_anything/ccswitch/skills/SKILL.md new file mode 100644 index 000000000..041b763a2 --- /dev/null +++ b/cc-switch/agent-harness/cli_anything/ccswitch/skills/SKILL.md @@ -0,0 +1,146 @@ +--- +name: "cli-anything-ccswitch" +description: "CLI interface for CC Switch — manage AI coding tool configurations from the terminal" +--- + +# CC Switch CLI + +CLI harness for CC Switch, a desktop app that manages AI coding tool (Claude Code, +Codex, Gemini CLI, OpenCode, OpenClaw, Hermes) configurations. Built with Click +and the CLI-Anything methodology. + +## Prerequisites + +- Python 3.10+ +- CC Switch installed with an active database at `~/.cc-switch/cc-switch.db` + +## Installation + +```bash +pip install -e . +``` + +After installation, the command `cli-anything-ccswitch` is available. + +## Command Groups + +| Group | Description | +|-------|-------------| +| `status` | Show a quick database overview | +| `providers` | Manage AI provider configurations (list, get, set-current) | +| `proxy` | Manage the local HTTP proxy server (status, config) | +| `mcp` | Manage MCP (Model Context Protocol) servers (list, enable) | +| `skills` | Manage installed skills (list, repos) | +| `usage` | View API usage and cost statistics (stats, logs) | +| `settings` | View and manage CC Switch settings (list, get, set) | +| `sessions` | Browse and search AI conversation sessions (list) | + +## Global Options + +- `--json` — Output in machine-readable JSON format (recommended for agent use) +- `--db PATH` — Override the database path + +## Agent-Specific Guidance + +### JSON Output + +Always use `--json` for programmatic consumption. Place it **before** the +subcommand: + +```bash +cli-anything-ccswitch --json providers list +cli-anything-ccswitch --json usage stats --days 30 +cli-anything-ccswitch --json providers get --app claude +``` + +### Sensitive Values + +API tokens, keys, and secrets are masked in all output. The `settings_config` +field in provider details shows masked values (e.g., `sk-bc089...5cbb`). + +### Exit Codes + +- `0` — Success +- `1` — Error (e.g., resource not found, invalid app type) + +### App Types + +Valid app types: `claude`, `codex`, `gemini`, `opencode`, `openclaw`, `hermes`. + +## Examples + +### List all providers + +```bash +cli-anything-ccswitch providers list +cli-anything-ccswitch --json providers list --app claude +``` + +### Switch active provider + +```bash +cli-anything-ccswitch providers set-current --app claude +``` + +### Check proxy status + +```bash +cli-anything-ccswitch proxy status --app claude +cli-anything-ccswitch proxy config --app claude --set-port 8080 +``` + +### View usage stats + +```bash +cli-anything-ccswitch usage stats --days 7 +cli-anything-ccswitch --json usage stats --days 30 --app claude +cli-anything-ccswitch usage logs --limit 10 +``` + +### List skills + +```bash +cli-anything-ccswitch skills list +cli-anything-ccswitch skills repos +``` + +### MCP servers + +```bash +cli-anything-ccswitch mcp list +cli-anything-ccswitch mcp enable --app claude --on +``` + +### Settings + +```bash +cli-anything-ccswitch settings list +cli-anything-ccswitch settings get +cli-anything-ccswitch settings set +``` + +### Sessions + +```bash +cli-anything-ccswitch sessions list --app claude --limit 10 +``` + +### Status overview + +```bash +cli-anything-ccswitch status +cli-anything-ccswitch +cli-anything-ccswitch --json +``` + +## Error Handling + +When a resource is not found, the CLI prints an error message to stderr and +exits with code 1. Agents should check the exit code before parsing output. + +## Database + +The CLI reads from the live CC Switch SQLite database. All read operations +are safe and do not modify the database. Write operations (`providers +set-current`, `proxy config`, `mcp enable`, `settings set`) modify the +database directly. diff --git a/cc-switch/agent-harness/cli_anything/ccswitch/tests/TEST.md b/cc-switch/agent-harness/cli_anything/ccswitch/tests/TEST.md new file mode 100644 index 000000000..4803bf017 --- /dev/null +++ b/cc-switch/agent-harness/cli_anything/ccswitch/tests/TEST.md @@ -0,0 +1,86 @@ +# CC Switch CLI - Test Plan + +## Test Inventory + +| Test File | Tests | Type | +|-----------|-------|------| +| `test_core.py` | 46 | Unit tests with synthetic data | +| `test_full_e2e.py` | 20 | E2E tests against a real or configured CC Switch database | + +## Unit Test Coverage + +### Database and Config Helpers + +- `CCSWITCH_HOME` path resolution +- DB, config, and settings path helpers +- SQLite in-memory connection setup +- Config load/save, including empty-file behavior +- Valid app type list + +### CLI Core Helpers + +- App resolution for valid, invalid, and `None` app values +- Table formatting for normal, empty, and single-row data +- Sensitive-value masking for tokens, API keys, passwords, short secrets, nested dicts, nested lists, authorization headers, and hotkey false positives + +### CLI Commands + +- Top-level help and command group help +- Explicit `status --json` +- `providers get --json` masking for nested `settings_config` +- Plain `providers get` masking for nested list/header values +- `settings get --json` object output +- `settings get/list/set` masking for sensitive settings +- `sessions list --json` empty-result output +- `mcp list` and `skills list` include OpenClaw state +- `usage stats` normalizes nullable token/cost aggregates +- `providers set-current` DB update plus live Codex config write + +### Live Config Write Helpers + +- Claude settings JSON env merge +- Codex provider TOML merge preserves unrelated `config.toml` sections plus `auth.json` writes +- Gemini `.env` merge with env key validation and newline escaping +- OpenCode provider merge into `~/.config/opencode/opencode.json` +- Temporary file cleanup after secure writes + +## E2E Coverage + +### Providers + +- List providers +- JSON output +- App filtering +- Nonexistent provider error +- API key leak prevention + +### Other Command Groups + +- Skills list and repos +- Usage stats and logs +- MCP list +- Settings list +- Proxy status +- Default status overview and JSON overview + +## Latest Verification + +```powershell +python -m pytest cli_anything\ccswitch\tests -q +.................................................................. [100%] +66 passed in 2.61s +``` + +Additional checks: + +```powershell +git diff --check +python -m compileall -q cli_anything +``` + +Both checks passed. + +## Coverage Notes + +- Write operations covered by synthetic tests: `providers set-current`, `settings set`, and live config writes for Claude, Codex, Gemini, and OpenCode. +- Destructive live-database operations and process-control flows should be validated only in a disposable CC Switch environment. diff --git a/cc-switch/agent-harness/cli_anything/ccswitch/tests/test_core.py b/cc-switch/agent-harness/cli_anything/ccswitch/tests/test_core.py new file mode 100644 index 000000000..d74482696 --- /dev/null +++ b/cc-switch/agent-harness/cli_anything/ccswitch/tests/test_core.py @@ -0,0 +1,903 @@ +"""Unit tests for CC Switch CLI core modules — synthetic data, no external deps.""" + +import os +import sys +import json +import sqlite3 +import tempfile +from pathlib import Path +from contextlib import contextmanager + +import pytest +import tomlkit + +from cli_anything.ccswitch.utils.db import ( + get_cc_switch_dir, get_db_path, get_config_path, + get_settings_path, connect_db, + load_config, save_config, load_settings, + VALID_APP_TYPES, +) +from cli_anything.ccswitch.ccswitch_cli import ( + _resolve_app, _table, _mask_sensitive, _mask_value, _write_live_config, +) + + +# ─────────────────────────── +# Database path helpers +# ─────────────────────────── + +def test_get_cc_switch_dir_custom(): + os.environ["CCSWITCH_HOME"] = "/tmp/ccswitch-test" + assert get_cc_switch_dir() == Path("/tmp/ccswitch-test/.cc-switch") + del os.environ["CCSWITCH_HOME"] + + +def test_get_db_path(): + os.environ["CCSWITCH_HOME"] = "/home/user" + assert get_db_path() == Path("/home/user/.cc-switch/cc-switch.db") + del os.environ["CCSWITCH_HOME"] + + +def test_get_config_path(): + os.environ["CCSWITCH_HOME"] = "/x" + assert get_config_path() == Path("/x/.cc-switch/config.json") + del os.environ["CCSWITCH_HOME"] + + +def test_get_settings_path(): + os.environ["CCSWITCH_HOME"] = "/y" + assert get_settings_path() == Path("/y/.cc-switch/settings.json") + del os.environ["CCSWITCH_HOME"] + + +def test_valid_app_types(): + assert "claude" in VALID_APP_TYPES + assert "codex" in VALID_APP_TYPES + assert "gemini" in VALID_APP_TYPES + assert "opencode" in VALID_APP_TYPES + assert "openclaw" in VALID_APP_TYPES + assert "hermes" in VALID_APP_TYPES + assert len(VALID_APP_TYPES) == 6 + + +# ─────────────────────────── +# Database connection +# ─────────────────────────── + +def test_connect_db_in_memory(): + conn = connect_db(Path(":memory:")) + conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)") + conn.execute("INSERT INTO test VALUES (1)") + assert conn.execute("SELECT COUNT(*) FROM test").fetchone()[0] == 1 + conn.close() + + +# ─────────────────────────── +# Config load/save +# ─────────────────────────── + +def test_load_config_missing(): + with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as f: + pass # empty file + result = load_config(Path(f.name)) + assert result == {} + os.unlink(f.name) + + +def test_save_and_load_config(): + path = Path(tempfile.mktemp(suffix=".json")) + data = {"version": 2, "apps": {"claude": {"providers": {}}}} + save_config(data, path) + loaded = load_config(path) + assert loaded["version"] == 2 + assert "apps" in loaded + os.unlink(path) + + +def test_load_settings_missing(): + os.environ["CCSWITCH_HOME"] = "/nonexistent-tmp-xyz" + result = load_settings() + assert result == {} + del os.environ["CCSWITCH_HOME"] + + +# ─────────────────────────── +# App resolution +# ─────────────────────────── + +def test_resolve_app_valid(): + assert _resolve_app("claude") == "claude" + assert _resolve_app("CLAUDE") == "claude" + assert _resolve_app("OpenCode") == "opencode" + assert _resolve_app("Hermes") == "hermes" + + +def test_resolve_app_none(): + assert _resolve_app(None) is None + + +def test_resolve_app_invalid(): + from click import BadParameter + with pytest.raises(BadParameter): + _resolve_app("invalid-app") + + +# ─────────────────────────── +# Table formatting +# ─────────────────────────── + +def test_table_basic(): + result = _table(["Name", "Count"], [("Alice", 5), ("Bob", 3)]) + assert "Name" in result + assert "Count" in result + assert "Alice" in result + assert "5" in result + assert "Bob" in result + assert "3" in result + + +def test_table_empty(): + assert _table(["Col"], []) == "(empty)" + + +def test_table_single(): + result = _table(["A"], [("x",)]) + assert "A" in result + assert "x" in result + + +# ─────────────────────────── +# Sensitive masking +# ─────────────────────────── + +def test_mask_api_token(): + result = _mask_sensitive("ANTHROPIC_AUTH_TOKEN", "sk-bc089d043dc34c6c9022831769d85cbb") + assert "sk-bc089" in result + assert "5cbb" in result + assert "bc089d043dc34c6c" not in result # middle is masked + + +def test_mask_api_key(): + result = _mask_sensitive("api_key", "sec-1234567890abcdef") + assert "..." in result or "***" in result or "sec-1234" in result + + +def test_mask_password(): + result = _mask_sensitive("password", "mysecretkey") + assert "mysec" in result or "***" in result + + +def test_mask_hotkey_is_not_treated_as_secret(): + assert _mask_sensitive("hotkey", "ctrl+k") == "ctrl+k" + + +def test_mask_short_value(): + result = _mask_sensitive("secret", "abc") + assert result == "***" + + +def test_mask_non_sensitive(): + result = _mask_sensitive("model", "claude-sonnet-4-6") + assert "claude-sonnet-4-6" in result + assert "***" not in result + + +def test_mask_nested_dict(): + result = _mask_sensitive("env", { + "ANTHROPIC_AUTH_TOKEN": "sk-test1234567890", + "ANTHROPIC_MODEL": "deepseek-v4-pro", + "ANTHROPIC_BASE_URL": "https://api.deepseek.com", + }) + assert "sk-test1" in result + assert "7890" in result + assert "deepseek-v4-pro" in result + assert "https://api.deepseek.com" in result + + +# ─────────────────────────── +# CLI help tests +# ─────────────────────────── + +def test_mask_value_nested_json(): + result = _mask_value({ + "env": { + "ANTHROPIC_AUTH_TOKEN": "sk-test1234567890", + "ANTHROPIC_MODEL": "deepseek-v4-pro", + }, + "headers": [ + {"authorization": "Bearer abcdef1234567890"}, + ], + }) + + assert result["env"]["ANTHROPIC_AUTH_TOKEN"] != "sk-test1234567890" + assert result["env"]["ANTHROPIC_MODEL"] == "deepseek-v4-pro" + assert result["headers"][0]["authorization"] != "Bearer abcdef1234567890" + + +def test_mask_sensitive_nested_list(): + result = _mask_sensitive("headers", [ + {"authorization": "Bearer abcdef1234567890"}, + ]) + + assert "Bearer abcdef1234567890" not in result + assert "headers" not in result + + +from click.testing import CliRunner +from cli_anything.ccswitch.ccswitch_cli import cli + + +@pytest.fixture +def runner(): + return CliRunner() + + +def _init_cli_db(path: Path) -> sqlite3.Connection: + conn = sqlite3.connect(path) + conn.row_factory = sqlite3.Row + conn.executescript(""" + CREATE TABLE providers ( + id TEXT, + app_type TEXT, + name TEXT, + category TEXT, + is_current INTEGER, + sort_index INTEGER, + settings_config TEXT + ); + CREATE TABLE settings ( + key TEXT PRIMARY KEY, + value TEXT + ); + CREATE TABLE skills ( + id TEXT, + name TEXT DEFAULT '', + description TEXT, + repo_owner TEXT, + repo_name TEXT, + enabled_claude INTEGER DEFAULT 0, + enabled_codex INTEGER DEFAULT 0, + enabled_gemini INTEGER DEFAULT 0, + enabled_opencode INTEGER DEFAULT 0, + enabled_openclaw INTEGER DEFAULT 0, + enabled_hermes INTEGER DEFAULT 0 + ); + CREATE TABLE mcp_servers ( + id TEXT, + name TEXT DEFAULT '', + description TEXT, + enabled_claude INTEGER DEFAULT 0, + enabled_codex INTEGER DEFAULT 0, + enabled_gemini INTEGER DEFAULT 0, + enabled_opencode INTEGER DEFAULT 0, + enabled_openclaw INTEGER DEFAULT 0, + enabled_hermes INTEGER DEFAULT 0 + ); + CREATE TABLE proxy_request_logs ( + app_type TEXT, + model TEXT, + status_code INTEGER, + input_tokens INTEGER, + output_tokens INTEGER, + total_cost_usd TEXT, + latency_ms INTEGER, + created_at INTEGER + ); + CREATE TABLE session_log_sync ( + file_path TEXT, + last_modified INTEGER, + last_synced_at INTEGER + ); + """) + return conn + + +def test_providers_get_json_masks_settings_config(runner, tmp_path): + db_path = tmp_path / "cc-switch.db" + conn = _init_cli_db(db_path) + conn.execute( + "INSERT INTO providers VALUES (?, ?, ?, ?, ?, ?, ?)", + ( + "deepseek", + "claude", + "DeepSeek", + "default", + 1, + 0, + json.dumps({ + "env": { + "ANTHROPIC_AUTH_TOKEN": "sk-test1234567890", + "ANTHROPIC_MODEL": "deepseek-v4-pro", + }, + "headers": [{"authorization": "Bearer abcdef1234567890"}], + }), + ), + ) + conn.commit() + conn.close() + + result = runner.invoke(cli, [ + "--json", "--db", str(db_path), + "providers", "get", "deepseek", "--app", "claude", + ]) + + assert result.exit_code == 0 + data = json.loads(result.output) + output = json.dumps(data) + assert "sk-test1234567890" not in output + assert "Bearer abcdef1234567890" not in output + assert data["settings_config"]["env"]["ANTHROPIC_MODEL"] == "deepseek-v4-pro" + + +def test_providers_get_plain_masks_nested_list(runner, tmp_path): + db_path = tmp_path / "cc-switch.db" + conn = _init_cli_db(db_path) + conn.execute( + "INSERT INTO providers VALUES (?, ?, ?, ?, ?, ?, ?)", + ( + "deepseek", + "claude", + "DeepSeek", + "default", + 1, + 0, + json.dumps({ + "headers": [{"authorization": "Bearer abcdef1234567890"}], + "model": "deepseek-v4-pro", + }), + ), + ) + conn.commit() + conn.close() + + result = runner.invoke(cli, [ + "--db", str(db_path), + "providers", "get", "deepseek", "--app", "claude", + ]) + + assert result.exit_code == 0 + assert "Bearer abcdef1234567890" not in result.output + assert "deepseek-v4-pro" in result.output + + +def test_status_command_json(runner, tmp_path): + db_path = tmp_path / "cc-switch.db" + conn = _init_cli_db(db_path) + conn.execute( + "INSERT INTO providers VALUES (?, ?, ?, ?, ?, ?, ?)", + ("deepseek", "claude", "DeepSeek", "default", 1, 0, "{}"), + ) + conn.execute("INSERT INTO skills (id) VALUES (?)", ("skill-1",)) + conn.execute("INSERT INTO mcp_servers (id) VALUES (?)", ("mcp-1",)) + conn.commit() + conn.close() + + result = runner.invoke(cli, ["--json", "--db", str(db_path), "status"]) + + assert result.exit_code == 0 + assert json.loads(result.output) == { + "providers": 1, + "current": {"claude": "DeepSeek"}, + "skills": 1, + "mcp_servers": 1, + } + + +def test_sessions_list_json_empty_outputs_array(runner, tmp_path): + db_path = tmp_path / "cc-switch.db" + conn = _init_cli_db(db_path) + conn.commit() + conn.close() + + result = runner.invoke(cli, ["--json", "--db", str(db_path), "sessions", "list"]) + + assert result.exit_code == 0 + assert json.loads(result.output) == [] + + +def test_settings_get_json_outputs_object(runner, tmp_path): + db_path = tmp_path / "cc-switch.db" + conn = _init_cli_db(db_path) + conn.execute("INSERT INTO settings VALUES (?, ?)", ("theme", "dark")) + conn.commit() + conn.close() + + result = runner.invoke(cli, [ + "--json", "--db", str(db_path), "settings", "get", "theme", + ]) + + assert result.exit_code == 0 + assert json.loads(result.output) == {"theme": "dark"} + + +def test_settings_outputs_mask_sensitive_values(runner, tmp_path): + db_path = tmp_path / "cc-switch.db" + conn = _init_cli_db(db_path) + conn.execute("INSERT INTO settings VALUES (?, ?)", ("OPENAI_API_KEY", "sk-secret1234567890")) + conn.commit() + conn.close() + + get_result = runner.invoke(cli, [ + "--json", "--db", str(db_path), "settings", "get", "OPENAI_API_KEY", + ]) + list_result = runner.invoke(cli, [ + "--json", "--db", str(db_path), "settings", "list", + ]) + set_result = runner.invoke(cli, [ + "--db", str(db_path), "settings", "set", "OPENAI_API_KEY", "sk-newsecret1234567890", + ]) + + assert get_result.exit_code == 0 + assert list_result.exit_code == 0 + assert set_result.exit_code == 0 + combined = get_result.output + list_result.output + set_result.output + assert "sk-secret1234567890" not in combined + assert "sk-newsecret1234567890" not in combined + assert json.loads(get_result.output)["OPENAI_API_KEY"] != "sk-secret1234567890" + + conn = sqlite3.connect(db_path) + assert conn.execute( + "SELECT value FROM settings WHERE key=?", ("OPENAI_API_KEY",) + ).fetchone()[0] == "sk-newsecret1234567890" + conn.close() + + +def test_mcp_and_skills_list_include_openclaw(runner, tmp_path): + db_path = tmp_path / "cc-switch.db" + conn = _init_cli_db(db_path) + conn.execute( + """ + INSERT INTO mcp_servers ( + id, name, description, enabled_claude, enabled_codex, enabled_gemini, + enabled_opencode, enabled_openclaw, enabled_hermes + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ("mcp-1", "OpenClaw MCP", "desc", 0, 0, 0, 0, 1, 0), + ) + conn.execute( + """ + INSERT INTO skills ( + id, name, description, repo_owner, repo_name, enabled_claude, + enabled_codex, enabled_gemini, enabled_opencode, enabled_openclaw, + enabled_hermes + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ("skill-1", "OpenClaw Skill", "desc", "owner", "repo", 0, 0, 0, 0, 1, 0), + ) + conn.commit() + conn.close() + + mcp_json = runner.invoke(cli, ["--json", "--db", str(db_path), "mcp", "list"]) + skills_json = runner.invoke(cli, ["--json", "--db", str(db_path), "skills", "list"]) + mcp_text = runner.invoke(cli, ["--db", str(db_path), "mcp", "list"]) + skills_text = runner.invoke(cli, ["--db", str(db_path), "skills", "list"]) + + assert mcp_json.exit_code == 0 + assert skills_json.exit_code == 0 + assert mcp_text.exit_code == 0 + assert skills_text.exit_code == 0 + assert json.loads(mcp_json.output)[0]["enabled_openclaw"] == 1 + assert json.loads(skills_json.output)[0]["enabled_openclaw"] == 1 + assert "openclaw" in mcp_text.output + assert "openclaw" in skills_text.output + + +def test_usage_stats_normalises_nullable_aggregates(runner, tmp_path): + db_path = tmp_path / "cc-switch.db" + conn = _init_cli_db(db_path) + conn.execute( + """ + INSERT INTO proxy_request_logs ( + app_type, model, status_code, input_tokens, output_tokens, + total_cost_usd, latency_ms, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, CAST(strftime('%s', 'now') AS INTEGER)) + """, + ("claude", "deepseek-v4-pro", 200, None, None, None, 100), + ) + conn.commit() + conn.close() + + text_result = runner.invoke(cli, ["--db", str(db_path), "usage", "stats", "--days", "30"]) + json_result = runner.invoke(cli, [ + "--json", "--db", str(db_path), "usage", "stats", "--days", "30", + ]) + + assert text_result.exit_code == 0 + assert "deepseek-v4-pro" in text_result.output + assert "$0.0000" in text_result.output + assert json_result.exit_code == 0 + data = json.loads(json_result.output) + assert data[0]["input_tok"] == 0 + assert data[0]["output_tok"] == 0 + assert data[0]["cost"] == 0 + + +def test_write_live_config_codex_writes_config_and_auth(tmp_path, monkeypatch): + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setenv("USERPROFILE", str(tmp_path)) + config_path = tmp_path / ".codex" / "config.toml" + config_path.parent.mkdir(parents=True) + config_path.write_text( + 'approval_policy = "on-request"\n' + 'sandbox_mode = "workspace-write"\n' + 'model = "old-model"\n' + 'model_provider = "old-provider"\n' + '\n' + '[model_providers.old-provider]\n' + 'name = "Old Provider"\n' + 'base_url = "https://old.example.test"\n' + '\n' + '[model_providers.keep]\n' + 'name = "Keep Provider"\n' + 'base_url = "https://keep.example.test"\n' + '\n' + '# mcp should stay with filesystem\n' + '[mcp_servers.filesystem]\n' + 'command = "npx"\n' + 'args = ["-y", "@modelcontextprotocol/server-filesystem"]\n' + '\n' + '[profiles.work]\n' + 'model = "profile-model"\n', + encoding="utf-8", + ) + db = sqlite3.connect(":memory:") + db.row_factory = sqlite3.Row + db.execute(""" + CREATE TABLE providers ( + app_type TEXT, + is_current INTEGER, + settings_config TEXT + ) + """) + db.execute( + "INSERT INTO providers VALUES (?, ?, ?)", + ( + "codex", + 1, + json.dumps({ + "config": ( + 'model = "deepseek-v4-pro"\n' + 'model_provider = "deepseek"\n' + '\n' + '[model_providers.deepseek]\n' + 'name = "DeepSeek"\n' + 'base_url = "https://api.deepseek.com/v1"\n' + ), + "auth": {"OPENAI_API_KEY": "sk-codex1234567890"}, + }), + ), + ) + + _write_live_config("codex", db) + + merged = config_path.read_text(encoding="utf-8") + assert 'model = "deepseek-v4-pro"' in merged + assert 'model_provider = "deepseek"' in merged + assert '[model_providers.deepseek]' in merged + assert 'base_url = "https://api.deepseek.com/v1"' in merged + assert '[model_providers.keep]' in merged + assert 'approval_policy = "on-request"' in merged + assert 'sandbox_mode = "workspace-write"' in merged + assert '[mcp_servers.filesystem]' in merged + assert '[profiles.work]' in merged + parsed = tomlkit.parse(merged) + assert parsed["mcp_servers"]["filesystem"]["command"] == "npx" + assert parsed["profiles"]["work"]["model"] == "profile-model" + auth = json.loads((tmp_path / ".codex" / "auth.json").read_text(encoding="utf-8")) + assert auth == {"OPENAI_API_KEY": "sk-codex1234567890"} + assert not list((tmp_path / ".codex").glob(".config.toml.*")) + assert not list((tmp_path / ".codex").glob(".auth.json.*")) + + +def test_write_live_config_codex_handles_multiline_strings(tmp_path, monkeypatch): + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setenv("USERPROFILE", str(tmp_path)) + config_path = tmp_path / ".codex" / "config.toml" + config_path.parent.mkdir(parents=True) + config_path.write_text( + 'model = "old-model"\n' + 'model_provider = "deepseek"\n' + '\n' + '[model_providers.deepseek]\n' + 'name = "Old DeepSeek"\n' + 'notes = """\n' + 'line one\n' + '[mcp_servers.fake]\n' + 'line three\n' + '"""\n' + 'base_url = "https://old.example.test"\n' + '\n' + '[mcp_servers.real]\n' + 'command = "npx"\n', + encoding="utf-8", + ) + db = sqlite3.connect(":memory:") + db.row_factory = sqlite3.Row + db.execute(""" + CREATE TABLE providers ( + app_type TEXT, + is_current INTEGER, + settings_config TEXT + ) + """) + db.execute( + "INSERT INTO providers VALUES (?, ?, ?)", + ( + "codex", + 1, + json.dumps({ + "config": ( + 'model = "deepseek-v4-pro"\n' + 'model_provider = "deepseek"\n' + '\n' + '[model_providers.deepseek]\n' + 'name = "DeepSeek"\n' + 'base_url = "https://api.deepseek.com/v1"\n' + ), + }), + ), + ) + + _write_live_config("codex", db) + + parsed = tomlkit.parse(config_path.read_text(encoding="utf-8")) + assert parsed["model"] == "deepseek-v4-pro" + assert parsed["model_provider"] == "deepseek" + assert parsed["model_providers"]["deepseek"]["base_url"] == "https://api.deepseek.com/v1" + assert parsed["mcp_servers"]["real"]["command"] == "npx" + assert "fake" not in parsed["mcp_servers"] + + +def test_write_live_config_claude_merges_nested_env(tmp_path, monkeypatch): + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setenv("USERPROFILE", str(tmp_path)) + settings_path = tmp_path / ".claude" / "settings.json" + settings_path.parent.mkdir(parents=True) + settings_path.write_text(json.dumps({"env": {"EXISTING": "1"}}), encoding="utf-8") + db = sqlite3.connect(":memory:") + db.row_factory = sqlite3.Row + db.execute(""" + CREATE TABLE providers ( + app_type TEXT, + is_current INTEGER, + settings_config TEXT + ) + """) + db.execute( + "INSERT INTO providers VALUES (?, ?, ?)", + ( + "claude", + 1, + json.dumps({"env": {"ANTHROPIC_AUTH_TOKEN": "sk-test1234567890"}}), + ), + ) + + _write_live_config("claude", db) + + data = json.loads(settings_path.read_text(encoding="utf-8")) + assert data["env"]["EXISTING"] == "1" + assert data["env"]["ANTHROPIC_AUTH_TOKEN"] == "sk-test1234567890" + + +def test_write_live_config_gemini_writes_env_file(tmp_path, monkeypatch): + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setenv("USERPROFILE", str(tmp_path)) + env_path = tmp_path / ".gemini" / ".env" + env_path.parent.mkdir(parents=True) + env_path.write_text("EXISTING=1\n", encoding="utf-8") + db = sqlite3.connect(":memory:") + db.row_factory = sqlite3.Row + db.execute(""" + CREATE TABLE providers ( + app_type TEXT, + is_current INTEGER, + settings_config TEXT + ) + """) + db.execute( + "INSERT INTO providers VALUES (?, ?, ?)", + ( + "gemini", + 1, + json.dumps({ + "env": { + "GEMINI_API_KEY": "sk-gemini1234567890", + "GOOGLE_GEMINI_BASE_URL": "https://api.example.test", + "GEMINI_MODEL": "gemini-3.1-pro", + "MULTILINE": "one\nTWO=2", + "BAD-KEY": "ignored", + }, + }), + ), + ) + + _write_live_config("gemini", db) + + content = env_path.read_text(encoding="utf-8") + assert "EXISTING=1" in content + assert "GEMINI_API_KEY=sk-gemini1234567890" in content + assert "GOOGLE_GEMINI_BASE_URL=https://api.example.test" in content + assert "GEMINI_MODEL=gemini-3.1-pro" in content + assert "MULTILINE=one\\nTWO=2" in content + assert "\nTWO=2" not in content + assert "BAD-KEY=ignored" not in content + assert not (tmp_path / ".gemini" / "settings.json").exists() + + +def test_write_live_config_opencode_merges_provider_settings(tmp_path, monkeypatch): + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setenv("USERPROFILE", str(tmp_path)) + config_path = tmp_path / ".config" / "opencode" / "opencode.json" + config_path.parent.mkdir(parents=True) + config_path.write_text(json.dumps({ + "$schema": "https://opencode.ai/config.json", + "provider": {"old": {"npm": "@ai-sdk/openai"}}, + }), encoding="utf-8") + db = sqlite3.connect(":memory:") + db.row_factory = sqlite3.Row + db.execute(""" + CREATE TABLE providers ( + id TEXT, + app_type TEXT, + is_current INTEGER, + settings_config TEXT + ) + """) + db.execute( + "INSERT INTO providers VALUES (?, ?, ?, ?)", + ( + "deepseek", + "opencode", + 1, + json.dumps({ + "settingsConfig": { + "npm": "@ai-sdk/openai-compatible", + "options": { + "baseURL": "https://api.deepseek.com/v1", + "apiKey": "sk-opencode1234567890", + }, + "models": { + "deepseek-v4-pro": {"name": "DeepSeek V4 Pro"}, + }, + }, + }), + ), + ) + + _write_live_config("opencode", db) + + data = json.loads(config_path.read_text(encoding="utf-8")) + assert "old" in data["provider"] + assert data["provider"]["deepseek"]["options"]["apiKey"] == "sk-opencode1234567890" + assert data["provider"]["deepseek"]["models"]["deepseek-v4-pro"]["name"] == "DeepSeek V4 Pro" + + +def test_providers_set_current_codex_writes_live_config(runner, tmp_path, monkeypatch): + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setenv("USERPROFILE", str(tmp_path)) + config_path = tmp_path / ".codex" / "config.toml" + config_path.parent.mkdir(parents=True) + config_path.write_text( + 'approval_policy = "on-request"\n' + '\n' + '[mcp_servers.filesystem]\n' + 'command = "npx"\n' + '\n' + '[profiles.work]\n' + 'model = "profile-model"\n', + encoding="utf-8", + ) + db_path = tmp_path / "cc-switch.db" + conn = _init_cli_db(db_path) + conn.execute( + "INSERT INTO providers VALUES (?, ?, ?, ?, ?, ?, ?)", + ( + "old", + "codex", + "Old", + "default", + 1, + 0, + json.dumps({"config": 'model = "old"'}), + ), + ) + conn.execute( + "INSERT INTO providers VALUES (?, ?, ?, ?, ?, ?, ?)", + ( + "deepseek", + "codex", + "DeepSeek", + "default", + 0, + 1, + json.dumps({ + "config": ( + 'model = "deepseek-v4-pro"\n' + 'model_provider = "deepseek"\n' + '\n' + '[model_providers.deepseek]\n' + 'name = "DeepSeek"\n' + 'base_url = "https://api.deepseek.com/v1"\n' + ), + "auth": {"OPENAI_API_KEY": "sk-codex1234567890"}, + }), + ), + ) + conn.commit() + conn.close() + + result = runner.invoke(cli, [ + "--db", str(db_path), "providers", "set-current", "deepseek", "--app", "codex", + ]) + + assert result.exit_code == 0 + assert "sk-codex1234567890" not in result.output + merged = config_path.read_text(encoding="utf-8") + assert 'model = "deepseek-v4-pro"' in merged + assert 'model_provider = "deepseek"' in merged + assert '[model_providers.deepseek]' in merged + assert 'approval_policy = "on-request"' in merged + assert '[mcp_servers.filesystem]' in merged + assert '[profiles.work]' in merged + auth = json.loads((tmp_path / ".codex" / "auth.json").read_text(encoding="utf-8")) + assert auth["OPENAI_API_KEY"] == "sk-codex1234567890" + + conn = sqlite3.connect(db_path) + current = conn.execute( + "SELECT id FROM providers WHERE app_type=? AND is_current=1", ("codex",) + ).fetchone()[0] + conn.close() + assert current == "deepseek" + + +def test_main_help(runner): + result = runner.invoke(cli, ["--help"]) + assert result.exit_code == 0 + assert "CC Switch" in result.output + + +def test_providers_help(runner): + result = runner.invoke(cli, ["providers", "--help"]) + assert result.exit_code == 0 + assert "Manage AI provider" in result.output + + +def test_usage_help(runner): + result = runner.invoke(cli, ["usage", "--help"]) + assert result.exit_code == 0 + + +def test_skills_help(runner): + result = runner.invoke(cli, ["skills", "--help"]) + assert result.exit_code == 0 + + +def test_mcp_help(runner): + result = runner.invoke(cli, ["mcp", "--help"]) + assert result.exit_code == 0 + + +def test_proxy_help(runner): + result = runner.invoke(cli, ["proxy", "--help"]) + assert result.exit_code == 0 + + +def test_settings_help(runner): + result = runner.invoke(cli, ["settings", "--help"]) + assert result.exit_code == 0 + + +def test_sessions_help(runner): + result = runner.invoke(cli, ["sessions", "--help"]) + assert result.exit_code == 0 + + +def test_all_command_groups(runner): + result = runner.invoke(cli, ["--help"]) + assert "providers" in result.output + assert "proxy" in result.output + assert "mcp" in result.output + assert "skills" in result.output + assert "usage" in result.output + assert "settings" in result.output + assert "sessions" in result.output diff --git a/cc-switch/agent-harness/cli_anything/ccswitch/tests/test_full_e2e.py b/cc-switch/agent-harness/cli_anything/ccswitch/tests/test_full_e2e.py new file mode 100644 index 000000000..680c44fc0 --- /dev/null +++ b/cc-switch/agent-harness/cli_anything/ccswitch/tests/test_full_e2e.py @@ -0,0 +1,191 @@ +"""E2E tests for CC Switch CLI — tests against the real CC Switch database. + +These tests require a real CC Switch installation with an active database. +Set env CCSWITCH_HOME to point to a test home directory if needed. +""" + +import json +import os +import subprocess +import sys +from pathlib import Path + +import pytest + + +LIVE_DB_OPT_IN_ENV = "CLI_ANYTHING_CCSWITCH_LIVE_DB" +NO_LIVE_DB_TESTS = {"test_help", "test_providers_help"} + + +def _live_db_path() -> Path: + home = Path(os.environ.get("CCSWITCH_HOME", os.path.expanduser("~"))) + return home / ".cc-switch" / "cc-switch.db" + + +@pytest.fixture(autouse=True) +def _gate_live_db_tests(request): + if request.node.name in NO_LIVE_DB_TESTS: + return + + if os.environ.get(LIVE_DB_OPT_IN_ENV) != "1": + pytest.skip(f"set {LIVE_DB_OPT_IN_ENV}=1 to run live CC Switch DB tests") + + db_path = _live_db_path() + if not db_path.is_file(): + pytest.skip(f"live CC Switch database not found at {db_path}") + + +def _resolve_cli(name): + """Resolve installed CLI command; falls back to python -m for dev.""" + import shutil + force = os.environ.get("CLI_ANYTHING_FORCE_INSTALLED", "").strip() == "1" + path = shutil.which(name) + if path: + print(f"[_resolve_cli] Using installed command: {path}") + return [path] + if force: + raise RuntimeError(f"{name} not found in PATH. Install with: pip install -e .") + module = "cli_anything.ccswitch.ccswitch_cli" + print(f"[_resolve_cli] Falling back to: {sys.executable} -m {module}") + return [sys.executable, "-m", module] + + +CLI_BASE = _resolve_cli("cli-anything-ccswitch") + + +class TestCLISubprocess: + """Subprocess tests that invoke the real installed CLI command.""" + + def _run(self, args, check=True): + return subprocess.run( + CLI_BASE + args, + capture_output=True, text=True, + check=check, + ) + + # ── help ── + + def test_help(self): + result = self._run(["--help"]) + assert result.returncode == 0 + assert "CC Switch" in result.stdout + + def test_providers_help(self): + result = self._run(["providers", "--help"]) + assert result.returncode == 0 + + # ── providers ── + + def test_providers_list(self): + result = self._run(["providers", "list"]) + assert result.returncode == 0 + # Should have table headers + assert "App" in result.stdout or len(result.stdout) > 0 + + def test_providers_list_json(self): + result = self._run(["--json", "providers", "list"]) + assert result.returncode == 0 + data = json.loads(result.stdout) + assert isinstance(data, list) + if data: + assert "app_type" in data[0] or "id" in data[0] + + def test_providers_list_filter_claude(self): + result = self._run(["providers", "list", "--app", "claude"]) + assert result.returncode == 0 + + def test_providers_get_nonexistent(self): + result = self._run( + ["providers", "get", "__nonexistent__", "--app", "claude"], + check=False, + ) + assert result.returncode != 0 + + def test_providers_get_no_api_key_leaked(self): + """Ensure --json output of providers get masks sensitive values.""" + result = self._run( + ["--json", "providers", "list", "--app", "claude"], + ) + data = json.loads(result.stdout) + for prov in data: + prov_str = json.dumps(prov) + # No raw API tokens in output + assert "sk-" not in prov_str.lower() or "sk-" not in prov_str + + # ── skills ── + + def test_skills_list(self): + result = self._run(["skills", "list"]) + assert result.returncode == 0 + assert "Name" in result.stdout or len(result.stdout) > 0 + + def test_skills_list_json(self): + result = self._run(["--json", "skills", "list"]) + assert result.returncode == 0 + data = json.loads(result.stdout) + assert isinstance(data, list) + + def test_skills_repos(self): + result = self._run(["skills", "repos"]) + assert result.returncode == 0 + + # ── usage ── + + def test_usage_stats(self): + result = self._run(["usage", "stats", "--days", "30"]) + assert result.returncode == 0 + + def test_usage_stats_json(self): + result = self._run(["--json", "usage", "stats", "--days", "30"]) + assert result.returncode == 0 + data = json.loads(result.stdout) + assert isinstance(data, list) + + def test_usage_logs(self): + result = self._run(["usage", "logs", "--limit", "5"]) + assert result.returncode == 0 + + # ── mcp ── + + def test_mcp_list(self): + result = self._run(["mcp", "list"]) + assert result.returncode == 0 + + def test_mcp_list_json(self): + result = self._run(["--json", "mcp", "list"]) + assert result.returncode == 0 + data = json.loads(result.stdout) + assert isinstance(data, list) + + # ── settings ── + + def test_settings_list(self): + result = self._run(["settings", "list"]) + assert result.returncode == 0 + + def test_settings_list_json(self): + result = self._run(["--json", "settings", "list"]) + assert result.returncode == 0 + data = json.loads(result.stdout) + assert isinstance(data, dict) + + # ── proxy ── + + def test_proxy_status(self): + result = self._run(["proxy", "status", "--app", "claude"]) + assert result.returncode == 0 + assert "127.0.0.1" in result.stdout or "Proxy" in result.stdout + + # ── combined / overview ── + + def test_full_status(self): + result = self._run([]) + assert result.returncode == 0 + assert "CC Switch" in result.stdout or "Status" in result.stdout + + def test_full_status_json(self): + result = self._run(["--json"]) + assert result.returncode == 0 + data = json.loads(result.stdout) + assert "providers" in data + assert isinstance(data["providers"], int) diff --git a/cc-switch/agent-harness/cli_anything/ccswitch/utils/__init__.py b/cc-switch/agent-harness/cli_anything/ccswitch/utils/__init__.py new file mode 100644 index 000000000..6eb7059c5 --- /dev/null +++ b/cc-switch/agent-harness/cli_anything/ccswitch/utils/__init__.py @@ -0,0 +1 @@ +"""Utility modules for ccswitch CLI.""" diff --git a/cc-switch/agent-harness/cli_anything/ccswitch/utils/db.py b/cc-switch/agent-harness/cli_anything/ccswitch/utils/db.py new file mode 100644 index 000000000..0dad3e817 --- /dev/null +++ b/cc-switch/agent-harness/cli_anything/ccswitch/utils/db.py @@ -0,0 +1,70 @@ +"""Database connection and utility functions for CC Switch CLI.""" + +import os +import sqlite3 +import json +from pathlib import Path +from typing import Optional + + +def get_cc_switch_dir() -> Path: + """Get the CC Switch config directory.""" + home = Path(os.environ.get("CCSWITCH_HOME", os.path.expanduser("~"))) + return home / ".cc-switch" + + +def get_db_path() -> Path: + """Get the SQLite database path.""" + return get_cc_switch_dir() / "cc-switch.db" + + +def get_config_path() -> Path: + """Get the main app config JSON path.""" + return get_cc_switch_dir() / "config.json" + + +def get_settings_path() -> Path: + """Get the settings JSON path.""" + return get_cc_switch_dir() / "settings.json" + + +def connect_db(db_path: Optional[Path] = None) -> sqlite3.Connection: + """Connect to the CC Switch SQLite database.""" + path = db_path or get_db_path() + conn = sqlite3.connect(str(path)) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA foreign_keys=ON") + return conn + + +def load_config(config_path: Optional[Path] = None) -> dict: + """Load the main CC Switch config.json.""" + path = config_path or get_config_path() + if not path.exists(): + return {} + with open(path, "r", encoding="utf-8") as f: + content = f.read().strip() + if not content: + return {} + return json.loads(content) + + +def save_config(data: dict, config_path: Optional[Path] = None) -> None: + """Save the main CC Switch config.json.""" + path = config_path or get_config_path() + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + + +def load_settings() -> dict: + """Load CC Switch settings.json (UI preferences).""" + path = get_settings_path() + if not path.exists(): + return {} + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + + +VALID_APP_TYPES = ("claude", "codex", "gemini", "opencode", "openclaw", "hermes") diff --git a/cc-switch/agent-harness/cli_anything/ccswitch/utils/repl_skin.py b/cc-switch/agent-harness/cli_anything/ccswitch/utils/repl_skin.py new file mode 100644 index 000000000..bc1fb6d1d --- /dev/null +++ b/cc-switch/agent-harness/cli_anything/ccswitch/utils/repl_skin.py @@ -0,0 +1,567 @@ +"""cli-anything REPL Skin — Unified terminal interface for all CLI harnesses. + +Copy this file into your CLI package at: + cli_anything//utils/repl_skin.py + +Usage: + from cli_anything..utils.repl_skin import ReplSkin + + skin = ReplSkin("shotcut", version="1.0.0") + skin.print_banner() # auto-detects repo-root or packaged SKILL.md + prompt_text = skin.prompt(project_name="my_video.mlt", modified=True) + skin.success("Project saved") + skin.error("File not found") + skin.warning("Unsaved changes") + skin.info("Processing 24 clips...") + skin.status("Track 1", "3 clips, 00:02:30") + skin.table(headers, rows) + skin.print_goodbye() +""" + +import os +import sys +from pathlib import Path + +# ── ANSI color codes (no external deps for core styling) ────────────── + +_RESET = "\033[0m" +_BOLD = "\033[1m" +_DIM = "\033[2m" +_ITALIC = "\033[3m" +_UNDERLINE = "\033[4m" + +# Brand colors +_CYAN = "\033[38;5;80m" # cli-anything brand cyan +_CYAN_BG = "\033[48;5;80m" +_WHITE = "\033[97m" +_GRAY = "\033[38;5;245m" +_DARK_GRAY = "\033[38;5;240m" +_LIGHT_GRAY = "\033[38;5;250m" + +# Software accent colors — each software gets a unique accent +_ACCENT_COLORS = { + "gimp": "\033[38;5;214m", # warm orange + "blender": "\033[38;5;208m", # deep orange + "inkscape": "\033[38;5;39m", # bright blue + "audacity": "\033[38;5;33m", # navy blue + "libreoffice": "\033[38;5;40m", # green + "obs_studio": "\033[38;5;55m", # purple + "kdenlive": "\033[38;5;69m", # slate blue + "shotcut": "\033[38;5;35m", # teal green +} +_DEFAULT_ACCENT = "\033[38;5;75m" # default sky blue + +# Status colors +_GREEN = "\033[38;5;78m" +_YELLOW = "\033[38;5;220m" +_RED = "\033[38;5;196m" +_BLUE = "\033[38;5;75m" +_MAGENTA = "\033[38;5;176m" + +_SKILL_SOURCE_REPO = os.environ.get("CLI_ANYTHING_SKILL_REPO", "HKUDS/CLI-Anything") + +# ── Brand icon ──────────────────────────────────────────────────────── + +# The cli-anything icon: a small colored diamond/chevron mark +_ICON = f"{_CYAN}{_BOLD}◆{_RESET}" +_ICON_SMALL = f"{_CYAN}▸{_RESET}" + +# ── Box drawing characters ──────────────────────────────────────────── + +_H_LINE = "─" +_V_LINE = "│" +_TL = "╭" +_TR = "╮" +_BL = "╰" +_BR = "╯" +_T_DOWN = "┬" +_T_UP = "┴" +_T_RIGHT = "├" +_T_LEFT = "┤" +_CROSS = "┼" + + +def _strip_ansi(text: str) -> str: + """Remove ANSI escape codes for length calculation.""" + import re + return re.sub(r"\033\[[^m]*m", "", text) + + +def _visible_len(text: str) -> int: + """Get visible length of text (excluding ANSI codes).""" + return len(_strip_ansi(text)) + + +def _display_home_path(path: str) -> str: + """Display a path relative to the home directory when possible.""" + expanded = Path(path).expanduser().resolve() + home = Path.home().resolve() + try: + relative = expanded.relative_to(home) + return f"~/{relative.as_posix()}" + except ValueError: + return str(expanded) + + +class ReplSkin: + """Unified REPL skin for cli-anything CLIs. + + Provides consistent branding, prompts, and message formatting + across all CLI harnesses built with the cli-anything methodology. + """ + + def __init__(self, software: str, version: str = "1.0.0", + history_file: str | None = None, skill_path: str | None = None): + """Initialize the REPL skin. + + Args: + software: Software name (e.g., "gimp", "shotcut", "blender"). + version: CLI version string. + history_file: Path for persistent command history. + Defaults to ~/.cli-anything-/history + skill_path: Path to the SKILL.md file for agent discovery. + Auto-detected from the repo-root skills/ tree when present, + otherwise from the package's skills/ directory. + Displayed in banner for AI agents to know where to read skill info. + """ + self.software = software.lower().replace("-", "_") + self.display_name = software.replace("_", " ").title() + self.version = version + software_aliases = {"iterm2_ctl": "iterm2"} + self.skill_slug = software_aliases.get(self.software, self.software).replace("_", "-") + self.skill_id = f"cli-anything-{self.skill_slug}" + self.skill_install_cmd = ( + f"npx skills add {_SKILL_SOURCE_REPO} --skill {self.skill_id} -g -y" + ) + global_skill_root = Path( + os.environ.get("CLI_ANYTHING_GLOBAL_SKILLS_DIR", str(Path.home() / ".agents" / "skills")) + ).expanduser() + self.global_skill_path = str(global_skill_root / self.skill_id / "SKILL.md") + + # Prefer repo-root canonical skills//SKILL.md when running + # inside the CLI-Anything monorepo. Fall back to the packaged + # cli_anything//skills/SKILL.md for installed harnesses. + if skill_path is None: + package_skill = Path(__file__).resolve().parent.parent / "skills" / "SKILL.md" + repo_skill = None + for parent in Path(__file__).resolve().parents: + candidate = parent / "skills" / self.skill_id / "SKILL.md" + if candidate.is_file(): + repo_skill = candidate + break + if repo_skill and repo_skill.is_file(): + skill_path = str(repo_skill) + elif package_skill.is_file(): + skill_path = str(package_skill) + self.skill_path = skill_path + self.accent = _ACCENT_COLORS.get(self.software, _DEFAULT_ACCENT) + + # History file + if history_file is None: + hist_dir = Path.home() / f".cli-anything-{self.software}" + hist_dir.mkdir(parents=True, exist_ok=True) + self.history_file = str(hist_dir / "history") + else: + self.history_file = history_file + + # Detect terminal capabilities + self._color = self._detect_color_support() + + def _detect_color_support(self) -> bool: + """Check if terminal supports color.""" + if os.environ.get("NO_COLOR"): + return False + if os.environ.get("CLI_ANYTHING_NO_COLOR"): + return False + if not hasattr(sys.stdout, "isatty"): + return False + return sys.stdout.isatty() + + def _c(self, code: str, text: str) -> str: + """Apply color code if colors are supported.""" + if not self._color: + return text + return f"{code}{text}{_RESET}" + + # ── Banner ──────────────────────────────────────────────────────── + + def print_banner(self): + """Print the startup banner with branding.""" + import textwrap + + inner = 72 + + def _box_line(content: str) -> str: + """Wrap content in box drawing, padding to inner width.""" + pad = inner - _visible_len(content) + vl = self._c(_DARK_GRAY, _V_LINE) + return f"{vl}{content}{' ' * max(0, pad)}{vl}" + + def _meta_lines(label: str, value: str) -> list[str]: + """Wrap a metadata line for the banner box.""" + icon = self._c(_MAGENTA, "◇") + label_text = self._c(_DARK_GRAY, label) + prefix = f" {icon} {label_text} " + available = max(12, inner - _visible_len(prefix)) + wrapped = textwrap.wrap( + value, + width=available, + break_long_words=True, + break_on_hyphens=False, + ) or [""] + lines = [f"{prefix}{self._c(_LIGHT_GRAY, wrapped[0])}"] + continuation_prefix = " " * _visible_len(prefix) + for chunk in wrapped[1:]: + lines.append(f"{continuation_prefix}{self._c(_LIGHT_GRAY, chunk)}") + return lines + + top = self._c(_DARK_GRAY, f"{_TL}{_H_LINE * inner}{_TR}") + bot = self._c(_DARK_GRAY, f"{_BL}{_H_LINE * inner}{_BR}") + + # Title: ◆ cli-anything · Shotcut + icon = self._c(_CYAN + _BOLD, "◆") + brand = self._c(_CYAN + _BOLD, "cli-anything") + dot = self._c(_DARK_GRAY, "·") + name = self._c(self.accent + _BOLD, self.display_name) + title = f" {icon} {brand} {dot} {name}" + + ver = f" {self._c(_DARK_GRAY, f' v{self.version}')}" + tip = f" {self._c(_DARK_GRAY, ' Type help for commands, quit to exit')}" + empty = "" + + meta_lines: list[str] = [] + meta_lines.extend(_meta_lines("Install:", self.skill_install_cmd)) + meta_lines.extend(_meta_lines("Global skill:", _display_home_path(self.global_skill_path))) + print(top) + print(_box_line(title)) + print(_box_line(ver)) + for line in meta_lines: + print(_box_line(line)) + print(_box_line(empty)) + print(_box_line(tip)) + print(bot) + print() + + # ── Prompt ──────────────────────────────────────────────────────── + + def prompt(self, project_name: str = "", modified: bool = False, + context: str = "") -> str: + """Build a styled prompt string for prompt_toolkit or input(). + + Args: + project_name: Current project name (empty if none open). + modified: Whether the project has unsaved changes. + context: Optional extra context to show in prompt. + + Returns: + Formatted prompt string. + """ + parts = [] + + # Icon + if self._color: + parts.append(f"{_CYAN}◆{_RESET} ") + else: + parts.append("> ") + + # Software name + parts.append(self._c(self.accent + _BOLD, self.software)) + + # Project context + if project_name or context: + ctx = context or project_name + mod = "*" if modified else "" + parts.append(f" {self._c(_DARK_GRAY, '[')}") + parts.append(self._c(_LIGHT_GRAY, f"{ctx}{mod}")) + parts.append(self._c(_DARK_GRAY, ']')) + + parts.append(self._c(_GRAY, " ❯ ")) + + return "".join(parts) + + def prompt_tokens(self, project_name: str = "", modified: bool = False, + context: str = ""): + """Build prompt_toolkit formatted text tokens for the prompt. + + Use with prompt_toolkit's FormattedText for proper ANSI handling. + + Returns: + list of (style, text) tuples for prompt_toolkit. + """ + accent_hex = _ANSI_256_TO_HEX.get(self.accent, "#5fafff") + tokens = [] + + tokens.append(("class:icon", "◆ ")) + tokens.append(("class:software", self.software)) + + if project_name or context: + ctx = context or project_name + mod = "*" if modified else "" + tokens.append(("class:bracket", " [")) + tokens.append(("class:context", f"{ctx}{mod}")) + tokens.append(("class:bracket", "]")) + + tokens.append(("class:arrow", " ❯ ")) + + return tokens + + def get_prompt_style(self): + """Get a prompt_toolkit Style object matching the skin. + + Returns: + prompt_toolkit.styles.Style + """ + try: + from prompt_toolkit.styles import Style + except ImportError: + return None + + accent_hex = _ANSI_256_TO_HEX.get(self.accent, "#5fafff") + + return Style.from_dict({ + "icon": "#5fdfdf bold", # cyan brand color + "software": f"{accent_hex} bold", + "bracket": "#585858", + "context": "#bcbcbc", + "arrow": "#808080", + # Completion menu + "completion-menu.completion": "bg:#303030 #bcbcbc", + "completion-menu.completion.current": f"bg:{accent_hex} #000000", + "completion-menu.meta.completion": "bg:#303030 #808080", + "completion-menu.meta.completion.current": f"bg:{accent_hex} #000000", + # Auto-suggest + "auto-suggest": "#585858", + # Bottom toolbar + "bottom-toolbar": "bg:#1c1c1c #808080", + "bottom-toolbar.text": "#808080", + }) + + # ── Messages ────────────────────────────────────────────────────── + + def success(self, message: str): + """Print a success message with green checkmark.""" + icon = self._c(_GREEN + _BOLD, "✓") + print(f" {icon} {self._c(_GREEN, message)}") + + def error(self, message: str): + """Print an error message with red cross.""" + icon = self._c(_RED + _BOLD, "✗") + print(f" {icon} {self._c(_RED, message)}", file=sys.stderr) + + def warning(self, message: str): + """Print a warning message with yellow triangle.""" + icon = self._c(_YELLOW + _BOLD, "⚠") + print(f" {icon} {self._c(_YELLOW, message)}") + + def info(self, message: str): + """Print an info message with blue dot.""" + icon = self._c(_BLUE, "●") + print(f" {icon} {self._c(_LIGHT_GRAY, message)}") + + def hint(self, message: str): + """Print a subtle hint message.""" + print(f" {self._c(_DARK_GRAY, message)}") + + def section(self, title: str): + """Print a section header.""" + print() + print(f" {self._c(self.accent + _BOLD, title)}") + print(f" {self._c(_DARK_GRAY, _H_LINE * len(title))}") + + # ── Status display ──────────────────────────────────────────────── + + def status(self, label: str, value: str): + """Print a key-value status line.""" + lbl = self._c(_GRAY, f" {label}:") + val = self._c(_WHITE, f" {value}") + print(f"{lbl}{val}") + + def status_block(self, items: dict[str, str], title: str = ""): + """Print a block of status key-value pairs. + + Args: + items: Dict of label -> value pairs. + title: Optional title for the block. + """ + if title: + self.section(title) + + max_key = max(len(k) for k in items) if items else 0 + for label, value in items.items(): + lbl = self._c(_GRAY, f" {label:<{max_key}}") + val = self._c(_WHITE, f" {value}") + print(f"{lbl}{val}") + + def progress(self, current: int, total: int, label: str = ""): + """Print a simple progress indicator. + + Args: + current: Current step number. + total: Total number of steps. + label: Optional label for the progress. + """ + pct = int(current / total * 100) if total > 0 else 0 + bar_width = 20 + filled = int(bar_width * current / total) if total > 0 else 0 + bar = "█" * filled + "░" * (bar_width - filled) + text = f" {self._c(_CYAN, bar)} {self._c(_GRAY, f'{pct:3d}%')}" + if label: + text += f" {self._c(_LIGHT_GRAY, label)}" + print(text) + + # ── Table display ───────────────────────────────────────────────── + + def table(self, headers: list[str], rows: list[list[str]], + max_col_width: int = 40): + """Print a formatted table with box-drawing characters. + + Args: + headers: Column header strings. + rows: List of rows, each a list of cell strings. + max_col_width: Maximum column width before truncation. + """ + if not headers: + return + + # Calculate column widths + col_widths = [min(len(h), max_col_width) for h in headers] + for row in rows: + for i, cell in enumerate(row): + if i < len(col_widths): + col_widths[i] = min( + max(col_widths[i], len(str(cell))), max_col_width + ) + + def pad(text: str, width: int) -> str: + t = str(text)[:width] + return t + " " * (width - len(t)) + + # Header + header_cells = [ + self._c(_CYAN + _BOLD, pad(h, col_widths[i])) + for i, h in enumerate(headers) + ] + sep = self._c(_DARK_GRAY, f" {_V_LINE} ") + header_line = f" {sep.join(header_cells)}" + print(header_line) + + # Separator + sep_parts = [self._c(_DARK_GRAY, _H_LINE * w) for w in col_widths] + sep_line = self._c(_DARK_GRAY, f" {'───'.join([_H_LINE * w for w in col_widths])}") + print(sep_line) + + # Rows + for row in rows: + cells = [] + for i, cell in enumerate(row): + if i < len(col_widths): + cells.append(self._c(_LIGHT_GRAY, pad(str(cell), col_widths[i]))) + row_sep = self._c(_DARK_GRAY, f" {_V_LINE} ") + print(f" {row_sep.join(cells)}") + + # ── Help display ────────────────────────────────────────────────── + + def help(self, commands: dict[str, str]): + """Print a formatted help listing. + + Args: + commands: Dict of command -> description pairs. + """ + self.section("Commands") + max_cmd = max(len(c) for c in commands) if commands else 0 + for cmd, desc in commands.items(): + cmd_styled = self._c(self.accent, f" {cmd:<{max_cmd}}") + desc_styled = self._c(_GRAY, f" {desc}") + print(f"{cmd_styled}{desc_styled}") + print() + + # ── Goodbye ─────────────────────────────────────────────────────── + + def print_goodbye(self): + """Print a styled goodbye message.""" + print(f"\n {_ICON_SMALL} {self._c(_GRAY, 'Goodbye!')}\n") + + # ── Prompt toolkit session factory ──────────────────────────────── + + def create_prompt_session(self): + """Create a prompt_toolkit PromptSession with skin styling. + + Returns: + A configured PromptSession, or None if prompt_toolkit unavailable. + """ + try: + from prompt_toolkit import PromptSession + from prompt_toolkit.history import FileHistory + from prompt_toolkit.auto_suggest import AutoSuggestFromHistory + from prompt_toolkit.formatted_text import FormattedText + + style = self.get_prompt_style() + + session = PromptSession( + history=FileHistory(self.history_file), + auto_suggest=AutoSuggestFromHistory(), + style=style, + enable_history_search=True, + ) + return session + except ImportError: + return None + + def get_input(self, pt_session, project_name: str = "", + modified: bool = False, context: str = "") -> str: + """Get input from user using prompt_toolkit or fallback. + + Args: + pt_session: A prompt_toolkit PromptSession (or None). + project_name: Current project name. + modified: Whether project has unsaved changes. + context: Optional context string. + + Returns: + User input string (stripped). + """ + if pt_session is not None: + from prompt_toolkit.formatted_text import FormattedText + tokens = self.prompt_tokens(project_name, modified, context) + return pt_session.prompt(FormattedText(tokens)).strip() + else: + raw_prompt = self.prompt(project_name, modified, context) + return input(raw_prompt).strip() + + # ── Toolbar builder ─────────────────────────────────────────────── + + def bottom_toolbar(self, items: dict[str, str]): + """Create a bottom toolbar callback for prompt_toolkit. + + Args: + items: Dict of label -> value pairs to show in toolbar. + + Returns: + A callable that returns FormattedText for the toolbar. + """ + def toolbar(): + from prompt_toolkit.formatted_text import FormattedText + parts = [] + for i, (k, v) in enumerate(items.items()): + if i > 0: + parts.append(("class:bottom-toolbar.text", " │ ")) + parts.append(("class:bottom-toolbar.text", f" {k}: ")) + parts.append(("class:bottom-toolbar", v)) + return FormattedText(parts) + return toolbar + + +# ── ANSI 256-color to hex mapping (for prompt_toolkit styles) ───────── + +_ANSI_256_TO_HEX = { + "\033[38;5;33m": "#0087ff", # audacity navy blue + "\033[38;5;35m": "#00af5f", # shotcut teal + "\033[38;5;39m": "#00afff", # inkscape bright blue + "\033[38;5;40m": "#00d700", # libreoffice green + "\033[38;5;55m": "#5f00af", # obs purple + "\033[38;5;69m": "#5f87ff", # kdenlive slate blue + "\033[38;5;75m": "#5fafff", # default sky blue + "\033[38;5;80m": "#5fd7d7", # brand cyan + "\033[38;5;208m": "#ff8700", # blender deep orange + "\033[38;5;214m": "#ffaf00", # gimp warm orange +} diff --git a/cc-switch/agent-harness/setup.py b/cc-switch/agent-harness/setup.py new file mode 100644 index 000000000..f841743a8 --- /dev/null +++ b/cc-switch/agent-harness/setup.py @@ -0,0 +1,24 @@ +"""Setup script for CC Switch CLI harness.""" +from setuptools import setup, find_namespace_packages + +setup( + name="cli-anything-ccswitch", + version="1.0.0", + description="CLI interface for CC Switch — manage AI coding tool configurations", + author="cli-anything contributors", + python_requires=">=3.10", + packages=find_namespace_packages(include=["cli_anything.*"]), + package_data={ + "cli_anything.ccswitch": ["skills/*.md"], + }, + install_requires=[ + "click>=8.0", + "prompt_toolkit>=3.0", + "tomlkit>=0.12", + ], + entry_points={ + "console_scripts": [ + "cli-anything-ccswitch=cli_anything.ccswitch.ccswitch_cli:main", + ], + }, +) diff --git a/registry.json b/registry.json index 1e3eeca3e..e21ffabfa 100644 --- a/registry.json +++ b/registry.json @@ -5,6 +5,25 @@ "updated": "2026-04-16" }, "clis": [ + { + "name": "cc-switch", + "display_name": "CC Switch", + "version": "1.0.0", + "description": "Manage AI coding tool configurations - inspect providers, skills, MCP servers, usage stats, and proxy settings", + "requires": "CC Switch installed with active database", + "homepage": "https://github.com/HKUDS/CLI-Anything", + "source_url": null, + "install_cmd": "pip install git+https://github.com/HKUDS/CLI-Anything.git#subdirectory=cc-switch/agent-harness", + "entry_point": "cli-anything-ccswitch", + "skill_md": "skills/cli-anything-ccswitch/SKILL.md", + "category": "devops", + "contributors": [ + { + "name": "computersniper", + "url": "https://github.com/computersniper" + } + ] + }, { "name": "wiremock", "display_name": "WireMock", @@ -1325,4 +1344,4 @@ ] } ] -} +} \ No newline at end of file diff --git a/skills/cli-anything-ccswitch/SKILL.md b/skills/cli-anything-ccswitch/SKILL.md new file mode 100644 index 000000000..041b763a2 --- /dev/null +++ b/skills/cli-anything-ccswitch/SKILL.md @@ -0,0 +1,146 @@ +--- +name: "cli-anything-ccswitch" +description: "CLI interface for CC Switch — manage AI coding tool configurations from the terminal" +--- + +# CC Switch CLI + +CLI harness for CC Switch, a desktop app that manages AI coding tool (Claude Code, +Codex, Gemini CLI, OpenCode, OpenClaw, Hermes) configurations. Built with Click +and the CLI-Anything methodology. + +## Prerequisites + +- Python 3.10+ +- CC Switch installed with an active database at `~/.cc-switch/cc-switch.db` + +## Installation + +```bash +pip install -e . +``` + +After installation, the command `cli-anything-ccswitch` is available. + +## Command Groups + +| Group | Description | +|-------|-------------| +| `status` | Show a quick database overview | +| `providers` | Manage AI provider configurations (list, get, set-current) | +| `proxy` | Manage the local HTTP proxy server (status, config) | +| `mcp` | Manage MCP (Model Context Protocol) servers (list, enable) | +| `skills` | Manage installed skills (list, repos) | +| `usage` | View API usage and cost statistics (stats, logs) | +| `settings` | View and manage CC Switch settings (list, get, set) | +| `sessions` | Browse and search AI conversation sessions (list) | + +## Global Options + +- `--json` — Output in machine-readable JSON format (recommended for agent use) +- `--db PATH` — Override the database path + +## Agent-Specific Guidance + +### JSON Output + +Always use `--json` for programmatic consumption. Place it **before** the +subcommand: + +```bash +cli-anything-ccswitch --json providers list +cli-anything-ccswitch --json usage stats --days 30 +cli-anything-ccswitch --json providers get --app claude +``` + +### Sensitive Values + +API tokens, keys, and secrets are masked in all output. The `settings_config` +field in provider details shows masked values (e.g., `sk-bc089...5cbb`). + +### Exit Codes + +- `0` — Success +- `1` — Error (e.g., resource not found, invalid app type) + +### App Types + +Valid app types: `claude`, `codex`, `gemini`, `opencode`, `openclaw`, `hermes`. + +## Examples + +### List all providers + +```bash +cli-anything-ccswitch providers list +cli-anything-ccswitch --json providers list --app claude +``` + +### Switch active provider + +```bash +cli-anything-ccswitch providers set-current --app claude +``` + +### Check proxy status + +```bash +cli-anything-ccswitch proxy status --app claude +cli-anything-ccswitch proxy config --app claude --set-port 8080 +``` + +### View usage stats + +```bash +cli-anything-ccswitch usage stats --days 7 +cli-anything-ccswitch --json usage stats --days 30 --app claude +cli-anything-ccswitch usage logs --limit 10 +``` + +### List skills + +```bash +cli-anything-ccswitch skills list +cli-anything-ccswitch skills repos +``` + +### MCP servers + +```bash +cli-anything-ccswitch mcp list +cli-anything-ccswitch mcp enable --app claude --on +``` + +### Settings + +```bash +cli-anything-ccswitch settings list +cli-anything-ccswitch settings get +cli-anything-ccswitch settings set +``` + +### Sessions + +```bash +cli-anything-ccswitch sessions list --app claude --limit 10 +``` + +### Status overview + +```bash +cli-anything-ccswitch status +cli-anything-ccswitch +cli-anything-ccswitch --json +``` + +## Error Handling + +When a resource is not found, the CLI prints an error message to stderr and +exits with code 1. Agents should check the exit code before parsing output. + +## Database + +The CLI reads from the live CC Switch SQLite database. All read operations +are safe and do not modify the database. Write operations (`providers +set-current`, `proxy config`, `mcp enable`, `settings set`) modify the +database directly.