mirror of
https://github.com/HKUDS/CLI-Anything.git
synced 2026-07-03 13:02:27 +08:00
feat: add CC Switch CLI harness (#310)
* feat: add CC Switch CLI harness Add CLI harness for CC Switch — a desktop app that manages AI coding tool configurations (Claude Code, Codex, Gemini CLI, OpenCode, OpenClaw, Hermes). Reads from the live SQLite database with 7 command groups: providers, proxy, mcp, skills, usage, settings, sessions. 50 tests (30 unit + 20 E2E) all passing. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(ccswitch): harden secret masking and live config writes * fix(ccswitch): align gemini env writes and json sessions * fix(ccswitch): cover openclaw listings and nullable usage stats * fix: restore rekordbox harness gitignore exception * fix(ccswitch): preserve codex config during provider switch * Fix CC Switch review blockers --------- Co-authored-by: cjc-agent <agent@cjc-company.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: yuhao <itsyuhao@icloud.com>
This commit is contained in:
4
.gitignore
vendored
4
.gitignore
vendored
@@ -100,6 +100,7 @@
|
||||
!/3MF/
|
||||
!/calibre/
|
||||
!/rekordbox/
|
||||
!/cc-switch/
|
||||
!/siyuan/
|
||||
# Step 5: Inside each software dir, ignore everything (including dotfiles)
|
||||
/gimp/*
|
||||
@@ -208,6 +209,8 @@
|
||||
/calibre/.*
|
||||
/rekordbox/*
|
||||
/rekordbox/.*
|
||||
/cc-switch/*
|
||||
/cc-switch/.*
|
||||
/siyuan/*
|
||||
/siyuan/.*
|
||||
|
||||
@@ -275,6 +278,7 @@
|
||||
!/quietshrink/agent-harness/
|
||||
!/mailchimp/agent-harness/
|
||||
!/rekordbox/agent-harness/
|
||||
!/cc-switch/agent-harness/
|
||||
!/siyuan/agent-harness/
|
||||
|
||||
# Exclude non-gedit demo macros from macrocli (local only)
|
||||
|
||||
57
cc-switch/agent-harness/CCSWITCH.md
Normal file
57
cc-switch/agent-harness/CCSWITCH.md
Normal file
@@ -0,0 +1,57 @@
|
||||
# CC Switch SOP: CLI Harness Design
|
||||
|
||||
## Software Identity
|
||||
- **Name**: CC Switch v3.15.0
|
||||
- **Type**: Cross-platform desktop GUI (Tauri 2: Rust + React)
|
||||
- **Purpose**: All-in-One configuration manager for AI-powered coding CLI tools (Claude Code, Codex, Gemini CLI, OpenCode, OpenClaw, Hermes Agent)
|
||||
- **Core operations**: Provider switching management, local HTTP proxy, MCP server management, skills management, prompt management, session browsing, usage tracking, cloud sync
|
||||
|
||||
## Architecture Analysis
|
||||
|
||||
### Backend Engine
|
||||
- **Rust/Tauri 2** (`src-tauri/src/`) — native system integration, SQLite database, embedded HTTP proxy (axum), config file I/O
|
||||
- **React/TypeScript** (`src/`) — UI components, forms, charts
|
||||
|
||||
### Data Model
|
||||
- **Primary Store**: SQLite database at `~/.cc-switch/cc-switch.db` (12+ tables)
|
||||
- **Config Files**: `~/.cc-switch/config.json` (MultiAppConfig), `~/.cc-switch/settings.json`
|
||||
- **Managed Files**: `~/.claude/settings.json`, `~/.codex/*`, `~/.gemini/*`, `~/.opencode/*`, `~/.openclaw/*`, Hermes config dirs
|
||||
|
||||
### Key Database Tables
|
||||
| Table | Purpose |
|
||||
|-------|---------|
|
||||
| providers | AI provider configurations (API keys, endpoints, models) |
|
||||
| mcp_servers | MCP server definitions with per-app enable flags |
|
||||
| skills | Installed skills with per-app enable flags |
|
||||
| skill_repos | Registered skill GitHub repos |
|
||||
| prompts | Prompt presets with per-app association |
|
||||
| proxy_config | Per-app proxy settings (claude/codex/gemini) |
|
||||
| proxy_request_logs | API request logs for usage tracking |
|
||||
| usage_daily_rollups | Daily aggregated usage stats |
|
||||
| model_pricing | Model pricing data |
|
||||
| provider_health | Provider health monitoring |
|
||||
| session_log_sync | Session log file sync state |
|
||||
| settings | Key-value settings store |
|
||||
|
||||
### Existing CLI/Scripting Capabilities
|
||||
- **None**. All operations are Tauri IPC commands invoked from React frontend via `invoke()`.
|
||||
- **Deep Link Protocol** (`ccswitch://`) provides URL-based import for providers/MCP/prompts/skills.
|
||||
|
||||
## Command Map
|
||||
|
||||
| GUI Feature | CLI Command Group | Data Source |
|
||||
|-------------|------------------|-------------|
|
||||
| Provider Management | `providers` | SQLite providers table |
|
||||
| Proxy Server | `proxy` | SQLite proxy_config, proxy_request_logs tables |
|
||||
| MCP Management | `mcp` | SQLite mcp_servers table + app config files |
|
||||
| Skills Management | `skills` | SQLite skills + skill_repos tables + filesystem |
|
||||
| Prompt Management | `prompts` | SQLite prompts table + prompt files |
|
||||
| Session Browser | `sessions` | Filesystem (JSON log files) + SQLite session_log_sync |
|
||||
| Usage Dashboard | `usage` | SQLite proxy_request_logs + usage_daily_rollups |
|
||||
| Settings | `settings` | SQLite settings table + settings.json |
|
||||
| Cloud Sync | `sync` | WebDAV config + filesystem |
|
||||
|
||||
## Rendering Gap Assessment
|
||||
- **No rendering gap** — CC Switch manages text-based AI CLI configuration files. There is no visual/graphical rendering. The CLI directly reads/writes SQLite and JSON, which is functionally identical to what the GUI does.
|
||||
|
||||
The current harness focuses on status, provider switching, proxy, MCP, skills, usage, settings, and sessions. Preview-style summaries can be added later on top of the existing `usage stats` data.
|
||||
119
cc-switch/agent-harness/cli_anything/ccswitch/README.md
Normal file
119
cc-switch/agent-harness/cli_anything/ccswitch/README.md
Normal file
@@ -0,0 +1,119 @@
|
||||
# CC Switch CLI - Agent Harness
|
||||
|
||||
A CLI interface for CC Switch, a desktop app that manages AI coding
|
||||
tool configurations across Claude Code, Codex, Gemini CLI, OpenCode, OpenClaw,
|
||||
and Hermes. Reads directly from the live CC Switch SQLite database.
|
||||
|
||||
## Installation
|
||||
|
||||
```bash
|
||||
pip install git+https://github.com/HKUDS/CLI-Anything.git#subdirectory=cc-switch/agent-harness
|
||||
```
|
||||
|
||||
Or for local development:
|
||||
|
||||
```bash
|
||||
cd cc-switch/agent-harness
|
||||
pip install -e .
|
||||
```
|
||||
|
||||
Requires CC Switch installed with an active database at `~/.cc-switch/cc-switch.db`.
|
||||
|
||||
## Quick Start
|
||||
|
||||
```bash
|
||||
# Status overview
|
||||
cli-anything-ccswitch
|
||||
cli-anything-ccswitch status
|
||||
|
||||
# List all providers
|
||||
cli-anything-ccswitch providers list
|
||||
|
||||
# Filter by app
|
||||
cli-anything-ccswitch providers list --app claude
|
||||
|
||||
# Switch active provider
|
||||
cli-anything-ccswitch providers set-current <provider-id> --app claude
|
||||
|
||||
# Check proxy status
|
||||
cli-anything-ccswitch proxy status --app claude
|
||||
|
||||
# View usage stats (7 days)
|
||||
cli-anything-ccswitch usage stats --days 7
|
||||
|
||||
# Recent request logs
|
||||
cli-anything-ccswitch usage logs --limit 10
|
||||
|
||||
# List skills
|
||||
cli-anything-ccswitch skills list
|
||||
|
||||
# List MCP servers
|
||||
cli-anything-ccswitch mcp list
|
||||
|
||||
# Manage settings
|
||||
cli-anything-ccswitch settings list
|
||||
cli-anything-ccswitch settings get <key>
|
||||
|
||||
# Session logs
|
||||
cli-anything-ccswitch sessions list --app claude
|
||||
```
|
||||
|
||||
## JSON Output Mode
|
||||
|
||||
All commands support `--json` for machine-readable output. Place it before the subcommand:
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch --json providers list
|
||||
cli-anything-ccswitch --json usage stats --days 30
|
||||
cli-anything-ccswitch --json providers get <id> --app claude
|
||||
```
|
||||
|
||||
## Command Groups
|
||||
|
||||
| Group | Description |
|
||||
|-------|-------------|
|
||||
| `status` | Show a quick database overview |
|
||||
| `providers` | Manage AI provider configurations (list, get, set-current) |
|
||||
| `proxy` | Manage the local HTTP proxy server (status, config) |
|
||||
| `mcp` | Manage MCP servers (list, enable) |
|
||||
| `skills` | Manage installed skills (list, repos) |
|
||||
| `usage` | View API usage and cost statistics (stats, logs) |
|
||||
| `settings` | View and manage settings (list, get, set) |
|
||||
| `sessions` | Browse AI conversation sessions (list) |
|
||||
|
||||
## Running Tests
|
||||
|
||||
```bash
|
||||
# All tests
|
||||
python3 -m pytest cli_anything/ccswitch/tests/ -v
|
||||
|
||||
# Unit tests (no CC Switch needed)
|
||||
python3 -m pytest cli_anything/ccswitch/tests/test_core.py -v
|
||||
|
||||
# E2E tests (requires live CC Switch database)
|
||||
python3 -m pytest cli_anything/ccswitch/tests/test_full_e2e.py -v
|
||||
```
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
cli_anything/ccswitch/
|
||||
├── __init__.py
|
||||
├── __main__.py # python3 -m cli_anything.ccswitch
|
||||
├── ccswitch_cli.py # Main Click CLI (7 command groups)
|
||||
├── utils/
|
||||
│ ├── __init__.py
|
||||
│ └── db.py # SQLite database utilities
|
||||
├── skills/
|
||||
│ └── SKILL.md # Packaged AI skill definition
|
||||
└── tests/
|
||||
├── TEST.md # Test plan and results
|
||||
├── test_core.py # 30 unit tests
|
||||
└── test_full_e2e.py # 20 E2E tests
|
||||
```
|
||||
|
||||
## Database
|
||||
|
||||
Reads directly from `~/.cc-switch/cc-switch.db`. Read operations are safe;
|
||||
write operations (`providers set-current`, `proxy config`, `mcp enable`,
|
||||
`settings set`) modify the database and write live config to app settings files.
|
||||
@@ -0,0 +1,3 @@
|
||||
"""CC Switch CLI - Command-line interface for CC Switch configuration manager."""
|
||||
|
||||
__version__ = "1.0.0"
|
||||
@@ -0,0 +1,5 @@
|
||||
"""Allow python -m cli_anything.ccswitch"""
|
||||
from cli_anything.ccswitch.ccswitch_cli import main
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
988
cc-switch/agent-harness/cli_anything/ccswitch/ccswitch_cli.py
Normal file
988
cc-switch/agent-harness/cli_anything/ccswitch/ccswitch_cli.py
Normal file
@@ -0,0 +1,988 @@
|
||||
"""CC Switch CLI — command-line interface for CC Switch configuration manager.
|
||||
|
||||
Usage:
|
||||
ccswitch providers list [--app claude] [--json]
|
||||
ccswitch providers set-current <provider-id> --app claude
|
||||
ccswitch proxy status [--app claude] [--json]
|
||||
ccswitch proxy config get [--app claude]
|
||||
ccswitch mcp list [--json]
|
||||
ccswitch skills list [--json]
|
||||
ccswitch usage stats [--days 30] [--json]
|
||||
ccswitch settings get <key>
|
||||
"""
|
||||
|
||||
import json as _json
|
||||
import os as _os
|
||||
import sys
|
||||
import tempfile as _tempfile
|
||||
from collections.abc import Mapping
|
||||
from copy import deepcopy as _deepcopy
|
||||
import click
|
||||
import tomlkit
|
||||
|
||||
from .utils.db import connect_db, load_config, load_settings, VALID_APP_TYPES
|
||||
|
||||
_MANAGED_APPS = VALID_APP_TYPES
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# Shared helpers
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
|
||||
def _resolve_app(app: str | None) -> str | None:
|
||||
if app is not None:
|
||||
app = app.lower()
|
||||
if app not in VALID_APP_TYPES:
|
||||
raise click.BadParameter(f"Invalid app: {app}. Valid: {', '.join(VALID_APP_TYPES)}")
|
||||
return app
|
||||
|
||||
|
||||
def _mask_sensitive(key: str, value) -> str:
|
||||
"""Mask sensitive values like API tokens and keys."""
|
||||
if isinstance(value, str) and _is_sensitive_key(key):
|
||||
if len(value) > 12:
|
||||
return value[:8] + "..." + value[-4:]
|
||||
return "***"
|
||||
if isinstance(value, Mapping | list | tuple):
|
||||
return _format_masked_value(_mask_value(value, key))
|
||||
return str(value)
|
||||
|
||||
|
||||
def _is_sensitive_key(key: str) -> bool:
|
||||
lowered = key.lower()
|
||||
normalized = "".join(ch if ch.isalnum() else "_" for ch in lowered).strip("_")
|
||||
parts = {p for p in normalized.split("_") if p}
|
||||
if not normalized:
|
||||
return False
|
||||
if normalized in {"key", "apikey", "api_key"}:
|
||||
return True
|
||||
if normalized.endswith("_key") or normalized.startswith("key_"):
|
||||
return True
|
||||
sensitive_words = {
|
||||
"token", "secret", "password", "auth", "credential",
|
||||
"bearer", "cookie", "private",
|
||||
}
|
||||
if parts & sensitive_words:
|
||||
return True
|
||||
return any(word in normalized for word in (
|
||||
"apikey", "api_key", "authtoken", "accesskey", "secretkey",
|
||||
"secretaccesskey", "authorization",
|
||||
))
|
||||
|
||||
|
||||
def _mask_value(value, key: str = ""):
|
||||
"""Return a JSON-safe copy with sensitive nested values masked."""
|
||||
if _is_sensitive_key(key):
|
||||
if isinstance(value, str):
|
||||
return _mask_sensitive(key, value)
|
||||
return "***"
|
||||
if isinstance(value, Mapping):
|
||||
return {k: _mask_value(v, str(k)) for k, v in value.items()}
|
||||
if isinstance(value, list):
|
||||
return [_mask_value(item, key) for item in value]
|
||||
if isinstance(value, tuple):
|
||||
return [_mask_value(item, key) for item in value]
|
||||
return value
|
||||
|
||||
|
||||
def _format_masked_value(value) -> str:
|
||||
if isinstance(value, Mapping):
|
||||
return "{" + ", ".join(
|
||||
f"{k}: {_format_masked_value(v)}" for k, v in value.items()
|
||||
) + "}"
|
||||
if isinstance(value, list):
|
||||
return "[" + ", ".join(_format_masked_value(item) for item in value) + "]"
|
||||
return str(value)
|
||||
|
||||
|
||||
def _table(headers: list[str], rows: list[tuple]) -> str:
|
||||
"""Format data as a simple aligned table."""
|
||||
if not rows:
|
||||
return "(empty)"
|
||||
all_rows = [headers] + [list(map(str, r)) for r in rows]
|
||||
col_widths = [max(len(r[i]) for r in all_rows) for i in range(len(headers))]
|
||||
lines = []
|
||||
header = " ".join(h.ljust(col_widths[i]) for i, h in enumerate(headers))
|
||||
lines.append(header)
|
||||
lines.append("-" * len(header))
|
||||
for row in all_rows[1:]:
|
||||
lines.append(" ".join(v.ljust(col_widths[i]) for i, v in enumerate(row)))
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _enabled_apps_str(row: Mapping) -> str:
|
||||
apps = [app for app in _MANAGED_APPS if row[f"enabled_{app}"]]
|
||||
return ",".join(apps) if apps else "-"
|
||||
|
||||
|
||||
def _normalise_usage_rows(rows) -> list[dict]:
|
||||
normalised = []
|
||||
for row in rows:
|
||||
data = dict(row)
|
||||
data["input_tok"] = data["input_tok"] or 0
|
||||
data["output_tok"] = data["output_tok"] or 0
|
||||
data["cost"] = data["cost"] or 0
|
||||
normalised.append(data)
|
||||
return normalised
|
||||
|
||||
|
||||
def _enabled_app_columns(db, table: str) -> str:
|
||||
columns = {row["name"] for row in db.execute(f"PRAGMA table_info({table})")}
|
||||
parts = []
|
||||
for app in _MANAGED_APPS:
|
||||
column = f"enabled_{app}"
|
||||
parts.append(column if column in columns else f"0 AS {column}")
|
||||
return ", ".join(parts)
|
||||
|
||||
|
||||
_CODEX_PROVIDER_CONFIG_KEYS = ("model", "model_provider", "base_url")
|
||||
|
||||
|
||||
def _merge_codex_provider_toml(target, provider_config: str) -> None:
|
||||
"""Merge provider-owned Codex TOML fields without deleting user settings."""
|
||||
try:
|
||||
existing_text = target.read_text(encoding="utf-8") if target.exists() else ""
|
||||
existing_doc = (
|
||||
tomlkit.parse(existing_text) if existing_text.strip() else tomlkit.document()
|
||||
)
|
||||
provider_doc = (
|
||||
tomlkit.parse(provider_config) if provider_config.strip() else tomlkit.document()
|
||||
)
|
||||
except Exception as exc:
|
||||
raise click.ClickException(f"Invalid Codex config.toml: {exc}") from exc
|
||||
|
||||
for key in _CODEX_PROVIDER_CONFIG_KEYS:
|
||||
if key in provider_doc:
|
||||
existing_doc[key] = _deepcopy(provider_doc[key])
|
||||
|
||||
if "model_providers" in provider_doc:
|
||||
provider_table = provider_doc["model_providers"]
|
||||
if not hasattr(provider_table, "items"):
|
||||
existing_doc["model_providers"] = _deepcopy(provider_table)
|
||||
else:
|
||||
existing_table = existing_doc.get("model_providers")
|
||||
if not hasattr(existing_table, "items"):
|
||||
existing_doc["model_providers"] = tomlkit.table()
|
||||
existing_table = existing_doc["model_providers"]
|
||||
for provider_id, provider_settings in provider_table.items():
|
||||
existing_table[provider_id] = _deepcopy(provider_settings)
|
||||
|
||||
_write_text_config(target, tomlkit.dumps(existing_doc))
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# Main CLI
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
@click.group(invoke_without_command=True)
|
||||
@click.option("--json", "json_mode", is_flag=True, help="Output in JSON format")
|
||||
@click.option("--db", "db_path", type=click.Path(), help="Override database path")
|
||||
@click.pass_context
|
||||
def cli(ctx: click.Context, json_mode: bool, db_path: str | None) -> None:
|
||||
"""CC Switch CLI — Manage AI coding tool configurations from the terminal."""
|
||||
ctx.ensure_object(dict)
|
||||
ctx.obj["json_mode"] = json_mode
|
||||
ctx.obj["db_path"] = db_path
|
||||
if ctx.invoked_subcommand is None:
|
||||
# Show status overview
|
||||
_show_status(ctx)
|
||||
|
||||
|
||||
@cli.command("status")
|
||||
@click.pass_context
|
||||
def status(ctx: click.Context) -> None:
|
||||
"""Show a quick database overview."""
|
||||
_show_status(ctx)
|
||||
|
||||
|
||||
def _show_status(ctx: click.Context) -> None:
|
||||
"""Show a quick status overview."""
|
||||
db = connect_db(ctx.obj.get("db_path"))
|
||||
try:
|
||||
# Count providers
|
||||
prov_count = db.execute("SELECT COUNT(*) FROM providers").fetchone()[0]
|
||||
# Current provider per app
|
||||
cur = db.execute(
|
||||
"SELECT app_type, name FROM providers WHERE is_current=1 ORDER BY app_type"
|
||||
).fetchall()
|
||||
# Skill count
|
||||
skill_count = db.execute("SELECT COUNT(*) FROM skills").fetchone()[0]
|
||||
# MCP count
|
||||
mcp_count = db.execute("SELECT COUNT(*) FROM mcp_servers").fetchone()[0]
|
||||
|
||||
if ctx.obj.get("json_mode"):
|
||||
_json.dump({
|
||||
"providers": prov_count,
|
||||
"current": {r["app_type"]: r["name"] for r in cur},
|
||||
"skills": skill_count,
|
||||
"mcp_servers": mcp_count,
|
||||
}, sys.stdout, indent=2)
|
||||
return
|
||||
|
||||
click.echo("CC Switch Status")
|
||||
click.echo("-" * 40)
|
||||
click.echo(f" Providers: {prov_count}")
|
||||
click.echo(f" Skills: {skill_count}")
|
||||
click.echo(f" MCP Servers: {mcp_count}")
|
||||
click.echo()
|
||||
click.echo(" Current providers:")
|
||||
for r in cur:
|
||||
click.echo(f" {r['app_type']:>10s}: {r['name']}")
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# Providers
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
@cli.group()
|
||||
def providers() -> None:
|
||||
"""Manage AI provider configurations."""
|
||||
pass
|
||||
|
||||
|
||||
@providers.command("list")
|
||||
@click.option("--app", "-a", help="Filter by app type")
|
||||
@click.pass_context
|
||||
def providers_list(ctx: click.Context, app: str | None) -> None:
|
||||
"""List all configured providers."""
|
||||
app = _resolve_app(app)
|
||||
db = connect_db(ctx.obj.get("db_path"))
|
||||
try:
|
||||
if app:
|
||||
rows = db.execute(
|
||||
"SELECT id, name, category, is_current, sort_index FROM providers "
|
||||
"WHERE app_type=? ORDER BY sort_index",
|
||||
(app,),
|
||||
).fetchall()
|
||||
else:
|
||||
rows = db.execute(
|
||||
"SELECT app_type, id, name, category, is_current, sort_index "
|
||||
"FROM providers ORDER BY app_type, sort_index"
|
||||
).fetchall()
|
||||
|
||||
if ctx.obj.get("json_mode"):
|
||||
_json.dump([dict(r) for r in rows], sys.stdout, indent=2, default=str)
|
||||
return
|
||||
|
||||
if app:
|
||||
click.echo(_table(["ID", "Name", "Category", "Current", "Sort"], [
|
||||
(r["id"], r["name"], r["category"] or "", "*" if r["is_current"] else "", r["sort_index"])
|
||||
for r in rows
|
||||
]))
|
||||
else:
|
||||
click.echo(_table(["App", "ID", "Name", "Category", "Current", "Sort"], [
|
||||
(r["app_type"], r["id"], r["name"], r["category"] or "", "*" if r["is_current"] else "", r["sort_index"])
|
||||
for r in rows
|
||||
]))
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
@providers.command("get")
|
||||
@click.argument("provider_id")
|
||||
@click.option("--app", "-a", required=True, help="App type (claude/codex/gemini/...)")
|
||||
@click.pass_context
|
||||
def providers_get(ctx: click.Context, provider_id: str, app: str) -> None:
|
||||
"""Get detailed configuration for a provider."""
|
||||
app = _resolve_app(app)
|
||||
db = connect_db(ctx.obj.get("db_path"))
|
||||
try:
|
||||
row = db.execute(
|
||||
"SELECT * FROM providers WHERE id=? AND app_type=?", (provider_id, app)
|
||||
).fetchone()
|
||||
if not row:
|
||||
click.echo(f"Provider '{provider_id}' not found for app '{app}'", err=True)
|
||||
raise SystemExit(1)
|
||||
|
||||
data = dict(row)
|
||||
# Parse settings_config JSON
|
||||
data["settings_config"] = _json.loads(data["settings_config"])
|
||||
|
||||
if ctx.obj.get("json_mode"):
|
||||
data["settings_config"] = _mask_value(data["settings_config"])
|
||||
_json.dump(data, sys.stdout, indent=2, default=str)
|
||||
return
|
||||
|
||||
click.echo(f"Provider: {data['name']}")
|
||||
click.echo(f" ID: {data['id']}")
|
||||
click.echo(f" App: {data['app_type']}")
|
||||
click.echo(f" Category: {data.get('category', 'N/A')}")
|
||||
click.echo(f" Current: {bool(data['is_current'])}")
|
||||
click.echo(f" Settings:")
|
||||
for k, v in sorted(data["settings_config"].items()):
|
||||
click.echo(f" {k}: {_format_masked_value(_mask_value(v, k))}")
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
@providers.command("set-current")
|
||||
@click.argument("provider_id")
|
||||
@click.option("--app", "-a", required=True, help="App type")
|
||||
@click.pass_context
|
||||
def providers_set_current(ctx: click.Context, provider_id: str, app: str) -> None:
|
||||
"""Set the current/active provider for an app."""
|
||||
app = _resolve_app(app)
|
||||
db = connect_db(ctx.obj.get("db_path"))
|
||||
try:
|
||||
# Verify provider exists
|
||||
row = db.execute(
|
||||
"SELECT id, name FROM providers WHERE id=? AND app_type=?", (provider_id, app)
|
||||
).fetchone()
|
||||
if not row:
|
||||
click.echo(f"Provider '{provider_id}' not found for '{app}'", err=True)
|
||||
raise SystemExit(1)
|
||||
|
||||
# Unset all, then set current
|
||||
db.execute("UPDATE providers SET is_current=0 WHERE app_type=?", (app,))
|
||||
db.execute(
|
||||
"UPDATE providers SET is_current=1 WHERE id=? AND app_type=?", (provider_id, app)
|
||||
)
|
||||
|
||||
_write_live_config(app, db)
|
||||
db.commit()
|
||||
click.echo(f"Switched {app} to provider: {row['name']}")
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def _write_live_config(app: str, db) -> None:
|
||||
"""Write the current provider config to the live app config file."""
|
||||
from pathlib import Path
|
||||
|
||||
home = Path(_os.path.expanduser("~"))
|
||||
row = db.execute(
|
||||
"SELECT * FROM providers WHERE app_type=? AND is_current=1", (app,)
|
||||
).fetchone()
|
||||
if not row:
|
||||
return
|
||||
|
||||
config = _json.loads(row["settings_config"])
|
||||
|
||||
target_map = {
|
||||
"claude": home / ".claude" / "settings.json",
|
||||
"codex": home / ".codex" / "config.toml",
|
||||
"gemini": home / ".gemini" / ".env",
|
||||
"opencode": home / ".config" / "opencode" / "opencode.json",
|
||||
"openclaw": home / ".openclaw" / "openclaw.json",
|
||||
"hermes": home / ".hermes" / "config.yaml",
|
||||
}
|
||||
|
||||
target = target_map.get(app)
|
||||
if not target:
|
||||
return
|
||||
|
||||
if app == "claude":
|
||||
_write_json_env_config(target, _env_config(config))
|
||||
click.echo(f" Written live config to: {target}")
|
||||
elif app == "codex":
|
||||
wrote = False
|
||||
if "config" in config:
|
||||
_merge_codex_provider_toml(target, str(config["config"]))
|
||||
wrote = True
|
||||
if isinstance(config.get("auth"), dict):
|
||||
_write_json_config(home / ".codex" / "auth.json", config["auth"])
|
||||
wrote = True
|
||||
if wrote:
|
||||
click.echo(f" Written live config to: {target}")
|
||||
else:
|
||||
click.echo(f" (Note: no supported {app} live config payload found)")
|
||||
elif app == "gemini":
|
||||
written_targets = []
|
||||
env = _gemini_env_config(config)
|
||||
if env:
|
||||
_write_env_config(target, env)
|
||||
written_targets.append(target)
|
||||
if "config" in config:
|
||||
settings_target = home / ".gemini" / "settings.json"
|
||||
if isinstance(config["config"], dict):
|
||||
_write_json_config(settings_target, config["config"])
|
||||
else:
|
||||
_write_text_config(settings_target, str(config["config"]))
|
||||
written_targets.append(settings_target)
|
||||
if written_targets:
|
||||
click.echo(
|
||||
" Written live config to: "
|
||||
+ ", ".join(str(path) for path in written_targets)
|
||||
)
|
||||
else:
|
||||
click.echo(f" (Note: no supported {app} live config payload found)")
|
||||
elif app == "opencode":
|
||||
if "config" in config:
|
||||
_write_text_config(target, str(config["config"]))
|
||||
elif "settingsConfig" in config:
|
||||
_write_opencode_provider_config(target, str(row["id"]), config["settingsConfig"])
|
||||
else:
|
||||
click.echo(f" (Note: no supported {app} live config payload found)")
|
||||
return
|
||||
click.echo(f" Written live config to: {target}")
|
||||
elif app == "openclaw":
|
||||
_write_json_config(target, config)
|
||||
click.echo(f" Written live config to: {target}")
|
||||
elif app == "hermes":
|
||||
if "config" in config:
|
||||
_write_text_config(target, str(config["config"]))
|
||||
else:
|
||||
click.echo(f" (Note: no supported {app} live config payload found)")
|
||||
return
|
||||
click.echo(f" Written live config to: {target}")
|
||||
|
||||
|
||||
def _env_config(config: dict) -> dict:
|
||||
env = config.get("env")
|
||||
if isinstance(env, dict):
|
||||
return env
|
||||
return config
|
||||
|
||||
|
||||
def _gemini_env_config(config: dict) -> dict:
|
||||
env = config.get("env")
|
||||
if isinstance(env, dict):
|
||||
return env
|
||||
if "config" in config or "auth" in config or "settingsConfig" in config:
|
||||
return {}
|
||||
return config
|
||||
|
||||
|
||||
def _write_json_env_config(target, env: dict) -> None:
|
||||
existing = {}
|
||||
if target.exists():
|
||||
with open(target, encoding="utf-8") as f:
|
||||
existing = _json.load(f)
|
||||
if not isinstance(existing, dict):
|
||||
existing = {}
|
||||
existing_env = existing.get("env")
|
||||
if not isinstance(existing_env, dict):
|
||||
existing_env = {}
|
||||
existing_env.update(env)
|
||||
existing["env"] = existing_env
|
||||
_write_json_config(target, existing)
|
||||
|
||||
|
||||
def _write_env_config(target, env: dict) -> None:
|
||||
existing = {}
|
||||
if target.exists():
|
||||
existing = _parse_env_file(target.read_text(encoding="utf-8"))
|
||||
existing.update(_coerce_env_config(env))
|
||||
lines = [f"{key}={existing[key]}" for key in sorted(existing)]
|
||||
_write_text_config(target, "\n".join(lines))
|
||||
|
||||
|
||||
def _parse_env_file(content: str) -> dict[str, str]:
|
||||
parsed = {}
|
||||
for raw_line in content.splitlines():
|
||||
line = raw_line.strip()
|
||||
if not line or line.startswith("#") or "=" not in line:
|
||||
continue
|
||||
key, value = line.split("=", 1)
|
||||
key = key.strip()
|
||||
if _is_valid_env_key(key):
|
||||
parsed[key] = value
|
||||
return parsed
|
||||
|
||||
|
||||
def _coerce_env_config(env: dict) -> dict[str, str]:
|
||||
result = {}
|
||||
for key, value in env.items():
|
||||
key = str(key)
|
||||
if not _is_valid_env_key(key):
|
||||
continue
|
||||
result[key] = _env_value(value)
|
||||
return result
|
||||
|
||||
|
||||
def _is_valid_env_key(key: str) -> bool:
|
||||
return bool(key) and all(
|
||||
ch == "_" or "A" <= ch <= "Z" or "a" <= ch <= "z" or "0" <= ch <= "9"
|
||||
for ch in key
|
||||
)
|
||||
|
||||
|
||||
def _env_value(value) -> str:
|
||||
text = "" if value is None else str(value)
|
||||
return text.replace("\r", "\\r").replace("\n", "\\n")
|
||||
|
||||
|
||||
def _write_json_config(target, config: dict) -> None:
|
||||
_write_text_config(target, _json.dumps(config, indent=2, ensure_ascii=False) + "\n")
|
||||
|
||||
|
||||
def _write_opencode_provider_config(target, provider_id: str, provider_config: dict) -> None:
|
||||
existing = {
|
||||
"$schema": "https://opencode.ai/config.json",
|
||||
}
|
||||
if target.exists():
|
||||
with open(target, encoding="utf-8") as f:
|
||||
existing = _json.load(f)
|
||||
if not isinstance(existing, dict):
|
||||
existing = {}
|
||||
providers = existing.get("provider")
|
||||
if not isinstance(providers, dict):
|
||||
providers = {}
|
||||
providers[provider_id] = provider_config
|
||||
existing["provider"] = providers
|
||||
_write_json_config(target, existing)
|
||||
|
||||
|
||||
def _write_text_config(target, content: str) -> None:
|
||||
if content and not content.endswith("\n"):
|
||||
content += "\n"
|
||||
_write_secure_file(target, content)
|
||||
|
||||
|
||||
def _write_secure_file(target, content: str) -> None:
|
||||
target.parent.mkdir(parents=True, exist_ok=True)
|
||||
fd, tmp_name = _tempfile.mkstemp(prefix=f".{target.name}.", dir=target.parent)
|
||||
try:
|
||||
with _os.fdopen(fd, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
try:
|
||||
_os.chmod(tmp_name, 0o600)
|
||||
except OSError:
|
||||
pass
|
||||
_os.replace(tmp_name, target)
|
||||
except Exception:
|
||||
try:
|
||||
_os.unlink(tmp_name)
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# Proxy
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
@cli.group()
|
||||
def proxy() -> None:
|
||||
"""Manage the local HTTP proxy server."""
|
||||
pass
|
||||
|
||||
|
||||
@proxy.command("status")
|
||||
@click.option("--app", "-a", default="claude", help="App type")
|
||||
@click.pass_context
|
||||
def proxy_status(ctx: click.Context, app: str) -> None:
|
||||
"""Show proxy server status."""
|
||||
app = _resolve_app(app)
|
||||
db = connect_db(ctx.obj.get("db_path"))
|
||||
try:
|
||||
row = db.execute(
|
||||
"SELECT * FROM proxy_config WHERE app_type=?", (app,)
|
||||
).fetchone()
|
||||
if not row:
|
||||
click.echo(f"No proxy config for {app}")
|
||||
return
|
||||
|
||||
data = dict(row)
|
||||
if ctx.obj.get("json_mode"):
|
||||
_json.dump(data, sys.stdout, indent=2, default=str)
|
||||
return
|
||||
|
||||
click.echo(f"Proxy Status ({app}):")
|
||||
click.echo(f" Enabled: {bool(data['enabled'])}")
|
||||
click.echo(f" Listen: {data['listen_address']}:{data['listen_port']}")
|
||||
click.echo(f" Proxy Enabled: {bool(data['proxy_enabled'])}")
|
||||
click.echo(f" Auto Failover: {bool(data['auto_failover_enabled'])}")
|
||||
click.echo(f" Max Retries: {data['max_retries']}")
|
||||
click.echo(f" Circuit Breaker: {bool(data.get('live_takeover_active', 0))}")
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
@proxy.command("config")
|
||||
@click.option("--app", "-a", default="claude", help="App type")
|
||||
@click.option("--set-port", type=int, help="Set listen port")
|
||||
@click.option("--enable/--disable", default=None, help="Enable/disable proxy")
|
||||
@click.option("--failover/--no-failover", default=None, help="Enable/disable auto failover")
|
||||
@click.pass_context
|
||||
def proxy_config(
|
||||
ctx: click.Context, app: str, set_port: int | None,
|
||||
enable: bool | None, failover: bool | None
|
||||
) -> None:
|
||||
"""Get or set proxy configuration."""
|
||||
app = _resolve_app(app)
|
||||
db = connect_db(ctx.obj.get("db_path"))
|
||||
try:
|
||||
if set_port is not None:
|
||||
db.execute("UPDATE proxy_config SET listen_port=? WHERE app_type=?", (set_port, app))
|
||||
if enable is True:
|
||||
db.execute("UPDATE proxy_config SET proxy_enabled=1 WHERE app_type=?", (app,))
|
||||
elif enable is False:
|
||||
db.execute("UPDATE proxy_config SET proxy_enabled=0 WHERE app_type=?", (app,))
|
||||
if failover is True:
|
||||
db.execute("UPDATE proxy_config SET auto_failover_enabled=1 WHERE app_type=?", (app,))
|
||||
elif failover is False:
|
||||
db.execute("UPDATE proxy_config SET auto_failover_enabled=0 WHERE app_type=?", (app,))
|
||||
db.commit()
|
||||
|
||||
# Show updated config
|
||||
row = db.execute(
|
||||
"SELECT * FROM proxy_config WHERE app_type=?", (app,)
|
||||
).fetchone()
|
||||
if row:
|
||||
data = dict(row)
|
||||
click.echo(_json.dumps({
|
||||
"app": app,
|
||||
"listen": f"{data['listen_address']}:{data['listen_port']}",
|
||||
"enabled": bool(data["enabled"]),
|
||||
"proxy_enabled": bool(data["proxy_enabled"]),
|
||||
"auto_failover": bool(data["auto_failover_enabled"]),
|
||||
"max_retries": data["max_retries"],
|
||||
}, indent=2))
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# MCP
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
@cli.group()
|
||||
def mcp() -> None:
|
||||
"""Manage MCP (Model Context Protocol) servers."""
|
||||
pass
|
||||
|
||||
|
||||
@mcp.command("list")
|
||||
@click.pass_context
|
||||
def mcp_list(ctx: click.Context) -> None:
|
||||
"""List all MCP servers."""
|
||||
db = connect_db(ctx.obj.get("db_path"))
|
||||
try:
|
||||
enabled_columns = _enabled_app_columns(db, "mcp_servers")
|
||||
rows = db.execute(
|
||||
f"SELECT id, name, description, {enabled_columns} "
|
||||
"FROM mcp_servers ORDER BY name"
|
||||
).fetchall()
|
||||
|
||||
if ctx.obj.get("json_mode"):
|
||||
_json.dump([dict(r) for r in rows], sys.stdout, indent=2, default=str)
|
||||
return
|
||||
|
||||
click.echo(_table(["ID", "Name", "Apps", "Description"], [
|
||||
(r["id"][:30], r["name"], _enabled_apps_str(r), (r["description"] or "")[:50])
|
||||
for r in rows
|
||||
]))
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
@mcp.command("enable")
|
||||
@click.argument("server_id")
|
||||
@click.option("--app", "-a", required=True, help="App type")
|
||||
@click.option("--on/--off", default=True, help="Enable or disable")
|
||||
@click.pass_context
|
||||
def mcp_enable(ctx: click.Context, server_id: str, app: str, on: bool) -> None:
|
||||
"""Enable or disable an MCP server for an app."""
|
||||
app = _resolve_app(app)
|
||||
db = connect_db(ctx.obj.get("db_path"))
|
||||
try:
|
||||
col = f"enabled_{app}"
|
||||
db.execute(f"UPDATE mcp_servers SET {col}=? WHERE id=?", (int(on), server_id))
|
||||
if db.total_changes == 0:
|
||||
click.echo(f"MCP server '{server_id}' not found", err=True)
|
||||
raise SystemExit(1)
|
||||
db.commit()
|
||||
click.echo(f"MCP '{server_id}' {'enabled' if on else 'disabled'} for {app}")
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# Skills
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
@cli.group()
|
||||
def skills() -> None:
|
||||
"""Manage installed skills."""
|
||||
pass
|
||||
|
||||
|
||||
@skills.command("list")
|
||||
@click.pass_context
|
||||
def skills_list(ctx: click.Context) -> None:
|
||||
"""List all installed skills."""
|
||||
db = connect_db(ctx.obj.get("db_path"))
|
||||
try:
|
||||
enabled_columns = _enabled_app_columns(db, "skills")
|
||||
rows = db.execute(
|
||||
"SELECT id, name, description, repo_owner, repo_name, "
|
||||
f"{enabled_columns} "
|
||||
"FROM skills ORDER BY name"
|
||||
).fetchall()
|
||||
|
||||
if ctx.obj.get("json_mode"):
|
||||
_json.dump([dict(r) for r in rows], sys.stdout, indent=2, default=str)
|
||||
return
|
||||
|
||||
click.echo(_table(["Name", "Source", "Apps", "Description"], [
|
||||
(r["name"], f"{r['repo_owner']}/{r['repo_name']}" if r["repo_owner"] else "local",
|
||||
_enabled_apps_str(r), (r["description"] or "")[:50])
|
||||
for r in rows
|
||||
]))
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
@skills.command("repos")
|
||||
@click.pass_context
|
||||
def skills_repos(ctx: click.Context) -> None:
|
||||
"""List registered skill repositories."""
|
||||
db = connect_db(ctx.obj.get("db_path"))
|
||||
try:
|
||||
rows = db.execute(
|
||||
"SELECT owner, name, branch, enabled FROM skill_repos ORDER BY owner, name"
|
||||
).fetchall()
|
||||
|
||||
if ctx.obj.get("json_mode"):
|
||||
_json.dump([dict(r) for r in rows], sys.stdout, indent=2, default=str)
|
||||
return
|
||||
|
||||
click.echo(_table(["Owner", "Name", "Branch", "Enabled"], [
|
||||
(r["owner"], r["name"], r["branch"], "yes" if r["enabled"] else "no")
|
||||
for r in rows
|
||||
]))
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# Usage
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
@cli.group()
|
||||
def usage() -> None:
|
||||
"""View API usage and cost statistics."""
|
||||
pass
|
||||
|
||||
|
||||
@usage.command("stats")
|
||||
@click.option("--days", "-d", default=30, type=int, help="Number of days to show")
|
||||
@click.option("--app", "-a", help="Filter by app type")
|
||||
@click.pass_context
|
||||
def usage_stats(ctx: click.Context, days: int, app: str | None) -> None:
|
||||
"""Show usage statistics."""
|
||||
app = _resolve_app(app)
|
||||
db = connect_db(ctx.obj.get("db_path"))
|
||||
try:
|
||||
if app:
|
||||
rows = db.execute(
|
||||
"SELECT model, COUNT(*) as requests, SUM(input_tokens) as input_tok, "
|
||||
"SUM(output_tokens) as output_tok, "
|
||||
"SUM(CAST(total_cost_usd AS REAL)) as cost "
|
||||
"FROM proxy_request_logs "
|
||||
"WHERE app_type=? AND created_at > unixepoch('now', ? || ' days') "
|
||||
"GROUP BY model ORDER BY cost DESC",
|
||||
(app, f"-{days}"),
|
||||
).fetchall()
|
||||
else:
|
||||
rows = db.execute(
|
||||
"SELECT app_type, model, COUNT(*) as requests, SUM(input_tokens) as input_tok, "
|
||||
"SUM(output_tokens) as output_tok, "
|
||||
"SUM(CAST(total_cost_usd AS REAL)) as cost "
|
||||
"FROM proxy_request_logs "
|
||||
"WHERE created_at > unixepoch('now', ? || ' days') "
|
||||
"GROUP BY app_type, model ORDER BY cost DESC",
|
||||
(f"-{days}",),
|
||||
).fetchall()
|
||||
|
||||
rows = _normalise_usage_rows(rows)
|
||||
|
||||
if ctx.obj.get("json_mode"):
|
||||
_json.dump(rows, sys.stdout, indent=2, default=str)
|
||||
return
|
||||
|
||||
total_cost = sum(r["cost"] for r in rows)
|
||||
total_requests = sum(r["requests"] for r in rows)
|
||||
total_in = sum(r["input_tok"] for r in rows)
|
||||
total_out = sum(r["output_tok"] for r in rows)
|
||||
|
||||
if app:
|
||||
click.echo(_table(["Model", "Requests", "Input Tokens", "Output Tokens", "Cost (USD)"], [
|
||||
(r["model"], r["requests"], f'{r["input_tok"]:,}', f'{r["output_tok"]:,}', f'${r["cost"]:.4f}')
|
||||
for r in rows
|
||||
]))
|
||||
else:
|
||||
click.echo(_table(["App", "Model", "Requests", "Input Tokens", "Output Tokens", "Cost (USD)"], [
|
||||
(r["app_type"], r["model"], r["requests"],
|
||||
f'{r["input_tok"]:,}', f'{r["output_tok"]:,}', f'${r["cost"]:.4f}')
|
||||
for r in rows
|
||||
]))
|
||||
|
||||
click.echo()
|
||||
click.echo(f" Total ({days} days): {total_requests:,} requests | "
|
||||
f"{total_in + total_out:,} tokens | ${total_cost:.4f}")
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
@usage.command("logs")
|
||||
@click.option("--limit", "-n", default=20, type=int, help="Number of recent logs to show")
|
||||
@click.option("--app", "-a", help="Filter by app type")
|
||||
@click.pass_context
|
||||
def usage_logs(ctx: click.Context, limit: int, app: str | None) -> None:
|
||||
"""Show recent API request logs."""
|
||||
app = _resolve_app(app)
|
||||
db = connect_db(ctx.obj.get("db_path"))
|
||||
try:
|
||||
if app:
|
||||
rows = db.execute(
|
||||
"SELECT app_type, model, status_code, input_tokens, output_tokens, "
|
||||
"total_cost_usd, latency_ms, datetime(created_at, 'unixepoch') as ts "
|
||||
"FROM proxy_request_logs WHERE app_type=? "
|
||||
"ORDER BY created_at DESC LIMIT ?",
|
||||
(app, limit),
|
||||
).fetchall()
|
||||
else:
|
||||
rows = db.execute(
|
||||
"SELECT app_type, model, status_code, input_tokens, output_tokens, "
|
||||
"total_cost_usd, latency_ms, datetime(created_at, 'unixepoch') as ts "
|
||||
"FROM proxy_request_logs "
|
||||
"ORDER BY created_at DESC LIMIT ?",
|
||||
(limit,),
|
||||
).fetchall()
|
||||
|
||||
if ctx.obj.get("json_mode"):
|
||||
_json.dump([dict(r) for r in rows], sys.stdout, indent=2, default=str)
|
||||
return
|
||||
|
||||
click.echo(_table(["App", "Model", "Status", "Tokens (in/out)", "Cost", "Latency", "Time"], [
|
||||
(r["app_type"], r["model"][:25], r["status_code"],
|
||||
f'{r["input_tokens"]}/{r["output_tokens"]}',
|
||||
f'${float(r["total_cost_usd"] or 0):.4f}',
|
||||
f'{r["latency_ms"]}ms', r["ts"])
|
||||
for r in rows
|
||||
]))
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# Settings
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
@cli.group()
|
||||
def settings() -> None:
|
||||
"""View and manage CC Switch settings."""
|
||||
pass
|
||||
|
||||
|
||||
@settings.command("list")
|
||||
@click.pass_context
|
||||
def settings_list(ctx: click.Context) -> None:
|
||||
"""List all settings key-value pairs."""
|
||||
db = connect_db(ctx.obj.get("db_path"))
|
||||
try:
|
||||
rows = db.execute("SELECT key, value FROM settings ORDER BY key").fetchall()
|
||||
if ctx.obj.get("json_mode"):
|
||||
_json.dump({
|
||||
r["key"]: _mask_value(r["value"], r["key"]) for r in rows
|
||||
}, sys.stdout, indent=2)
|
||||
return
|
||||
click.echo(_table(["Key", "Value"], [
|
||||
(r["key"], _mask_sensitive(r["key"], r["value"])[:80]) for r in rows
|
||||
]))
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
@settings.command("get")
|
||||
@click.argument("key")
|
||||
@click.pass_context
|
||||
def settings_get(ctx: click.Context, key: str) -> None:
|
||||
"""Get a specific setting value."""
|
||||
db = connect_db(ctx.obj.get("db_path"))
|
||||
try:
|
||||
row = db.execute("SELECT value FROM settings WHERE key=?", (key,)).fetchone()
|
||||
if not row:
|
||||
click.echo(f"Setting '{key}' not found", err=True)
|
||||
raise SystemExit(1)
|
||||
if ctx.obj.get("json_mode"):
|
||||
_json.dump({key: _mask_value(row["value"], key)}, sys.stdout, indent=2)
|
||||
return
|
||||
click.echo(_mask_sensitive(key, row["value"]))
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
@settings.command("set")
|
||||
@click.argument("key")
|
||||
@click.argument("value")
|
||||
@click.pass_context
|
||||
def settings_set(ctx: click.Context, key: str, value: str) -> None:
|
||||
"""Set a setting value."""
|
||||
db = connect_db(ctx.obj.get("db_path"))
|
||||
try:
|
||||
db.execute(
|
||||
"INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)", (key, value)
|
||||
)
|
||||
db.commit()
|
||||
click.echo(f"Set '{key}' = '{_mask_sensitive(key, value)}'")
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# Sessions
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
@cli.group()
|
||||
def sessions() -> None:
|
||||
"""Browse and search AI conversation sessions."""
|
||||
pass
|
||||
|
||||
|
||||
@sessions.command("list")
|
||||
@click.option("--app", "-a", help="Filter by app type")
|
||||
@click.option("--limit", "-n", default=20, type=int)
|
||||
@click.pass_context
|
||||
def sessions_list(ctx: click.Context, app: str | None, limit: int) -> None:
|
||||
"""List recent conversation sessions."""
|
||||
app = _resolve_app(app)
|
||||
db = connect_db(ctx.obj.get("db_path"))
|
||||
try:
|
||||
if app:
|
||||
rows = db.execute(
|
||||
"SELECT file_path, last_modified, last_synced_at "
|
||||
"FROM session_log_sync WHERE file_path LIKE ? "
|
||||
"ORDER BY last_modified DESC LIMIT ?",
|
||||
(f"%{app}%", limit),
|
||||
).fetchall()
|
||||
else:
|
||||
rows = db.execute(
|
||||
"SELECT file_path, last_modified, last_synced_at "
|
||||
"FROM session_log_sync ORDER BY last_modified DESC LIMIT ?",
|
||||
(limit,),
|
||||
).fetchall()
|
||||
|
||||
if ctx.obj.get("json_mode"):
|
||||
_json.dump([dict(r) for r in rows], sys.stdout, indent=2, default=str)
|
||||
return
|
||||
|
||||
if not rows:
|
||||
click.echo("No session logs found. Enable usage tracking in CC Switch first.")
|
||||
return
|
||||
|
||||
click.echo(_table(["Path", "Last Modified", "Last Synced"], [
|
||||
(r["file_path"][:60],
|
||||
r["last_modified"],
|
||||
r["last_synced_at"])
|
||||
for r in rows
|
||||
]))
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# Entry point
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
def main() -> None:
|
||||
"""Main entry point for CC Switch CLI."""
|
||||
cli(prog_name="ccswitch")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1 @@
|
||||
"""Core modules for ccswitch CLI."""
|
||||
146
cc-switch/agent-harness/cli_anything/ccswitch/skills/SKILL.md
Normal file
146
cc-switch/agent-harness/cli_anything/ccswitch/skills/SKILL.md
Normal file
@@ -0,0 +1,146 @@
|
||||
---
|
||||
name: "cli-anything-ccswitch"
|
||||
description: "CLI interface for CC Switch — manage AI coding tool configurations from the terminal"
|
||||
---
|
||||
|
||||
# CC Switch CLI
|
||||
|
||||
CLI harness for CC Switch, a desktop app that manages AI coding tool (Claude Code,
|
||||
Codex, Gemini CLI, OpenCode, OpenClaw, Hermes) configurations. Built with Click
|
||||
and the CLI-Anything methodology.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Python 3.10+
|
||||
- CC Switch installed with an active database at `~/.cc-switch/cc-switch.db`
|
||||
|
||||
## Installation
|
||||
|
||||
```bash
|
||||
pip install -e .
|
||||
```
|
||||
|
||||
After installation, the command `cli-anything-ccswitch` is available.
|
||||
|
||||
## Command Groups
|
||||
|
||||
| Group | Description |
|
||||
|-------|-------------|
|
||||
| `status` | Show a quick database overview |
|
||||
| `providers` | Manage AI provider configurations (list, get, set-current) |
|
||||
| `proxy` | Manage the local HTTP proxy server (status, config) |
|
||||
| `mcp` | Manage MCP (Model Context Protocol) servers (list, enable) |
|
||||
| `skills` | Manage installed skills (list, repos) |
|
||||
| `usage` | View API usage and cost statistics (stats, logs) |
|
||||
| `settings` | View and manage CC Switch settings (list, get, set) |
|
||||
| `sessions` | Browse and search AI conversation sessions (list) |
|
||||
|
||||
## Global Options
|
||||
|
||||
- `--json` — Output in machine-readable JSON format (recommended for agent use)
|
||||
- `--db PATH` — Override the database path
|
||||
|
||||
## Agent-Specific Guidance
|
||||
|
||||
### JSON Output
|
||||
|
||||
Always use `--json` for programmatic consumption. Place it **before** the
|
||||
subcommand:
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch --json providers list
|
||||
cli-anything-ccswitch --json usage stats --days 30
|
||||
cli-anything-ccswitch --json providers get <id> --app claude
|
||||
```
|
||||
|
||||
### Sensitive Values
|
||||
|
||||
API tokens, keys, and secrets are masked in all output. The `settings_config`
|
||||
field in provider details shows masked values (e.g., `sk-bc089...5cbb`).
|
||||
|
||||
### Exit Codes
|
||||
|
||||
- `0` — Success
|
||||
- `1` — Error (e.g., resource not found, invalid app type)
|
||||
|
||||
### App Types
|
||||
|
||||
Valid app types: `claude`, `codex`, `gemini`, `opencode`, `openclaw`, `hermes`.
|
||||
|
||||
## Examples
|
||||
|
||||
### List all providers
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch providers list
|
||||
cli-anything-ccswitch --json providers list --app claude
|
||||
```
|
||||
|
||||
### Switch active provider
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch providers set-current <provider-id> --app claude
|
||||
```
|
||||
|
||||
### Check proxy status
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch proxy status --app claude
|
||||
cli-anything-ccswitch proxy config --app claude --set-port 8080
|
||||
```
|
||||
|
||||
### View usage stats
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch usage stats --days 7
|
||||
cli-anything-ccswitch --json usage stats --days 30 --app claude
|
||||
cli-anything-ccswitch usage logs --limit 10
|
||||
```
|
||||
|
||||
### List skills
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch skills list
|
||||
cli-anything-ccswitch skills repos
|
||||
```
|
||||
|
||||
### MCP servers
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch mcp list
|
||||
cli-anything-ccswitch mcp enable <server-id> --app claude --on
|
||||
```
|
||||
|
||||
### Settings
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch settings list
|
||||
cli-anything-ccswitch settings get <key>
|
||||
cli-anything-ccswitch settings set <key> <value>
|
||||
```
|
||||
|
||||
### Sessions
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch sessions list --app claude --limit 10
|
||||
```
|
||||
|
||||
### Status overview
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch status
|
||||
cli-anything-ccswitch
|
||||
cli-anything-ccswitch --json
|
||||
```
|
||||
|
||||
## Error Handling
|
||||
|
||||
When a resource is not found, the CLI prints an error message to stderr and
|
||||
exits with code 1. Agents should check the exit code before parsing output.
|
||||
|
||||
## Database
|
||||
|
||||
The CLI reads from the live CC Switch SQLite database. All read operations
|
||||
are safe and do not modify the database. Write operations (`providers
|
||||
set-current`, `proxy config`, `mcp enable`, `settings set`) modify the
|
||||
database directly.
|
||||
86
cc-switch/agent-harness/cli_anything/ccswitch/tests/TEST.md
Normal file
86
cc-switch/agent-harness/cli_anything/ccswitch/tests/TEST.md
Normal file
@@ -0,0 +1,86 @@
|
||||
# CC Switch CLI - Test Plan
|
||||
|
||||
## Test Inventory
|
||||
|
||||
| Test File | Tests | Type |
|
||||
|-----------|-------|------|
|
||||
| `test_core.py` | 46 | Unit tests with synthetic data |
|
||||
| `test_full_e2e.py` | 20 | E2E tests against a real or configured CC Switch database |
|
||||
|
||||
## Unit Test Coverage
|
||||
|
||||
### Database and Config Helpers
|
||||
|
||||
- `CCSWITCH_HOME` path resolution
|
||||
- DB, config, and settings path helpers
|
||||
- SQLite in-memory connection setup
|
||||
- Config load/save, including empty-file behavior
|
||||
- Valid app type list
|
||||
|
||||
### CLI Core Helpers
|
||||
|
||||
- App resolution for valid, invalid, and `None` app values
|
||||
- Table formatting for normal, empty, and single-row data
|
||||
- Sensitive-value masking for tokens, API keys, passwords, short secrets, nested dicts, nested lists, authorization headers, and hotkey false positives
|
||||
|
||||
### CLI Commands
|
||||
|
||||
- Top-level help and command group help
|
||||
- Explicit `status --json`
|
||||
- `providers get --json` masking for nested `settings_config`
|
||||
- Plain `providers get` masking for nested list/header values
|
||||
- `settings get --json` object output
|
||||
- `settings get/list/set` masking for sensitive settings
|
||||
- `sessions list --json` empty-result output
|
||||
- `mcp list` and `skills list` include OpenClaw state
|
||||
- `usage stats` normalizes nullable token/cost aggregates
|
||||
- `providers set-current` DB update plus live Codex config write
|
||||
|
||||
### Live Config Write Helpers
|
||||
|
||||
- Claude settings JSON env merge
|
||||
- Codex provider TOML merge preserves unrelated `config.toml` sections plus `auth.json` writes
|
||||
- Gemini `.env` merge with env key validation and newline escaping
|
||||
- OpenCode provider merge into `~/.config/opencode/opencode.json`
|
||||
- Temporary file cleanup after secure writes
|
||||
|
||||
## E2E Coverage
|
||||
|
||||
### Providers
|
||||
|
||||
- List providers
|
||||
- JSON output
|
||||
- App filtering
|
||||
- Nonexistent provider error
|
||||
- API key leak prevention
|
||||
|
||||
### Other Command Groups
|
||||
|
||||
- Skills list and repos
|
||||
- Usage stats and logs
|
||||
- MCP list
|
||||
- Settings list
|
||||
- Proxy status
|
||||
- Default status overview and JSON overview
|
||||
|
||||
## Latest Verification
|
||||
|
||||
```powershell
|
||||
python -m pytest cli_anything\ccswitch\tests -q
|
||||
.................................................................. [100%]
|
||||
66 passed in 2.61s
|
||||
```
|
||||
|
||||
Additional checks:
|
||||
|
||||
```powershell
|
||||
git diff --check
|
||||
python -m compileall -q cli_anything
|
||||
```
|
||||
|
||||
Both checks passed.
|
||||
|
||||
## Coverage Notes
|
||||
|
||||
- Write operations covered by synthetic tests: `providers set-current`, `settings set`, and live config writes for Claude, Codex, Gemini, and OpenCode.
|
||||
- Destructive live-database operations and process-control flows should be validated only in a disposable CC Switch environment.
|
||||
903
cc-switch/agent-harness/cli_anything/ccswitch/tests/test_core.py
Normal file
903
cc-switch/agent-harness/cli_anything/ccswitch/tests/test_core.py
Normal file
@@ -0,0 +1,903 @@
|
||||
"""Unit tests for CC Switch CLI core modules — synthetic data, no external deps."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import sqlite3
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from contextlib import contextmanager
|
||||
|
||||
import pytest
|
||||
import tomlkit
|
||||
|
||||
from cli_anything.ccswitch.utils.db import (
|
||||
get_cc_switch_dir, get_db_path, get_config_path,
|
||||
get_settings_path, connect_db,
|
||||
load_config, save_config, load_settings,
|
||||
VALID_APP_TYPES,
|
||||
)
|
||||
from cli_anything.ccswitch.ccswitch_cli import (
|
||||
_resolve_app, _table, _mask_sensitive, _mask_value, _write_live_config,
|
||||
)
|
||||
|
||||
|
||||
# ───────────────────────────
|
||||
# Database path helpers
|
||||
# ───────────────────────────
|
||||
|
||||
def test_get_cc_switch_dir_custom():
|
||||
os.environ["CCSWITCH_HOME"] = "/tmp/ccswitch-test"
|
||||
assert get_cc_switch_dir() == Path("/tmp/ccswitch-test/.cc-switch")
|
||||
del os.environ["CCSWITCH_HOME"]
|
||||
|
||||
|
||||
def test_get_db_path():
|
||||
os.environ["CCSWITCH_HOME"] = "/home/user"
|
||||
assert get_db_path() == Path("/home/user/.cc-switch/cc-switch.db")
|
||||
del os.environ["CCSWITCH_HOME"]
|
||||
|
||||
|
||||
def test_get_config_path():
|
||||
os.environ["CCSWITCH_HOME"] = "/x"
|
||||
assert get_config_path() == Path("/x/.cc-switch/config.json")
|
||||
del os.environ["CCSWITCH_HOME"]
|
||||
|
||||
|
||||
def test_get_settings_path():
|
||||
os.environ["CCSWITCH_HOME"] = "/y"
|
||||
assert get_settings_path() == Path("/y/.cc-switch/settings.json")
|
||||
del os.environ["CCSWITCH_HOME"]
|
||||
|
||||
|
||||
def test_valid_app_types():
|
||||
assert "claude" in VALID_APP_TYPES
|
||||
assert "codex" in VALID_APP_TYPES
|
||||
assert "gemini" in VALID_APP_TYPES
|
||||
assert "opencode" in VALID_APP_TYPES
|
||||
assert "openclaw" in VALID_APP_TYPES
|
||||
assert "hermes" in VALID_APP_TYPES
|
||||
assert len(VALID_APP_TYPES) == 6
|
||||
|
||||
|
||||
# ───────────────────────────
|
||||
# Database connection
|
||||
# ───────────────────────────
|
||||
|
||||
def test_connect_db_in_memory():
|
||||
conn = connect_db(Path(":memory:"))
|
||||
conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)")
|
||||
conn.execute("INSERT INTO test VALUES (1)")
|
||||
assert conn.execute("SELECT COUNT(*) FROM test").fetchone()[0] == 1
|
||||
conn.close()
|
||||
|
||||
|
||||
# ───────────────────────────
|
||||
# Config load/save
|
||||
# ───────────────────────────
|
||||
|
||||
def test_load_config_missing():
|
||||
with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as f:
|
||||
pass # empty file
|
||||
result = load_config(Path(f.name))
|
||||
assert result == {}
|
||||
os.unlink(f.name)
|
||||
|
||||
|
||||
def test_save_and_load_config():
|
||||
path = Path(tempfile.mktemp(suffix=".json"))
|
||||
data = {"version": 2, "apps": {"claude": {"providers": {}}}}
|
||||
save_config(data, path)
|
||||
loaded = load_config(path)
|
||||
assert loaded["version"] == 2
|
||||
assert "apps" in loaded
|
||||
os.unlink(path)
|
||||
|
||||
|
||||
def test_load_settings_missing():
|
||||
os.environ["CCSWITCH_HOME"] = "/nonexistent-tmp-xyz"
|
||||
result = load_settings()
|
||||
assert result == {}
|
||||
del os.environ["CCSWITCH_HOME"]
|
||||
|
||||
|
||||
# ───────────────────────────
|
||||
# App resolution
|
||||
# ───────────────────────────
|
||||
|
||||
def test_resolve_app_valid():
|
||||
assert _resolve_app("claude") == "claude"
|
||||
assert _resolve_app("CLAUDE") == "claude"
|
||||
assert _resolve_app("OpenCode") == "opencode"
|
||||
assert _resolve_app("Hermes") == "hermes"
|
||||
|
||||
|
||||
def test_resolve_app_none():
|
||||
assert _resolve_app(None) is None
|
||||
|
||||
|
||||
def test_resolve_app_invalid():
|
||||
from click import BadParameter
|
||||
with pytest.raises(BadParameter):
|
||||
_resolve_app("invalid-app")
|
||||
|
||||
|
||||
# ───────────────────────────
|
||||
# Table formatting
|
||||
# ───────────────────────────
|
||||
|
||||
def test_table_basic():
|
||||
result = _table(["Name", "Count"], [("Alice", 5), ("Bob", 3)])
|
||||
assert "Name" in result
|
||||
assert "Count" in result
|
||||
assert "Alice" in result
|
||||
assert "5" in result
|
||||
assert "Bob" in result
|
||||
assert "3" in result
|
||||
|
||||
|
||||
def test_table_empty():
|
||||
assert _table(["Col"], []) == "(empty)"
|
||||
|
||||
|
||||
def test_table_single():
|
||||
result = _table(["A"], [("x",)])
|
||||
assert "A" in result
|
||||
assert "x" in result
|
||||
|
||||
|
||||
# ───────────────────────────
|
||||
# Sensitive masking
|
||||
# ───────────────────────────
|
||||
|
||||
def test_mask_api_token():
|
||||
result = _mask_sensitive("ANTHROPIC_AUTH_TOKEN", "sk-bc089d043dc34c6c9022831769d85cbb")
|
||||
assert "sk-bc089" in result
|
||||
assert "5cbb" in result
|
||||
assert "bc089d043dc34c6c" not in result # middle is masked
|
||||
|
||||
|
||||
def test_mask_api_key():
|
||||
result = _mask_sensitive("api_key", "sec-1234567890abcdef")
|
||||
assert "..." in result or "***" in result or "sec-1234" in result
|
||||
|
||||
|
||||
def test_mask_password():
|
||||
result = _mask_sensitive("password", "mysecretkey")
|
||||
assert "mysec" in result or "***" in result
|
||||
|
||||
|
||||
def test_mask_hotkey_is_not_treated_as_secret():
|
||||
assert _mask_sensitive("hotkey", "ctrl+k") == "ctrl+k"
|
||||
|
||||
|
||||
def test_mask_short_value():
|
||||
result = _mask_sensitive("secret", "abc")
|
||||
assert result == "***"
|
||||
|
||||
|
||||
def test_mask_non_sensitive():
|
||||
result = _mask_sensitive("model", "claude-sonnet-4-6")
|
||||
assert "claude-sonnet-4-6" in result
|
||||
assert "***" not in result
|
||||
|
||||
|
||||
def test_mask_nested_dict():
|
||||
result = _mask_sensitive("env", {
|
||||
"ANTHROPIC_AUTH_TOKEN": "sk-test1234567890",
|
||||
"ANTHROPIC_MODEL": "deepseek-v4-pro",
|
||||
"ANTHROPIC_BASE_URL": "https://api.deepseek.com",
|
||||
})
|
||||
assert "sk-test1" in result
|
||||
assert "7890" in result
|
||||
assert "deepseek-v4-pro" in result
|
||||
assert "https://api.deepseek.com" in result
|
||||
|
||||
|
||||
# ───────────────────────────
|
||||
# CLI help tests
|
||||
# ───────────────────────────
|
||||
|
||||
def test_mask_value_nested_json():
|
||||
result = _mask_value({
|
||||
"env": {
|
||||
"ANTHROPIC_AUTH_TOKEN": "sk-test1234567890",
|
||||
"ANTHROPIC_MODEL": "deepseek-v4-pro",
|
||||
},
|
||||
"headers": [
|
||||
{"authorization": "Bearer abcdef1234567890"},
|
||||
],
|
||||
})
|
||||
|
||||
assert result["env"]["ANTHROPIC_AUTH_TOKEN"] != "sk-test1234567890"
|
||||
assert result["env"]["ANTHROPIC_MODEL"] == "deepseek-v4-pro"
|
||||
assert result["headers"][0]["authorization"] != "Bearer abcdef1234567890"
|
||||
|
||||
|
||||
def test_mask_sensitive_nested_list():
|
||||
result = _mask_sensitive("headers", [
|
||||
{"authorization": "Bearer abcdef1234567890"},
|
||||
])
|
||||
|
||||
assert "Bearer abcdef1234567890" not in result
|
||||
assert "headers" not in result
|
||||
|
||||
|
||||
from click.testing import CliRunner
|
||||
from cli_anything.ccswitch.ccswitch_cli import cli
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def runner():
|
||||
return CliRunner()
|
||||
|
||||
|
||||
def _init_cli_db(path: Path) -> sqlite3.Connection:
|
||||
conn = sqlite3.connect(path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.executescript("""
|
||||
CREATE TABLE providers (
|
||||
id TEXT,
|
||||
app_type TEXT,
|
||||
name TEXT,
|
||||
category TEXT,
|
||||
is_current INTEGER,
|
||||
sort_index INTEGER,
|
||||
settings_config TEXT
|
||||
);
|
||||
CREATE TABLE settings (
|
||||
key TEXT PRIMARY KEY,
|
||||
value TEXT
|
||||
);
|
||||
CREATE TABLE skills (
|
||||
id TEXT,
|
||||
name TEXT DEFAULT '',
|
||||
description TEXT,
|
||||
repo_owner TEXT,
|
||||
repo_name TEXT,
|
||||
enabled_claude INTEGER DEFAULT 0,
|
||||
enabled_codex INTEGER DEFAULT 0,
|
||||
enabled_gemini INTEGER DEFAULT 0,
|
||||
enabled_opencode INTEGER DEFAULT 0,
|
||||
enabled_openclaw INTEGER DEFAULT 0,
|
||||
enabled_hermes INTEGER DEFAULT 0
|
||||
);
|
||||
CREATE TABLE mcp_servers (
|
||||
id TEXT,
|
||||
name TEXT DEFAULT '',
|
||||
description TEXT,
|
||||
enabled_claude INTEGER DEFAULT 0,
|
||||
enabled_codex INTEGER DEFAULT 0,
|
||||
enabled_gemini INTEGER DEFAULT 0,
|
||||
enabled_opencode INTEGER DEFAULT 0,
|
||||
enabled_openclaw INTEGER DEFAULT 0,
|
||||
enabled_hermes INTEGER DEFAULT 0
|
||||
);
|
||||
CREATE TABLE proxy_request_logs (
|
||||
app_type TEXT,
|
||||
model TEXT,
|
||||
status_code INTEGER,
|
||||
input_tokens INTEGER,
|
||||
output_tokens INTEGER,
|
||||
total_cost_usd TEXT,
|
||||
latency_ms INTEGER,
|
||||
created_at INTEGER
|
||||
);
|
||||
CREATE TABLE session_log_sync (
|
||||
file_path TEXT,
|
||||
last_modified INTEGER,
|
||||
last_synced_at INTEGER
|
||||
);
|
||||
""")
|
||||
return conn
|
||||
|
||||
|
||||
def test_providers_get_json_masks_settings_config(runner, tmp_path):
|
||||
db_path = tmp_path / "cc-switch.db"
|
||||
conn = _init_cli_db(db_path)
|
||||
conn.execute(
|
||||
"INSERT INTO providers VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
"deepseek",
|
||||
"claude",
|
||||
"DeepSeek",
|
||||
"default",
|
||||
1,
|
||||
0,
|
||||
json.dumps({
|
||||
"env": {
|
||||
"ANTHROPIC_AUTH_TOKEN": "sk-test1234567890",
|
||||
"ANTHROPIC_MODEL": "deepseek-v4-pro",
|
||||
},
|
||||
"headers": [{"authorization": "Bearer abcdef1234567890"}],
|
||||
}),
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
result = runner.invoke(cli, [
|
||||
"--json", "--db", str(db_path),
|
||||
"providers", "get", "deepseek", "--app", "claude",
|
||||
])
|
||||
|
||||
assert result.exit_code == 0
|
||||
data = json.loads(result.output)
|
||||
output = json.dumps(data)
|
||||
assert "sk-test1234567890" not in output
|
||||
assert "Bearer abcdef1234567890" not in output
|
||||
assert data["settings_config"]["env"]["ANTHROPIC_MODEL"] == "deepseek-v4-pro"
|
||||
|
||||
|
||||
def test_providers_get_plain_masks_nested_list(runner, tmp_path):
|
||||
db_path = tmp_path / "cc-switch.db"
|
||||
conn = _init_cli_db(db_path)
|
||||
conn.execute(
|
||||
"INSERT INTO providers VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
"deepseek",
|
||||
"claude",
|
||||
"DeepSeek",
|
||||
"default",
|
||||
1,
|
||||
0,
|
||||
json.dumps({
|
||||
"headers": [{"authorization": "Bearer abcdef1234567890"}],
|
||||
"model": "deepseek-v4-pro",
|
||||
}),
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
result = runner.invoke(cli, [
|
||||
"--db", str(db_path),
|
||||
"providers", "get", "deepseek", "--app", "claude",
|
||||
])
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert "Bearer abcdef1234567890" not in result.output
|
||||
assert "deepseek-v4-pro" in result.output
|
||||
|
||||
|
||||
def test_status_command_json(runner, tmp_path):
|
||||
db_path = tmp_path / "cc-switch.db"
|
||||
conn = _init_cli_db(db_path)
|
||||
conn.execute(
|
||||
"INSERT INTO providers VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
("deepseek", "claude", "DeepSeek", "default", 1, 0, "{}"),
|
||||
)
|
||||
conn.execute("INSERT INTO skills (id) VALUES (?)", ("skill-1",))
|
||||
conn.execute("INSERT INTO mcp_servers (id) VALUES (?)", ("mcp-1",))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
result = runner.invoke(cli, ["--json", "--db", str(db_path), "status"])
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert json.loads(result.output) == {
|
||||
"providers": 1,
|
||||
"current": {"claude": "DeepSeek"},
|
||||
"skills": 1,
|
||||
"mcp_servers": 1,
|
||||
}
|
||||
|
||||
|
||||
def test_sessions_list_json_empty_outputs_array(runner, tmp_path):
|
||||
db_path = tmp_path / "cc-switch.db"
|
||||
conn = _init_cli_db(db_path)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
result = runner.invoke(cli, ["--json", "--db", str(db_path), "sessions", "list"])
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert json.loads(result.output) == []
|
||||
|
||||
|
||||
def test_settings_get_json_outputs_object(runner, tmp_path):
|
||||
db_path = tmp_path / "cc-switch.db"
|
||||
conn = _init_cli_db(db_path)
|
||||
conn.execute("INSERT INTO settings VALUES (?, ?)", ("theme", "dark"))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
result = runner.invoke(cli, [
|
||||
"--json", "--db", str(db_path), "settings", "get", "theme",
|
||||
])
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert json.loads(result.output) == {"theme": "dark"}
|
||||
|
||||
|
||||
def test_settings_outputs_mask_sensitive_values(runner, tmp_path):
|
||||
db_path = tmp_path / "cc-switch.db"
|
||||
conn = _init_cli_db(db_path)
|
||||
conn.execute("INSERT INTO settings VALUES (?, ?)", ("OPENAI_API_KEY", "sk-secret1234567890"))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
get_result = runner.invoke(cli, [
|
||||
"--json", "--db", str(db_path), "settings", "get", "OPENAI_API_KEY",
|
||||
])
|
||||
list_result = runner.invoke(cli, [
|
||||
"--json", "--db", str(db_path), "settings", "list",
|
||||
])
|
||||
set_result = runner.invoke(cli, [
|
||||
"--db", str(db_path), "settings", "set", "OPENAI_API_KEY", "sk-newsecret1234567890",
|
||||
])
|
||||
|
||||
assert get_result.exit_code == 0
|
||||
assert list_result.exit_code == 0
|
||||
assert set_result.exit_code == 0
|
||||
combined = get_result.output + list_result.output + set_result.output
|
||||
assert "sk-secret1234567890" not in combined
|
||||
assert "sk-newsecret1234567890" not in combined
|
||||
assert json.loads(get_result.output)["OPENAI_API_KEY"] != "sk-secret1234567890"
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
assert conn.execute(
|
||||
"SELECT value FROM settings WHERE key=?", ("OPENAI_API_KEY",)
|
||||
).fetchone()[0] == "sk-newsecret1234567890"
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_mcp_and_skills_list_include_openclaw(runner, tmp_path):
|
||||
db_path = tmp_path / "cc-switch.db"
|
||||
conn = _init_cli_db(db_path)
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO mcp_servers (
|
||||
id, name, description, enabled_claude, enabled_codex, enabled_gemini,
|
||||
enabled_opencode, enabled_openclaw, enabled_hermes
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
("mcp-1", "OpenClaw MCP", "desc", 0, 0, 0, 0, 1, 0),
|
||||
)
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO skills (
|
||||
id, name, description, repo_owner, repo_name, enabled_claude,
|
||||
enabled_codex, enabled_gemini, enabled_opencode, enabled_openclaw,
|
||||
enabled_hermes
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
("skill-1", "OpenClaw Skill", "desc", "owner", "repo", 0, 0, 0, 0, 1, 0),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
mcp_json = runner.invoke(cli, ["--json", "--db", str(db_path), "mcp", "list"])
|
||||
skills_json = runner.invoke(cli, ["--json", "--db", str(db_path), "skills", "list"])
|
||||
mcp_text = runner.invoke(cli, ["--db", str(db_path), "mcp", "list"])
|
||||
skills_text = runner.invoke(cli, ["--db", str(db_path), "skills", "list"])
|
||||
|
||||
assert mcp_json.exit_code == 0
|
||||
assert skills_json.exit_code == 0
|
||||
assert mcp_text.exit_code == 0
|
||||
assert skills_text.exit_code == 0
|
||||
assert json.loads(mcp_json.output)[0]["enabled_openclaw"] == 1
|
||||
assert json.loads(skills_json.output)[0]["enabled_openclaw"] == 1
|
||||
assert "openclaw" in mcp_text.output
|
||||
assert "openclaw" in skills_text.output
|
||||
|
||||
|
||||
def test_usage_stats_normalises_nullable_aggregates(runner, tmp_path):
|
||||
db_path = tmp_path / "cc-switch.db"
|
||||
conn = _init_cli_db(db_path)
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO proxy_request_logs (
|
||||
app_type, model, status_code, input_tokens, output_tokens,
|
||||
total_cost_usd, latency_ms, created_at
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, CAST(strftime('%s', 'now') AS INTEGER))
|
||||
""",
|
||||
("claude", "deepseek-v4-pro", 200, None, None, None, 100),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
text_result = runner.invoke(cli, ["--db", str(db_path), "usage", "stats", "--days", "30"])
|
||||
json_result = runner.invoke(cli, [
|
||||
"--json", "--db", str(db_path), "usage", "stats", "--days", "30",
|
||||
])
|
||||
|
||||
assert text_result.exit_code == 0
|
||||
assert "deepseek-v4-pro" in text_result.output
|
||||
assert "$0.0000" in text_result.output
|
||||
assert json_result.exit_code == 0
|
||||
data = json.loads(json_result.output)
|
||||
assert data[0]["input_tok"] == 0
|
||||
assert data[0]["output_tok"] == 0
|
||||
assert data[0]["cost"] == 0
|
||||
|
||||
|
||||
def test_write_live_config_codex_writes_config_and_auth(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HOME", str(tmp_path))
|
||||
monkeypatch.setenv("USERPROFILE", str(tmp_path))
|
||||
config_path = tmp_path / ".codex" / "config.toml"
|
||||
config_path.parent.mkdir(parents=True)
|
||||
config_path.write_text(
|
||||
'approval_policy = "on-request"\n'
|
||||
'sandbox_mode = "workspace-write"\n'
|
||||
'model = "old-model"\n'
|
||||
'model_provider = "old-provider"\n'
|
||||
'\n'
|
||||
'[model_providers.old-provider]\n'
|
||||
'name = "Old Provider"\n'
|
||||
'base_url = "https://old.example.test"\n'
|
||||
'\n'
|
||||
'[model_providers.keep]\n'
|
||||
'name = "Keep Provider"\n'
|
||||
'base_url = "https://keep.example.test"\n'
|
||||
'\n'
|
||||
'# mcp should stay with filesystem\n'
|
||||
'[mcp_servers.filesystem]\n'
|
||||
'command = "npx"\n'
|
||||
'args = ["-y", "@modelcontextprotocol/server-filesystem"]\n'
|
||||
'\n'
|
||||
'[profiles.work]\n'
|
||||
'model = "profile-model"\n',
|
||||
encoding="utf-8",
|
||||
)
|
||||
db = sqlite3.connect(":memory:")
|
||||
db.row_factory = sqlite3.Row
|
||||
db.execute("""
|
||||
CREATE TABLE providers (
|
||||
app_type TEXT,
|
||||
is_current INTEGER,
|
||||
settings_config TEXT
|
||||
)
|
||||
""")
|
||||
db.execute(
|
||||
"INSERT INTO providers VALUES (?, ?, ?)",
|
||||
(
|
||||
"codex",
|
||||
1,
|
||||
json.dumps({
|
||||
"config": (
|
||||
'model = "deepseek-v4-pro"\n'
|
||||
'model_provider = "deepseek"\n'
|
||||
'\n'
|
||||
'[model_providers.deepseek]\n'
|
||||
'name = "DeepSeek"\n'
|
||||
'base_url = "https://api.deepseek.com/v1"\n'
|
||||
),
|
||||
"auth": {"OPENAI_API_KEY": "sk-codex1234567890"},
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
_write_live_config("codex", db)
|
||||
|
||||
merged = config_path.read_text(encoding="utf-8")
|
||||
assert 'model = "deepseek-v4-pro"' in merged
|
||||
assert 'model_provider = "deepseek"' in merged
|
||||
assert '[model_providers.deepseek]' in merged
|
||||
assert 'base_url = "https://api.deepseek.com/v1"' in merged
|
||||
assert '[model_providers.keep]' in merged
|
||||
assert 'approval_policy = "on-request"' in merged
|
||||
assert 'sandbox_mode = "workspace-write"' in merged
|
||||
assert '[mcp_servers.filesystem]' in merged
|
||||
assert '[profiles.work]' in merged
|
||||
parsed = tomlkit.parse(merged)
|
||||
assert parsed["mcp_servers"]["filesystem"]["command"] == "npx"
|
||||
assert parsed["profiles"]["work"]["model"] == "profile-model"
|
||||
auth = json.loads((tmp_path / ".codex" / "auth.json").read_text(encoding="utf-8"))
|
||||
assert auth == {"OPENAI_API_KEY": "sk-codex1234567890"}
|
||||
assert not list((tmp_path / ".codex").glob(".config.toml.*"))
|
||||
assert not list((tmp_path / ".codex").glob(".auth.json.*"))
|
||||
|
||||
|
||||
def test_write_live_config_codex_handles_multiline_strings(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HOME", str(tmp_path))
|
||||
monkeypatch.setenv("USERPROFILE", str(tmp_path))
|
||||
config_path = tmp_path / ".codex" / "config.toml"
|
||||
config_path.parent.mkdir(parents=True)
|
||||
config_path.write_text(
|
||||
'model = "old-model"\n'
|
||||
'model_provider = "deepseek"\n'
|
||||
'\n'
|
||||
'[model_providers.deepseek]\n'
|
||||
'name = "Old DeepSeek"\n'
|
||||
'notes = """\n'
|
||||
'line one\n'
|
||||
'[mcp_servers.fake]\n'
|
||||
'line three\n'
|
||||
'"""\n'
|
||||
'base_url = "https://old.example.test"\n'
|
||||
'\n'
|
||||
'[mcp_servers.real]\n'
|
||||
'command = "npx"\n',
|
||||
encoding="utf-8",
|
||||
)
|
||||
db = sqlite3.connect(":memory:")
|
||||
db.row_factory = sqlite3.Row
|
||||
db.execute("""
|
||||
CREATE TABLE providers (
|
||||
app_type TEXT,
|
||||
is_current INTEGER,
|
||||
settings_config TEXT
|
||||
)
|
||||
""")
|
||||
db.execute(
|
||||
"INSERT INTO providers VALUES (?, ?, ?)",
|
||||
(
|
||||
"codex",
|
||||
1,
|
||||
json.dumps({
|
||||
"config": (
|
||||
'model = "deepseek-v4-pro"\n'
|
||||
'model_provider = "deepseek"\n'
|
||||
'\n'
|
||||
'[model_providers.deepseek]\n'
|
||||
'name = "DeepSeek"\n'
|
||||
'base_url = "https://api.deepseek.com/v1"\n'
|
||||
),
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
_write_live_config("codex", db)
|
||||
|
||||
parsed = tomlkit.parse(config_path.read_text(encoding="utf-8"))
|
||||
assert parsed["model"] == "deepseek-v4-pro"
|
||||
assert parsed["model_provider"] == "deepseek"
|
||||
assert parsed["model_providers"]["deepseek"]["base_url"] == "https://api.deepseek.com/v1"
|
||||
assert parsed["mcp_servers"]["real"]["command"] == "npx"
|
||||
assert "fake" not in parsed["mcp_servers"]
|
||||
|
||||
|
||||
def test_write_live_config_claude_merges_nested_env(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HOME", str(tmp_path))
|
||||
monkeypatch.setenv("USERPROFILE", str(tmp_path))
|
||||
settings_path = tmp_path / ".claude" / "settings.json"
|
||||
settings_path.parent.mkdir(parents=True)
|
||||
settings_path.write_text(json.dumps({"env": {"EXISTING": "1"}}), encoding="utf-8")
|
||||
db = sqlite3.connect(":memory:")
|
||||
db.row_factory = sqlite3.Row
|
||||
db.execute("""
|
||||
CREATE TABLE providers (
|
||||
app_type TEXT,
|
||||
is_current INTEGER,
|
||||
settings_config TEXT
|
||||
)
|
||||
""")
|
||||
db.execute(
|
||||
"INSERT INTO providers VALUES (?, ?, ?)",
|
||||
(
|
||||
"claude",
|
||||
1,
|
||||
json.dumps({"env": {"ANTHROPIC_AUTH_TOKEN": "sk-test1234567890"}}),
|
||||
),
|
||||
)
|
||||
|
||||
_write_live_config("claude", db)
|
||||
|
||||
data = json.loads(settings_path.read_text(encoding="utf-8"))
|
||||
assert data["env"]["EXISTING"] == "1"
|
||||
assert data["env"]["ANTHROPIC_AUTH_TOKEN"] == "sk-test1234567890"
|
||||
|
||||
|
||||
def test_write_live_config_gemini_writes_env_file(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HOME", str(tmp_path))
|
||||
monkeypatch.setenv("USERPROFILE", str(tmp_path))
|
||||
env_path = tmp_path / ".gemini" / ".env"
|
||||
env_path.parent.mkdir(parents=True)
|
||||
env_path.write_text("EXISTING=1\n", encoding="utf-8")
|
||||
db = sqlite3.connect(":memory:")
|
||||
db.row_factory = sqlite3.Row
|
||||
db.execute("""
|
||||
CREATE TABLE providers (
|
||||
app_type TEXT,
|
||||
is_current INTEGER,
|
||||
settings_config TEXT
|
||||
)
|
||||
""")
|
||||
db.execute(
|
||||
"INSERT INTO providers VALUES (?, ?, ?)",
|
||||
(
|
||||
"gemini",
|
||||
1,
|
||||
json.dumps({
|
||||
"env": {
|
||||
"GEMINI_API_KEY": "sk-gemini1234567890",
|
||||
"GOOGLE_GEMINI_BASE_URL": "https://api.example.test",
|
||||
"GEMINI_MODEL": "gemini-3.1-pro",
|
||||
"MULTILINE": "one\nTWO=2",
|
||||
"BAD-KEY": "ignored",
|
||||
},
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
_write_live_config("gemini", db)
|
||||
|
||||
content = env_path.read_text(encoding="utf-8")
|
||||
assert "EXISTING=1" in content
|
||||
assert "GEMINI_API_KEY=sk-gemini1234567890" in content
|
||||
assert "GOOGLE_GEMINI_BASE_URL=https://api.example.test" in content
|
||||
assert "GEMINI_MODEL=gemini-3.1-pro" in content
|
||||
assert "MULTILINE=one\\nTWO=2" in content
|
||||
assert "\nTWO=2" not in content
|
||||
assert "BAD-KEY=ignored" not in content
|
||||
assert not (tmp_path / ".gemini" / "settings.json").exists()
|
||||
|
||||
|
||||
def test_write_live_config_opencode_merges_provider_settings(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HOME", str(tmp_path))
|
||||
monkeypatch.setenv("USERPROFILE", str(tmp_path))
|
||||
config_path = tmp_path / ".config" / "opencode" / "opencode.json"
|
||||
config_path.parent.mkdir(parents=True)
|
||||
config_path.write_text(json.dumps({
|
||||
"$schema": "https://opencode.ai/config.json",
|
||||
"provider": {"old": {"npm": "@ai-sdk/openai"}},
|
||||
}), encoding="utf-8")
|
||||
db = sqlite3.connect(":memory:")
|
||||
db.row_factory = sqlite3.Row
|
||||
db.execute("""
|
||||
CREATE TABLE providers (
|
||||
id TEXT,
|
||||
app_type TEXT,
|
||||
is_current INTEGER,
|
||||
settings_config TEXT
|
||||
)
|
||||
""")
|
||||
db.execute(
|
||||
"INSERT INTO providers VALUES (?, ?, ?, ?)",
|
||||
(
|
||||
"deepseek",
|
||||
"opencode",
|
||||
1,
|
||||
json.dumps({
|
||||
"settingsConfig": {
|
||||
"npm": "@ai-sdk/openai-compatible",
|
||||
"options": {
|
||||
"baseURL": "https://api.deepseek.com/v1",
|
||||
"apiKey": "sk-opencode1234567890",
|
||||
},
|
||||
"models": {
|
||||
"deepseek-v4-pro": {"name": "DeepSeek V4 Pro"},
|
||||
},
|
||||
},
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
_write_live_config("opencode", db)
|
||||
|
||||
data = json.loads(config_path.read_text(encoding="utf-8"))
|
||||
assert "old" in data["provider"]
|
||||
assert data["provider"]["deepseek"]["options"]["apiKey"] == "sk-opencode1234567890"
|
||||
assert data["provider"]["deepseek"]["models"]["deepseek-v4-pro"]["name"] == "DeepSeek V4 Pro"
|
||||
|
||||
|
||||
def test_providers_set_current_codex_writes_live_config(runner, tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HOME", str(tmp_path))
|
||||
monkeypatch.setenv("USERPROFILE", str(tmp_path))
|
||||
config_path = tmp_path / ".codex" / "config.toml"
|
||||
config_path.parent.mkdir(parents=True)
|
||||
config_path.write_text(
|
||||
'approval_policy = "on-request"\n'
|
||||
'\n'
|
||||
'[mcp_servers.filesystem]\n'
|
||||
'command = "npx"\n'
|
||||
'\n'
|
||||
'[profiles.work]\n'
|
||||
'model = "profile-model"\n',
|
||||
encoding="utf-8",
|
||||
)
|
||||
db_path = tmp_path / "cc-switch.db"
|
||||
conn = _init_cli_db(db_path)
|
||||
conn.execute(
|
||||
"INSERT INTO providers VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
"old",
|
||||
"codex",
|
||||
"Old",
|
||||
"default",
|
||||
1,
|
||||
0,
|
||||
json.dumps({"config": 'model = "old"'}),
|
||||
),
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO providers VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
"deepseek",
|
||||
"codex",
|
||||
"DeepSeek",
|
||||
"default",
|
||||
0,
|
||||
1,
|
||||
json.dumps({
|
||||
"config": (
|
||||
'model = "deepseek-v4-pro"\n'
|
||||
'model_provider = "deepseek"\n'
|
||||
'\n'
|
||||
'[model_providers.deepseek]\n'
|
||||
'name = "DeepSeek"\n'
|
||||
'base_url = "https://api.deepseek.com/v1"\n'
|
||||
),
|
||||
"auth": {"OPENAI_API_KEY": "sk-codex1234567890"},
|
||||
}),
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
result = runner.invoke(cli, [
|
||||
"--db", str(db_path), "providers", "set-current", "deepseek", "--app", "codex",
|
||||
])
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert "sk-codex1234567890" not in result.output
|
||||
merged = config_path.read_text(encoding="utf-8")
|
||||
assert 'model = "deepseek-v4-pro"' in merged
|
||||
assert 'model_provider = "deepseek"' in merged
|
||||
assert '[model_providers.deepseek]' in merged
|
||||
assert 'approval_policy = "on-request"' in merged
|
||||
assert '[mcp_servers.filesystem]' in merged
|
||||
assert '[profiles.work]' in merged
|
||||
auth = json.loads((tmp_path / ".codex" / "auth.json").read_text(encoding="utf-8"))
|
||||
assert auth["OPENAI_API_KEY"] == "sk-codex1234567890"
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
current = conn.execute(
|
||||
"SELECT id FROM providers WHERE app_type=? AND is_current=1", ("codex",)
|
||||
).fetchone()[0]
|
||||
conn.close()
|
||||
assert current == "deepseek"
|
||||
|
||||
|
||||
def test_main_help(runner):
|
||||
result = runner.invoke(cli, ["--help"])
|
||||
assert result.exit_code == 0
|
||||
assert "CC Switch" in result.output
|
||||
|
||||
|
||||
def test_providers_help(runner):
|
||||
result = runner.invoke(cli, ["providers", "--help"])
|
||||
assert result.exit_code == 0
|
||||
assert "Manage AI provider" in result.output
|
||||
|
||||
|
||||
def test_usage_help(runner):
|
||||
result = runner.invoke(cli, ["usage", "--help"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
|
||||
def test_skills_help(runner):
|
||||
result = runner.invoke(cli, ["skills", "--help"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
|
||||
def test_mcp_help(runner):
|
||||
result = runner.invoke(cli, ["mcp", "--help"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
|
||||
def test_proxy_help(runner):
|
||||
result = runner.invoke(cli, ["proxy", "--help"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
|
||||
def test_settings_help(runner):
|
||||
result = runner.invoke(cli, ["settings", "--help"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
|
||||
def test_sessions_help(runner):
|
||||
result = runner.invoke(cli, ["sessions", "--help"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
|
||||
def test_all_command_groups(runner):
|
||||
result = runner.invoke(cli, ["--help"])
|
||||
assert "providers" in result.output
|
||||
assert "proxy" in result.output
|
||||
assert "mcp" in result.output
|
||||
assert "skills" in result.output
|
||||
assert "usage" in result.output
|
||||
assert "settings" in result.output
|
||||
assert "sessions" in result.output
|
||||
@@ -0,0 +1,191 @@
|
||||
"""E2E tests for CC Switch CLI — tests against the real CC Switch database.
|
||||
|
||||
These tests require a real CC Switch installation with an active database.
|
||||
Set env CCSWITCH_HOME to point to a test home directory if needed.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
LIVE_DB_OPT_IN_ENV = "CLI_ANYTHING_CCSWITCH_LIVE_DB"
|
||||
NO_LIVE_DB_TESTS = {"test_help", "test_providers_help"}
|
||||
|
||||
|
||||
def _live_db_path() -> Path:
|
||||
home = Path(os.environ.get("CCSWITCH_HOME", os.path.expanduser("~")))
|
||||
return home / ".cc-switch" / "cc-switch.db"
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _gate_live_db_tests(request):
|
||||
if request.node.name in NO_LIVE_DB_TESTS:
|
||||
return
|
||||
|
||||
if os.environ.get(LIVE_DB_OPT_IN_ENV) != "1":
|
||||
pytest.skip(f"set {LIVE_DB_OPT_IN_ENV}=1 to run live CC Switch DB tests")
|
||||
|
||||
db_path = _live_db_path()
|
||||
if not db_path.is_file():
|
||||
pytest.skip(f"live CC Switch database not found at {db_path}")
|
||||
|
||||
|
||||
def _resolve_cli(name):
|
||||
"""Resolve installed CLI command; falls back to python -m for dev."""
|
||||
import shutil
|
||||
force = os.environ.get("CLI_ANYTHING_FORCE_INSTALLED", "").strip() == "1"
|
||||
path = shutil.which(name)
|
||||
if path:
|
||||
print(f"[_resolve_cli] Using installed command: {path}")
|
||||
return [path]
|
||||
if force:
|
||||
raise RuntimeError(f"{name} not found in PATH. Install with: pip install -e .")
|
||||
module = "cli_anything.ccswitch.ccswitch_cli"
|
||||
print(f"[_resolve_cli] Falling back to: {sys.executable} -m {module}")
|
||||
return [sys.executable, "-m", module]
|
||||
|
||||
|
||||
CLI_BASE = _resolve_cli("cli-anything-ccswitch")
|
||||
|
||||
|
||||
class TestCLISubprocess:
|
||||
"""Subprocess tests that invoke the real installed CLI command."""
|
||||
|
||||
def _run(self, args, check=True):
|
||||
return subprocess.run(
|
||||
CLI_BASE + args,
|
||||
capture_output=True, text=True,
|
||||
check=check,
|
||||
)
|
||||
|
||||
# ── help ──
|
||||
|
||||
def test_help(self):
|
||||
result = self._run(["--help"])
|
||||
assert result.returncode == 0
|
||||
assert "CC Switch" in result.stdout
|
||||
|
||||
def test_providers_help(self):
|
||||
result = self._run(["providers", "--help"])
|
||||
assert result.returncode == 0
|
||||
|
||||
# ── providers ──
|
||||
|
||||
def test_providers_list(self):
|
||||
result = self._run(["providers", "list"])
|
||||
assert result.returncode == 0
|
||||
# Should have table headers
|
||||
assert "App" in result.stdout or len(result.stdout) > 0
|
||||
|
||||
def test_providers_list_json(self):
|
||||
result = self._run(["--json", "providers", "list"])
|
||||
assert result.returncode == 0
|
||||
data = json.loads(result.stdout)
|
||||
assert isinstance(data, list)
|
||||
if data:
|
||||
assert "app_type" in data[0] or "id" in data[0]
|
||||
|
||||
def test_providers_list_filter_claude(self):
|
||||
result = self._run(["providers", "list", "--app", "claude"])
|
||||
assert result.returncode == 0
|
||||
|
||||
def test_providers_get_nonexistent(self):
|
||||
result = self._run(
|
||||
["providers", "get", "__nonexistent__", "--app", "claude"],
|
||||
check=False,
|
||||
)
|
||||
assert result.returncode != 0
|
||||
|
||||
def test_providers_get_no_api_key_leaked(self):
|
||||
"""Ensure --json output of providers get masks sensitive values."""
|
||||
result = self._run(
|
||||
["--json", "providers", "list", "--app", "claude"],
|
||||
)
|
||||
data = json.loads(result.stdout)
|
||||
for prov in data:
|
||||
prov_str = json.dumps(prov)
|
||||
# No raw API tokens in output
|
||||
assert "sk-" not in prov_str.lower() or "sk-" not in prov_str
|
||||
|
||||
# ── skills ──
|
||||
|
||||
def test_skills_list(self):
|
||||
result = self._run(["skills", "list"])
|
||||
assert result.returncode == 0
|
||||
assert "Name" in result.stdout or len(result.stdout) > 0
|
||||
|
||||
def test_skills_list_json(self):
|
||||
result = self._run(["--json", "skills", "list"])
|
||||
assert result.returncode == 0
|
||||
data = json.loads(result.stdout)
|
||||
assert isinstance(data, list)
|
||||
|
||||
def test_skills_repos(self):
|
||||
result = self._run(["skills", "repos"])
|
||||
assert result.returncode == 0
|
||||
|
||||
# ── usage ──
|
||||
|
||||
def test_usage_stats(self):
|
||||
result = self._run(["usage", "stats", "--days", "30"])
|
||||
assert result.returncode == 0
|
||||
|
||||
def test_usage_stats_json(self):
|
||||
result = self._run(["--json", "usage", "stats", "--days", "30"])
|
||||
assert result.returncode == 0
|
||||
data = json.loads(result.stdout)
|
||||
assert isinstance(data, list)
|
||||
|
||||
def test_usage_logs(self):
|
||||
result = self._run(["usage", "logs", "--limit", "5"])
|
||||
assert result.returncode == 0
|
||||
|
||||
# ── mcp ──
|
||||
|
||||
def test_mcp_list(self):
|
||||
result = self._run(["mcp", "list"])
|
||||
assert result.returncode == 0
|
||||
|
||||
def test_mcp_list_json(self):
|
||||
result = self._run(["--json", "mcp", "list"])
|
||||
assert result.returncode == 0
|
||||
data = json.loads(result.stdout)
|
||||
assert isinstance(data, list)
|
||||
|
||||
# ── settings ──
|
||||
|
||||
def test_settings_list(self):
|
||||
result = self._run(["settings", "list"])
|
||||
assert result.returncode == 0
|
||||
|
||||
def test_settings_list_json(self):
|
||||
result = self._run(["--json", "settings", "list"])
|
||||
assert result.returncode == 0
|
||||
data = json.loads(result.stdout)
|
||||
assert isinstance(data, dict)
|
||||
|
||||
# ── proxy ──
|
||||
|
||||
def test_proxy_status(self):
|
||||
result = self._run(["proxy", "status", "--app", "claude"])
|
||||
assert result.returncode == 0
|
||||
assert "127.0.0.1" in result.stdout or "Proxy" in result.stdout
|
||||
|
||||
# ── combined / overview ──
|
||||
|
||||
def test_full_status(self):
|
||||
result = self._run([])
|
||||
assert result.returncode == 0
|
||||
assert "CC Switch" in result.stdout or "Status" in result.stdout
|
||||
|
||||
def test_full_status_json(self):
|
||||
result = self._run(["--json"])
|
||||
assert result.returncode == 0
|
||||
data = json.loads(result.stdout)
|
||||
assert "providers" in data
|
||||
assert isinstance(data["providers"], int)
|
||||
@@ -0,0 +1 @@
|
||||
"""Utility modules for ccswitch CLI."""
|
||||
70
cc-switch/agent-harness/cli_anything/ccswitch/utils/db.py
Normal file
70
cc-switch/agent-harness/cli_anything/ccswitch/utils/db.py
Normal file
@@ -0,0 +1,70 @@
|
||||
"""Database connection and utility functions for CC Switch CLI."""
|
||||
|
||||
import os
|
||||
import sqlite3
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
def get_cc_switch_dir() -> Path:
|
||||
"""Get the CC Switch config directory."""
|
||||
home = Path(os.environ.get("CCSWITCH_HOME", os.path.expanduser("~")))
|
||||
return home / ".cc-switch"
|
||||
|
||||
|
||||
def get_db_path() -> Path:
|
||||
"""Get the SQLite database path."""
|
||||
return get_cc_switch_dir() / "cc-switch.db"
|
||||
|
||||
|
||||
def get_config_path() -> Path:
|
||||
"""Get the main app config JSON path."""
|
||||
return get_cc_switch_dir() / "config.json"
|
||||
|
||||
|
||||
def get_settings_path() -> Path:
|
||||
"""Get the settings JSON path."""
|
||||
return get_cc_switch_dir() / "settings.json"
|
||||
|
||||
|
||||
def connect_db(db_path: Optional[Path] = None) -> sqlite3.Connection:
|
||||
"""Connect to the CC Switch SQLite database."""
|
||||
path = db_path or get_db_path()
|
||||
conn = sqlite3.connect(str(path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA foreign_keys=ON")
|
||||
return conn
|
||||
|
||||
|
||||
def load_config(config_path: Optional[Path] = None) -> dict:
|
||||
"""Load the main CC Switch config.json."""
|
||||
path = config_path or get_config_path()
|
||||
if not path.exists():
|
||||
return {}
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
content = f.read().strip()
|
||||
if not content:
|
||||
return {}
|
||||
return json.loads(content)
|
||||
|
||||
|
||||
def save_config(data: dict, config_path: Optional[Path] = None) -> None:
|
||||
"""Save the main CC Switch config.json."""
|
||||
path = config_path or get_config_path()
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||||
|
||||
|
||||
def load_settings() -> dict:
|
||||
"""Load CC Switch settings.json (UI preferences)."""
|
||||
path = get_settings_path()
|
||||
if not path.exists():
|
||||
return {}
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
VALID_APP_TYPES = ("claude", "codex", "gemini", "opencode", "openclaw", "hermes")
|
||||
567
cc-switch/agent-harness/cli_anything/ccswitch/utils/repl_skin.py
Normal file
567
cc-switch/agent-harness/cli_anything/ccswitch/utils/repl_skin.py
Normal file
@@ -0,0 +1,567 @@
|
||||
"""cli-anything REPL Skin — Unified terminal interface for all CLI harnesses.
|
||||
|
||||
Copy this file into your CLI package at:
|
||||
cli_anything/<software>/utils/repl_skin.py
|
||||
|
||||
Usage:
|
||||
from cli_anything.<software>.utils.repl_skin import ReplSkin
|
||||
|
||||
skin = ReplSkin("shotcut", version="1.0.0")
|
||||
skin.print_banner() # auto-detects repo-root or packaged SKILL.md
|
||||
prompt_text = skin.prompt(project_name="my_video.mlt", modified=True)
|
||||
skin.success("Project saved")
|
||||
skin.error("File not found")
|
||||
skin.warning("Unsaved changes")
|
||||
skin.info("Processing 24 clips...")
|
||||
skin.status("Track 1", "3 clips, 00:02:30")
|
||||
skin.table(headers, rows)
|
||||
skin.print_goodbye()
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# ── ANSI color codes (no external deps for core styling) ──────────────
|
||||
|
||||
_RESET = "\033[0m"
|
||||
_BOLD = "\033[1m"
|
||||
_DIM = "\033[2m"
|
||||
_ITALIC = "\033[3m"
|
||||
_UNDERLINE = "\033[4m"
|
||||
|
||||
# Brand colors
|
||||
_CYAN = "\033[38;5;80m" # cli-anything brand cyan
|
||||
_CYAN_BG = "\033[48;5;80m"
|
||||
_WHITE = "\033[97m"
|
||||
_GRAY = "\033[38;5;245m"
|
||||
_DARK_GRAY = "\033[38;5;240m"
|
||||
_LIGHT_GRAY = "\033[38;5;250m"
|
||||
|
||||
# Software accent colors — each software gets a unique accent
|
||||
_ACCENT_COLORS = {
|
||||
"gimp": "\033[38;5;214m", # warm orange
|
||||
"blender": "\033[38;5;208m", # deep orange
|
||||
"inkscape": "\033[38;5;39m", # bright blue
|
||||
"audacity": "\033[38;5;33m", # navy blue
|
||||
"libreoffice": "\033[38;5;40m", # green
|
||||
"obs_studio": "\033[38;5;55m", # purple
|
||||
"kdenlive": "\033[38;5;69m", # slate blue
|
||||
"shotcut": "\033[38;5;35m", # teal green
|
||||
}
|
||||
_DEFAULT_ACCENT = "\033[38;5;75m" # default sky blue
|
||||
|
||||
# Status colors
|
||||
_GREEN = "\033[38;5;78m"
|
||||
_YELLOW = "\033[38;5;220m"
|
||||
_RED = "\033[38;5;196m"
|
||||
_BLUE = "\033[38;5;75m"
|
||||
_MAGENTA = "\033[38;5;176m"
|
||||
|
||||
_SKILL_SOURCE_REPO = os.environ.get("CLI_ANYTHING_SKILL_REPO", "HKUDS/CLI-Anything")
|
||||
|
||||
# ── Brand icon ────────────────────────────────────────────────────────
|
||||
|
||||
# The cli-anything icon: a small colored diamond/chevron mark
|
||||
_ICON = f"{_CYAN}{_BOLD}◆{_RESET}"
|
||||
_ICON_SMALL = f"{_CYAN}▸{_RESET}"
|
||||
|
||||
# ── Box drawing characters ────────────────────────────────────────────
|
||||
|
||||
_H_LINE = "─"
|
||||
_V_LINE = "│"
|
||||
_TL = "╭"
|
||||
_TR = "╮"
|
||||
_BL = "╰"
|
||||
_BR = "╯"
|
||||
_T_DOWN = "┬"
|
||||
_T_UP = "┴"
|
||||
_T_RIGHT = "├"
|
||||
_T_LEFT = "┤"
|
||||
_CROSS = "┼"
|
||||
|
||||
|
||||
def _strip_ansi(text: str) -> str:
|
||||
"""Remove ANSI escape codes for length calculation."""
|
||||
import re
|
||||
return re.sub(r"\033\[[^m]*m", "", text)
|
||||
|
||||
|
||||
def _visible_len(text: str) -> int:
|
||||
"""Get visible length of text (excluding ANSI codes)."""
|
||||
return len(_strip_ansi(text))
|
||||
|
||||
|
||||
def _display_home_path(path: str) -> str:
|
||||
"""Display a path relative to the home directory when possible."""
|
||||
expanded = Path(path).expanduser().resolve()
|
||||
home = Path.home().resolve()
|
||||
try:
|
||||
relative = expanded.relative_to(home)
|
||||
return f"~/{relative.as_posix()}"
|
||||
except ValueError:
|
||||
return str(expanded)
|
||||
|
||||
|
||||
class ReplSkin:
|
||||
"""Unified REPL skin for cli-anything CLIs.
|
||||
|
||||
Provides consistent branding, prompts, and message formatting
|
||||
across all CLI harnesses built with the cli-anything methodology.
|
||||
"""
|
||||
|
||||
def __init__(self, software: str, version: str = "1.0.0",
|
||||
history_file: str | None = None, skill_path: str | None = None):
|
||||
"""Initialize the REPL skin.
|
||||
|
||||
Args:
|
||||
software: Software name (e.g., "gimp", "shotcut", "blender").
|
||||
version: CLI version string.
|
||||
history_file: Path for persistent command history.
|
||||
Defaults to ~/.cli-anything-<software>/history
|
||||
skill_path: Path to the SKILL.md file for agent discovery.
|
||||
Auto-detected from the repo-root skills/ tree when present,
|
||||
otherwise from the package's skills/ directory.
|
||||
Displayed in banner for AI agents to know where to read skill info.
|
||||
"""
|
||||
self.software = software.lower().replace("-", "_")
|
||||
self.display_name = software.replace("_", " ").title()
|
||||
self.version = version
|
||||
software_aliases = {"iterm2_ctl": "iterm2"}
|
||||
self.skill_slug = software_aliases.get(self.software, self.software).replace("_", "-")
|
||||
self.skill_id = f"cli-anything-{self.skill_slug}"
|
||||
self.skill_install_cmd = (
|
||||
f"npx skills add {_SKILL_SOURCE_REPO} --skill {self.skill_id} -g -y"
|
||||
)
|
||||
global_skill_root = Path(
|
||||
os.environ.get("CLI_ANYTHING_GLOBAL_SKILLS_DIR", str(Path.home() / ".agents" / "skills"))
|
||||
).expanduser()
|
||||
self.global_skill_path = str(global_skill_root / self.skill_id / "SKILL.md")
|
||||
|
||||
# Prefer repo-root canonical skills/<skill-id>/SKILL.md when running
|
||||
# inside the CLI-Anything monorepo. Fall back to the packaged
|
||||
# cli_anything/<software>/skills/SKILL.md for installed harnesses.
|
||||
if skill_path is None:
|
||||
package_skill = Path(__file__).resolve().parent.parent / "skills" / "SKILL.md"
|
||||
repo_skill = None
|
||||
for parent in Path(__file__).resolve().parents:
|
||||
candidate = parent / "skills" / self.skill_id / "SKILL.md"
|
||||
if candidate.is_file():
|
||||
repo_skill = candidate
|
||||
break
|
||||
if repo_skill and repo_skill.is_file():
|
||||
skill_path = str(repo_skill)
|
||||
elif package_skill.is_file():
|
||||
skill_path = str(package_skill)
|
||||
self.skill_path = skill_path
|
||||
self.accent = _ACCENT_COLORS.get(self.software, _DEFAULT_ACCENT)
|
||||
|
||||
# History file
|
||||
if history_file is None:
|
||||
hist_dir = Path.home() / f".cli-anything-{self.software}"
|
||||
hist_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.history_file = str(hist_dir / "history")
|
||||
else:
|
||||
self.history_file = history_file
|
||||
|
||||
# Detect terminal capabilities
|
||||
self._color = self._detect_color_support()
|
||||
|
||||
def _detect_color_support(self) -> bool:
|
||||
"""Check if terminal supports color."""
|
||||
if os.environ.get("NO_COLOR"):
|
||||
return False
|
||||
if os.environ.get("CLI_ANYTHING_NO_COLOR"):
|
||||
return False
|
||||
if not hasattr(sys.stdout, "isatty"):
|
||||
return False
|
||||
return sys.stdout.isatty()
|
||||
|
||||
def _c(self, code: str, text: str) -> str:
|
||||
"""Apply color code if colors are supported."""
|
||||
if not self._color:
|
||||
return text
|
||||
return f"{code}{text}{_RESET}"
|
||||
|
||||
# ── Banner ────────────────────────────────────────────────────────
|
||||
|
||||
def print_banner(self):
|
||||
"""Print the startup banner with branding."""
|
||||
import textwrap
|
||||
|
||||
inner = 72
|
||||
|
||||
def _box_line(content: str) -> str:
|
||||
"""Wrap content in box drawing, padding to inner width."""
|
||||
pad = inner - _visible_len(content)
|
||||
vl = self._c(_DARK_GRAY, _V_LINE)
|
||||
return f"{vl}{content}{' ' * max(0, pad)}{vl}"
|
||||
|
||||
def _meta_lines(label: str, value: str) -> list[str]:
|
||||
"""Wrap a metadata line for the banner box."""
|
||||
icon = self._c(_MAGENTA, "◇")
|
||||
label_text = self._c(_DARK_GRAY, label)
|
||||
prefix = f" {icon} {label_text} "
|
||||
available = max(12, inner - _visible_len(prefix))
|
||||
wrapped = textwrap.wrap(
|
||||
value,
|
||||
width=available,
|
||||
break_long_words=True,
|
||||
break_on_hyphens=False,
|
||||
) or [""]
|
||||
lines = [f"{prefix}{self._c(_LIGHT_GRAY, wrapped[0])}"]
|
||||
continuation_prefix = " " * _visible_len(prefix)
|
||||
for chunk in wrapped[1:]:
|
||||
lines.append(f"{continuation_prefix}{self._c(_LIGHT_GRAY, chunk)}")
|
||||
return lines
|
||||
|
||||
top = self._c(_DARK_GRAY, f"{_TL}{_H_LINE * inner}{_TR}")
|
||||
bot = self._c(_DARK_GRAY, f"{_BL}{_H_LINE * inner}{_BR}")
|
||||
|
||||
# Title: ◆ cli-anything · Shotcut
|
||||
icon = self._c(_CYAN + _BOLD, "◆")
|
||||
brand = self._c(_CYAN + _BOLD, "cli-anything")
|
||||
dot = self._c(_DARK_GRAY, "·")
|
||||
name = self._c(self.accent + _BOLD, self.display_name)
|
||||
title = f" {icon} {brand} {dot} {name}"
|
||||
|
||||
ver = f" {self._c(_DARK_GRAY, f' v{self.version}')}"
|
||||
tip = f" {self._c(_DARK_GRAY, ' Type help for commands, quit to exit')}"
|
||||
empty = ""
|
||||
|
||||
meta_lines: list[str] = []
|
||||
meta_lines.extend(_meta_lines("Install:", self.skill_install_cmd))
|
||||
meta_lines.extend(_meta_lines("Global skill:", _display_home_path(self.global_skill_path)))
|
||||
print(top)
|
||||
print(_box_line(title))
|
||||
print(_box_line(ver))
|
||||
for line in meta_lines:
|
||||
print(_box_line(line))
|
||||
print(_box_line(empty))
|
||||
print(_box_line(tip))
|
||||
print(bot)
|
||||
print()
|
||||
|
||||
# ── Prompt ────────────────────────────────────────────────────────
|
||||
|
||||
def prompt(self, project_name: str = "", modified: bool = False,
|
||||
context: str = "") -> str:
|
||||
"""Build a styled prompt string for prompt_toolkit or input().
|
||||
|
||||
Args:
|
||||
project_name: Current project name (empty if none open).
|
||||
modified: Whether the project has unsaved changes.
|
||||
context: Optional extra context to show in prompt.
|
||||
|
||||
Returns:
|
||||
Formatted prompt string.
|
||||
"""
|
||||
parts = []
|
||||
|
||||
# Icon
|
||||
if self._color:
|
||||
parts.append(f"{_CYAN}◆{_RESET} ")
|
||||
else:
|
||||
parts.append("> ")
|
||||
|
||||
# Software name
|
||||
parts.append(self._c(self.accent + _BOLD, self.software))
|
||||
|
||||
# Project context
|
||||
if project_name or context:
|
||||
ctx = context or project_name
|
||||
mod = "*" if modified else ""
|
||||
parts.append(f" {self._c(_DARK_GRAY, '[')}")
|
||||
parts.append(self._c(_LIGHT_GRAY, f"{ctx}{mod}"))
|
||||
parts.append(self._c(_DARK_GRAY, ']'))
|
||||
|
||||
parts.append(self._c(_GRAY, " ❯ "))
|
||||
|
||||
return "".join(parts)
|
||||
|
||||
def prompt_tokens(self, project_name: str = "", modified: bool = False,
|
||||
context: str = ""):
|
||||
"""Build prompt_toolkit formatted text tokens for the prompt.
|
||||
|
||||
Use with prompt_toolkit's FormattedText for proper ANSI handling.
|
||||
|
||||
Returns:
|
||||
list of (style, text) tuples for prompt_toolkit.
|
||||
"""
|
||||
accent_hex = _ANSI_256_TO_HEX.get(self.accent, "#5fafff")
|
||||
tokens = []
|
||||
|
||||
tokens.append(("class:icon", "◆ "))
|
||||
tokens.append(("class:software", self.software))
|
||||
|
||||
if project_name or context:
|
||||
ctx = context or project_name
|
||||
mod = "*" if modified else ""
|
||||
tokens.append(("class:bracket", " ["))
|
||||
tokens.append(("class:context", f"{ctx}{mod}"))
|
||||
tokens.append(("class:bracket", "]"))
|
||||
|
||||
tokens.append(("class:arrow", " ❯ "))
|
||||
|
||||
return tokens
|
||||
|
||||
def get_prompt_style(self):
|
||||
"""Get a prompt_toolkit Style object matching the skin.
|
||||
|
||||
Returns:
|
||||
prompt_toolkit.styles.Style
|
||||
"""
|
||||
try:
|
||||
from prompt_toolkit.styles import Style
|
||||
except ImportError:
|
||||
return None
|
||||
|
||||
accent_hex = _ANSI_256_TO_HEX.get(self.accent, "#5fafff")
|
||||
|
||||
return Style.from_dict({
|
||||
"icon": "#5fdfdf bold", # cyan brand color
|
||||
"software": f"{accent_hex} bold",
|
||||
"bracket": "#585858",
|
||||
"context": "#bcbcbc",
|
||||
"arrow": "#808080",
|
||||
# Completion menu
|
||||
"completion-menu.completion": "bg:#303030 #bcbcbc",
|
||||
"completion-menu.completion.current": f"bg:{accent_hex} #000000",
|
||||
"completion-menu.meta.completion": "bg:#303030 #808080",
|
||||
"completion-menu.meta.completion.current": f"bg:{accent_hex} #000000",
|
||||
# Auto-suggest
|
||||
"auto-suggest": "#585858",
|
||||
# Bottom toolbar
|
||||
"bottom-toolbar": "bg:#1c1c1c #808080",
|
||||
"bottom-toolbar.text": "#808080",
|
||||
})
|
||||
|
||||
# ── Messages ──────────────────────────────────────────────────────
|
||||
|
||||
def success(self, message: str):
|
||||
"""Print a success message with green checkmark."""
|
||||
icon = self._c(_GREEN + _BOLD, "✓")
|
||||
print(f" {icon} {self._c(_GREEN, message)}")
|
||||
|
||||
def error(self, message: str):
|
||||
"""Print an error message with red cross."""
|
||||
icon = self._c(_RED + _BOLD, "✗")
|
||||
print(f" {icon} {self._c(_RED, message)}", file=sys.stderr)
|
||||
|
||||
def warning(self, message: str):
|
||||
"""Print a warning message with yellow triangle."""
|
||||
icon = self._c(_YELLOW + _BOLD, "⚠")
|
||||
print(f" {icon} {self._c(_YELLOW, message)}")
|
||||
|
||||
def info(self, message: str):
|
||||
"""Print an info message with blue dot."""
|
||||
icon = self._c(_BLUE, "●")
|
||||
print(f" {icon} {self._c(_LIGHT_GRAY, message)}")
|
||||
|
||||
def hint(self, message: str):
|
||||
"""Print a subtle hint message."""
|
||||
print(f" {self._c(_DARK_GRAY, message)}")
|
||||
|
||||
def section(self, title: str):
|
||||
"""Print a section header."""
|
||||
print()
|
||||
print(f" {self._c(self.accent + _BOLD, title)}")
|
||||
print(f" {self._c(_DARK_GRAY, _H_LINE * len(title))}")
|
||||
|
||||
# ── Status display ────────────────────────────────────────────────
|
||||
|
||||
def status(self, label: str, value: str):
|
||||
"""Print a key-value status line."""
|
||||
lbl = self._c(_GRAY, f" {label}:")
|
||||
val = self._c(_WHITE, f" {value}")
|
||||
print(f"{lbl}{val}")
|
||||
|
||||
def status_block(self, items: dict[str, str], title: str = ""):
|
||||
"""Print a block of status key-value pairs.
|
||||
|
||||
Args:
|
||||
items: Dict of label -> value pairs.
|
||||
title: Optional title for the block.
|
||||
"""
|
||||
if title:
|
||||
self.section(title)
|
||||
|
||||
max_key = max(len(k) for k in items) if items else 0
|
||||
for label, value in items.items():
|
||||
lbl = self._c(_GRAY, f" {label:<{max_key}}")
|
||||
val = self._c(_WHITE, f" {value}")
|
||||
print(f"{lbl}{val}")
|
||||
|
||||
def progress(self, current: int, total: int, label: str = ""):
|
||||
"""Print a simple progress indicator.
|
||||
|
||||
Args:
|
||||
current: Current step number.
|
||||
total: Total number of steps.
|
||||
label: Optional label for the progress.
|
||||
"""
|
||||
pct = int(current / total * 100) if total > 0 else 0
|
||||
bar_width = 20
|
||||
filled = int(bar_width * current / total) if total > 0 else 0
|
||||
bar = "█" * filled + "░" * (bar_width - filled)
|
||||
text = f" {self._c(_CYAN, bar)} {self._c(_GRAY, f'{pct:3d}%')}"
|
||||
if label:
|
||||
text += f" {self._c(_LIGHT_GRAY, label)}"
|
||||
print(text)
|
||||
|
||||
# ── Table display ─────────────────────────────────────────────────
|
||||
|
||||
def table(self, headers: list[str], rows: list[list[str]],
|
||||
max_col_width: int = 40):
|
||||
"""Print a formatted table with box-drawing characters.
|
||||
|
||||
Args:
|
||||
headers: Column header strings.
|
||||
rows: List of rows, each a list of cell strings.
|
||||
max_col_width: Maximum column width before truncation.
|
||||
"""
|
||||
if not headers:
|
||||
return
|
||||
|
||||
# Calculate column widths
|
||||
col_widths = [min(len(h), max_col_width) for h in headers]
|
||||
for row in rows:
|
||||
for i, cell in enumerate(row):
|
||||
if i < len(col_widths):
|
||||
col_widths[i] = min(
|
||||
max(col_widths[i], len(str(cell))), max_col_width
|
||||
)
|
||||
|
||||
def pad(text: str, width: int) -> str:
|
||||
t = str(text)[:width]
|
||||
return t + " " * (width - len(t))
|
||||
|
||||
# Header
|
||||
header_cells = [
|
||||
self._c(_CYAN + _BOLD, pad(h, col_widths[i]))
|
||||
for i, h in enumerate(headers)
|
||||
]
|
||||
sep = self._c(_DARK_GRAY, f" {_V_LINE} ")
|
||||
header_line = f" {sep.join(header_cells)}"
|
||||
print(header_line)
|
||||
|
||||
# Separator
|
||||
sep_parts = [self._c(_DARK_GRAY, _H_LINE * w) for w in col_widths]
|
||||
sep_line = self._c(_DARK_GRAY, f" {'───'.join([_H_LINE * w for w in col_widths])}")
|
||||
print(sep_line)
|
||||
|
||||
# Rows
|
||||
for row in rows:
|
||||
cells = []
|
||||
for i, cell in enumerate(row):
|
||||
if i < len(col_widths):
|
||||
cells.append(self._c(_LIGHT_GRAY, pad(str(cell), col_widths[i])))
|
||||
row_sep = self._c(_DARK_GRAY, f" {_V_LINE} ")
|
||||
print(f" {row_sep.join(cells)}")
|
||||
|
||||
# ── Help display ──────────────────────────────────────────────────
|
||||
|
||||
def help(self, commands: dict[str, str]):
|
||||
"""Print a formatted help listing.
|
||||
|
||||
Args:
|
||||
commands: Dict of command -> description pairs.
|
||||
"""
|
||||
self.section("Commands")
|
||||
max_cmd = max(len(c) for c in commands) if commands else 0
|
||||
for cmd, desc in commands.items():
|
||||
cmd_styled = self._c(self.accent, f" {cmd:<{max_cmd}}")
|
||||
desc_styled = self._c(_GRAY, f" {desc}")
|
||||
print(f"{cmd_styled}{desc_styled}")
|
||||
print()
|
||||
|
||||
# ── Goodbye ───────────────────────────────────────────────────────
|
||||
|
||||
def print_goodbye(self):
|
||||
"""Print a styled goodbye message."""
|
||||
print(f"\n {_ICON_SMALL} {self._c(_GRAY, 'Goodbye!')}\n")
|
||||
|
||||
# ── Prompt toolkit session factory ────────────────────────────────
|
||||
|
||||
def create_prompt_session(self):
|
||||
"""Create a prompt_toolkit PromptSession with skin styling.
|
||||
|
||||
Returns:
|
||||
A configured PromptSession, or None if prompt_toolkit unavailable.
|
||||
"""
|
||||
try:
|
||||
from prompt_toolkit import PromptSession
|
||||
from prompt_toolkit.history import FileHistory
|
||||
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
|
||||
from prompt_toolkit.formatted_text import FormattedText
|
||||
|
||||
style = self.get_prompt_style()
|
||||
|
||||
session = PromptSession(
|
||||
history=FileHistory(self.history_file),
|
||||
auto_suggest=AutoSuggestFromHistory(),
|
||||
style=style,
|
||||
enable_history_search=True,
|
||||
)
|
||||
return session
|
||||
except ImportError:
|
||||
return None
|
||||
|
||||
def get_input(self, pt_session, project_name: str = "",
|
||||
modified: bool = False, context: str = "") -> str:
|
||||
"""Get input from user using prompt_toolkit or fallback.
|
||||
|
||||
Args:
|
||||
pt_session: A prompt_toolkit PromptSession (or None).
|
||||
project_name: Current project name.
|
||||
modified: Whether project has unsaved changes.
|
||||
context: Optional context string.
|
||||
|
||||
Returns:
|
||||
User input string (stripped).
|
||||
"""
|
||||
if pt_session is not None:
|
||||
from prompt_toolkit.formatted_text import FormattedText
|
||||
tokens = self.prompt_tokens(project_name, modified, context)
|
||||
return pt_session.prompt(FormattedText(tokens)).strip()
|
||||
else:
|
||||
raw_prompt = self.prompt(project_name, modified, context)
|
||||
return input(raw_prompt).strip()
|
||||
|
||||
# ── Toolbar builder ───────────────────────────────────────────────
|
||||
|
||||
def bottom_toolbar(self, items: dict[str, str]):
|
||||
"""Create a bottom toolbar callback for prompt_toolkit.
|
||||
|
||||
Args:
|
||||
items: Dict of label -> value pairs to show in toolbar.
|
||||
|
||||
Returns:
|
||||
A callable that returns FormattedText for the toolbar.
|
||||
"""
|
||||
def toolbar():
|
||||
from prompt_toolkit.formatted_text import FormattedText
|
||||
parts = []
|
||||
for i, (k, v) in enumerate(items.items()):
|
||||
if i > 0:
|
||||
parts.append(("class:bottom-toolbar.text", " │ "))
|
||||
parts.append(("class:bottom-toolbar.text", f" {k}: "))
|
||||
parts.append(("class:bottom-toolbar", v))
|
||||
return FormattedText(parts)
|
||||
return toolbar
|
||||
|
||||
|
||||
# ── ANSI 256-color to hex mapping (for prompt_toolkit styles) ─────────
|
||||
|
||||
_ANSI_256_TO_HEX = {
|
||||
"\033[38;5;33m": "#0087ff", # audacity navy blue
|
||||
"\033[38;5;35m": "#00af5f", # shotcut teal
|
||||
"\033[38;5;39m": "#00afff", # inkscape bright blue
|
||||
"\033[38;5;40m": "#00d700", # libreoffice green
|
||||
"\033[38;5;55m": "#5f00af", # obs purple
|
||||
"\033[38;5;69m": "#5f87ff", # kdenlive slate blue
|
||||
"\033[38;5;75m": "#5fafff", # default sky blue
|
||||
"\033[38;5;80m": "#5fd7d7", # brand cyan
|
||||
"\033[38;5;208m": "#ff8700", # blender deep orange
|
||||
"\033[38;5;214m": "#ffaf00", # gimp warm orange
|
||||
}
|
||||
24
cc-switch/agent-harness/setup.py
Normal file
24
cc-switch/agent-harness/setup.py
Normal file
@@ -0,0 +1,24 @@
|
||||
"""Setup script for CC Switch CLI harness."""
|
||||
from setuptools import setup, find_namespace_packages
|
||||
|
||||
setup(
|
||||
name="cli-anything-ccswitch",
|
||||
version="1.0.0",
|
||||
description="CLI interface for CC Switch — manage AI coding tool configurations",
|
||||
author="cli-anything contributors",
|
||||
python_requires=">=3.10",
|
||||
packages=find_namespace_packages(include=["cli_anything.*"]),
|
||||
package_data={
|
||||
"cli_anything.ccswitch": ["skills/*.md"],
|
||||
},
|
||||
install_requires=[
|
||||
"click>=8.0",
|
||||
"prompt_toolkit>=3.0",
|
||||
"tomlkit>=0.12",
|
||||
],
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"cli-anything-ccswitch=cli_anything.ccswitch.ccswitch_cli:main",
|
||||
],
|
||||
},
|
||||
)
|
||||
@@ -5,6 +5,25 @@
|
||||
"updated": "2026-04-16"
|
||||
},
|
||||
"clis": [
|
||||
{
|
||||
"name": "cc-switch",
|
||||
"display_name": "CC Switch",
|
||||
"version": "1.0.0",
|
||||
"description": "Manage AI coding tool configurations - inspect providers, skills, MCP servers, usage stats, and proxy settings",
|
||||
"requires": "CC Switch installed with active database",
|
||||
"homepage": "https://github.com/HKUDS/CLI-Anything",
|
||||
"source_url": null,
|
||||
"install_cmd": "pip install git+https://github.com/HKUDS/CLI-Anything.git#subdirectory=cc-switch/agent-harness",
|
||||
"entry_point": "cli-anything-ccswitch",
|
||||
"skill_md": "skills/cli-anything-ccswitch/SKILL.md",
|
||||
"category": "devops",
|
||||
"contributors": [
|
||||
{
|
||||
"name": "computersniper",
|
||||
"url": "https://github.com/computersniper"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "wiremock",
|
||||
"display_name": "WireMock",
|
||||
@@ -1325,4 +1344,4 @@
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
146
skills/cli-anything-ccswitch/SKILL.md
Normal file
146
skills/cli-anything-ccswitch/SKILL.md
Normal file
@@ -0,0 +1,146 @@
|
||||
---
|
||||
name: "cli-anything-ccswitch"
|
||||
description: "CLI interface for CC Switch — manage AI coding tool configurations from the terminal"
|
||||
---
|
||||
|
||||
# CC Switch CLI
|
||||
|
||||
CLI harness for CC Switch, a desktop app that manages AI coding tool (Claude Code,
|
||||
Codex, Gemini CLI, OpenCode, OpenClaw, Hermes) configurations. Built with Click
|
||||
and the CLI-Anything methodology.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Python 3.10+
|
||||
- CC Switch installed with an active database at `~/.cc-switch/cc-switch.db`
|
||||
|
||||
## Installation
|
||||
|
||||
```bash
|
||||
pip install -e .
|
||||
```
|
||||
|
||||
After installation, the command `cli-anything-ccswitch` is available.
|
||||
|
||||
## Command Groups
|
||||
|
||||
| Group | Description |
|
||||
|-------|-------------|
|
||||
| `status` | Show a quick database overview |
|
||||
| `providers` | Manage AI provider configurations (list, get, set-current) |
|
||||
| `proxy` | Manage the local HTTP proxy server (status, config) |
|
||||
| `mcp` | Manage MCP (Model Context Protocol) servers (list, enable) |
|
||||
| `skills` | Manage installed skills (list, repos) |
|
||||
| `usage` | View API usage and cost statistics (stats, logs) |
|
||||
| `settings` | View and manage CC Switch settings (list, get, set) |
|
||||
| `sessions` | Browse and search AI conversation sessions (list) |
|
||||
|
||||
## Global Options
|
||||
|
||||
- `--json` — Output in machine-readable JSON format (recommended for agent use)
|
||||
- `--db PATH` — Override the database path
|
||||
|
||||
## Agent-Specific Guidance
|
||||
|
||||
### JSON Output
|
||||
|
||||
Always use `--json` for programmatic consumption. Place it **before** the
|
||||
subcommand:
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch --json providers list
|
||||
cli-anything-ccswitch --json usage stats --days 30
|
||||
cli-anything-ccswitch --json providers get <id> --app claude
|
||||
```
|
||||
|
||||
### Sensitive Values
|
||||
|
||||
API tokens, keys, and secrets are masked in all output. The `settings_config`
|
||||
field in provider details shows masked values (e.g., `sk-bc089...5cbb`).
|
||||
|
||||
### Exit Codes
|
||||
|
||||
- `0` — Success
|
||||
- `1` — Error (e.g., resource not found, invalid app type)
|
||||
|
||||
### App Types
|
||||
|
||||
Valid app types: `claude`, `codex`, `gemini`, `opencode`, `openclaw`, `hermes`.
|
||||
|
||||
## Examples
|
||||
|
||||
### List all providers
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch providers list
|
||||
cli-anything-ccswitch --json providers list --app claude
|
||||
```
|
||||
|
||||
### Switch active provider
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch providers set-current <provider-id> --app claude
|
||||
```
|
||||
|
||||
### Check proxy status
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch proxy status --app claude
|
||||
cli-anything-ccswitch proxy config --app claude --set-port 8080
|
||||
```
|
||||
|
||||
### View usage stats
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch usage stats --days 7
|
||||
cli-anything-ccswitch --json usage stats --days 30 --app claude
|
||||
cli-anything-ccswitch usage logs --limit 10
|
||||
```
|
||||
|
||||
### List skills
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch skills list
|
||||
cli-anything-ccswitch skills repos
|
||||
```
|
||||
|
||||
### MCP servers
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch mcp list
|
||||
cli-anything-ccswitch mcp enable <server-id> --app claude --on
|
||||
```
|
||||
|
||||
### Settings
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch settings list
|
||||
cli-anything-ccswitch settings get <key>
|
||||
cli-anything-ccswitch settings set <key> <value>
|
||||
```
|
||||
|
||||
### Sessions
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch sessions list --app claude --limit 10
|
||||
```
|
||||
|
||||
### Status overview
|
||||
|
||||
```bash
|
||||
cli-anything-ccswitch status
|
||||
cli-anything-ccswitch
|
||||
cli-anything-ccswitch --json
|
||||
```
|
||||
|
||||
## Error Handling
|
||||
|
||||
When a resource is not found, the CLI prints an error message to stderr and
|
||||
exits with code 1. Agents should check the exit code before parsing output.
|
||||
|
||||
## Database
|
||||
|
||||
The CLI reads from the live CC Switch SQLite database. All read operations
|
||||
are safe and do not modify the database. Write operations (`providers
|
||||
set-current`, `proxy config`, `mcp enable`, `settings set`) modify the
|
||||
database directly.
|
||||
Reference in New Issue
Block a user