The Missing Manual for Signals: State Management for Python Developers

A practical guide to reactive state management with Signals in Python
For All Signals Developers: While this manual uses Python examples, the concepts apply universally to Signals in Angular, SolidJS, Vue, and beyond. Unlike most tutorials that focus on UI rendering, this explores the fundamental mental models of reactive state management that make Signals powerful across domains.
Introduction
I maintain reaktiv. When I demo it to Python teams, I get the same response: "Why do I need this? I can just call functions when things change."
Fair question. Python has excellent patterns for coordinating state changes. You can trigger updates manually, use the observer pattern, or set up event systems. Most Python applications handle state coordination just fine.
But some don't.
If you're building systems where state changes cascade through multiple components, where derived values need to stay synchronized, or where manual coordination is becoming a maintenance burden - Signals might solve real problems for you.
Frontend developers recognize the pattern immediately. They've dealt with forgetting to trigger updates when state changes, or having component state get out of sync. Signals solve the "forgot to update X when Y changed" class of bugs.
This manual shows you when that coordination problem is worth solving with reactive programming, and when it's not.
What You'll Learn
- The fundamental mental models behind reactive state management
- How to think declaratively about state relationships instead of imperatively
- Why "inversion of control" makes Signals powerful across domains beyond UI rendering
- Mental preparation for understanding Signals
Let's start with what breaks as state coordination scales.
Table of Contents
- The Problem with Traditional State Management
- What Are Signals, Really?
- The Mental Model Shift
- When Signals Matter (And When They Don't)
- Common Patterns and Anti-Patterns
- Understanding Fine-Grained Reactivity
- Effects: The Bridge to the Outside World
- Temporal Concerns: Time and Signals
- Real-World Scenarios
- Conclusion
- Learning from Frontend Innovations
- Additional Resources
- A Humble Thanks to the Hacker News Community
The Problem with Traditional State Management
As developers, we've all written variations of this code:
class OrderService:
def __init__(self):
self.orders = []
self.total_revenue = 0.0
self.avg_order_value = 0.0
self.notification_service = NotificationService()
self.analytics_service = AnalyticsService()
def add_order(self, order):
# Step 1: Update base data
self.orders.append(order)
# Step 2: Update derived values (order matters!)
self.total_revenue += order.amount
self.avg_order_value = self.total_revenue / len(self.orders)
# Step 3: Send notifications (depends on derived values)
self._send_notifications(order)
# Step 4: Track analytics (also depends on derived values)
self._track_analytics(order)
def _send_notifications(self, order):
# Notification logic depends on up-to-date averages
if order.amount > self.avg_order_value * 2:
self.notification_service.send_high_value_alert(order)
if len(self.orders) % 100 == 0:
self.notification_service.send_milestone_alert(len(self.orders))
def _track_analytics(self, order):
self.analytics_service.track_order(order)
self.analytics_service.track_revenue_milestone(self.total_revenue)
self.analytics_service.track_avg_order_value(self.avg_order_value)
This looks reasonable at first glance. But let's visualize the hidden complexity:
The Hidden Dependencies
The real problem isn't visible in the code - it's the implicit dependency graph:
These dependencies are implicit and manually maintained. The notifications depend on avg_order_value
being calculated correctly, which depends on both total_revenue
and the orders list length being accurate. Miss the order or forget to update one value? You'll send notifications based on stale data.
1. Tight Coupling Through Side Effects
Every time we add an order, we must remember to update:
- Total revenue
- Average order value (which depends on the first two)
- Notifications (which depend on the average)
- Analytics (which depend on multiple values)
Miss one update? Silent bugs. Add a new derived value? Modify every entry point.
2. Implicit Dependencies
The relationship between orders and derived state is buried in imperative code. New developers (or future you) must trace through method calls to understand what depends on what.
3. Inconsistent State Windows
Between the moment orders.append(order)
executes and avg_order_value
is recalculated, your system is in an inconsistent state. In concurrent environments, this creates race conditions.
4. Testing Complexity
Testing requires mocking all the side effects, or carefully orchestrating partial updates. Want to test just the average calculation? Good luck isolating it.
5. Performance Blind Spots
Every order addition triggers every derived calculation, even if only some values are actually needed. No easy way to optimize without restructuring.
What Are Signals, Really?
Signals aren't just "reactive variables." They're a dependency graph abstraction that inverts the control flow of state management.
Why "Inverted" Control Flow?
The control flow is "inverted" because Signals flip the direction of how updates propagate through your system:
Traditional Control Flow (Push-based): When data changes, you actively push updates outward:
Signals Control Flow (Pull-based): When data changes, the system automatically pulls updates as needed:
Traditional (You Control): Manual Push Updates
When data changes, you decide what to update. This works fine when you have a single entry point:
# Works well for a single function
def update_user(new_user):
self.user = new_user
self.user_display = format_user(self.user) # You remember this
self.user_permissions = calc_perms(self.user) # You remember this
self.send_notification(self.user) # You remember this
But what happens when you need to modify user data from multiple places?
def update_user_email(new_email):
self.user.email = new_email
# Oops! Forgot to update user_display and user_permissions
# They still show the old email
def update_user_role(new_role):
self.user.role = new_role
# Forgot to recalculate permissions and send notifications
def bulk_import_users(users):
self.users.extend(users)
# Do we call update_user_display for each?
# Do we send 1000 notifications or batch them?
# What's the right coordination here?
You are in control of:
- Remembering what depends on what
- Calling update functions in the right order
- Handling the timing of updates
- Making sure nothing gets forgotten
Signals (System Controls): Automatic Pull Updates
When data changes, the system decides what to update:
# System automatically pulls updates as needed
user = Signal(initial_user)
user_display = Computed(lambda: format_user(user())) # System tracks this dependency
user_permissions = Computed(lambda: calc_perms(user())) # System tracks this dependency
notify_effect = Effect(lambda: send_notification(user())) # System tracks this dependency
# You just change the data, system handles the rest
user.set(new_user) # System automatically updates everything that depends on user
The system is in control of:
- Tracking what depends on what (dependency graph)
- Triggering updates automatically and synchronously
- Handling update order (topological dependency resolution)
- Ensuring nothing gets missed
The control of "what to update when" shifts from you (manual, error-prone) to the system (automatic, reliable). This is the "inversion" - you're no longer responsible for managing the update flow.
The Dependency Graph Model
Instead of push-based updates (imperative):
# When X changes, manually update Y and Z
x = new_value
y = calculate_y(x)
z = calculate_z(x, y)
notify_observers(x, y, z)
Signals provide pull-based derivation (declarative):
# Define relationships once
x = Signal(initial_value)
y = Computed(lambda: calculate_y(x()))
z = Computed(lambda: calculate_z(x(), y()))
notify_effect = Effect(lambda: notify_observers(x(), y(), z()))
# Updates happen automatically
x.set(new_value) # y, z, and notifications update automatically
Important: Signals are value containers, not event streams. If you're thinking "this sounds like event listeners," there's a key difference. Signals hold current state and create a snapshot of your application at any point in time. When you call signal()
, you get the current value - not a subscription to future events.
# Signal: value container (current state)
user_count = Signal(42)
print(user_count()) # 42 - current value, right now
# Event listener: reacts to future events
button.addEventListener('click', handler) # waits for future clicks
This distinction matters. Signals create a state graph - a snapshot of how values relate to each other at any moment. Event listeners create reaction patterns-responses to things happening over time.
The Three Primitives
Holds value
Notifies changes] B[Computed
Derives from others
Caches result] C[Effect
Performs side effects
Runs when deps change] end A --> B A --> C B --> B2[Other Computed] style A fill:#2196F3,color:#fff style B fill:#9C27B0,color:#fff style C fill:#FF9800,color:#fff
Think of them as:
- Signal: A cell in a spreadsheet that holds a value
- Computed: A formula cell that derives from other cells (e.g.,
=A1+B1
) - Effect: A macro that runs when referenced cells change
The key insight: your entire application state becomes a live spreadsheet where changing one cell automatically updates all dependent cells.
Building Complex Systems: The Emerging DAG
In practice, you rarely create just one Signal. Production applications that benefit from Signals typically end up with dozens of interconnected reactive primitives forming a Directed Acyclic Graph (DAG) of dependencies.
This isn't accidental complexity - it's emergent architecture. Each Signal represents a boundary of concern, each Computed value captures a specific derivation rule, and each Effect handles a particular side effect. The DAG structure ensures:
1. No Circular Dependencies: The system prevents infinite update loops by design.
2. Predictable Update Order: Changes flow through the graph in topological order - dependencies always resolve before dependents.
3. Efficient Invalidation: Only the affected branches of the graph recompute when source data changes.
The Insight: The DAG isn't something you design upfront - it emerges from your domain model. Start with the natural boundaries in your problem space. User authentication state, configuration values, and external data feeds typically become your root Signals. Business logic rules become Computed values. Integration points with external systems become Effects.
# A typical enterprise application with multiple domains and cross-cutting concerns
class ApplicationState:
def __init__(self):
# Root signals (external inputs)
self.user_session = Signal(None)
self.feature_flags = Signal({})
self.market_data = Signal({})
self.system_config = Signal({})
# Derived state (business logic)
self.user_context = Computed(lambda: self._build_user_context())
self.enabled_features = Computed(lambda: self._filter_features())
self.trading_metrics = Computed(lambda: self._calculate_metrics())
self.dashboard_config = Computed(lambda: self._build_dashboard())
# Cross-cutting derived state
self.audit_context = Computed(lambda: self._build_audit_context())
self.performance_metrics = Computed(lambda: self._calculate_performance())
# Effects (external synchronization)
self._ui_sync = Effect(lambda: self._sync_ui())
self._audit_log = Effect(lambda: self._log_audit_events())
self._metrics_reporter = Effect(lambda: self._report_metrics())
self._cache_manager = Effect(lambda: self._manage_cache())
The beauty is that changing user_session
automatically propagates through exactly the right subset of the graph - no manual coordination required.
State Snapshots vs Event Reactions
When you access a signal, you're asking: "What's the current state?" When you set up an event listener, you're saying: "Do this when something happens later."
Example: Order Processing with Signals
The Mental Model Shift
The hardest part about adopting Signals isn't the API - it's the mental model shift from imperative to declarative state management.
Before vs After: Visualization
Before: Imperative Thinking
"When this happens, do these things in this order."
def process_user_action(user_id, action):
user = get_user(user_id)
user.last_action = action
user.last_active = datetime.now()
update_user_stats(user)
check_achievement_progress(user)
update_leaderboard(user)
send_activity_notification(user)
log_user_activity(user, action)
After: Declarative Thinking
"These relationships always hold true."
# Define relationships once
user_action = Signal(None)
user_last_active = Computed(lambda: datetime.now() if user_action() else None)
user_stats = Computed(lambda: calculate_stats(user_action()))
achievements = Computed(lambda: check_achievements(user_stats()))
leaderboard_position = Computed(lambda: calculate_position(user_stats()))
# Effects for side effects
notify_effect = Effect(lambda: send_notification(user_stats()) if user_action() else None)
log_effect = Effect(lambda: log_activity(user_action()) if user_action() else None)
# Usage becomes simple
def process_user_action(user_id, action):
user_action.set(action) # Everything else happens automatically
Dependency Flow Visualization
When Signals Matter (And When They Don't)
Signals Shine When: Visual Patterns
Signals Are Overkill When:
Transformations] B[One-Shot
Calculations] C[Pure Request-Response
Patterns] end subgraph "✅ Use Regular Functions" D[validate → enrich → save] E["calculate_tax(order)"] F[HTTP GET /users/123] end A --> D B --> E C --> F style A fill:#F44336,color:#fff style B fill:#F44336,color:#fff style C fill:#F44336,color:#fff style D fill:#4CAF50,color:#fff style E fill:#4CAF50,color:#fff style F fill:#4CAF50,color:#fff
Common Patterns and Anti-Patterns
Core Principle: Computed Signals Must Remain Pure
Golden Rule: Computed signals should never have side effects. They represent pure transformations of state.
# ✅ Good: Pure computation
user_display = Computed(lambda: f"{user_name()} ({user_age()})")
# ❌ Bad: Side effect in computed
def bad_computed():
result = expensive_calculation(user_data())
log_computation_time(result) # Side effect!
return result
This separation ensures that your Signal graph remains predictable and testable. All side effects should be handled in Effects, not Computed values.
Pattern: Configuration Cascades
# Good: Grouped configuration
from reaktiv import Signal, Computed
# Good: Single grouped signal
app_config = Signal({
"database": {"host": "localhost", "port": 5432, "user": "app", "password": "secret"},
"redis": {"host": "localhost", "port": 6379},
"api": {"timeout": 30, "retries": 3}
})
# Derived configs
db_config = Computed(lambda: app_config().get("database", {}))
redis_config = Computed(lambda: app_config().get("redis", {}))
# Connection pools derived from configs
db_pool = Computed(lambda: create_db_pool(**db_config()))
redis_client = Computed(lambda: create_redis_client(**redis_config()))
# Bad: Overly granular signals
db_host = Signal("localhost")
db_port = Signal(5432)
db_user = Signal("app")
db_password = Signal("secret")
# This approach makes it harder to update related settings together
Pattern: Data Processing Pipelines
# Good: Clean separation of computation and effects
from reaktiv import Signal, Computed, Effect
# Data pipeline
raw_data = Signal([])
cleaned_data = Computed(lambda: [clean_item(item) for item in raw_data()])
aggregated_data = Computed(lambda: aggregate_by_category(cleaned_data()))
formatted_output = Computed(lambda: format_for_display(aggregated_data()))
# Good: Side effects in Effects only
cache_effect = Effect(lambda: cache_service.store("agg_data", formatted_output()))
# Bad: Side effects in Computed
def bad_computed_with_api_call():
data = expensive_api_call() # Side effect!
return process_data(data)
# Better: Use separate Signal and Effect
api_trigger = Signal(False)
def api_effect():
if api_trigger():
data = expensive_api_call()
processed = process_data(data)
store_result(processed)
api_effect_instance = Effect(api_effect)
Pattern: Event Sourcing Integration
Good: Signals as event processors
event_stream = Signal([])
current_state = Computed(lambda: reduce_events(event_stream()))
projections = {
"user_stats": Computed(lambda: project_user_stats(event_stream())),
"daily_summary": Computed(lambda: project_daily_summary(event_stream()))
}
# Append events, projections update automatically
def add_event(event):
event_stream.update(lambda events: events + [event])
Understanding Fine-Grained Reactivity
Traditional state management has a cascade problem: change one value, trigger all dependent computations immediately. This wastes CPU on calculations you might never use.
Consider an e-commerce app where inventory changes could update: stock display, pricing, shipping estimates, recommendations, and analytics. Traditional systems recalculate everything. But if you're only showing "In Stock" vs "Out of Stock," why run expensive recommendation algorithms?
Fine-grained reactivity solves this with lazy evaluation. When state changes, the system marks dependent values as "potentially stale" but doesn't compute anything. Computation only happens when you actually access a value.
The system builds dependency graphs automatically by observing which signals your computed functions read. No manual declarations needed - just write your derivations and the dependencies are tracked transparently.
# With Signals: Only compute what you actually use
users = Signal([])
analytics = Computed(lambda: calculate_expensive_analytics(users()))
reports = Computed(lambda: generate_expensive_reports(users()))
user_count = Computed(lambda: len(users()))
# Add a user - no computations run yet
users.update(lambda u: u + [new_user])
# Only when you access a value does it compute
print(f"Total users: {user_count()}") # Only user_count computes
# analytics and reports stay untouched - zero CPU waste
This gives you three key performance benefits:
1. Surgical Updates
Only accessed values get computed. Change user data but only display the count? Only count calculation runs.
2. Lazy Evaluation
Expensive operations like ML inference or complex aggregations run only when their results are actually needed.
# Expensive ML computation - only runs when accessed
risk_score = Computed(lambda: run_ml_model(user_behavior(), market_data()))
# Adding behavior data doesn't trigger ML model
user_behavior.update(lambda b: b + [new_behavior])
# Model only runs when you actually need the score
if should_check_risk():
score = risk_score() # Now the ML model runs
3. Automatic Dependency Tracking
The system observes what your computations read and builds the dependency graph automatically.
# System automatically knows this depends on both signals
combined_score = Computed(lambda:
calculate_score(user_metrics(), system_health())
)
# Change either dependency, combined_score knows to invalidate
user_metrics.set(new_metrics) # combined_score marked for recomputation
This is especially powerful in data-heavy applications where you might have dozens of derived metrics, but only a subset are needed for any given operation. Traditional approaches recalculate everything; fine-grained reactivity computes only what's actually accessed.
Effects: The Bridge to the Outside World
Effects represent the boundary between your reactive Signal graph and the outside world. Understanding this boundary is crucial for building maintainable reactive systems.
The Mental Model: Internal vs External State
The Signal graph represents your application's internal state - a pure, synchronous, deterministic system. Effects are the controlled escape hatches that allow this internal state to influence and synchronize with external systems.
Core Principles
1. Effects Are for Side Effects and External Synchronization
Effects bridge your Signal graph to the external world. They should:
- Trigger external operations (API calls, database writes, logging)
- Keep external systems in sync with your internal state
- Handle asynchronous operations
# ✅ Good: External synchronization
database_sync_effect = Effect(lambda: save_user_to_db(user_data()))
logging_effect = Effect(lambda: logger.info(f"User updated: {user_name()}"))
2. Be Cautious When Setting Signals from Effects
Important Consideration: While Effects are primarily designed for side effects and external synchronization, there are legitimate and common cases where you'll need to set Signals from Effects. Examples include updating state based on API responses, handling external events, or coordinating with async operations.
Key Consideration: When setting Signals from Effects, be mindful of circular dependencies. An Effect that sets a Signal it also reads can create infinite execution loops, so ensure proper guards or separate the reading and writing concerns.
Temporal Concerns: Time and Signals
One of the most common questions about Signals is: "How do I handle debouncing, throttling, or other time-based operations?" The answer reveals a fundamental design principle of Signals: they are synchronous state management by design.
The Synchronous Nature of Signals
Signals operate in a synchronous, immediate world. When you call signal.set(value)
, all dependent computations and effects run immediately before the function returns. This is intentional and powerful - it ensures your state is always consistent.
State is always consistent"] end style A fill:#2196F3,color:#fff style D fill:#4CAF50,color:#fff
# This all happens synchronously
user_input = Signal("")
search_results = Computed(lambda: search_api(user_input()) if user_input() else [])
display_results = Effect(lambda: update_ui(search_results()))
# When this runs, everything updates immediately
user_input.set("python signals") # search_api() is called right now
Temporal Concerns Belong Outside the Signal Graph
Time-based operations like debouncing, throttling, and rate limiting are temporal concerns - they deal with when things happen, not what the current state is. These concerns should be handled before data enters your Signal graph.
Pattern: Debounced Search Input
Here's how to properly handle debounced search with Signals:
import asyncio
from reaktiv import Signal, Computed, Effect
class SearchInterface:
def __init__(self):
# Signal state - what IS the current search term
self.search_term = Signal("")
self.search_results = Computed(lambda: self._search(self.search_term()))
# Effect to update UI
self._ui_updater = Effect(lambda: self._update_search_ui())
# Temporal state - outside the Signal graph
self._debounce_task = None
self._debounce_delay = 0.3 # 300ms
def _search(self, term):
if not term.strip():
return []
# This runs synchronously when search_term changes
return search_api(term)
def _update_search_ui(self):
# This runs synchronously when search_results change
display_search_results(self.search_results())
# Temporal layer - handles WHEN to update the Signal
def on_user_input(self, raw_input):
"""Called every time user types in search box"""
# Cancel previous debounce
if self._debounce_task:
self._debounce_task.cancel()
# Start new debounce timer
self._debounce_task = asyncio.create_task(
self._debounced_update(raw_input)
)
async def _debounced_update(self, value):
try:
await asyncio.sleep(self._debounce_delay)
# After delay, update the Signal (synchronously)
self.search_term.set(value)
except asyncio.CancelledError:
pass # Debounce was cancelled, ignore
Why This Separation Matters
1. Clear Responsibilities: Signals handle "what is the state?" Temporal logic handles "when should state change?"
2. Testability: You can test your Signal logic independently of timing concerns.
# Easy to test the Signal logic without timing
def test_search_logic():
search = SearchInterface()
search.search_term.set("python")
assert len(search.search_results()) > 0
# Easy to test debouncing logic separately
async def test_debouncing():
search = SearchInterface()
search.on_user_input("p")
search.on_user_input("py") # Should cancel previous
search.on_user_input("python") # Should cancel previous
await asyncio.sleep(0.4) # Wait for debounce
assert search.search_term() == "python"
3. Performance: The Signal graph only processes settled, final values — not every intermediate keystroke.
Anti-Pattern: Temporal Logic in Signals
Don't try to embed temporal logic directly in Signals:
# ❌ Bad: Mixing temporal concerns with Signal state
class BadSearchExample:
def __init__(self):
self.raw_input = Signal("")
# This is wrong - debouncing inside a Computed
self.debounced_search = Computed(lambda: self._debounce_input())
def _debounce_input(self):
# This breaks the synchronous nature of Signals
time.sleep(0.3) # Blocks everything!
return search_api(self.raw_input())
# ❌ Bad: Async operations in Computed
class AlsoBadExample:
def __init__(self):
self.search_term = Signal("")
# Computed functions must be synchronous
self.results = Computed(lambda: asyncio.run(async_search(self.search_term())))
The Mental Model: Two Layers
Think of your reactive system as having two distinct layers:
Network Events
Timers] Debouncing[Debouncing
Throttling
Rate Limiting] UserEvents --> Debouncing end subgraph "Signal Layer (Synchronous)" Signals[Signals
Computed
Effects] State[Pure State
Derivations
Side Effects] Signals --> State end Debouncing --> Signals style UserEvents fill:#FF9800,color:#fff style Debouncing fill:#FF9800,color:#fff style Signals fill:#2196F3,color:#fff style State fill:#4CAF50,color:#fff
Temporal Layer: Handles timing, buffering, rate limiting. Decides when to update state.
Signal Layer: Handles pure state transformations. Represents what the current state is.
Key Takeaways
- Signals are synchronous by design - this ensures consistency and predictability
- Temporal concerns belong outside the Signal graph - handle them before data enters Signals
- Separate "when" from "what" - timing logic and state logic have different responsibilities
- Test them independently - temporal and state logic can be tested separately
- Use the right tool for each job - async/await for timing, Signals for state
This separation keeps your code clean, testable, and performant while leveraging the strengths of both paradigms.
Real-World Scenarios
Scenario 1: Microservice Configuration Management
class ServiceConfig:
def __init__(self):
# Base configuration sources
self.env_config = Signal(os.environ.copy())
self.file_config = Signal(load_config_file())
self.remote_config = Signal({}) # Updated via API calls
# Merged configuration with precedence
self.effective_config = Computed(lambda: {
**self.file_config(),
**self.remote_config(),
**self.env_config()
})
# Service-specific configurations
self.database_config = Computed(
lambda: DatabaseConfig.from_dict(self.effective_config().get("database", {}))
)
self.redis_config = Computed(
lambda: RedisConfig.from_dict(self.effective_config().get("redis", {}))
)
self.feature_flags = Computed(
lambda: self.effective_config().get("features", {})
)
# Derived services
self.db_pool = Computed(lambda: create_database_pool(self.database_config()))
self.cache_client = Computed(lambda: create_redis_client(self.redis_config()))
# Effects for configuration changes
self._config_logger = Effect(
lambda: logger.info(f"Config updated: {list(self.effective_config().keys())}")
)
self._metrics_updater = Effect(
lambda: update_config_metrics(self.effective_config())
)
def update_remote_config(self, new_config):
"""Called by configuration service webhook"""
self.remote_config.set(new_config)
# Database pool, cache client, etc. automatically recreated
Scenario 2: Real-Time Analytics Dashboard
# Real-Time Analytics Implementation
from reaktiv import Signal, Computed, Effect
import asyncio
class AnalyticsDashboard:
def __init__(self, websocket):
# Data sources
self.raw_events = Signal([])
self.time_window = Signal(60) # Last 60 seconds
# Computed metrics
self.cutoff_time = Computed(
lambda: time.time() - self.time_window()
)
self.recent_events = Computed(
lambda: [e for e in self.raw_events()
if e["timestamp"] >= self.cutoff_time()]
)
self.event_counts = Computed(
lambda: {
"total": len(self.recent_events()),
"by_type": self._count_by_type(self.recent_events())
}
)
# Dashboard data
self.dashboard_data = Computed(
lambda: {
"counts": self.event_counts(),
"window": self.time_window(),
"updated_at": time.time()
}
)
async def dashboard_update():
await self._send_dashboard_update(websocket, self.dashboard_data())
# Effect to broadcast updates
self._broadcaster = Effect(dashboard_update)
def _count_by_type(self, events):
result = {}
for event in events:
event_type = event.get("type", "unknown")
result[event_type] = result.get(event_type, 0) + 1
return result
async def _send_dashboard_update(self, websocket, data):
if websocket.open:
await websocket.send_json(data)
def add_event(self, event):
self.raw_events.update(lambda events: events + [event])
def change_time_window(self, seconds):
self.time_window.set(seconds)
Scenario 3: Distributed System Health Monitoring
# Health Monitoring Implementation
from reaktiv import Signal, Computed, Effect
class ClusterMonitor:
def __init__(self, alert_service, load_balancer):
# Raw status data
self.node_statuses = Signal({}) # node_id -> status
# Derived metrics
self.healthy_nodes = Computed(
lambda: [node_id for node_id, status in self.node_statuses().items()
if status["healthy"]]
)
self.cluster_capacity = Computed(
lambda: sum(status["capacity"] for status in self.node_statuses().values()
if status["healthy"])
)
self.cluster_load = Computed(
lambda: sum(status["current_load"] for status in self.node_statuses().values())
)
self.load_percentage = Computed(
lambda: (self.cluster_load() / self.cluster_capacity() * 100)
if self.cluster_capacity() > 0 else 100
)
# Effects for automated actions
self._lb_updater = Effect(
lambda: load_balancer.update_backends(self.healthy_nodes())
)
self._scaler = Effect(lambda: self._check_scaling_needs())
def _check_scaling_needs(self):
load_pct = self.load_percentage()
if load_pct > 80:
# Trigger scaling
print(f"High load detected ({load_pct:.1f}%), initiating scale out")
elif load_pct < 20:
# Scale in
print(f"Low load detected ({load_pct:.1f}%), initiating scale in")
def update_node_status(self, node_id, status):
self.node_statuses.update(lambda statuses: {
**statuses,
node_id: status
})
Migration Guide
Migration Phases Visualization
with Signals] P2B[Add Computed Values
for Derived State] P2C[Replace Side Effects
with Effects] end subgraph "Phase 3: Remove Manual Coordination" P3A[Declarative
Relationships] P3B[Automatic
Updates] P3C[Simplified
API] end P1A --> P2A P1B --> P2B P1C --> P2C P2A --> P3A P2B --> P3B P2C --> P3C style P1A fill:#FF9800,color:#fff style P1B fill:#FF9800,color:#fff style P1C fill:#FF9800,color:#fff style P2A fill:#4CAF50,color:#fff style P2B fill:#4CAF50,color:#fff style P2C fill:#4CAF50,color:#fff style P3A fill:#2196F3,color:#fff style P3B fill:#2196F3,color:#fff style P3C fill:#2196F3,color:#fff
Before and After Architecture
Conclusion
Signals represent a fundamental shift from imperative to declarative state management. They're not just "reactive variables" - they're a way to express complex state relationships that automatically maintain consistency.
The Signal Advantage
The key insight is that most state management bugs come from forgetting to update something when related state changes. Signals eliminate this entire class of bugs by making relationships explicit and automatic.
Start small: identify one area of your codebase where you manually coordinate state updates. Replace it with Signals, and experience the difference. Once you see how much cleaner and more reliable it makes your code, you'll start seeing Signal opportunities everywhere.
Remember: Signals are a tool, not a religion. Use them where they add value - complex derived state, cross-cutting concerns, real-time data flows. Skip them for simple, linear transformations.
Learning from Frontend Innovations
The Signals paradigm has gained significant traction in frontend development before crossing over to other domains. Much of this momentum can be credited to Ryan Carniato, creator of SolidJS and a principal architect at Netlify. Through his detailed blog posts, conference talks, and livestreams, Carniato has systematically demystified the inner workings of reactive systems, making these concepts accessible to a broader audience of developers.
What makes Carniato's contributions particularly valuable is his focus on the mental models behind reactivity, rather than just the implementation details. He has consistently emphasized how Signals create a coherent graph of dependencies that stays in sync automatically - the same fundamental principle that makes reaktiv powerful in Python applications.
His work demonstrates that Signals aren't merely a frontend pattern but a universal approach to dependency management that can solve coordination problems across language boundaries.
Additional Resources
If you're interested in deepening your understanding of Signals and reactive programming, these resources provide valuable insights:
Articles and Blogs
- "A Hands-on Introduction to Fine-Grained Reactivity" - Practical guide to Signals implementation
- "Reactivity in Depth" - Signals reactivity in Vue.js
Video Content
- "Rethinking reactivity with Angular Signals" - Introduction to Signals, a new reactivity model in Angular (Don't be intimidated by the terrible cypher game example...)
- "Ryan Carniato explains JavaScript Signals" - SolidJS creator Ryan Carniato talks Signals on the "JS Party" podcast
- "Rich Harris: The How and Why of Reactivity" - Comprehensive talk on reactive programming models
Python-Specific Resources
- reaktiv Documentation - Official documentation of the reaktiv Python implementation
Understanding how these patterns work across different languages can provide deeper insights into their universal applicability, regardless of whether you're working in a frontend, backend, or data processing context.
A Humble Thanks to the Hacker News Community
First, I'd like to sincerely thank everyone who took the time to read, upvote, and comment on Hacker News. I genuinely didn't expect it to reach the front page, and I'm grateful for the visibility and thoughtful discussion that followed.
The conversation to the article: https://news.ycombinator.com/item?id=44267705