"""Configuration storage and access."""
from __future__ import annotations
import copy
from typing import TYPE_CHECKING, Any
from PySide6.QtCore import QObject, Signal
from qtframework.utils.logger import get_logger
if TYPE_CHECKING:
from collections.abc import Callable
from pydantic import BaseModel
logger = get_logger(__name__)
_MISSING = object()
[docs]
class Config(QObject):
"""Configuration container with dot notation access."""
value_changed = Signal(str, object)
config_reloaded = Signal()
def __init__(self, data: dict[str, Any] | None = None) -> None:
"""Initialize config.
Args:
data: Initial configuration data
"""
super().__init__()
self._data = data or {}
self._watchers: dict[str, list[Callable]] = {}
[docs]
def get(self, key: str, default: Any = None) -> Any:
"""Get configuration value.
Args:
key: Dot-separated key path
default: Default value if not found
Returns:
Configuration value
Raises:
ValueError: If key is empty or malformed
"""
keys = key.split(".")
value: Any = self._data
for k in keys:
if not isinstance(value, dict):
return default
value = value.get(k)
if value is None:
return default
return value
[docs]
def set(self, key: str, value: Any) -> None:
"""Set configuration value.
Args:
key: Dot-separated key path
value: Value to set
"""
keys = key.split(".")
data = self._data
for k in keys[:-1]:
if k not in data:
data[k] = {}
data = data[k]
old_value = data.get(keys[-1])
data[keys[-1]] = value
if old_value != value:
self.value_changed.emit(key, value)
self._notify_watchers(key, value)
[docs]
def delete(self, key: str) -> bool:
"""Delete configuration value.
Args:
key: Dot-separated key path
Returns:
True if deleted
"""
keys = key.split(".")
data = self._data
for k in keys[:-1]:
if k not in data:
return False
data = data[k]
if keys[-1] in data:
del data[keys[-1]]
self.value_changed.emit(key, None)
self._notify_watchers(key, None)
return True
return False
[docs]
def has(self, key: str) -> bool:
"""Check if configuration key exists.
Args:
key: Dot-separated key path
Returns:
True if exists
"""
sentinel = _MISSING
return self.get(key, sentinel) is not sentinel
[docs]
def watch(self, key: str, callback: Callable[[Any], None]) -> Callable[[], None]:
"""Watch configuration value changes.
Args:
key: Key to watch
callback: Callback function
Returns:
Unwatch function
"""
if key not in self._watchers:
self._watchers[key] = []
self._watchers[key].append(callback)
def unwatch() -> None:
"""Remove the watcher callback."""
if key in self._watchers and callback in self._watchers[key]:
self._watchers[key].remove(callback)
return unwatch
def _notify_watchers(self, key: str, value: Any) -> None:
"""Notify watchers of value change.
Args:
key: Changed key
value: New value
"""
# Direct watchers
if key in self._watchers:
for callback in self._watchers[key][:]:
try:
callback(value)
except Exception as e:
logger.exception("Watcher error for %s: %s", key, e)
# Parent watchers (watch entire sections)
parts = key.split(".")
for i in range(len(parts) - 1):
parent_key = ".".join(parts[: i + 1])
if parent_key in self._watchers:
parent_value = self.get(parent_key)
for callback in self._watchers[parent_key][:]:
try:
callback(parent_value)
except Exception as e:
logger.exception("Parent watcher error for %s: %s", parent_key, e)
[docs]
def merge(self, data: dict[str, Any], deep: bool = True) -> None:
"""Merge configuration data.
Args:
data: Data to merge
deep: Perform deep merge
"""
if deep:
self._data = self._deep_merge(self._data, data)
else:
self._data.update(data)
self.config_reloaded.emit()
def _deep_merge(self, base: dict, update: dict) -> dict:
"""Deep merge dictionaries.
Args:
base: Base dictionary
update: Update dictionary
Returns:
Merged dictionary
"""
result = copy.deepcopy(base)
for key, value in update.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = self._deep_merge(result[key], value)
else:
result[key] = copy.deepcopy(value)
return result
[docs]
def to_dict(self) -> dict[str, Any]:
"""Get configuration as dictionary.
Returns:
Configuration dictionary
"""
return copy.deepcopy(self._data)
[docs]
def from_dict(self, data: dict[str, Any]) -> None:
"""Load configuration from dictionary.
Args:
data: Configuration dictionary
"""
self._data = copy.deepcopy(data)
self.config_reloaded.emit()
[docs]
def clear(self) -> None:
"""Clear all configuration."""
self._data.clear()
self.config_reloaded.emit()
[docs]
def keys(self, prefix: str = "") -> list[str]:
"""Get all configuration keys.
Args:
prefix: Key prefix filter
Returns:
List of keys
"""
def extract_keys(data: dict, parent: str = "") -> list[str]:
"""Recursively extract all config keys.
Args:
data: Dictionary to extract keys from
parent: Parent key path
Returns:
List of dot-separated keys
"""
keys = []
for key, value in data.items():
full_key = f"{parent}.{key}" if parent else key
keys.append(full_key)
if isinstance(value, dict):
keys.extend(extract_keys(value, full_key))
return keys
all_keys = extract_keys(self._data)
if prefix:
return [k for k in all_keys if k.startswith(prefix)]
return all_keys
[docs]
def validate(self, schema: type[BaseModel]) -> bool:
"""Validate configuration against schema.
Args:
schema: Pydantic model schema
Returns:
True if valid
"""
try:
schema(**self._data)
return True
except Exception:
return False
def __getitem__(self, key: str) -> Any:
"""Get item by key.
Args:
key: Configuration key
Returns:
Configuration value
Raises:
KeyError: If configuration key is not found
ValueError: If key is empty or malformed
"""
value = self.get(key)
if value is None:
raise KeyError(f"Configuration key not found: {key}")
return value
def __setitem__(self, key: str, value: Any) -> None:
"""Set item by key.
Args:
key: Configuration key
value: Value to set
"""
self.set(key, value)
def __contains__(self, key: str) -> bool:
"""Check if key exists.
Args:
key: Configuration key
Returns:
True if exists
"""
return self.has(key)