Добавлена возможность устанавливать различные пути для нод

main
ilyukhin 2 months ago
parent dbeb762178
commit e2b20b0cd7

494
dsync

@ -1,15 +1,18 @@
#!/usr/bin/env python3
VERSION = "0.1.0"
VERSION = "0.2.0"
import re
import sys
import json
import subprocess
from pathlib import Path
import concurrent.futures
from typing import Any, Dict, List, Optional, Tuple
# ANSI color codes
class Colors:
"""ANSI color escape codes for simple CLI highlighting."""
RED = '\033[31m'
GREEN = '\033[32m'
ORANGE = '\033[33m'
@ -21,11 +24,33 @@ class Colors:
RESET = '\033[0m'
BOLD = '\033[1m'
def colored(text, color):
"""Apply color to text"""
def colored(text: str, color: str) -> str:
"""Wrap text with an ANSI color.
Args:
text: String to colorize.
color: One of the `Colors` constants.
Returns:
Colorized string with a trailing reset sequence.
"""
return f"{color}{text}{Colors.RESET}"
def find_file_up(filename: str, start_dir: Path = Path.cwd()) -> Path:
"""Search for a file by walking up parent directories.
Args:
filename: File to find (e.g., ".dsyncconfig").
start_dir: Directory to start the upward search from.
Returns:
Absolute path to the first matching file found.
Raises:
SystemExit: If the file cannot be found up to filesystem root.
"""
current_dir = start_dir
while current_dir != current_dir.parent:
candidate = current_dir / filename
@ -35,47 +60,131 @@ def find_file_up(filename: str, start_dir: Path = Path.cwd()) -> Path:
print(colored(f"Error: {filename} not found in this directory or any parent.", Colors.RED))
sys.exit(1)
def load_config():
def load_config() -> Tuple[Dict[str, Any], Path]:
"""Load configuration from `.dsyncconfig`.
Returns:
(config dict, project root path where the config lives).
"""
config_path = find_file_up(".dsyncconfig")
with open(config_path) as f:
return json.load(f), config_path.parent
cfg = json.load(f)
return cfg, config_path.parent
def save_config(config: Dict[str, Any], project_dir: Path) -> None:
"""Persist configuration back to `.dsyncconfig`.
Args:
config: Configuration object to save.
project_dir: Directory containing `.dsyncconfig`.
"""
(project_dir / ".dsyncconfig").write_text(json.dumps(config, indent=2))
def _normalize_nodes(config: Dict[str, Any]) -> List[Dict[str, Optional[str]]]:
"""Normalize nodes list to dict objects with optional remote_dir.
Backward-compatibility:
- Old configs might have `nodes` as a list of strings ["user@host", ...].
- New format is: [{"name": "user@host", "remote_dir": "/path" | None}, ...]
Args:
config: Loaded dsync configuration.
Returns:
List of node dicts with keys: "name", "remote_dir".
"""
norm: List[Dict[str, Optional[str]]] = []
for item in config.get("nodes", []):
if isinstance(item, str):
norm.append({"name": item, "remote_dir": None})
elif isinstance(item, dict) and "name" in item:
norm.append({"name": item["name"], "remote_dir": item.get("remote_dir")})
return norm
def _index_by_name(nodes: List[Dict[str, Optional[str]]], name: str) -> int:
"""Find node index by name.
Args:
nodes: Normalized node list.
name: Node identifier, e.g. "user@host".
Returns:
Index in list, or -1 if not found.
"""
for i, n in enumerate(nodes):
if n.get("name") == name:
return i
return -1
def check_node_status(node, project_dir, ignore_file, master_dir):
"""Check status for a single node"""
remote = f"{node}:{master_dir}"
# Accepts either "user@host" or "user@host:/absolute/dir"
_NODE_ARG_RE = re.compile(r"^(?P<name>[^:]+):(?P<dir>/.*)$")
def _parse_node_token(token: str) -> Tuple[str, Optional[str]]:
"""Parse a node token into (name, remote_dir).
Supported forms:
"user@host" -> ("user@host", None)
"user@host:/path/to/dir" -> ("user@host", "/path/to/dir")
Args:
token: CLI argument representing a node.
Returns:
Tuple of (node_name, remote_dir_or_None).
"""
m = _NODE_ARG_RE.match(token)
if m:
return m.group("name"), m.group("dir")
return token, None
def check_node_status(node_name: str, project_dir: Path, ignore_file: Path, remote_dir: str) -> Dict[str, Any]:
"""Compute rsync dry-run diff for a single node.
Args:
node_name: SSH target in form "user@host".
project_dir: Local project directory to sync from.
ignore_file: Path to rsync exclude file.
remote_dir: Remote directory for this node.
Returns:
Dict with fields: node, remote_dir, status ("ok"|"error"|"timeout"),
changes (list[str]), total_changes (int), error (optional).
"""
remote = f"{node_name}:{remote_dir}"
# --itemize-changes yields machine-parsable change indicators we can explain.
cmd = [
"rsync", "-avz", "--dry-run", "--itemize-changes", "--delete",
f"--exclude-from={ignore_file}",
"-e", "ssh -o StrictHostKeyChecking=no -o ConnectTimeout=5",
f"{project_dir}/", remote
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
# Parse rsync output
lines = result.stdout.strip().split('\n')
changes = []
lines = result.stdout.strip().split('\n') if result.stdout.strip() else []
changes: List[str] = []
# Filter only meaningful change lines
for line in lines:
if line.startswith('>f') or line.startswith('>d') or line.startswith('*deleting'):
if line.startswith(('>f', '>d', '*deleting', '<f', '<d', 'cd', 'cf')):
changes.append(line)
elif line.startswith('<f') or line.startswith('<d'):
changes.append(line)
elif line.startswith('cd') or line.startswith('cf'):
changes.append(line)
return {
'node': node,
'node': node_name,
'remote_dir': remote_dir,
'status': 'ok',
'changes': changes,
'total_changes': len(changes)
}
else:
return {
'node': node,
'node': node_name,
'remote_dir': remote_dir,
'status': 'error',
'error': result.stderr.strip(),
'changes': [],
@ -83,7 +192,8 @@ def check_node_status(node, project_dir, ignore_file, master_dir):
}
except subprocess.TimeoutExpired:
return {
'node': node,
'node': node_name,
'remote_dir': remote_dir,
'status': 'timeout',
'error': 'Connection timeout',
'changes': [],
@ -91,24 +201,26 @@ def check_node_status(node, project_dir, ignore_file, master_dir):
}
except Exception as e:
return {
'node': node,
'node': node_name,
'remote_dir': remote_dir,
'status': 'error',
'error': str(e),
'changes': [],
'total_changes': 0
}
def status():
"""Show sync status for all nodes"""
def status() -> None:
"""Show synchronization status for all nodes (including per-node directories)."""
config, project_dir = load_config()
ignore_file = project_dir / config.get("ignore_file", ".dsyncignore")
master_dir = config.get("master_dir", str(project_dir))
# Ensure ignore file exists
# Ensure exclude file exists (empty is fine)
if not ignore_file.exists():
ignore_file.write_text("# Add patterns to ignore during sync\n")
nodes = config.get("nodes", [])
nodes = _normalize_nodes(config)
if not nodes:
print(colored("No nodes configured", Colors.YELLOW))
return
@ -116,49 +228,45 @@ def status():
print(f"Checking status for {len(nodes)} node(s)...")
print("-" * 50)
# Check all nodes in parallel for speed
# Run rsync dry-runs in parallel for speed.
with concurrent.futures.ThreadPoolExecutor(max_workers=min(len(nodes), 10)) as executor:
futures = {
executor.submit(check_node_status, node, project_dir, ignore_file, master_dir): node
for node in nodes
executor.submit(
check_node_status,
n["name"], project_dir, ignore_file,
n["remote_dir"] or master_dir
): n["name"] for n in nodes
}
results = []
results: List[Dict[str, Any]] = []
for future in concurrent.futures.as_completed(futures):
result = future.result()
results.append(result)
results.append(future.result())
# Sort results by node name for consistent output
results.sort(key=lambda x: x['node'])
# Display results
total_changes = 0
for result in results:
node = result['node']
status = result['status']
changes = result['changes']
change_count = result['total_changes']
if status == 'ok':
if change_count == 0:
print(colored(f"[OK] {node}: Up to date", Colors.GREEN))
for r in results:
node = r['node']
rdir = r.get('remote_dir') or master_dir
st = r['status']
changes = r['changes']
cnt = r['total_changes']
header = f"{node} ({rdir})"
if st == 'ok':
if cnt == 0:
print(colored(f"[OK] {header}: Up to date", Colors.GREEN))
else:
print(colored(f"[!] {node}: {change_count} change(s) pending", Colors.YELLOW))
total_changes += change_count
# Show first few changes
for i, change in enumerate(changes[:5]):
action = parse_rsync_action(change)
print(f" {action}")
print(colored(f"[!] {header}: {cnt} change(s) pending", Colors.YELLOW))
total_changes += cnt
# Show a small preview of the change list
for ch in changes[:5]:
print(f" {parse_rsync_action(ch)}")
if len(changes) > 5:
print(f" ... and {len(changes) - 5} more change(s)")
else:
print(colored(f"[X] {node}: {result.get('error', 'Unknown error')}", Colors.RED))
print(colored(f"[X] {header}: {r.get('error', 'Unknown error')}", Colors.RED))
print()
# Summary
active_nodes = sum(1 for r in results if r['status'] == 'ok')
error_nodes = sum(1 for r in results if r['status'] != 'ok')
@ -171,8 +279,16 @@ def status():
if error_nodes == 0:
print(colored("All nodes are up to date!", Colors.GREEN))
def parse_rsync_action(line):
"""Parse rsync itemize-changes output into human-readable format"""
def parse_rsync_action(line: str) -> str:
"""Translate `rsync --itemize-changes` markers into readable messages.
Args:
line: Single rsync itemize line.
Returns:
Human-readable description of the change.
"""
if line.startswith('*deleting'):
filename = line.split(' ', 1)[1].strip()
return f"DELETE: {filename}"
@ -180,19 +296,18 @@ def parse_rsync_action(line):
if len(line) < 11:
return line.strip()
# Parse rsync itemize format: YXcstpoguax filename
# rsync itemize format: YXcstpoguax filename
changes = line[0:11]
filename = line[11:].strip()
if not filename:
return line.strip()
# Check file type and direction
# Direction
file_type = changes[0]
if file_type == '<':
direction = "SEND"
direction = "SEND" # local -> remote
elif file_type == '>':
direction = "RECV"
direction = "RECV" # remote -> local (rare in our usage)
elif file_type == 'c':
direction = "CREATE"
elif file_type == 'h':
@ -202,7 +317,7 @@ def parse_rsync_action(line):
else:
direction = "CHANGE"
# Parse item type
# Item kind
change_type = changes[1]
if change_type == 'f':
item_type = "file"
@ -217,15 +332,14 @@ def parse_rsync_action(line):
else:
item_type = "item"
# Parse specific changes
# Specific attributes
change_details = changes[2:11]
details = []
details: List[str] = []
if change_details[0] == '+':
return f"NEW {item_type.upper()}: {filename}"
elif change_details[0] == 'c':
details.append("content")
if change_details[1] == 's':
details.append("size")
if change_details[2] == 't':
@ -243,27 +357,34 @@ def parse_rsync_action(line):
if change_details[8] == 'x':
details.append("extended attributes")
# Build result with direction and item type
if details:
detail_str = ", ".join(details)
return f"{direction} {item_type.upper()}: {filename} ({detail_str})"
else:
return f"{direction} {item_type.upper()}: {filename}"
return f"{direction} {item_type.upper()}: {filename}" if not details \
else f"{direction} {item_type.upper()}: {filename} ({', '.join(details)})"
def sync():
def sync() -> None:
"""Synchronize local project directory to all nodes, honoring per-node dirs.
Uses rsync with `--delete` so remote deletions track local deletions.
"""
config, project_dir = load_config()
ignore_file = project_dir / config.get("ignore_file", ".dsyncignore")
master_dir = config.get("master_dir", str(project_dir))
# Ensure ignore file exists
if not ignore_file.exists():
ignore_file.write_text("# Add patterns to ignore during sync\n")
nodes = _normalize_nodes(config)
if not nodes:
print(colored("No nodes configured", Colors.YELLOW))
return
success_count = 0
failed_nodes = []
failed_nodes: List[str] = []
for node in config["nodes"]:
remote = f"{node}:{master_dir}"
for n in nodes:
node = n["name"]
rdir = n["remote_dir"] or master_dir
remote = f"{node}:{rdir}"
print(f"Syncing to {remote}...")
cmd = [
@ -272,25 +393,29 @@ def sync():
"-e", "ssh -o StrictHostKeyChecking=no -o ConnectTimeout=10",
f"{project_dir}/", remote
]
try:
subprocess.run(cmd, check=True, capture_output=True, text=True)
print(colored(f"[OK] Successfully synced to {node}", Colors.GREEN))
print(colored(f"[OK] Successfully synced to {node} ({rdir})", Colors.GREEN))
success_count += 1
except subprocess.CalledProcessError as e:
print(colored(f"[X] Failed to sync to {node}: {e}", Colors.RED))
failed_nodes.append(node)
print(colored(f"[X] Failed to sync to {node} ({rdir}): {e}", Colors.RED))
failed_nodes.append(f"{node} ({rdir})")
print()
if success_count == len(config['nodes']):
print(colored(f"Sync completed: {success_count}/{len(config['nodes'])} nodes successful", Colors.GREEN))
if success_count == len(nodes):
print(colored(f"Sync completed: {success_count}/{len(nodes)} nodes successful", Colors.GREEN))
else:
print(colored(f"Sync completed: {success_count}/{len(config['nodes'])} nodes successful", Colors.YELLOW))
print(colored(f"Sync completed: {success_count}/{len(nodes)} nodes successful", Colors.YELLOW))
if failed_nodes:
print(colored(f"Failed nodes: {', '.join(failed_nodes)}", Colors.RED))
def init(master_dir: Path = Path.cwd()):
def init(master_dir: Path = Path.cwd()) -> None:
"""Initialize dsync in the given directory (creates config & ignore).
Args:
master_dir: Directory to treat as project root (default: cwd).
"""
config_file = master_dir / ".dsyncconfig"
ignore_file = master_dir / ".dsyncignore"
@ -300,12 +425,12 @@ def init(master_dir: Path = Path.cwd()):
config = {
"master_dir": str(master_dir),
"nodes": [],
"nodes": [], # list of {"name": "...", "remote_dir": "/path"|None}
"ignore_file": ".dsyncignore"
}
config_file.write_text(json.dumps(config, indent=2))
# Seed a reasonable ignore file for common Python/JS build clutter.
if not ignore_file.exists():
ignore_content = """# dsync ignore patterns
# Add patterns to ignore during sync
@ -352,92 +477,171 @@ node_modules/
print(colored(f"Initialized dsync config in {master_dir}", Colors.GREEN))
def add_nodes(*nodes):
if not nodes:
def add_nodes(*args: str) -> None:
"""Add one or multiple nodes.
Supported forms:
dsync add-node user@host
dsync add-node user@host:/remote/dir
dsync add-node user@host /remote/dir # alt form for a single node
dsync add-node user@h1 user@h2:/dir user@h3 ...
Args:
*args: Node tokens, see forms above.
"""
if not args:
print(colored("No nodes provided", Colors.YELLOW))
return
config, project_dir = load_config()
added_nodes = []
nodes = _normalize_nodes(config)
master_dir = config.get("master_dir", str(project_dir))
for node in nodes:
if node not in config["nodes"]:
config["nodes"].append(node)
added_nodes.append(node)
# Special alt-form: exactly two args "user@host /remote/dir"
if len(args) == 2 and args[1].startswith(("/", ".", "~")):
name, rdir = args[0], args[1]
idx = _index_by_name(nodes, name)
if idx >= 0:
print(colored(f"Node {name} already exists", Colors.YELLOW))
else:
print(colored(f"Node {node} already exists", Colors.YELLOW))
nodes.append({"name": name, "remote_dir": rdir})
print(colored(f"Added node: {name} ({rdir})", Colors.GREEN))
config["nodes"] = nodes
save_config(config, project_dir)
return
if added_nodes:
(project_dir / ".dsyncconfig").write_text(json.dumps(config, indent=2))
print(colored(f"Added nodes: {', '.join(added_nodes)}", Colors.GREEN))
added_any = False
for tok in args:
name, rdir = _parse_node_token(tok)
idx = _index_by_name(nodes, name)
if idx >= 0:
print(colored(f"Node {name} already exists", Colors.YELLOW))
continue
nodes.append({"name": name, "remote_dir": rdir})
print(colored(f"Added node: {name} ({rdir or master_dir})", Colors.GREEN))
added_any = True
if added_any:
config["nodes"] = nodes
save_config(config, project_dir)
else:
print(colored("No new nodes added", Colors.YELLOW))
def del_nodes(*nodes):
if not nodes:
def del_nodes(*nodes_to_remove: str) -> None:
"""Remove one or multiple nodes by name.
Args:
*nodes_to_remove: Node names to delete (e.g., "user@host").
"""
if not nodes_to_remove:
print(colored("No nodes provided", Colors.YELLOW))
return
config, project_dir = load_config()
removed_nodes = []
nodes = _normalize_nodes(config)
for node in nodes:
if node in config["nodes"]:
config["nodes"].remove(node)
removed_nodes.append(node)
removed: List[str] = []
left: List[Dict[str, Optional[str]]] = []
to_remove = set(nodes_to_remove)
for n in nodes:
if n["name"] in to_remove:
removed.append(n["name"])
else:
print(colored(f"Node {node} not found", Colors.YELLOW))
left.append(n)
if removed_nodes:
(project_dir / ".dsyncconfig").write_text(json.dumps(config, indent=2))
print(colored(f"Removed nodes: {', '.join(removed_nodes)}", Colors.GREEN))
if removed:
config["nodes"] = left
save_config(config, project_dir)
print(colored(f"Removed nodes: {', '.join(removed)}", Colors.GREEN))
else:
print(colored("No nodes removed", Colors.YELLOW))
def list_nodes():
config, _ = load_config()
nodes = config.get("nodes", [])
def set_node_dir(node_name: str, remote_dir: str) -> None:
"""Set or change the remote project directory for a node.
Args:
node_name: Node name "user@host".
remote_dir: Absolute path on the remote host.
"""
config, project_dir = load_config()
nodes = _normalize_nodes(config)
idx = _index_by_name(nodes, node_name)
if idx < 0:
print(colored(f"Node {node_name} not found", Colors.RED))
return
nodes[idx]["remote_dir"] = remote_dir
config["nodes"] = nodes
save_config(config, project_dir)
print(colored(f"Updated node {node_name} dir -> {remote_dir}", Colors.GREEN))
def clear_node_dir(node_name: str) -> None:
"""Clear per-node remote directory (fallback to master_dir on use).
Args:
node_name: Node name "user@host".
"""
config, project_dir = load_config()
nodes = _normalize_nodes(config)
idx = _index_by_name(nodes, node_name)
if idx < 0:
print(colored(f"Node {node_name} not found", Colors.RED))
return
nodes[idx]["remote_dir"] = None
config["nodes"] = nodes
save_config(config, project_dir)
print(colored(f"Cleared node {node_name} dir (will use master_dir)", Colors.GREEN))
def list_nodes() -> None:
"""List configured nodes with their effective remote directories."""
config, project_dir = load_config()
master_dir = config.get("master_dir", str(project_dir))
nodes = _normalize_nodes(config)
if nodes:
print(colored("Configured nodes:", Colors.CYAN))
for i, node in enumerate(nodes, 1):
print(f" {i}. {node}")
for i, n in enumerate(sorted(nodes, key=lambda x: x["name"]), 1):
rdir = n["remote_dir"] or master_dir
print(f" {i}. {n['name']} -> {rdir}")
else:
print(colored("No nodes configured", Colors.YELLOW))
def show_help():
def show_help() -> None:
"""Print CLI usage."""
help_text = """
dsync - Directory Synchronization Tool
Usage:
dsync init Initialize dsync in current directory
dsync add-node <node1> [node2...] Add one or more nodes
dsync del-node <node1> [node2...] Remove one or more nodes
dsync list-nodes List all configured nodes
dsync status Show sync status for all nodes
dsync init
dsync add-node <node1> [node2 ...] Add nodes. Each node may be 'user@host' or 'user@host:/remote/dir'
dsync add-node <node> <remote_dir> Add single node with explicit dir (alt form)
dsync del-node <node1> [node2 ...] Remove nodes
dsync set-node-dir <node> <remote_dir> Set/Change remote dir for node
dsync clear-node-dir <node> Clear node dir (fallback to master_dir)
dsync list-nodes List nodes with their remote dirs
dsync status Show sync status for all nodes (with dirs)
dsync sync Sync to all configured nodes
dsync help Show this help message
dsync version Print dsync version
Examples:
dsync init
dsync add-node user@server1.com user@server2.com
dsync del-node user@server1.com
dsync status
dsync sync
Notes:
- Status command shows pending changes without syncing
- Sync uses rsync with --delete flag to handle file deletions
- SSH keys should be configured for passwordless access
- Edit .dsyncignore to exclude files from sync
- If a node has no specific remote_dir, master_dir is used.
- 'add-node user@host:/path' works for multiple nodes in one command.
- SSH keys should be configured for passwordless access.
- Edit .dsyncignore to exclude files from sync.
"""
print(help_text)
def show_version():
def show_version() -> None:
"""Print dsync version."""
print(f"dsync {VERSION}")
if __name__ == "__main__":
if len(sys.argv) < 2:
show_help()
@ -449,14 +653,24 @@ if __name__ == "__main__":
init()
elif cmd in ("-an", "--add-node", "add-node"):
if len(sys.argv) < 3:
print(colored("Usage: dsync.py add-node <node1> [node2...]", Colors.RED))
print(colored("Usage: dsync add-node <node> [remote_dir] | add-node <node1> [node2...]", Colors.RED))
sys.exit(1)
add_nodes(*sys.argv[2:])
elif cmd in ("-dn", "--del-node", "del-node"):
if len(sys.argv) < 3:
print(colored("Usage: dsync.py del-node <node1> [node2...]", Colors.RED))
print(colored("Usage: dsync del-node <node1> [node2...]", Colors.RED))
sys.exit(1)
del_nodes(*sys.argv[2:])
elif cmd in ("-sd", "--set-node-dir", "--set-dir", "set-node-dir", "set-dir"):
if len(sys.argv) != 4:
print(colored("Usage: dsync set-node-dir <node> <remote_dir>", Colors.RED))
sys.exit(1)
set_node_dir(sys.argv[2], sys.argv[3])
elif cmd in ("-cn", "--clear-node-dir", "--clear-dir", "clear-node-dir", "clear-dir"):
if len(sys.argv) != 3:
print(colored("Usage: dsync clear-node-dir <node>", Colors.RED))
sys.exit(1)
clear_node_dir(sys.argv[2])
elif cmd in ("-ln", "--list-nodes", "list-nodes"):
list_nodes()
elif cmd == "status":

Loading…
Cancel
Save