Source code for qtframework.plugins.manager

"""Plugin manager for handling plugin lifecycle."""

from __future__ import annotations

import importlib.util
import sys
from pathlib import Path
from typing import TYPE_CHECKING, Any

from PySide6.QtCore import QObject, Signal

from qtframework.plugins.base import Plugin, PluginMetadata, PluginState
from qtframework.utils.exceptions import PluginError, SecurityError
from qtframework.utils.logger import get_logger


if TYPE_CHECKING:
    from collections.abc import Callable

    from qtframework.core.application import Application

logger = get_logger(__name__)


[docs] class PluginManager(QObject): """Manager for handling plugins.""" plugin_loaded = Signal(str) plugin_activated = Signal(str) plugin_deactivated = Signal(str) plugin_error = Signal(str, str) def __init__(self, application: Application | None = None) -> None: """Initialize plugin manager. Args: application: Application instance """ super().__init__() self._app = application self._plugins: dict[str, Plugin] = {} self._plugin_paths: list[Path] = [] self._hooks: dict[str, list[tuple[str, Callable]]] = {} self._loaded_modules: dict[str, str] = {} # Track loaded plugin modules for cleanup
[docs] def add_plugin_path(self, path: Path | str) -> None: """Add a plugin search path. Args: path: Path to search for plugins """ path = Path(path) if path.exists() and path not in self._plugin_paths: self._plugin_paths.append(path) logger.info("Added plugin path: %s", path)
[docs] def discover_plugins(self) -> list[PluginMetadata]: """Discover available plugins. Returns: List of discovered plugin metadata """ discovered = [] for path in self._plugin_paths: if path.is_dir(): for plugin_dir in path.iterdir(): if plugin_dir.is_dir(): metadata = self._load_plugin_metadata(plugin_dir) if metadata: discovered.append(metadata) logger.info(f"Discovered {len(discovered)} plugins") return discovered
def _load_plugin_metadata(self, plugin_dir: Path) -> PluginMetadata | None: """Load plugin metadata from directory. Args: plugin_dir: Plugin directory Returns: Plugin metadata or None """ metadata_file = plugin_dir / "plugin.json" if metadata_file.exists(): import json try: with Path(metadata_file).open(encoding="utf-8") as f: data = json.load(f) return PluginMetadata(**data) except Exception as e: logger.exception("Failed to load metadata from %s: %s", metadata_file, e) return None
[docs] def load_plugin(self, plugin_id: str, plugin_path: Path | None = None) -> bool: """Load a plugin with security validation. Args: plugin_id: Plugin ID plugin_path: Optional specific plugin path Returns: True if plugin loaded successfully Raises: PluginError: If plugin loading fails SecurityError: If security validation fails """ if not isinstance(plugin_id, str) or not plugin_id: raise PluginError("Plugin ID must be a non-empty string", plugin_id=plugin_id) if plugin_id in self._plugins: logger.warning("Plugin %s already loaded", plugin_id) return True try: if plugin_path: # Validate plugin path security if not self._validate_plugin_path_security(plugin_path): raise SecurityError( f"Plugin path failed security validation: {plugin_path}", security_context="plugin_loading", attempted_action="load_plugin", ) plugin = self._load_plugin_from_path(plugin_path) else: plugin = self._find_and_load_plugin(plugin_id) if plugin: # Validate plugin instance self._validate_plugin_instance(plugin, plugin_id) self._plugins[plugin_id] = plugin if self._app is not None: plugin.set_application(self._app) plugin.set_state(PluginState.LOADED) if plugin.initialize(): self.plugin_loaded.emit(plugin_id) logger.info("Plugin %s loaded successfully", plugin_id) return True plugin.set_state(PluginState.ERROR) del self._plugins[plugin_id] raise PluginError( f"Plugin {plugin_id} initialization failed", plugin_id=plugin_id, operation="initialize", ) raise PluginError( f"Failed to load plugin {plugin_id}", plugin_id=plugin_id, operation="load" ) except (PluginError, SecurityError) as e: self.plugin_error.emit(plugin_id, str(e)) raise except Exception as e: error = PluginError( f"Unexpected error loading plugin {plugin_id}: {e}", plugin_id=plugin_id, operation="load", ) self.plugin_error.emit(plugin_id, str(error)) raise error
def _load_plugin_from_path(self, path: Path) -> Plugin | None: """Load plugin from specific path. Args: path: Plugin path Returns: Plugin instance or None """ main_file = path / "main.py" if not main_file.exists(): return None module_name = f"plugin_{path.name}" spec = importlib.util.spec_from_file_location(module_name, main_file) if spec and spec.loader: module = importlib.util.module_from_spec(spec) sys.modules[module_name] = module # Register in sys.modules spec.loader.exec_module(module) if hasattr(module, "create_plugin"): plugin = module.create_plugin() if isinstance(plugin, Plugin): # Track the module for cleanup self._loaded_modules[plugin.metadata.id] = module_name return plugin return None def _find_and_load_plugin(self, plugin_id: str) -> Plugin | None: """Find and load plugin by ID. Args: plugin_id: Plugin ID Returns: Plugin instance or None """ for path in self._plugin_paths: plugin_dir = path / plugin_id if plugin_dir.exists(): return self._load_plugin_from_path(plugin_dir) return None
[docs] def activate_plugin(self, plugin_id: str) -> bool: """Activate a plugin. Args: plugin_id: Plugin ID Returns: True if plugin activated successfully Raises: PluginError: If plugin is not loaded or activation fails RuntimeError: If plugin dependencies are not met """ plugin = self._plugins.get(plugin_id) if not plugin: logger.error("Plugin %s not found", plugin_id) return False if plugin.state == PluginState.ACTIVE: logger.warning("Plugin %s already active", plugin_id) return True try: if plugin.activate(): plugin.set_state(PluginState.ACTIVE) self.plugin_activated.emit(plugin_id) logger.info("Plugin %s activated", plugin_id) return True plugin.set_state(PluginState.ERROR) return False except Exception as e: logger.exception("Failed to activate plugin %s: %s", plugin_id, e) plugin.set_state(PluginState.ERROR) self.plugin_error.emit(plugin_id, str(e)) return False
[docs] def deactivate_plugin(self, plugin_id: str) -> bool: """Deactivate a plugin. Args: plugin_id: Plugin ID Returns: True if plugin deactivated successfully """ plugin = self._plugins.get(plugin_id) if not plugin: logger.error("Plugin %s not found", plugin_id) return False if plugin.state != PluginState.ACTIVE: logger.warning("Plugin %s not active", plugin_id) return True try: if plugin.deactivate(): plugin.set_state(PluginState.LOADED) self.plugin_deactivated.emit(plugin_id) logger.info("Plugin %s deactivated", plugin_id) return True return False except Exception as e: logger.exception("Failed to deactivate plugin %s: %s", plugin_id, e) self.plugin_error.emit(plugin_id, str(e)) return False
[docs] def unload_plugin(self, plugin_id: str) -> bool: """Unload a plugin. Args: plugin_id: Plugin ID Returns: True if plugin unloaded successfully """ plugin = self._plugins.get(plugin_id) if not plugin: return True if plugin.state == PluginState.ACTIVE: self.deactivate_plugin(plugin_id) # Clean up plugin module from sys.modules if plugin_id in self._loaded_modules: module_name = self._loaded_modules[plugin_id] if module_name in sys.modules: del sys.modules[module_name] logger.debug(f"Cleaned up module {module_name} for plugin {plugin_id}") del self._loaded_modules[plugin_id] try: plugin.cleanup() del self._plugins[plugin_id] logger.info("Plugin %s unloaded", plugin_id) return True except Exception as e: logger.exception("Failed to unload plugin %s: %s", plugin_id, e) return False
[docs] def get_plugin(self, plugin_id: str) -> Plugin | None: """Get a plugin by ID. Args: plugin_id: Plugin ID Returns: Plugin instance or None """ return self._plugins.get(plugin_id)
[docs] def get_all_plugins(self) -> dict[str, Plugin]: """Get all loaded plugins. Returns: Dictionary of plugins """ return self._plugins.copy()
[docs] def get_active_plugins(self) -> list[Plugin]: """Get all active plugins. Returns: List of active plugins """ return [p for p in self._plugins.values() if p.state == PluginState.ACTIVE]
[docs] def register_global_hook(self, hook_name: str, plugin_id: str, callback: Callable) -> None: """Register a global hook. Args: hook_name: Hook name plugin_id: Plugin ID callback: Callback function """ if hook_name not in self._hooks: self._hooks[hook_name] = [] self._hooks[hook_name].append((plugin_id, callback))
[docs] def trigger_global_hook(self, hook_name: str, *args: Any, **kwargs: Any) -> list[Any]: """Trigger a global hook. Args: hook_name: Hook name *args: Positional arguments **kwargs: Keyword arguments Returns: List of hook results """ results = [] if hook_name in self._hooks: for plugin_id, callback in self._hooks[hook_name]: try: result = callback(*args, **kwargs) results.append((plugin_id, result)) except Exception as e: logger.exception("Hook error in %s.%s: %s", plugin_id, hook_name, e) return results
def _validate_plugin_path_security(self, path: Path) -> bool: """Validate plugin path for security concerns. Args: path: Plugin path to validate Returns: True if path passes security validation """ try: # Ensure path exists and is a directory if not path.exists() or not path.is_dir(): logger.warning("Plugin path does not exist or is not a directory: %s", path) return False # Check for restricted directories restricted_paths = [ Path.home(), Path("/"), Path("C:\\"), Path("/usr"), Path("/bin"), Path("/sys"), Path("C:\\Windows"), Path("C:\\Program Files"), ] for restricted in restricted_paths: try: if path.is_relative_to(restricted): logger.warning("Plugin path in restricted directory: %s", path) return False except (ValueError, TypeError): continue # Check plugin directory size (prevent loading huge plugins) total_size = sum(f.stat().st_size for f in path.rglob("*") if f.is_file()) if total_size > 50 * 1024 * 1024: # 50MB limit logger.warning("Plugin directory too large: %s (%s bytes)", path, total_size) return False # Check for main.py file main_file = path / "main.py" if not main_file.exists(): logger.warning("Plugin missing main.py file: %s", path) return False # Basic security scan of main.py return self._validate_plugin_file_content(main_file) except Exception as e: logger.exception("Error validating plugin path security: %s", e) return False def _validate_plugin_file_content(self, file_path: Path) -> bool: """Validate plugin file content for security. Args: file_path: Path to plugin file Returns: True if file content is safe """ try: content = file_path.read_text(encoding="utf-8") # Check for suspicious imports/operations suspicious_patterns = [ "import os", "import sys", "import subprocess", "import shutil", "exec(", "eval(", "__import__", "open(", "file(", "urllib", "requests", "socket", "tempfile", ] for pattern in suspicious_patterns: if pattern in content: logger.warning( "Suspicious pattern '%s' found in plugin file: %s", pattern, file_path ) return False return True except Exception as e: logger.exception("Error reading plugin file for security validation: %s", e) return False def _validate_plugin_instance(self, plugin: Plugin, plugin_id: str) -> None: """Validate plugin instance. Args: plugin: Plugin instance to validate plugin_id: Plugin ID Raises: PluginError: If plugin validation fails """ # Check if plugin is actually a Plugin instance if not isinstance(plugin, Plugin): raise PluginError( f"Plugin {plugin_id} is not a valid Plugin instance", plugin_id=plugin_id ) # Check required methods required_methods = ["initialize", "activate", "deactivate", "cleanup"] for method in required_methods: if not hasattr(plugin, method) or not callable(getattr(plugin, method)): raise PluginError( f"Plugin {plugin_id} missing required method: {method}", plugin_id=plugin_id ) # Check plugin metadata if available if hasattr(plugin, "metadata"): metadata = plugin.metadata if metadata and hasattr(metadata, "id") and metadata.id != plugin_id: raise PluginError( f"Plugin metadata ID mismatch: expected {plugin_id}, got {metadata.id}", plugin_id=plugin_id, )