"""Redux-like state store implementation.
This module provides a centralized state management system based on Redux patterns,
featuring unidirectional data flow, pure reducers, and middleware support.
Example:
Basic store usage with reducer::
from qtframework.state import Store, Action, create_reducer
# Define initial state
initial_state = {"count": 0, "user": None}
# Create reducer
def counter_reducer(state, action):
if action.type == "INCREMENT":
return {**state, "count": state["count"] + 1}
elif action.type == "DECREMENT":
return {**state, "count": state["count"] - 1}
elif action.type == "SET_USER":
return {**state, "user": action.payload}
return state
# Create store
store = Store(reducer=counter_reducer, initial_state=initial_state)
# Subscribe to changes
def on_change(state):
print(f"Count: {state['count']}")
unsubscribe = store.subscribe(on_change)
# Dispatch actions
store.dispatch(Action(type="INCREMENT")) # Output: Count: 1
store.dispatch(Action(type="INCREMENT")) # Output: Count: 2
# Unsubscribe when done
unsubscribe()
With middleware::
from qtframework.state import logger_middleware, thunk_middleware
store = Store(
reducer=counter_reducer,
initial_state=initial_state,
middleware=[logger_middleware(), thunk_middleware()],
)
# Middleware processes actions before they reach the reducer
store.dispatch(Action(type="INCREMENT"))
# Logger middleware will log the action and state changes
Combining reducers::
from qtframework.state import combine_reducers
def user_reducer(state=None, action=None):
if action.type == "SET_USER":
return action.payload
elif action.type == "LOGOUT":
return None
return state
def items_reducer(state=None, action=None):
if state is None:
state = []
if action.type == "ADD_ITEM":
return [*state, action.payload]
elif action.type == "REMOVE_ITEM":
return [item for item in state if item["id"] != action.payload]
return state
root_reducer = combine_reducers({
"user": user_reducer,
"items": items_reducer,
"count": counter_reducer,
})
store = Store(
reducer=root_reducer, initial_state={"user": None, "items": [], "count": 0}
)
See Also:
- :class:`Action`: Action objects for dispatching
- :mod:`qtframework.state.middleware`: Middleware implementations
- :mod:`qtframework.state.reducers`: Reducer utilities
"""
from __future__ import annotations
import collections
import copy
import threading
from typing import TYPE_CHECKING, Any
from PySide6.QtCore import QObject, Signal
from qtframework.state.actions import Action
from qtframework.utils.logger import get_logger
if TYPE_CHECKING:
from qtframework.state.middleware import Middleware
from qtframework.state.reducers import Reducer
logger = get_logger(__name__)
[docs]
class Store(QObject):
"""Centralized state store with Redux-like pattern.
The Store holds the application state and provides methods to:
- Dispatch actions to modify state
- Subscribe to state changes
- Access current state (read-only)
- Navigate state history (time-travel debugging)
Signals:
state_changed(dict): Emitted when state changes
action_dispatched(dict): Emitted when action is dispatched
Example:
Creating and using a store::
store = Store(reducer=my_reducer, initial_state={"count": 0})
# Subscribe to changes
store.state_changed.connect(lambda state: print(state))
# Dispatch actions
store.dispatch(Action(type="INCREMENT"))
"""
state_changed = Signal(dict)
action_dispatched = Signal(dict)
def __init__(
self,
reducer: Reducer,
initial_state: dict[str, Any] | None = None,
middleware: list[Middleware] | None = None,
) -> None:
"""Initialize store.
Args:
reducer: Root reducer function that transforms state
initial_state: Initial state dictionary (default: empty dict)
middleware: List of middleware to process actions
Raises:
TypeError: If reducer is not callable or middleware entries are not callable
"""
super().__init__()
self._reducer = reducer
self._state = initial_state or {}
self._middleware = middleware or []
self._subscribers: list[collections.abc.Callable[[dict[str, Any]], None]] = []
self._lock = threading.RLock() # Reentrant lock for thread-safe operations
self._is_dispatching = False
self._history: list[dict[str, Any]] = []
self._history_index = -1
self._max_history = 100
if not callable(reducer):
raise TypeError("Reducer must be callable")
for mw in self._middleware:
if not callable(mw):
raise TypeError("Middleware entries must be callable")
# Initialize state
self._state = self._reducer(self._state, Action(type="@@INIT"))
self._save_to_history()
@property
def state(self) -> dict[str, Any]:
"""Get current state (read-only copy)."""
with self._lock:
return copy.deepcopy(self._state)
[docs]
def get_state(self) -> dict[str, Any]:
"""Get current state.
Returns:
Copy of current state
"""
return self.state
[docs]
def dispatch(self, action: Action | dict[str, Any]) -> Action:
"""Dispatch an action to update state.
Actions are processed through middleware (if any) before reaching
the reducer. The reducer creates a new state, which triggers
subscriber notifications.
Args:
action: Action to dispatch (Action object or dict with type/payload)
Returns:
Dispatched action (converted to Action if dict was provided)
Raises:
RuntimeError: If dispatch is called while already dispatching
(prevents nested dispatches which can cause race conditions)
TypeError: If action cannot be converted to Action
Example::
# Dispatch with Action object
store.dispatch(Action(type="INCREMENT"))
# Dispatch with dict
store.dispatch({"type": "SET_USER", "payload": {"id": 1}})
"""
if isinstance(action, dict):
action = Action(**action)
logger.debug(f"Dispatching action: {action.type}")
# Apply middleware
dispatch_func: collections.abc.Callable[[Action], Action] = self._dispatch_core
for middleware in reversed(self._middleware):
dispatch_func = middleware(self)(dispatch_func)
return dispatch_func(action)
def _dispatch_core(self, action: Action) -> Action:
"""Core dispatch logic.
Args:
action: Action to dispatch
Returns:
Dispatched action
"""
with self._lock:
if self._is_dispatching:
raise RuntimeError("Cannot dispatch while reducing")
self._is_dispatching = True
try:
old_state = copy.deepcopy(self._state)
self._state = self._reducer(self._state, action)
if self._state != old_state:
self._save_to_history()
self.state_changed.emit(self.state)
self._notify_subscribers()
self.action_dispatched.emit(action.to_dict())
finally:
self._is_dispatching = False
return action
[docs]
def subscribe(
self, callback: collections.abc.Callable[[dict[str, Any]], None]
) -> collections.abc.Callable[[], None]:
"""Subscribe to state changes.
Args:
callback: Callback function
Returns:
Unsubscribe function
"""
with self._lock:
self._subscribers.append(callback)
def unsubscribe() -> None:
"""Remove the subscriber callback."""
with self._lock:
if callback in self._subscribers:
self._subscribers.remove(callback)
return unsubscribe
def _notify_subscribers(self) -> None:
"""Notify all subscribers of state change."""
state = self.state
for subscriber in self._subscribers[:]:
try:
subscriber(state)
except Exception as e:
logger.exception("Subscriber error: %s", e)
[docs]
def replace_reducer(self, reducer: Reducer) -> None:
"""Replace the root reducer.
Args:
reducer: New reducer
"""
self._reducer = reducer
self.dispatch(Action(type="@@REPLACE"))
[docs]
def add_middleware(self, middleware: Middleware) -> None:
"""Add middleware to the store.
Args:
middleware: Middleware to add
"""
with self._lock:
self._middleware.append(middleware)
[docs]
def remove_middleware(self, middleware: Middleware) -> bool:
"""Remove middleware from the store.
Args:
middleware: Middleware to remove
Returns:
True if middleware was found and removed, False otherwise
"""
with self._lock:
try:
self._middleware.remove(middleware)
return True
except ValueError:
return False
def _save_to_history(self) -> None:
"""Save current state to history."""
# Remove any states after current index
self._history = self._history[: self._history_index + 1]
# Add current state
self._history.append(copy.deepcopy(self._state))
self._history_index += 1
# Limit history size
if len(self._history) > self._max_history:
self._history = self._history[-self._max_history :]
self._history_index = len(self._history) - 1
[docs]
def undo(self) -> bool:
"""Undo last state change.
Returns:
True if undo successful
"""
with self._lock:
if self._history_index > 0:
self._history_index -= 1
self._state = copy.deepcopy(self._history[self._history_index])
self.state_changed.emit(self.state)
self._notify_subscribers()
return True
return False
[docs]
def redo(self) -> bool:
"""Redo previously undone state change.
Returns:
True if redo successful
"""
with self._lock:
if self._history_index < len(self._history) - 1:
self._history_index += 1
self._state = copy.deepcopy(self._history[self._history_index])
self.state_changed.emit(self.state)
self._notify_subscribers()
return True
return False
[docs]
def reset(self, state: dict[str, Any] | None = None) -> None:
"""Reset store to initial or provided state.
Args:
state: State to reset to
"""
with self._lock:
self._state = state or {}
self._state = self._reducer(self._state, Action(type="@@RESET"))
self._history = [copy.deepcopy(self._state)]
self._history_index = 0
self.state_changed.emit(self.state)
self._notify_subscribers()
[docs]
def get_history(self) -> list[dict[str, Any]]:
"""Get state history for time-travel debugging.
Returns:
Deep copy of all historical states (up to 100 states)
Note:
Returns a deep copy of history to prevent external modifications.
For large state objects, this may be memory-intensive. History is
automatically limited to the last 100 states (configurable via _max_history).
Example::
# Get and display history
history = store.get_history()
for i, state in enumerate(history):
print(f"State {i}: {state}")
"""
return copy.deepcopy(self._history)
[docs]
def select(self, selector: collections.abc.Callable[[dict[str, Any]], Any]) -> Any:
"""Select a value from state.
Args:
selector: Selector function
Returns:
Selected value
"""
with self._lock:
return selector(self._state)
[docs]
def select_path(self, path: str) -> Any:
"""Select value by path (e.g., 'user.profile.name').
Args:
path: Dot-separated path
Returns:
Value at path or None
"""
with self._lock:
keys = path.split(".")
value = self._state
for key in keys:
if isinstance(value, dict) and key in value:
value = value[key]
else:
return None
return value