feat(cli): added much more robust bundle handling

This commit is contained in:
2025-08-06 08:25:03 -04:00
parent 67e611ed9c
commit 1aadd01732
34 changed files with 2788 additions and 1201 deletions

View File

View File

@@ -0,0 +1,69 @@
# fourdst/cli/bundle/clear.py
import typer
import yaml
import zipfile
from pathlib import Path
import tempfile
import shutil
def bundle_clear(
bundle_path: Path = typer.Argument(..., help="The path to the .fbundle file to clear.", exists=True, readable=True, writable=True)
):
"""
Removes all compiled binaries from a bundle, leaving only the source distributions.
"""
typer.echo(f"--- Clearing binaries from bundle: {bundle_path.name} ---")
try:
with tempfile.TemporaryDirectory() as temp_dir_str:
temp_dir = Path(temp_dir_str)
# 1. Unpack the bundle
with zipfile.ZipFile(bundle_path, 'r') as bundle_zip:
bundle_zip.extractall(temp_dir)
# 2. Read the manifest
manifest_path = temp_dir / "manifest.yaml"
if not manifest_path.is_file():
typer.secho("Error: Bundle is invalid. Missing manifest.yaml.", fg=typer.colors.RED)
raise typer.Exit(code=1)
with open(manifest_path, 'r') as f:
manifest = yaml.safe_load(f)
# 3. Clear binaries and signatures
typer.echo("Clearing binaries and signature information...")
manifest.pop('bundleAuthorKeyFingerprint', None)
manifest.pop('checksums', None)
for plugin_name, plugin_data in manifest.get('bundlePlugins', {}).items():
if 'binaries' in plugin_data:
plugin_data['binaries'] = []
# 4. Delete the binaries directory and signature file
bin_dir = temp_dir / "bin"
if bin_dir.is_dir():
shutil.rmtree(bin_dir)
typer.echo(" - Removed 'bin/' directory.")
sig_file = temp_dir / "manifest.sig"
if sig_file.is_file():
sig_file.unlink()
typer.echo(" - Removed 'manifest.sig'.")
# 5. Write the updated manifest
with open(manifest_path, 'w') as f:
yaml.dump(manifest, f, sort_keys=False)
# 6. Repack the bundle
typer.echo("Repacking the bundle...")
with zipfile.ZipFile(bundle_path, 'w', zipfile.ZIP_DEFLATED) as bundle_zip:
for file_path in temp_dir.rglob('*'):
if file_path.is_file():
bundle_zip.write(file_path, file_path.relative_to(temp_dir))
typer.secho(f"\n✅ Bundle '{bundle_path.name}' has been cleared of all binaries.", fg=typer.colors.GREEN)
except Exception as e:
typer.secho(f"An unexpected error occurred: {e}", fg=typer.colors.RED)
raise typer.Exit(code=1)

View File

@@ -0,0 +1,132 @@
# fourdst/cli/bundle/create.py
import typer
import os
import sys
import shutil
import datetime
import yaml
import zipfile
from pathlib import Path
from fourdst.cli.common.utils import get_platform_identifier, run_command
bundle_app = typer.Typer()
@bundle_app.command("create")
def bundle_create(
plugin_dirs: list[Path] = typer.Argument(..., help="A list of plugin project directories to include.", exists=True, file_okay=False),
output_bundle: Path = typer.Option("bundle.fbundle", "--out", "-o", help="The path for the output bundle file."),
bundle_name: str = typer.Option("MyPluginBundle", "--name", help="The name of the bundle."),
bundle_version: str = typer.Option("0.1.0", "--ver", help="The version of the bundle."),
bundle_author: str = typer.Option("Unknown", "--author", help="The author of the bundle.")
):
"""
Builds and packages one or more plugin projects into a single .fbundle file.
"""
staging_dir = Path("temp_bundle_staging")
if staging_dir.exists():
shutil.rmtree(staging_dir)
staging_dir.mkdir()
# Get the host platform identifier, triggering detection if needed.
host_platform = get_platform_identifier()
manifest = {
"bundleName": bundle_name,
"bundleVersion": bundle_version,
"bundleAuthor": bundle_author,
"bundleComment": "Created with fourdst-cli",
"bundledOn": datetime.datetime.now().isoformat(),
"bundlePlugins": {}
}
print("Creating bundle...")
for plugin_dir in plugin_dirs:
plugin_name = plugin_dir.name
print(f"--> Processing plugin: {plugin_name}")
# 1. Build the plugin
print(f" - Compiling for host platform...")
build_dir = plugin_dir / "builddir"
if not build_dir.exists():
run_command(["meson", "setup", "builddir"], cwd=plugin_dir)
run_command(["meson", "compile", "-C", "builddir"], cwd=plugin_dir)
# 2. Find the compiled artifact
compiled_lib = next(build_dir.glob("lib*.so"), None) or next(build_dir.glob("lib*.dylib"), None)
if not compiled_lib:
print(f"Error: Could not find compiled library for {plugin_name} (expected lib*.so or lib*.dylib)", file=sys.stderr)
raise typer.Exit(code=1)
# 3. Package source code (sdist), respecting .gitignore
print(" - Packaging source code (respecting .gitignore)...")
sdist_path = staging_dir / f"{plugin_name}_src.zip"
# Use git to list files, which automatically respects .gitignore
git_check = run_command(["git", "rev-parse", "--is-inside-work-tree"], cwd=plugin_dir, check=False)
files_to_include = []
if git_check.returncode == 0:
# This is a git repo, use git to list files
result = run_command(["git", "ls-files", "--cached", "--others", "--exclude-standard"], cwd=plugin_dir)
files_to_include = [plugin_dir / f for f in result.stdout.strip().split('\n') if f]
else:
# Not a git repo, fall back to os.walk and warn the user
typer.secho(f" - Warning: '{plugin_dir.name}' is not a git repository. Packaging all files.", fg=typer.colors.YELLOW)
for root, _, files in os.walk(plugin_dir):
if 'builddir' in root:
continue
for file in files:
files_to_include.append(Path(root) / file)
with zipfile.ZipFile(sdist_path, 'w', zipfile.ZIP_DEFLATED) as sdist_zip:
for file_path in files_to_include:
if file_path.is_file():
sdist_zip.write(file_path, file_path.relative_to(plugin_dir))
# 4. Stage artifacts with ABI-tagged filenames and update manifest
binaries_dir = staging_dir / "bin"
binaries_dir.mkdir(exist_ok=True)
# Construct new filename with arch, os, and ABI tag
base_name = compiled_lib.stem # e.g., "libplugin_a"
ext = compiled_lib.suffix # e.g., ".so"
triplet = host_platform["triplet"]
abi_signature = host_platform["abi_signature"]
tagged_filename = f"{base_name}.{triplet}.{abi_signature}{ext}"
staged_lib_path = binaries_dir / tagged_filename
print(f" - Staging binary as: {tagged_filename}")
shutil.copy(compiled_lib, staged_lib_path)
manifest["bundlePlugins"][plugin_name] = {
"sdist": {
"path": sdist_path.name,
"sdistBundledOn": datetime.datetime.now().isoformat(),
"buildable": True
},
"binaries": [{
"platform": {
"triplet": host_platform["triplet"],
"abi_signature": host_platform["abi_signature"]
},
"path": staged_lib_path.relative_to(staging_dir).as_posix(),
"compiledOn": datetime.datetime.now().isoformat()
}]
}
# 5. Write manifest and package final bundle
manifest_path = staging_dir / "manifest.yaml"
with open(manifest_path, 'w') as f:
yaml.dump(manifest, f, sort_keys=False)
print(f"\nPackaging final bundle: {output_bundle}")
with zipfile.ZipFile(output_bundle, 'w', zipfile.ZIP_DEFLATED) as bundle_zip:
for root, _, files in os.walk(staging_dir):
for file in files:
file_path = Path(root) / file
bundle_zip.write(file_path, file_path.relative_to(staging_dir))
shutil.rmtree(staging_dir)
print("\n✅ Bundle created successfully!")

121
fourdst/cli/bundle/diff.py Normal file
View File

@@ -0,0 +1,121 @@
# fourdst/cli/bundle/diff.py
import typer
import yaml
import zipfile
from pathlib import Path
import tempfile
import shutil
import difflib
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from rich.table import Table
console = Console()
def _get_file_content(directory: Path, filename: str):
file_path = directory / filename
if not file_path.exists():
return None
return file_path.read_bytes()
def bundle_diff(
bundle_a_path: Path = typer.Argument(..., help="The first bundle to compare.", exists=True, readable=True),
bundle_b_path: Path = typer.Argument(..., help="The second bundle to compare.", exists=True, readable=True),
):
"""
Compares two bundle files, showing differences in their manifests, signatures, and contents.
"""
console.print(Panel(f"Comparing [bold blue]{bundle_a_path.name}[/bold blue] with [bold blue]{bundle_b_path.name}[/bold blue]"))
with tempfile.TemporaryDirectory() as temp_a_str, tempfile.TemporaryDirectory() as temp_b_str:
temp_a = Path(temp_a_str)
temp_b = Path(temp_b_str)
# Unpack both bundles
with zipfile.ZipFile(bundle_a_path, 'r') as z: z.extractall(temp_a)
with zipfile.ZipFile(bundle_b_path, 'r') as z: z.extractall(temp_b)
# --- 1. Compare Signatures ---
sig_a = _get_file_content(temp_a, "manifest.sig")
sig_b = _get_file_content(temp_b, "manifest.sig")
sig_panel_style = "green"
sig_status = ""
if sig_a == sig_b and sig_a is not None:
sig_status = "[green]UNCHANGED[/green]"
elif sig_a and not sig_b:
sig_status = "[yellow]REMOVED[/yellow]"
sig_panel_style = "yellow"
elif not sig_a and sig_b:
sig_status = "[yellow]ADDED[/yellow]"
sig_panel_style = "yellow"
elif sig_a and sig_b and sig_a != sig_b:
sig_status = "[bold red]CHANGED[/bold red]"
sig_panel_style = "red"
else:
sig_status = "[dim]Both Unsigned[/dim]"
sig_panel_style = "dim"
console.print(Panel(f"Signature Status: {sig_status}", title="[bold]Signature Verification[/bold]", border_style=sig_panel_style, expand=False))
# --- 2. Compare Manifests ---
manifest_a_content = (temp_a / "manifest.yaml").read_text()
manifest_b_content = (temp_b / "manifest.yaml").read_text()
if manifest_a_content != manifest_b_content:
diff = difflib.unified_diff(
manifest_a_content.splitlines(keepends=True),
manifest_b_content.splitlines(keepends=True),
fromfile=f"{bundle_a_path.name}/manifest.yaml",
tofile=f"{bundle_b_path.name}/manifest.yaml",
)
diff_text = Text()
for line in diff:
if line.startswith('+'):
diff_text.append(line, style="green")
elif line.startswith('-'):
diff_text.append(line, style="red")
elif line.startswith('^'):
diff_text.append(line, style="blue")
else:
diff_text.append(line)
console.print(Panel(diff_text, title="[bold]Manifest Differences[/bold]", border_style="yellow"))
else:
console.print(Panel("[green]Manifests are identical.[/green]", title="[bold]Manifest[/bold]", border_style="green"))
# --- 3. Compare File Contents (via checksums) ---
manifest_a = yaml.safe_load(manifest_a_content)
manifest_b = yaml.safe_load(manifest_b_content)
files_a = {p['path']: p.get('checksum') for p in manifest_a.get('bundlePlugins', {}).get(next(iter(manifest_a.get('bundlePlugins', {})), ''), {}).get('binaries', [])}
files_b = {p['path']: p.get('checksum') for p in manifest_b.get('bundlePlugins', {}).get(next(iter(manifest_b.get('bundlePlugins', {})), ''), {}).get('binaries', [])}
table = Table(title="File Content Comparison")
table.add_column("File Path", style="cyan")
table.add_column("Status", style="magenta")
table.add_column("Details", style="yellow")
all_files = sorted(list(set(files_a.keys()) | set(files_b.keys())))
has_content_changes = False
for file in all_files:
in_a = file in files_a
in_b = file in files_b
if in_a and not in_b:
table.add_row(file, "[red]REMOVED[/red]", "")
has_content_changes = True
elif not in_a and in_b:
table.add_row(file, "[green]ADDED[/green]", "")
has_content_changes = True
elif files_a[file] != files_b[file]:
table.add_row(file, "[yellow]MODIFIED[/yellow]", f"Checksum changed from {files_a.get(file, 'N/A')} to {files_b.get(file, 'N/A')}")
has_content_changes = True
if has_content_changes:
console.print(table)
else:
console.print(Panel("[green]All file contents are identical.[/green]", title="[bold]File Contents[/bold]", border_style="green"))

171
fourdst/cli/bundle/fill.py Normal file
View File

@@ -0,0 +1,171 @@
# fourdst/cli/bundle/fill.py
import typer
import shutil
import datetime
import yaml
import zipfile
from pathlib import Path
import questionary
import subprocess
import sys
import traceback
try:
import docker
except ImportError:
docker = None # Docker is an optional dependency for the 'fill' command
from rich.console import Console
from rich.panel import Panel
console = Console()
from fourdst.cli.common.utils import get_available_build_targets, _build_plugin_in_docker, _build_plugin_for_target
bundle_app = typer.Typer()
@bundle_app.command("fill")
def bundle_fill(bundle_path: Path = typer.Argument(..., help="The .fbundle file to fill with new binaries.", exists=True)):
"""
Builds new binaries for the current host or cross-targets from the bundle's source.
"""
staging_dir = Path(f"temp_fill_{bundle_path.stem}")
if staging_dir.exists():
shutil.rmtree(staging_dir)
try:
# 1. Unpack and load manifest
with zipfile.ZipFile(bundle_path, 'r') as bundle_zip:
bundle_zip.extractall(staging_dir)
manifest_path = staging_dir / "manifest.yaml"
if not manifest_path.exists():
typer.secho("Error: Bundle is invalid. Missing manifest.yaml.", fg=typer.colors.RED)
raise typer.Exit(code=1)
with open(manifest_path, 'r') as f:
manifest = yaml.safe_load(f)
# 2. Find available targets and missing binaries
available_targets = get_available_build_targets()
build_options = []
for plugin_name, plugin_data in manifest.get('bundlePlugins', {}).items():
if "sdist" not in plugin_data:
continue # Cannot build without source
existing_abis = {b['platform']['abi_signature'] for b in plugin_data.get('binaries', [])}
for target in available_targets:
# Use a more descriptive name for the choice
if target.get('docker_image', None):
display_name = f"Docker: {target['docker_image']}"
elif target.get('cross_file', None):
display_name = f"Cross: {Path(target['cross_file']).name}"
else:
display_name = f"Native: {target['abi_signature']} (Local System)"
if target['abi_signature'] not in existing_abis:
build_options.append({
"name": f"Build '{plugin_name}' for {display_name}",
"plugin_name": plugin_name,
"target": target
})
if not build_options:
typer.secho("✅ Bundle is already full for all available build targets.", fg=typer.colors.GREEN)
raise typer.Exit()
# 3. Prompt user to select which targets to build
choices = [opt['name'] for opt in build_options]
selected_builds = questionary.checkbox(
"Select which missing binaries to build:",
choices=choices
).ask()
if not selected_builds:
typer.echo("No binaries selected to build. Exiting.")
raise typer.Exit()
# 4. Build selected targets
for build_name in selected_builds:
build_job = next(opt for opt in build_options if opt['name'] == build_name)
plugin_name = build_job['plugin_name']
target = build_job['target']
typer.secho(f"\nBuilding {plugin_name} for target '{build_name}'...", bold=True)
sdist_zip_path = staging_dir / manifest['bundlePlugins'][plugin_name]['sdist']['path']
build_temp_dir = staging_dir / f"build_{plugin_name}"
try:
if target['docker_image']:
if not docker:
typer.secho("Error: Docker is not installed. Please install Docker to build this target.", fg=typer.colors.RED)
continue
compiled_lib, final_target = _build_plugin_in_docker(sdist_zip_path, build_temp_dir, target, plugin_name)
else:
compiled_lib, final_target = _build_plugin_for_target(sdist_zip_path, build_temp_dir, target)
# Add new binary to bundle
abi_tag = final_target["abi_signature"]
base_name = compiled_lib.stem
ext = compiled_lib.suffix
triplet = final_target["triplet"]
tagged_filename = f"{base_name}.{triplet}.{abi_tag}{ext}"
binaries_dir = staging_dir / "bin"
binaries_dir.mkdir(exist_ok=True)
staged_lib_path = binaries_dir / tagged_filename
shutil.move(compiled_lib, staged_lib_path)
# Update manifest
new_binary_entry = {
"platform": {
"triplet": final_target["triplet"],
"abi_signature": abi_tag,
"arch": final_target["arch"]
},
"path": staged_lib_path.relative_to(staging_dir).as_posix(),
"compiledOn": datetime.datetime.now().isoformat()
}
manifest['bundlePlugins'][plugin_name]['binaries'].append(new_binary_entry)
typer.secho(f" -> Successfully built and staged {tagged_filename}", fg=typer.colors.GREEN)
except (FileNotFoundError, subprocess.CalledProcessError) as e:
typer.secho(f" -> Failed to build {plugin_name} for target '{build_name}': {e}", fg=typer.colors.RED)
tb_str = traceback.format_exc()
console.print(Panel(
tb_str,
title="Traceback",
border_style="yellow",
expand=False
))
finally:
if build_temp_dir.exists():
shutil.rmtree(build_temp_dir)
# 5. Repackage the bundle
# Invalidate any old signature
if "bundleAuthorKeyFingerprint" in manifest:
del manifest["bundleAuthorKeyFingerprint"]
if (staging_dir / "manifest.sig").exists():
(staging_dir / "manifest.sig").unlink()
typer.secho("\n⚠️ Bundle signature has been invalidated by this operation. Please re-sign the bundle.", fg=typer.colors.YELLOW)
with open(manifest_path, 'w') as f:
yaml.dump(manifest, f, sort_keys=False)
with zipfile.ZipFile(bundle_path, 'w', zipfile.ZIP_DEFLATED) as bundle_zip:
for file_path in staging_dir.rglob('*'):
if file_path.is_file():
bundle_zip.write(file_path, file_path.relative_to(staging_dir))
typer.secho(f"\n✅ Bundle '{bundle_path.name}' has been filled successfully.", fg=typer.colors.GREEN)
finally:
if staging_dir.exists():
shutil.rmtree(staging_dir)

View File

@@ -0,0 +1,174 @@
# fourdst/cli/bundle/inspect.py
import typer
import sys
import shutil
import yaml
import zipfile
import hashlib
from pathlib import Path
try:
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import padding, rsa, ed25519
from cryptography.exceptions import InvalidSignature
except ImportError:
print("Error: This CLI now requires 'cryptography'. Please install it.", file=sys.stderr)
print("Run: pip install cryptography", file=sys.stderr)
sys.exit(1)
from fourdst.cli.common.config import LOCAL_TRUST_STORE_PATH
from fourdst.cli.common.utils import calculate_sha256, get_platform_identifier
bundle_app = typer.Typer()
@bundle_app.command("inspect")
def bundle_inspect(bundle_path: Path = typer.Argument(..., help="The .fbundle file to inspect.", exists=True)):
"""
Inspects a bundle, validating its contents and cryptographic signature.
"""
staging_dir = Path(f"temp_inspect_{bundle_path.stem}")
if staging_dir.exists():
shutil.rmtree(staging_dir)
try:
# Get current system info first
host_platform = get_platform_identifier()
# 1. Unpack and load manifest
with zipfile.ZipFile(bundle_path, 'r') as bundle_zip:
archive_files = set(bundle_zip.namelist())
bundle_zip.extractall(staging_dir)
manifest_path = staging_dir / "manifest.yaml"
if not manifest_path.exists():
typer.secho("Error: Bundle is invalid. Missing manifest.yaml.", fg=typer.colors.RED)
raise typer.Exit(code=1)
with open(manifest_path, 'r') as f:
manifest = yaml.safe_load(f)
# 2. Print Header
typer.secho(f"--- Bundle Inspection Report for: {bundle_path.name} ---", bold=True)
typer.echo(f"Name: {manifest.get('bundleName', 'N/A')}")
typer.echo(f"Version: {manifest.get('bundleVersion', 'N/A')}")
typer.echo(f"Author: {manifest.get('bundleAuthor', 'N/A')}")
typer.echo(f"Bundled: {manifest.get('bundledOn', 'N/A')}")
typer.secho(f"Host ABI: {host_platform['abi_signature']}", dim=True)
typer.secho(f"Host Arch: {host_platform['triplet']}", dim=True)
typer.echo("-" * 50)
# 3. Signature and Trust Verification
fingerprint = manifest.get('bundleAuthorKeyFingerprint')
sig_path = staging_dir / "manifest.sig"
if not fingerprint or not sig_path.exists():
typer.secho("Trust Status: 🟡 UNSIGNED", fg=typer.colors.YELLOW)
else:
# Find the key in the local trust store
trusted_key_path = None
if LOCAL_TRUST_STORE_PATH.exists():
for key_file in LOCAL_TRUST_STORE_PATH.rglob("*.pub"):
pub_key = serialization.load_ssh_public_key(key_file.read_bytes())
pub_key_bytes = pub_key.public_bytes(
encoding = serialization.Encoding.OpenSSH,
format = serialization.PublicFormat.OpenSSH
)
pub_key_fingerprint = "sha256:" + hashlib.sha256(pub_key_bytes).hexdigest()
if pub_key_fingerprint == fingerprint:
trusted_key_path = key_file
break
if not trusted_key_path:
typer.secho(f"Trust Status: ⚠️ SIGNED but UNTRUSTED AUTHOR ({fingerprint})", fg=typer.colors.YELLOW)
else:
try:
pub_key_obj = serialization.load_ssh_public_key(trusted_key_path.read_bytes())
signature = sig_path.read_bytes()
manifest_content = manifest_path.read_bytes()
if isinstance(pub_key_obj, ed25519.Ed25519PublicKey):
pub_key_obj.verify(signature, manifest_content)
elif isinstance(pub_key_obj, rsa.RSAPublicKey):
pub_key_obj.verify(
signature,
manifest_content,
padding.PKCS1v15(),
hashes.SHA256()
)
typer.secho(f"Trust Status: ✅ SIGNED and TRUSTED ({trusted_key_path.relative_to(LOCAL_TRUST_STORE_PATH)})", fg=typer.colors.GREEN)
except InvalidSignature:
typer.secho(f"Trust Status: ❌ INVALID SIGNATURE ({fingerprint})", fg=typer.colors.RED)
typer.echo("-" * 50)
# 4. Content Validation
typer.echo("Validating bundle contents...")
missing_files = []
checksum_errors = []
for plugin_name, plugin_data in manifest.get('bundlePlugins', {}).items():
sdist_path = plugin_data.get('sdist', {}).get('path')
if sdist_path and sdist_path not in archive_files:
missing_files.append(sdist_path)
for binary in plugin_data.get('binaries', []):
binary_path_str = binary.get('path')
if binary_path_str and binary_path_str not in archive_files:
missing_files.append(binary_path_str)
elif binary_path_str:
# Verify checksum if present
expected_checksum = binary.get('checksum')
if expected_checksum:
actual_checksum = "sha256:" + calculate_sha256(staging_dir / binary_path_str)
if actual_checksum != expected_checksum:
checksum_errors.append(binary_path_str)
if not missing_files and not checksum_errors:
typer.secho("Content Validation: ✅ OK", fg=typer.colors.GREEN)
else:
typer.secho("Content Validation: ❌ FAILED", fg=typer.colors.RED)
for f in missing_files:
typer.echo(f" - Missing file from archive: {f}")
for f in checksum_errors:
typer.echo(f" - Checksum mismatch for: {f}")
# 5. Plugin Details
typer.echo("-" * 50)
typer.secho("Available Plugins:", bold=True)
for plugin_name, plugin_data in manifest.get('bundlePlugins', {}).items():
typer.echo(f"\n Plugin: {plugin_name}")
typer.echo(f" Source Dist: {plugin_data.get('sdist', {}).get('path', 'N/A')}")
binaries = plugin_data.get('binaries', [])
host_compatible_binary_found = False
if not binaries:
typer.echo(" Binaries: None")
else:
typer.echo(" Binaries:")
for b in binaries:
plat = b.get('platform', {})
is_compatible = (plat.get('triplet') == host_platform['triplet'] and
plat.get('abi_signature') == host_platform['abi_signature'])
color = typer.colors.GREEN if is_compatible else None
if is_compatible:
host_compatible_binary_found = True
typer.secho(f" - Path: {b.get('path', 'N/A')}", fg=color)
typer.secho(f" ABI: {plat.get('abi_signature', 'N/A')}", fg=color, dim=True)
typer.secho(f" Arch: {plat.get('triplet', 'N/A')}", fg=color, dim=True)
if not host_compatible_binary_found:
typer.secho(
f" Note: No compatible binary found for the current system ({host_platform['triplet']}).",
fg=typer.colors.YELLOW
)
typer.secho(
" Run 'fourdst-cli bundle fill' to build one.",
fg=typer.colors.YELLOW
)
finally:
if staging_dir.exists():
shutil.rmtree(staging_dir)

102
fourdst/cli/bundle/sign.py Normal file
View File

@@ -0,0 +1,102 @@
# fourdst/cli/bundle/sign.py
import typer
import shutil
import yaml
import zipfile
import hashlib
from pathlib import Path
import sys
try:
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import padding, rsa, ed25519
except ImportError:
print("Error: This CLI now requires 'cryptography'. Please install it.", file=sys.stderr)
print("Run: pip install cryptography", file=sys.stderr)
sys.exit(1)
from fourdst.cli.common.utils import calculate_sha256
bundle_app = typer.Typer()
@bundle_app.command("sign")
def bundle_sign(
bundle_path: Path = typer.Argument(..., help="The .fbundle file to sign.", exists=True),
private_key: Path = typer.Option(..., "--key", "-k", help="Path to the author's private signing key.", exists=True)
):
"""
Signs a bundle with an author's private key, adding checksums and a signature.
"""
print(f"Signing bundle: {bundle_path}")
staging_dir = Path("temp_sign_staging")
if staging_dir.exists():
shutil.rmtree(staging_dir)
# 1. Unpack the bundle
with zipfile.ZipFile(bundle_path, 'r') as bundle_zip:
bundle_zip.extractall(staging_dir)
manifest_path = staging_dir / "manifest.yaml"
if not manifest_path.exists():
print("Error: manifest.yaml not found in bundle.", file=sys.stderr)
raise typer.Exit(code=1)
# 2. Load private key and derive public key to get fingerprint
with open(private_key, "rb") as key_file:
priv_key_obj = serialization.load_ssh_private_key(key_file.read(), password=None)
pub_key_obj = priv_key_obj.public_key()
pub_key_bytes = pub_key_obj.public_bytes(
encoding=serialization.Encoding.OpenSSH,
format=serialization.PublicFormat.OpenSSH
)
fingerprint = "sha256:" + hashlib.sha256(pub_key_bytes).hexdigest()
print(f" - Signing with key fingerprint: {fingerprint}")
# 3. Update manifest with checksums and fingerprint
with open(manifest_path, 'r') as f:
manifest = yaml.safe_load(f)
manifest['bundleAuthorKeyFingerprint'] = fingerprint
for plugin in manifest['bundlePlugins'].values():
for binary in plugin.get('binaries', []):
binary_path = staging_dir / binary['path']
if binary_path.exists():
binary['checksum'] = "sha256:" + calculate_sha256(binary_path)
else:
binary['checksum'] = "MISSING_FILE"
with open(manifest_path, 'w') as f:
yaml.dump(manifest, f, sort_keys=False)
print(" - Added file checksums and key fingerprint to manifest.")
# 4. Sign the manifest
manifest_content = manifest_path.read_bytes()
if isinstance(priv_key_obj, ed25519.Ed25519PrivateKey):
signature = priv_key_obj.sign(manifest_content)
elif isinstance(priv_key_obj, rsa.RSAPrivateKey):
signature = priv_key_obj.sign(
manifest_content,
padding.PKCS1v15(),
hashes.SHA256()
)
else:
print("Error: Unsupported private key type for signing.", file=sys.stderr)
raise typer.Exit(code=1)
sig_path = staging_dir / "manifest.sig"
sig_path.write_bytes(signature)
print(" - Created manifest.sig.")
# 5. Repackage the bundle
with zipfile.ZipFile(bundle_path, 'w', zipfile.ZIP_DEFLATED) as bundle_zip:
for file_path in staging_dir.rglob('*'):
if file_path.is_file():
bundle_zip.write(file_path, file_path.relative_to(staging_dir))
shutil.rmtree(staging_dir)
print("\n✅ Bundle signed successfully!")

View File

@@ -0,0 +1,143 @@
# fourdst/cli/bundle/validate.py
import typer
import yaml
import zipfile
from pathlib import Path
import tempfile
import shutil
import hashlib
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
console = Console()
def _calculate_sha256(file_path: Path) -> str:
"""Calculates the SHA256 checksum of a file."""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def _validate_bundle_directory(path: Path, is_temp: bool = False):
"""Validates a directory that is structured like an unpacked bundle."""
title = "Validating Pre-Bundle Directory" if not is_temp else "Validating Bundle Contents"
console.print(Panel(f"{title}: [bold]{path.name}[/bold]", border_style="blue"))
errors = 0
warnings = 0
def check(condition, success_msg, error_msg, is_warning=False):
nonlocal errors, warnings
if condition:
console.print(Text(f"{success_msg}", style="green"))
return True
else:
if is_warning:
console.print(Text(f"⚠️ {error_msg}", style="yellow"))
warnings += 1
else:
console.print(Text(f"{error_msg}", style="red"))
errors += 1
return False
# 1. Check for manifest
manifest_file = path / "manifest.yaml"
if not check(manifest_file.is_file(), "Found manifest.yaml.", "Missing manifest.yaml file."):
raise typer.Exit(code=1)
try:
manifest = yaml.safe_load(manifest_file.read_text())
check(True, "Manifest file is valid YAML.", "")
except yaml.YAMLError as e:
check(False, "", f"Manifest file is not valid YAML: {e}")
raise typer.Exit(code=1)
# 2. Check manifest content
check(manifest is not None, "Manifest is not empty.", "Manifest file is empty.", is_warning=True)
check('bundleName' in manifest, "Manifest contains 'bundleName'.", "Manifest is missing 'bundleName'.")
check('bundleVersion' in manifest, "Manifest contains 'bundleVersion'.", "Manifest is missing 'bundleVersion'.")
plugins = manifest.get('bundlePlugins', {})
check(plugins, "Manifest contains 'bundlePlugins' section.", "Manifest is missing 'bundlePlugins' section.")
# 3. Check files listed in manifest
for name, data in plugins.items():
console.print(f"\n--- Validating plugin: [bold cyan]{name}[/bold cyan] ---")
sdist_info = data.get('sdist', {})
sdist_path_str = sdist_info.get('path')
if check(sdist_path_str, "sdist path is defined.", f"sdist path not defined for plugin '{name}'."):
sdist_path = path / sdist_path_str
check(sdist_path.exists(), f"sdist file found: {sdist_path_str}", f"sdist file not found: {sdist_path_str}")
for binary in data.get('binaries', []):
bin_path_str = binary.get('path')
if not check(bin_path_str, "Binary path is defined.", "Binary entry is missing a 'path'."):
continue
bin_path = path / bin_path_str
if check(bin_path.exists(), f"Binary file found: {bin_path_str}", f"Binary file not found: {bin_path_str}"):
expected_checksum = binary.get('checksum')
if check(expected_checksum, "Checksum is defined.", f"Checksum not defined for binary '{bin_path_str}'.", is_warning=True):
actual_checksum = "sha256:" + _calculate_sha256(bin_path)
check(
actual_checksum == expected_checksum,
f"Checksum matches for {bin_path_str}",
f"Checksum mismatch for {bin_path_str}.\n Expected: {expected_checksum}\n Actual: {actual_checksum}"
)
# 4. Check for signature
check((path / "manifest.sig").exists(), "Signature file 'manifest.sig' found.", "Signature file 'manifest.sig' is missing.", is_warning=True)
# Final summary
console.print("-" * 40)
if errors == 0:
console.print(Panel(
f"[bold green]Validation Passed[/bold green]\nWarnings: {warnings}",
title="Result",
border_style="green"
))
else:
console.print(Panel(
f"[bold red]Validation Failed[/bold red]\nErrors: {errors}\nWarnings: {warnings}",
title="Result",
border_style="red"
))
raise typer.Exit(code=1)
def _validate_bundle_file(bundle_path: Path):
"""Unpacks a .fbundle file and runs directory validation on its contents."""
with tempfile.TemporaryDirectory() as temp_dir_str:
temp_dir = Path(temp_dir_str)
try:
with zipfile.ZipFile(bundle_path, 'r') as bundle_zip:
bundle_zip.extractall(temp_dir)
_validate_bundle_directory(temp_dir, is_temp=True)
except zipfile.BadZipFile:
console.print(Panel(f"[red]Error: '{bundle_path.name}' is not a valid zip file.[/red]", title="Validation Error"))
raise typer.Exit(code=1)
def bundle_validate(
path: Path = typer.Argument(
".",
help="The path to the .fbundle file or pre-bundle directory to validate.",
exists=True,
resolve_path=True
)
):
"""
Validates a packed .fbundle or a directory ready to be packed.
- If a directory is provided, it checks for a valid manifest and that all referenced files exist.
- If a .fbundle file is provided, it unpacks it and runs the same validation checks.
"""
if path.is_dir():
_validate_bundle_directory(path)
elif path.is_file():
_validate_bundle_file(path)
else:
# This case should not be reached due to `exists=True`
console.print(Panel("[red]Error: Path is not a file or directory.[/red]", title="Validation Error"))
raise typer.Exit(code=1)

0
fourdst/cli/cache/__init__.py vendored Normal file
View File

20
fourdst/cli/cache/clear.py vendored Normal file
View File

@@ -0,0 +1,20 @@
# fourdst/cli/cache/clear.py
import typer
import shutil
from fourdst.cli.common.config import CACHE_PATH
import typer
cache_app = typer.Typer()
@cache_app.command("clear")
def cache_clear():
"""
Clears all cached data, including the ABI signature.
Run this if you have updated your C++ compiler.
"""
if CACHE_PATH.exists():
shutil.rmtree(CACHE_PATH)
print("✅ Local cache cleared.")
else:
print("No cache found to clear.")

View File

View File

@@ -0,0 +1,16 @@
# fourdst/cli/common/config.py
from pathlib import Path
FOURDST_CONFIG_DIR = Path.home() / ".config" / "fourdst"
LOCAL_TRUST_STORE_PATH = FOURDST_CONFIG_DIR / "keys"
CROSS_FILES_PATH = FOURDST_CONFIG_DIR / "cross"
CACHE_PATH = FOURDST_CONFIG_DIR / "cache"
ABI_CACHE_FILE = CACHE_PATH / "abi_identifier.json"
DOCKER_BUILD_IMAGES = {
"x86_64 (manylinux_2_28)": "quay.io/pypa/manylinux_2_28_x86_64",
"aarch64 (manylinux_2_28)": "quay.io/pypa/manylinux_2_28_aarch64",
"i686 (manylinux_2_28)" : "quay.io/pypa/manylinux_2_28_i686",
"ppc64le (manylinux_2_28)" : "quay.io/pypa/manylinux_2_28_ppc64le",
"s390x (manylinux_2_28)" : "quay.io/pypa/manylinux_2_28_s390x"
}

View File

@@ -0,0 +1,129 @@
# fourdst/cli/common/templates.py
ABI_DETECTOR_CPP_SRC = """
#include <iostream>
#include <string>
#include <vector>
#ifdef __GNUC__
#if __has_include(<gnu/libc-version.h>)
#include <gnu/libc-version.h>
#endif
#endif
int main() {
std::string os;
std::string compiler;
std::string compiler_version;
std::string stdlib;
std::string stdlib_version;
std::string abi;
#if defined(__APPLE__) && defined(__MACH__)
os = "macos";
#elif defined(__linux__)
os = "linux";
#elif defined(_WIN32)
os = "windows";
#else
os = "unknown_os";
#endif
#if defined(__clang__)
compiler = "clang";
compiler_version = __clang_version__;
#elif defined(__GNUC__)
compiler = "gcc";
compiler_version = std::to_string(__GNUC__) + "." + std::to_string(__GNUC_MINOR__) + "." + std::to_string(__GNUC_PATCHLEVEL__);
#elif defined(_MSC_VER)
compiler = "msvc";
compiler_version = std::to_string(_MSC_VER);
#else
compiler = "unknown_compiler";
compiler_version = "0";
#endif
#if defined(_LIBCPP_VERSION)
stdlib = "libc++";
stdlib_version = std::to_string(_LIBCPP_VERSION);
abi = "libc++_abi"; // On libc++, the ABI is tightly coupled with the library itself.
#elif defined(__GLIBCXX__)
stdlib = "libstdc++";
#if defined(_GLIBCXX_USE_CXX11_ABI)
abi = _GLIBCXX_USE_CXX11_ABI == 1 ? "cxx11_abi" : "pre_cxx11_abi";
#else
abi = "pre_cxx11_abi";
#endif
#if __has_include(<gnu/libc-version.h>)
stdlib_version = gnu_get_libc_version();
#else
stdlib_version = "unknown";
#endif
#else
stdlib = "unknown_stdlib";
abi = "unknown_abi";
#endif
std::cout << "os=" << os << std::endl;
std::cout << "compiler=" << compiler << std::endl;
std::cout << "compiler_version=" << compiler_version << std::endl;
std::cout << "stdlib=" << stdlib << std::endl;
if (!stdlib_version.empty()) {
std::cout << "stdlib_version=" << stdlib_version << std::endl;
}
// Always print the ABI key for consistent parsing
std::cout << "abi=" << abi << std::endl;
return 0;
}
"""
ABI_DETECTOR_MESON_SRC = """
project('abi-detector', 'cpp', default_options : ['cpp_std=c++23'])
executable('detector', 'main.cpp')
"""
GITIGNORE_CONTENT = """# General
*.swp
*~
.DS_Store
# Python
__pycache__/
*.pyc
*.pyo
*.pyd
.Python
.venv/
venv/
env/
*.egg-info/
dist/
# C++ Build Artifacts
*.o
*.a
*.so
*.dylib
*.dll
*.lib
*.exe
# Meson Build System
# Ignore any directory containing meson-private, which is a reliable marker
**/meson-private/
# Also ignore common build directory names
build/
builddir/
# Subprojects - ignore all subdirectories except 'packagefiles' and root .wrap files
/subprojects/*
!/subprojects/packagefiles
!/subprojects/*.wrap
# Editor specific
.vscode/
.idea/
*.sublime-project
*.sublime-workspace
"""

424
fourdst/cli/common/utils.py Normal file
View File

@@ -0,0 +1,424 @@
# fourdst/cli/common/utils.py
import typer
import os
import sys
import shutil
import subprocess
from pathlib import Path
import importlib.resources
import json
import platform
import zipfile
import hashlib
try:
import docker
except ImportError:
docker = None
from rich.console import Console
from rich.panel import Panel
console = Console()
from fourdst.cli.common.config import CACHE_PATH, ABI_CACHE_FILE, CROSS_FILES_PATH, DOCKER_BUILD_IMAGES
from fourdst.cli.common.templates import ABI_DETECTOR_CPP_SRC, ABI_DETECTOR_MESON_SRC
def get_template_content(template_name: str) -> str:
"""Safely reads content from a template file packaged with the CLI."""
try:
return importlib.resources.files('fourdst.cli.templates').joinpath(template_name).read_text()
except FileNotFoundError:
print(f"Error: Template file '{template_name}' not found.", file=sys.stderr)
sys.exit(1)
def run_command(command: list[str], cwd: Path = None, check=True, display_output: bool = False):
"""Runs a command, optionally displaying its output in a formatted box."""
command_str = ' '.join(command)
try:
result = subprocess.run(command, check=check, capture_output=True, text=True, cwd=cwd)
if display_output and (result.stdout or result.stderr):
output_text = ""
if result.stdout:
output_text += result.stdout.strip()
if result.stderr:
output_text += f"\n[yellow]{result.stderr.strip()}[/yellow]"
console.print(Panel(
output_text,
title=f"Output from: `{command_str}`",
border_style="blue",
expand=False
))
return result
except subprocess.CalledProcessError as e:
if check:
output_text = ""
if e.stdout:
output_text += f"[bold]--- STDOUT ---[/bold]\n{e.stdout.strip()}"
if e.stderr:
output_text += f"\n[bold]--- STDERR ---[/bold]\n{e.stderr.strip()}"
console.print(Panel(
output_text,
title=f"Error running: `{command_str}`",
border_style="red",
expand=False
))
raise typer.Exit(code=1)
return e
def _detect_and_cache_abi(cross_file: Path = None):
"""
Compiles and runs a C++ program to detect the compiler ABI, then caches it.
"""
print("Performing one-time native C++ ABI detection...")
temp_dir = CACHE_PATH / "abi_detector"
if temp_dir.exists():
shutil.rmtree(temp_dir)
temp_dir.mkdir(parents=True)
try:
(temp_dir / "main.cpp").write_text(ABI_DETECTOR_CPP_SRC)
(temp_dir / "meson.build").write_text(ABI_DETECTOR_MESON_SRC)
print(" - Configuring detector...")
run_command(["meson", "setup", "build"], cwd=temp_dir, display_output=True)
print(" - Compiling detector...")
run_command(["meson", "compile", "-C", "build"], cwd=temp_dir, display_output=True)
detector_exe = temp_dir / "build" / "detector"
print(" - Running detector...")
proc = subprocess.run([str(detector_exe)], check=True, capture_output=True, text=True)
abi_details = {}
for line in proc.stdout.strip().split('\n'):
if '=' in line:
key, value = line.split('=', 1)
abi_details[key.strip()] = value.strip()
compiler = abi_details.get('compiler', 'unk_compiler')
stdlib = abi_details.get('stdlib', 'unk_stdlib')
stdlib_version = abi_details.get('stdlib_version', 'unk_stdlib_version')
abi = abi_details.get('abi', 'unk_abi')
abi_string = f"{compiler}-{stdlib}-{stdlib_version}-{abi}"
detected_os = abi_details.get("os", "unknown_os")
arch = platform.machine()
platform_identifier = {
"triplet": f"{arch}-{detected_os}",
"abi_signature": abi_string,
"details": abi_details,
"is_native": True,
"cross_file": None,
"docker_image": None,
"arch": arch
}
with open(ABI_CACHE_FILE, 'w') as f:
json.dump(platform_identifier, f, indent=2)
print(f"✅ Native ABI detected and cached: {abi_string}")
return platform_identifier
finally:
if temp_dir.exists():
shutil.rmtree(temp_dir)
def get_platform_identifier() -> dict:
"""
Gets the native platform identifier, using a cached value if available.
"""
if ABI_CACHE_FILE.exists():
with open(ABI_CACHE_FILE, 'r') as f:
return json.load(f)
else:
return _detect_and_cache_abi()
def get_available_build_targets() -> list:
"""Gets native, cross-compilation, and Docker build targets."""
targets = [get_platform_identifier()]
# Add cross-file targets
CROSS_FILES_PATH.mkdir(exist_ok=True)
for cross_file in CROSS_FILES_PATH.glob("*.cross"):
triplet = cross_file.stem
targets.append({
"triplet": triplet,
"abi_signature": f"cross-{triplet}",
"is_native": False,
"cross_file": str(cross_file.resolve()),
"docker_image": None
})
# Add Docker targets if Docker is available
if docker:
try:
client = docker.from_env()
client.ping()
for name, image in DOCKER_BUILD_IMAGES.items():
arch = name.split(' ')[0]
targets.append({
"triplet": f"linux-{arch}",
"abi_signature": f"docker-{image}",
"is_native": False,
"cross_file": None,
"docker_image": image,
"arch": arch
})
except Exception:
typer.secho("Warning: Docker is installed but the daemon is not running. Docker targets are unavailable.", fg=typer.colors.YELLOW)
return targets
def _build_plugin_for_target(sdist_path: Path, build_dir: Path, target: dict):
"""Builds a plugin natively or with a cross file."""
source_dir = build_dir / "src"
if source_dir.exists():
shutil.rmtree(source_dir)
with zipfile.ZipFile(sdist_path, 'r') as sdist_zip:
sdist_zip.extractall(source_dir)
setup_cmd = ["meson", "setup"]
if target["cross_file"]:
setup_cmd.extend(["--cross-file", target["cross_file"]])
setup_cmd.append("build")
run_command(setup_cmd, cwd=source_dir, display_output=True)
run_command(["meson", "compile", "-C", "build"], cwd=source_dir, display_output=True)
meson_build_dir = source_dir / "build"
compiled_lib = next(meson_build_dir.rglob("lib*.so"), None) or next(meson_build_dir.rglob("lib*.dylib"), None)
if not compiled_lib:
raise FileNotFoundError("Could not find compiled library after build.")
return compiled_lib, target # Return target as ABI is pre-determined
def _build_plugin_in_docker(sdist_path: Path, build_dir: Path, target: dict, plugin_name: str):
"""Builds a plugin inside a Docker container."""
client = docker.from_env()
image_name = target["docker_image"]
# Find arch from DOCKER_BUILD_IMAGES to create a clean triplet later
arch = "unknown_arch"
for name, img in DOCKER_BUILD_IMAGES.items():
if img == image_name:
arch = name.split(' ')[0]
break
typer.echo(f" - Pulling Docker image '{image_name}' (if necessary)...")
client.images.pull(image_name)
source_dir = build_dir / "src"
if source_dir.exists():
shutil.rmtree(source_dir)
with zipfile.ZipFile(sdist_path, 'r') as sdist_zip:
sdist_zip.extractall(source_dir)
# This script will be run inside the container
build_script = f"""
set -e
echo "--- Installing build dependencies ---"
export PATH="/opt/python/cp313-cp313/bin:$PATH"
pip install meson ninja cmake
echo " -> meson version: $(meson --version) [$(which meson)]"
echo " -> ninja version: $(ninja --version) [$(which ninja)]"
echo " -> cmake version: $(cmake --version) [$(which cmake)]"
echo "--- Configuring with Meson ---"
meson setup /build/meson_build
echo "--- Compiling with Meson ---"
meson compile -C /build/meson_build
echo "--- Running ABI detector ---"
# We need to build and run the ABI detector inside the container too
mkdir /tmp/abi && cd /tmp/abi
echo "{ABI_DETECTOR_CPP_SRC.replace('"', '\\"')}" > main.cpp
echo "{ABI_DETECTOR_MESON_SRC.replace('"', '\\"')}" > meson.build
meson setup build && meson compile -C build
./build/detector > /build/abi_details.txt
"""
container_build_dir = Path("/build")
typer.echo(" - Running build container...")
container = client.containers.run(
image=image_name,
command=["/bin/sh", "-c", build_script],
volumes={str(source_dir.resolve()): {'bind': str(container_build_dir), 'mode': 'rw'}},
working_dir=str(container_build_dir),
detach=True
)
# Stream logs
for line in container.logs(stream=True, follow=True):
typer.echo(f" [docker] {line.decode('utf-8').strip()}")
result = container.wait()
if result["StatusCode"] != 0:
# The container is stopped, but we can still inspect its filesystem by restarting it briefly.
log_output = container.logs()
container.remove() # Clean up before raising
typer.secho(f"Build failed inside Docker. Full log:\n{log_output.decode('utf-8')}", fg=typer.colors.RED)
raise subprocess.CalledProcessError(result["StatusCode"], "Build inside Docker failed.")
# Retrieve artifacts by searching inside the container's filesystem
typer.echo(" - Locating compiled library in container...")
meson_build_dir_str = (container_build_dir / "meson_build").as_posix()
expected_lib_name = f"lib{plugin_name}.so"
find_cmd = f"find {meson_build_dir_str} -name {expected_lib_name}"
# We need to run the find command in the now-stopped container.
# We can't use exec_run on a stopped container, but we can create a new
# one that uses the same filesystem (volume) to find the file.
try:
find_output = client.containers.run(
image=image_name,
command=["/bin/sh", "-c", find_cmd],
volumes={str(source_dir.resolve()): {'bind': str(container_build_dir), 'mode': 'ro'}},
remove=True, # Clean up the find container immediately
detach=False
)
found_path_str = find_output.decode('utf-8').strip()
if not found_path_str:
raise FileNotFoundError("Find command returned no path.")
compiled_lib = Path(found_path_str)
typer.echo(f" - Found library at: {compiled_lib}")
except Exception as e:
typer.secho(f" - Error: Could not locate '{expected_lib_name}' inside the container.", fg=typer.colors.RED)
typer.secho(f" Details: {e}", fg=typer.colors.RED)
raise FileNotFoundError("Could not find compiled library in container after a successful build.")
# Get the ABI details from the container
abi_details_content = ""
bits, _ = container.get_archive(str(container_build_dir / "abi_details.txt"))
for chunk in bits:
abi_details_content += chunk.decode('utf-8')
# We need to find the actual file content within the tar stream
# This is a simplification; a real implementation would use the `tarfile` module
actual_content = abi_details_content.split('\n', 1)[1] if '\n' in abi_details_content else abi_details_content
actual_content = actual_content.split('main.cpp')[1].strip() if 'main.cpp' in actual_content else actual_content
actual_content = actual_content.rsplit('0755', 1)[0].strip() if '0755' in actual_content else actual_content
abi_details = {}
for line in actual_content.strip().split('\n'):
if '=' in line:
key, value = line.split('=', 1)
abi_details[key.strip()] = value.strip()
compiler = abi_details.get('compiler', 'unk_compiler')
stdlib = abi_details.get('stdlib', 'unk_stdlib')
stdlib_version = abi_details.get('stdlib_version', 'unk_stdlib_version')
abi = abi_details.get('abi', 'unk_abi')
abi_string = f"{compiler}-{stdlib}-{stdlib_version}-{abi}"
final_target = {
"triplet": f"{abi_details.get('os', 'linux')}-{arch}",
"abi_signature": abi_string,
"is_native": False,
"cross_file": None,
"docker_image": image_name,
"arch": arch
}
# Copy the binary out
local_lib_path = build_dir / compiled_lib.name
bits, _ = container.get_archive(str(compiled_lib))
with open(local_lib_path, 'wb') as f:
for chunk in bits:
f.write(chunk)
container.remove()
return local_lib_path, final_target
def calculate_sha256(file_path: Path) -> str:
"""Calculates the SHA256 checksum of a file."""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def parse_cpp_header(header_path: Path):
"""
Parses a C++ header file using libclang to find classes and their pure virtual methods.
"""
# This function requires python-clang-16
try:
from clang import cindex
except ImportError:
print("Error: The 'init' command requires 'libclang'. Please install it.", file=sys.stderr)
print("Run: pip install python-clang-16", file=sys.stderr)
# Also ensure the libclang.so/dylib is in your system's library path.
raise typer.Exit(code=1)
if not cindex.Config.loaded:
try:
# Attempt to find libclang automatically. This may need to be configured by the user.
cindex.Config.set_library_file(cindex.conf.get_filename())
except cindex.LibclangError as e:
print(f"Error: libclang library not found. Please ensure it's installed and in your system's path.", file=sys.stderr)
print(f"Details: {e}", file=sys.stderr)
raise typer.Exit(code=1)
# --- Get compiler flags from pkg-config to help clang find includes ---
try:
pkg_config_proc = subprocess.run(
['pkg-config', '--cflags', 'fourdst_plugin'],
capture_output=True,
text=True,
check=True
)
# Split the flags string into a list of arguments for libclang
compiler_flags = pkg_config_proc.stdout.strip().split()
print(f"Using compiler flags from pkg-config: {' '.join(compiler_flags)}")
except (subprocess.CalledProcessError, FileNotFoundError):
print("Warning: `pkg-config --cflags fourdst-plugin` failed. Parsing may not succeed if the header has dependencies.", file=sys.stderr)
print("Please ensure 'pkg-config' is installed and 'fourdst-plugin.pc' is in your PKG_CONFIG_PATH.", file=sys.stderr)
compiler_flags = []
index = cindex.Index.create()
# Add the pkg-config flags to the parser arguments
translation_unit = index.parse(str(header_path), args=['-x', 'c++', '-std=c++23'] + compiler_flags)
interfaces = {}
for cursor in translation_unit.cursor.walk_preorder():
if cursor.kind == cindex.CursorKind.CLASS_DECL and cursor.is_definition():
class_name = cursor.spelling
methods = []
for child in cursor.get_children():
if child.kind == cindex.CursorKind.CXX_METHOD and child.is_pure_virtual_method():
method_name = child.spelling
result_type = child.result_type.spelling
# Recreate the full method signature
params = [p.spelling or f"param{i+1}" for i, p in enumerate(child.get_arguments())]
param_str = ", ".join(f"{p.type.spelling} {p.spelling}" for p in child.get_arguments())
const_qualifier = " const" if child.is_const_method() else ""
signature = f"{result_type} {method_name}({param_str}){const_qualifier}"
# Generate a placeholder body
body = f" // TODO: Implement the {method_name} method.\n"
if result_type != "void":
body += f" return {{}};" # Default return
methods.append({'signature': signature, 'body': body})
if methods: # Only consider classes with pure virtual methods as interfaces
interfaces[class_name] = methods
return interfaces

View File

23
fourdst/cli/keys/add.py Normal file
View File

@@ -0,0 +1,23 @@
# fourdst/cli/keys/add.py
import typer
import shutil
from pathlib import Path
from fourdst.cli.common.config import LOCAL_TRUST_STORE_PATH
MANUAL_KEYS_DIR = LOCAL_TRUST_STORE_PATH / "manual"
def keys_add(
key_path: Path = typer.Argument(..., help="Path to the public key file to add.", exists=True, readable=True)
):
"""Adds a single public key to the local trust store."""
MANUAL_KEYS_DIR.mkdir(parents=True, exist_ok=True)
destination = MANUAL_KEYS_DIR / key_path.name
if destination.exists():
# check content
if destination.read_bytes() == key_path.read_bytes():
typer.secho(f"Key '{key_path.name}' with same content already exists.", fg=typer.colors.YELLOW)
return
shutil.copy(key_path, destination)
typer.secho(f"✅ Key '{key_path.name}' added to manual trust store.", fg=typer.colors.GREEN)

View File

@@ -0,0 +1,35 @@
# fourdst/cli/keys/generate.py
import typer
import sys
from pathlib import Path
from fourdst.cli.common.utils import run_command
keys_app = typer.Typer()
@keys_app.command("generate")
def keys_generate(
key_name: str = typer.Option("author_key", "--name", "-n", help="The base name for the generated key files.")
):
"""
Generates a new Ed25519 key pair for signing bundles.
"""
private_key_path = Path(f"{key_name}")
public_key_path = Path(f"{key_name}.pub")
if private_key_path.exists() or public_key_path.exists():
print(f"Error: Key files '{private_key_path}' or '{public_key_path}' already exist.", file=sys.stderr)
raise typer.Exit(code=1)
print("Generating Ed25519 key pair...")
run_command([
"ssh-keygen",
"-t", "ed25519",
"-f", str(private_key_path),
"-N", "", # No passphrase
"-C", "fourdst bundle signing key"
])
print("\n✅ Keys generated successfully!")
print(f" -> Private Key (KEEP SECRET): {private_key_path.resolve()}")
print(f" -> Public Key (SHARE): {public_key_path.resolve()}")
print("\nShare the public key with users who need to trust your bundles.")

23
fourdst/cli/keys/list.py Normal file
View File

@@ -0,0 +1,23 @@
# fourdst/cli/keys/list.py
import typer
from pathlib import Path
from fourdst.cli.common.config import LOCAL_TRUST_STORE_PATH
def keys_list():
"""Lists all trusted public keys."""
if not LOCAL_TRUST_STORE_PATH.exists():
typer.echo("Trust store not found.")
return
keys_found = False
for source_dir in LOCAL_TRUST_STORE_PATH.iterdir():
if source_dir.is_dir():
keys = list(source_dir.glob("*.pub"))
if keys:
keys_found = True
typer.secho(f"\n--- Source: {source_dir.name} ---", bold=True)
for key_file in keys:
typer.echo(f" - {key_file.name}")
if not keys_found:
typer.echo("No trusted keys found.")

View File

@@ -0,0 +1 @@
# fourdst/cli/keys/remote/__init__.py

View File

@@ -0,0 +1,31 @@
# fourdst/cli/keys/remote/add.py
import typer
import json
from pathlib import Path
from fourdst.cli.common.config import FOURDST_CONFIG_DIR
KEY_REMOTES_CONFIG = FOURDST_CONFIG_DIR / "key_remotes.json"
def remote_add(
url: str = typer.Argument(..., help="The URL of the Git repository."),
name: str = typer.Argument(..., help="A local name for the remote.")
):
"""Adds a new remote key source."""
FOURDST_CONFIG_DIR.mkdir(parents=True, exist_ok=True)
if KEY_REMOTES_CONFIG.exists():
with open(KEY_REMOTES_CONFIG, 'r') as f:
config = json.load(f)
else:
config = {"remotes": []}
if any(r['name'] == name for r in config['remotes']):
typer.secho(f"Error: Remote with name '{name}' already exists.", fg=typer.colors.RED)
raise typer.Exit(code=1)
config['remotes'].append({"name": name, "url": url})
with open(KEY_REMOTES_CONFIG, 'w') as f:
json.dump(config, f, indent=2)
typer.secho(f"✅ Remote '{name}' added.", fg=typer.colors.GREEN)

View File

@@ -0,0 +1,24 @@
# fourdst/cli/keys/remote/list.py
import typer
import json
from pathlib import Path
from fourdst.cli.common.config import FOURDST_CONFIG_DIR
KEY_REMOTES_CONFIG = FOURDST_CONFIG_DIR / "key_remotes.json"
def remote_list():
"""Lists all configured remote key sources."""
if not KEY_REMOTES_CONFIG.exists():
typer.echo("No remotes configured.")
return
with open(KEY_REMOTES_CONFIG, 'r') as f:
config = json.load(f)
if not config.get("remotes"):
typer.echo("No remotes configured.")
return
typer.secho("Configured Key Remotes:", bold=True)
for remote in config['remotes']:
typer.echo(f" - {remote['name']}: {remote['url']}")

View File

@@ -0,0 +1,30 @@
# fourdst/cli/keys/remote/remove.py
import typer
import json
from pathlib import Path
from fourdst.cli.common.config import FOURDST_CONFIG_DIR
KEY_REMOTES_CONFIG = FOURDST_CONFIG_DIR / "key_remotes.json"
def remote_remove(
name: str = typer.Argument(..., help="The name of the remote to remove.")
):
"""Removes a remote key source."""
if not KEY_REMOTES_CONFIG.exists():
typer.secho("Error: No remotes configured.", fg=typer.colors.RED)
raise typer.Exit(code=1)
with open(KEY_REMOTES_CONFIG, 'r') as f:
config = json.load(f)
original_len = len(config['remotes'])
config['remotes'] = [r for r in config['remotes'] if r['name'] != name]
if len(config['remotes']) == original_len:
typer.secho(f"Error: Remote '{name}' not found.", fg=typer.colors.RED)
raise typer.Exit(code=1)
with open(KEY_REMOTES_CONFIG, 'w') as f:
json.dump(config, f, indent=2)
typer.secho(f"✅ Remote '{name}' removed.", fg=typer.colors.GREEN)

View File

@@ -0,0 +1,61 @@
# fourdst/cli/keys/remove.py
import typer
import questionary
from pathlib import Path
import hashlib
from fourdst.cli.common.config import LOCAL_TRUST_STORE_PATH
def get_key_fingerprint(key_path: Path) -> str:
"""Generates a SHA256 fingerprint for a public key."""
pub_key_bytes = key_path.read_bytes()
# Assuming OpenSSH format, the fingerprint is based on the raw public key bytes
# For simplicity, we'll hash the whole file content.
return "sha256:" + hashlib.sha256(pub_key_bytes).hexdigest()
def keys_remove(
key_path: Path = typer.Argument(None, help="Path to the public key file to remove.", exists=True, readable=True)
):
"""Removes a single public key from the local trust store."""
if not LOCAL_TRUST_STORE_PATH.exists():
typer.secho("Trust store not found.", fg=typer.colors.RED)
raise typer.Exit(code=1)
if key_path:
# Remove by content matching
target_content = key_path.read_bytes()
key_removed = False
for source_dir in LOCAL_TRUST_STORE_PATH.iterdir():
if source_dir.is_dir():
for pub_key in source_dir.glob("*.pub"):
if pub_key.read_bytes() == target_content:
pub_key.unlink()
typer.secho(f"✅ Removed key '{pub_key.name}' from source '{source_dir.name}'.", fg=typer.colors.GREEN)
key_removed = True
if not key_removed:
typer.secho("No matching key found to remove.", fg=typer.colors.YELLOW)
else:
# Interactive removal
all_keys = []
for source_dir in LOCAL_TRUST_STORE_PATH.iterdir():
if source_dir.is_dir():
for pub_key in source_dir.glob("*.pub"):
all_keys.append(pub_key)
if not all_keys:
typer.echo("No keys to remove.")
raise typer.Exit()
choices = [
{
"name": f"{key.relative_to(LOCAL_TRUST_STORE_PATH)} ({get_key_fingerprint(key)})",
"value": key
} for key in all_keys
]
selected_to_remove = questionary.checkbox("Select keys to remove:", choices=choices).ask()
if selected_to_remove:
for key_to_remove in selected_to_remove:
key_to_remove.unlink()
typer.secho(f"✅ Removed key '{key_to_remove.name}'.", fg=typer.colors.GREEN)

66
fourdst/cli/keys/sync.py Normal file
View File

@@ -0,0 +1,66 @@
# fourdst/cli/keys/sync.py
import typer
import shutil
import json
from pathlib import Path
import questionary
from fourdst.cli.common.config import FOURDST_CONFIG_DIR, LOCAL_TRUST_STORE_PATH
KEY_REMOTES_CONFIG = FOURDST_CONFIG_DIR / "key_remotes.json"
REMOTES_DIR = LOCAL_TRUST_STORE_PATH / "remotes"
keys_app = typer.Typer()
@keys_app.command("sync")
def keys_sync():
"""
Syncs the local trust store with all configured remote Git repositories.
"""
if not KEY_REMOTES_CONFIG.exists():
typer.secho("No remotes configured. Use 'fourdst-cli keys remote add' to add one.", fg=typer.colors.YELLOW)
raise typer.Exit()
with open(KEY_REMOTES_CONFIG, 'r') as f:
config = json.load(f)
remotes = config.get("remotes", [])
if not remotes:
typer.secho("No remotes configured.", fg=typer.colors.YELLOW)
raise typer.Exit()
REMOTES_DIR.mkdir(parents=True, exist_ok=True)
remotes_to_remove = []
for remote in remotes:
name = remote['name']
url = remote['url']
remote_path = REMOTES_DIR / name
typer.secho(f"--- Syncing remote '{name}' from {url} ---", bold=True)
try:
if remote_path.exists():
run_command(["git", "pull"], cwd=remote_path)
else:
run_command(["git", "clone", "--depth", "1", url, str(remote_path)])
# Clean up non-public key files
for item in remote_path.iterdir():
if item.is_file() and item.suffix != '.pub':
item.unlink()
typer.secho(f"✅ Sync successful for '{name}'.", fg=typer.colors.GREEN)
except Exception as e:
typer.secho(f"⚠️ Failed to sync remote '{name}': {e}", fg=typer.colors.YELLOW)
if questionary.confirm(f"Do you want to remove the remote '{name}'?").ask():
remotes_to_remove.append(name)
if remotes_to_remove:
config['remotes'] = [r for r in config['remotes'] if r['name'] not in remotes_to_remove]
with open(KEY_REMOTES_CONFIG, 'w') as f:
json.dump(config, f, indent=2)
typer.secho(f"Removed failing remotes: {', '.join(remotes_to_remove)}", fg=typer.colors.YELLOW)

File diff suppressed because it is too large Load Diff

View File

105
fourdst/cli/plugin/diff.py Normal file
View File

@@ -0,0 +1,105 @@
# fourdst/cli/plugin/diff.py
import typer
import yaml
import zipfile
from pathlib import Path
import tempfile
import shutil
import difflib
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
console = Console()
def _extract_sdist(bundle_path: Path, plugin_name: str, temp_dir: Path):
"""Extracts a specific plugin's sdist from a bundle to a directory."""
sdist_extract_path = temp_dir / f"{plugin_name}_src"
with tempfile.TemporaryDirectory() as bundle_unpack_dir_str:
bundle_unpack_dir = Path(bundle_unpack_dir_str)
with zipfile.ZipFile(bundle_path, 'r') as bundle_zip:
bundle_zip.extractall(bundle_unpack_dir)
manifest_path = bundle_unpack_dir / "manifest.yaml"
if not manifest_path.exists():
raise FileNotFoundError("manifest.yaml not found in bundle.")
with open(manifest_path, 'r') as f:
manifest = yaml.safe_load(f)
plugin_data = manifest.get('bundlePlugins', {}).get(plugin_name)
if not plugin_data or 'sdist' not in plugin_data:
raise FileNotFoundError(f"Plugin '{plugin_name}' or its sdist not found in {bundle_path.name}.")
sdist_path_in_bundle = bundle_unpack_dir / plugin_data['sdist']['path']
if not sdist_path_in_bundle.exists():
raise FileNotFoundError(f"sdist archive '{plugin_data['sdist']['path']}' not found in bundle.")
with zipfile.ZipFile(sdist_path_in_bundle, 'r') as sdist_zip:
sdist_zip.extractall(sdist_extract_path)
return sdist_extract_path
def plugin_diff(
plugin_name: str = typer.Argument(..., help="The name of the plugin to compare."),
bundle_a_path: Path = typer.Argument(..., help="The first bundle to compare.", exists=True, readable=True),
bundle_b_path: Path = typer.Argument(..., help="The second bundle to compare.", exists=True, readable=True),
):
"""
Compares the source code of a specific plugin between two different bundles.
"""
console.print(Panel(f"Comparing source for plugin [bold blue]{plugin_name}[/bold blue] between bundles"))
with tempfile.TemporaryDirectory() as temp_a_str, tempfile.TemporaryDirectory() as temp_b_str:
try:
src_a_path = _extract_sdist(bundle_a_path, plugin_name, Path(temp_a_str))
src_b_path = _extract_sdist(bundle_b_path, plugin_name, Path(temp_b_str))
except FileNotFoundError as e:
console.print(f"[red]Error: {e}[/red]")
raise typer.Exit(code=1)
files_a = {p.relative_to(src_a_path) for p in src_a_path.rglob('*') if p.is_file()}
files_b = {p.relative_to(src_b_path) for p in src_b_path.rglob('*') if p.is_file()}
added_files = files_b - files_a
removed_files = files_a - files_b
common_files = files_a & files_b
has_changes = False
if added_files:
has_changes = True
console.print(Panel("\n".join(f"[green]+ {f}[/green]" for f in sorted(list(added_files))), title="[bold]Added Files[/bold]"))
if removed_files:
has_changes = True
console.print(Panel("\n".join(f"[red]- {f}[/red]" for f in sorted(list(removed_files))), title="[bold]Removed Files[/bold]"))
modified_files_count = 0
for file_rel_path in sorted(list(common_files)):
content_a = (src_a_path / file_rel_path).read_text()
content_b = (src_b_path / file_rel_path).read_text()
if content_a != content_b:
has_changes = True
modified_files_count += 1
diff = difflib.unified_diff(
content_a.splitlines(keepends=True),
content_b.splitlines(keepends=True),
fromfile=f"a/{file_rel_path}",
tofile=f"b/{file_rel_path}",
)
diff_text = Text()
for line in diff:
if line.startswith('+'): diff_text.append(line, style="green")
elif line.startswith('-'): diff_text.append(line, style="red")
else: diff_text.append(line)
console.print(Panel(diff_text, title=f"[bold yellow]Modified: {file_rel_path}[/bold yellow]", border_style="yellow", expand=False))
if not has_changes:
console.print(Panel("[green]No source code changes detected for this plugin.[/green]", title="Result"))
else:
console.print(f"\nFound changes in {modified_files_count} file(s).")

View File

@@ -0,0 +1,82 @@
# fourdst/cli/plugin/extract.py
import typer
import yaml
import zipfile
from pathlib import Path
import tempfile
import shutil
def plugin_extract(
plugin_name: str = typer.Argument(..., help="The name of the plugin to extract."),
bundle_path: Path = typer.Argument(..., help="The path to the .fbundle file.", exists=True, readable=True),
output_dir: Path = typer.Option(
Path("."),
"--out", "-o",
help="The directory to extract the plugin source to. Defaults to the current directory.",
file_okay=False,
dir_okay=True,
writable=True,
resolve_path=True
)
):
"""
Extracts a plugin's source code from a bundle.
"""
output_dir.mkdir(parents=True, exist_ok=True)
try:
with tempfile.TemporaryDirectory() as temp_dir_str:
temp_dir = Path(temp_dir_str)
# 1. Unpack the main bundle
typer.echo(f"Opening bundle: {bundle_path.name}")
with zipfile.ZipFile(bundle_path, 'r') as bundle_zip:
bundle_zip.extractall(temp_dir)
# 2. Read the manifest
manifest_path = temp_dir / "manifest.yaml"
if not manifest_path.exists():
typer.secho("Error: Bundle is invalid. Missing manifest.yaml.", fg=typer.colors.RED)
raise typer.Exit(code=1)
with open(manifest_path, 'r') as f:
manifest = yaml.safe_load(f)
# 3. Find the plugin and its sdist
plugin_data = manifest.get('bundlePlugins', {}).get(plugin_name)
if not plugin_data:
typer.secho(f"Error: Plugin '{plugin_name}' not found in the bundle.", fg=typer.colors.RED)
available_plugins = list(manifest.get('bundlePlugins', {}).keys())
if available_plugins:
typer.echo("Available plugins are: " + ", ".join(available_plugins))
raise typer.Exit(code=1)
sdist_info = plugin_data.get('sdist')
if not sdist_info or 'path' not in sdist_info:
typer.secho(f"Error: Source distribution (sdist) not found for plugin '{plugin_name}'.", fg=typer.colors.RED)
raise typer.Exit(code=1)
sdist_path_in_bundle = temp_dir / sdist_info['path']
if not sdist_path_in_bundle.is_file():
typer.secho(f"Error: sdist file '{sdist_info['path']}' is missing from the bundle archive.", fg=typer.colors.RED)
raise typer.Exit(code=1)
# 4. Extract the sdist to the final output directory
final_destination = output_dir / plugin_name
if final_destination.exists():
typer.secho(f"Warning: Output directory '{final_destination}' already exists. Files may be overwritten.", fg=typer.colors.YELLOW)
else:
final_destination.mkdir(parents=True)
typer.echo(f"Extracting '{plugin_name}' source to '{final_destination.resolve()}'...")
with zipfile.ZipFile(sdist_path_in_bundle, 'r') as sdist_zip:
sdist_zip.extractall(final_destination)
typer.secho(f"\n✅ Plugin '{plugin_name}' extracted successfully.", fg=typer.colors.GREEN)
except zipfile.BadZipFile:
typer.secho(f"Error: '{bundle_path}' is not a valid bundle (zip) file.", fg=typer.colors.RED)
raise typer.Exit(code=1)
except Exception as e:
typer.secho(f"An unexpected error occurred: {e}", fg=typer.colors.RED)
raise typer.Exit(code=1)

161
fourdst/cli/plugin/init.py Normal file
View File

@@ -0,0 +1,161 @@
# fourdst/cli/plugin/init.py
import typer
import sys
import shutil
from pathlib import Path
import questionary
from fourdst.cli.common.utils import run_command, get_template_content
from fourdst.cli.common.templates import GITIGNORE_CONTENT
plugin_app = typer.Typer()
@plugin_app.command("init")
def plugin_init(
project_name: str = typer.Argument(..., help="The name of the new plugin project."),
header: Path = typer.Option(..., "--header", "-H", help="Path to the C++ header file defining the plugin interface.", exists=True, file_okay=True, dir_okay=False, readable=True),
directory: Path = typer.Option(".", "-d", "--directory", help="The directory to create the project in.", resolve_path=True),
version: str = typer.Option("0.1.0", "--ver", help="The initial SemVer version of the plugin."),
libplugin_rev: str = typer.Option("main", "--libplugin-rev", help="The git revision of libplugin to use.")
):
"""
Initializes a new Meson-based C++ plugin project from an interface header.
"""
print(f"Parsing interface header: {header.name}")
interfaces = parse_cpp_header(header)
if not interfaces:
print(f"Error: No suitable interfaces (classes with pure virtual methods) found in {header}", file=sys.stderr)
raise typer.Exit(code=1)
# --- Interactive Selection ---
chosen_interface = questionary.select(
"Which interface would you like to implement?",
choices=list(interfaces.keys())
).ask()
if not chosen_interface:
raise typer.Exit() # User cancelled
print(f"Initializing plugin '{project_name}' implementing interface '{chosen_interface}'...")
# --- Code Generation ---
method_stubs = "\n".join(
f" {method['signature']} override {{\n{method['body']}\n }}"
for method in interfaces[chosen_interface]
)
class_name = ''.join(filter(str.isalnum, project_name.replace('_', ' ').title().replace(' ', ''))) + "Plugin"
root_path = directory / project_name
src_path = root_path / "src"
include_path = src_path / "include"
subprojects_path = root_path / "subprojects"
try:
src_path.mkdir(parents=True, exist_ok=True)
include_path.mkdir(exist_ok=True)
subprojects_path.mkdir(exist_ok=True)
# --- Copy interface header to make project self-contained ---
local_header_path = include_path / header.name
shutil.copy(header, local_header_path)
print(f" -> Copied interface header to {local_header_path.relative_to(root_path)}")
# --- Create libplugin.wrap file ---
libplugin_wrap_content = f"""[wrap-git]
url = https://github.com/4D-STAR/libplugin
revision = {libplugin_rev}
depth = 1
"""
(subprojects_path / "libplugin.wrap").write_text(libplugin_wrap_content)
print(f" -> Created {subprojects_path / 'libplugin.wrap'}")
# --- Create meson.build from template ---
meson_template = get_template_content("meson.build.in")
meson_content = meson_template.format(
project_name=project_name,
version=version
)
(root_path / "meson.build").write_text(meson_content)
print(f" -> Created {root_path / 'meson.build'}")
# --- Create C++ source file from template ---
cpp_template = get_template_content("plugin.cpp.in")
cpp_content = cpp_template.format(
class_name=class_name,
project_name=project_name,
interface=chosen_interface,
interface_header_path=header.name, # Use just the filename
method_stubs=method_stubs
)
(src_path / f"{project_name}.cpp").write_text(cpp_content)
print(f" -> Created {src_path / f'{project_name}.cpp'}")
# --- Create .gitignore ---
(root_path / ".gitignore").write_text(GITIGNORE_CONTENT)
print(f" -> Created .gitignore")
# --- Initialize Git Repository ---
print(" -> Initializing Git repository...")
run_command(["git", "init"], cwd=root_path)
run_command(["git", "add", "."], cwd=root_path)
commit_message = f"Initial commit: Scaffold fourdst plugin '{project_name}'"
run_command(["git", "commit", "-m", commit_message], cwd=root_path)
except OSError as e:
print(f"Error creating project structure: {e}", file=sys.stderr)
raise typer.Exit(code=1)
print("\n✅ Project initialized successfully and committed to Git!")
print("To build your new plugin:")
print(f" cd {root_path}")
print(" meson setup builddir")
print(" meson compile -C builddir")
def parse_cpp_header(header_path: Path):
"""
Parses a C++ header file using libclang to find classes and their pure virtual methods.
"""
# This function requires python-clang-16
try:
from clang import cindex
except ImportError:
print("Error: The 'init' command requires 'libclang'. Please install it.", file=sys.stderr)
print("Run: pip install python-clang-16", file=sys.stderr)
# Also ensure the libclang.so/dylib is in your system's library path.
raise typer.Exit(code=1)
if not cindex.Config.loaded:
try:
# Attempt to find libclang automatically. This may need to be configured by the user.
cindex.Config.set_library_file(cindex.conf.get_filename())
except cindex.LibclangError as e:
print(f"Error: libclang library not found. Please ensure it's installed and in your system's path.", file=sys.stderr)
print(f"Details: {e}", file=sys.stderr)
raise typer.Exit(code=1)
index = cindex.Index.create()
translation_unit = index.parse(str(header_path))
interfaces = {}
for node in translation_unit.cursor.get_children():
if node.kind == cindex.CursorKind.CLASS_DECL and node.is_pure_virtual():
# Found a class with pure virtual methods, likely an interface
interface_name = node.spelling
print(f"Found interface: {interface_name}")
methods = []
for method in node.get_children():
if method.kind == cindex.CursorKind.CXX_METHOD and method.is_pure_virtual():
# Only consider pure virtual methods
method_signature = f"{method.return_type.spelling} {method.spelling}({', '.join([arg.type.spelling for arg in method.get_arguments()])})"
method_body = "// TODO: Implement this method"
methods.append({"signature": method_signature, "body": method_body})
print(f" Found pure virtual method: {method_signature}")
interfaces[interface_name] = methods
return interfaces

100
fourdst/cli/plugin/pack.py Normal file
View File

@@ -0,0 +1,100 @@
# fourdst/cli/plugin/pack.py
import typer
import sys
import yaml
import zipfile
from pathlib import Path
from fourdst.cli.common.utils import calculate_sha256
def _validate_bundle_directory(directory: Path) -> list[str]:
"""
Validates that a directory has the structure of a valid bundle.
Returns a list of error strings. An empty list means success.
"""
errors = []
manifest_path = directory / "manifest.yaml"
if not manifest_path.is_file():
return ["Error: Missing 'manifest.yaml' in the root of the directory."]
try:
with open(manifest_path, 'r') as f:
manifest = yaml.safe_load(f)
except yaml.YAMLError as e:
return [f"Error: Invalid YAML in manifest.yaml: {e}"]
# 1. Check that all files referenced in the manifest exist
for plugin_name, plugin_data in manifest.get('bundlePlugins', {}).items():
sdist_info = plugin_data.get('sdist', {})
if sdist_info:
sdist_path = sdist_info.get('path')
if sdist_path and not (directory / sdist_path).is_file():
errors.append(f"Missing sdist file for '{plugin_name}': {sdist_path}")
for binary in plugin_data.get('binaries', []):
binary_path = binary.get('path')
if binary_path and not (directory / binary_path).is_file():
errors.append(f"Missing binary file for '{plugin_name}': {binary_path}")
# 2. If checksums exist, validate them
expected_checksum = binary.get('checksum')
if binary_path and expected_checksum:
file_to_check = directory / binary_path
if file_to_check.is_file():
actual_checksum = "sha256:" + calculate_sha256(file_to_check)
if actual_checksum != expected_checksum:
errors.append(f"Checksum mismatch for '{binary_path}'")
return errors
def plugin_pack(
folder_path: Path = typer.Argument(..., help="The directory to pack into a bundle.", exists=True, file_okay=False, dir_okay=True, readable=True),
name: str = typer.Option(None, "--name", "-n", help="The name for the output bundle file (without extension). Defaults to the folder name.")
):
"""
Validates and packs a directory into a .fbundle archive.
"""
typer.echo(f"--- Validating Bundle Directory: {folder_path.resolve()} ---")
validation_errors = _validate_bundle_directory(folder_path)
if validation_errors:
typer.secho("Validation Failed. The following issues were found:", fg=typer.colors.RED, bold=True)
for error in validation_errors:
typer.echo(f" - {error}")
raise typer.Exit(code=1)
typer.secho("✅ Validation Successful.", fg=typer.colors.GREEN)
typer.echo("\n--- Packing Bundle ---")
output_name = name if name else folder_path.name
output_path = folder_path.parent / f"{output_name}.fbundle"
if output_path.exists():
typer.secho(f"Warning: Output file {output_path} already exists and will be overwritten.", fg=typer.colors.YELLOW)
try:
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as bundle_zip:
for file_to_add in folder_path.rglob('*'):
if file_to_add.is_file():
arcname = file_to_add.relative_to(folder_path)
bundle_zip.write(file_to_add, arcname)
typer.echo(f" Adding: {arcname}")
typer.secho(f"\n✅ Successfully created bundle: {output_path.resolve()}", fg=typer.colors.GREEN, bold=True)
# Final status report
with open(folder_path / "manifest.yaml", 'r') as f:
manifest = yaml.safe_load(f)
is_signed = 'bundleAuthorKeyFingerprint' in manifest and (folder_path / "manifest.sig").exists()
if is_signed:
typer.secho("Bundle Status: ✅ SIGNED", fg=typer.colors.GREEN)
else:
typer.secho("Bundle Status: 🟡 UNSIGNED", fg=typer.colors.YELLOW)
except Exception as e:
typer.secho(f"An unexpected error occurred during packing: {e}", fg=typer.colors.RED)
raise typer.Exit(code=1)

View File

@@ -0,0 +1,74 @@
# fourdst/cli/plugin/validate.py
import typer
from pathlib import Path
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
console = Console()
def plugin_validate(
plugin_path: Path = typer.Argument(
".",
help="The path to the plugin directory to validate.",
exists=True,
file_okay=False,
dir_okay=True,
resolve_path=True
)
):
"""
Validates a plugin's structure and meson.build file.
"""
console.print(Panel(f"Validating Plugin: [bold]{plugin_path.name}[/bold]", border_style="blue"))
errors = 0
warnings = 0
def check(condition, success_msg, error_msg, is_warning=False):
nonlocal errors, warnings
if condition:
console.print(Text(f"{success_msg}", style="green"))
return True
else:
if is_warning:
console.print(Text(f"⚠️ {error_msg}", style="yellow"))
warnings += 1
else:
console.print(Text(f"{error_msg}", style="red"))
errors += 1
return False
# 1. Check for meson.build
meson_file = plugin_path / "meson.build"
if check(meson_file.exists(), "Found meson.build file.", "Missing meson.build file."):
meson_content = meson_file.read_text()
# 2. Check for project() definition
check("project(" in meson_content, "Contains project() definition.", "meson.build is missing a project() definition.", is_warning=True)
# 3. Check for shared_library()
check("shared_library(" in meson_content, "Contains shared_library() definition.", "meson.build does not appear to define a shared_library().")
# 4. Check for source files
has_cpp = any(plugin_path.rglob("*.cpp"))
has_h = any(plugin_path.rglob("*.h")) or any(plugin_path.rglob("*.hpp"))
check(has_cpp, "Found C++ source files (.cpp).", "No .cpp source files found in the directory.", is_warning=True)
check(has_h, "Found C++ header files (.h/.hpp).", "No .h or .hpp header files found in the directory.", is_warning=True)
# 5. Check for test definition (optional)
check("test(" in meson_content, "Contains test() definitions.", "No test() definitions found in meson.build. Consider adding tests.", is_warning=True)
# Final summary
console.print("-" * 40)
if errors == 0:
console.print(Panel(
f"[bold green]Validation Passed[/bold green]\nWarnings: {warnings}",
title="Result",
border_style="green"
))
else:
console.print(Panel(
f"[bold red]Validation Failed[/bold red]\nErrors: {errors}\nWarnings: {warnings}",
title="Result",
border_style="red"
))
raise typer.Exit(code=1)