Best Practices¶
This guide provides recommendations for using minevent effectively in your projects.
Event Naming¶
Use Clear, Descriptive Names¶
Event names should clearly describe what happened or will happen:
# Good
manager.trigger_event("training_epoch_completed")
manager.trigger_event("model_checkpoint_saved")
manager.trigger_event("validation_started")
# Less clear
manager.trigger_event("event1")
manager.trigger_event("done")
manager.trigger_event("x")
Use Consistent Naming Conventions¶
Choose a naming convention and stick to it throughout your project:
# Option 1: Snake case with verb in past tense
"training_started"
"epoch_completed"
"validation_finished"
# Option 2: Namespace-style with present tense
"training.start"
"training.epoch.complete"
"validation.finish"
Use Hierarchical Event Names¶
For complex systems, use hierarchical naming to organize events:
# Training lifecycle
"training.start"
"training.epoch.start"
"training.batch.start"
"training.batch.end"
"training.epoch.end"
"training.end"
# Model lifecycle
"model.created"
"model.compiled"
"model.saved"
Event Handler Design¶
Keep Handlers Focused¶
Each handler should do one thing well:
# Good: Separate handlers for separate concerns
def log_metrics(metrics):
logger.info(f"Metrics: {metrics}")
def save_checkpoint(model, epoch):
torch.save(model.state_dict(), f"checkpoint_epoch_{epoch}.pt")
def update_progress_bar(current, total):
progress_bar.update(current / total)
# Less ideal: One handler doing multiple things
def do_everything(model, metrics, epoch, current, total):
logger.info(f"Metrics: {metrics}")
torch.save(model.state_dict(), f"checkpoint_epoch_{epoch}.pt")
progress_bar.update(current / total)
Handle Exceptions Gracefully¶
Protect your main logic from handler failures:
from minevent import EventHandler
def safe_handler(*args, **kwargs):
try:
# Your handler logic
process_data(*args, **kwargs)
except Exception as e:
logger.error(f"Handler failed: {e}")
# Optionally re-raise if critical
# raise
handler = EventHandler(safe_handler)
Use Type Hints¶
Make your handlers more maintainable with type hints:
from typing import Dict, Any
def log_metrics(metrics: Dict[str, float], epoch: int) -> None:
"""Log training metrics for the current epoch.
Args:
metrics: Dictionary of metric names to values
epoch: Current training epoch number
"""
logger.info(f"Epoch {epoch}: {metrics}")
handler = EventHandler(
log_metrics, handler_kwargs={"metrics": {"loss": 0.5}, "epoch": 1}
)
Event Manager Usage¶
Centralize Event Manager Creation¶
Create the event manager in one place and pass it where needed:
# Good: Create once, pass around
def main():
manager = EventManager()
setup_handlers(manager)
trainer = Trainer(manager)
trainer.train()
def setup_handlers(manager: EventManager):
manager.add_event_handler("epoch_end", EventHandler(save_checkpoint))
manager.add_event_handler("training_end", EventHandler(log_summary))
# Avoid: Global event manager
# MANAGER = EventManager() # Global state can be problematic
Check Before Adding Duplicate Handlers¶
Prevent duplicate handler registrations:
# Add handler only if not already present
handler = EventHandler(my_function)
if not manager.has_event_handler(handler, "my_event"):
manager.add_event_handler("my_event", handler)
Clean Up When Done¶
Reset or properly dispose of event managers when they're no longer needed:
try:
# Training loop
trainer.train(manager)
finally:
# Clean up
manager.reset()
Conditional Event Handling¶
Use Conditions for Periodic Operations¶
For operations that shouldn't happen every time:
from minevent import ConditionalEventHandler, PeriodicCondition
# Save checkpoint every 10 epochs
save_handler = ConditionalEventHandler(save_checkpoint, PeriodicCondition(freq=10))
manager.add_event_handler("epoch_end", save_handler)
# Log metrics every 100 batches
log_handler = ConditionalEventHandler(log_batch_metrics, PeriodicCondition(freq=100))
manager.add_event_handler("batch_end", log_handler)
Integration Patterns¶
ML Training Pipeline Pattern¶
class Trainer:
def __init__(self, model, manager: EventManager):
self.model = model
self.manager = manager
def train(self, epochs: int, train_loader, val_loader):
self.manager.trigger_event("training_start")
for epoch in range(epochs):
self.manager.trigger_event("epoch_start")
# Training
train_loss = self._train_epoch(train_loader)
self.manager.trigger_event("training_epoch_end")
# Validation
val_loss = self._validate(val_loader)
self.manager.trigger_event("validation_epoch_end")
self.manager.trigger_event("epoch_end")
self.manager.trigger_event("training_end")
def _train_epoch(self, train_loader):
for batch in train_loader:
self.manager.trigger_event("batch_start")
loss = self._process_batch(batch)
self.manager.trigger_event("batch_end")
return loss
Plugin System Pattern¶
class PluginManager:
"""Manage plugins using event system."""
def __init__(self):
self.event_manager = EventManager()
self.plugins = []
def register_plugin(self, plugin):
"""Register a plugin and its event handlers."""
self.plugins.append(plugin)
plugin.setup(self.event_manager)
def trigger(self, event: str, **kwargs):
"""Trigger event with data."""
# Store kwargs somewhere accessible to handlers
self.event_manager.trigger_event(event)
# Plugin implementation
class LoggingPlugin:
def setup(self, manager: EventManager):
manager.add_event_handler(
"model_trained", EventHandler(self.log_training_complete)
)
def log_training_complete(self):
print("Training completed!")
Callback System Pattern¶
class CallbackSystem:
"""Wrap minevent for a cleaner callback API."""
def __init__(self):
self._manager = EventManager()
def on(self, event: str, callback, condition=None):
"""Register a callback for an event."""
if condition:
handler = ConditionalEventHandler(callback, condition)
else:
handler = EventHandler(callback)
self._manager.add_event_handler(event, handler)
def emit(self, event: str):
"""Emit an event."""
self._manager.trigger_event(event)
# Usage
callbacks = CallbackSystem()
callbacks.on("training_complete", send_notification)
callbacks.on("checkpoint", save_model, condition=PeriodicCondition(freq=10))
Testing¶
Test Event Handlers Independently¶
def test_handler():
"""Test handler logic without event manager."""
# Arrange
handler = EventHandler(my_function, handler_args=("test",))
# Act
handler.handle()
# Assert - verify expected behavior
assert expected_result == actual_result
Mock Event Handlers in Tests¶
from unittest.mock import Mock
def test_event_system():
"""Test event system with mock handlers."""
manager = EventManager()
mock_handler = Mock()
manager.add_event_handler("test_event", EventHandler(mock_handler))
manager.trigger_event("test_event")
mock_handler.assert_called_once()
Test Event Flow¶
def test_training_events():
"""Test that training triggers expected events."""
manager = EventManager()
events_triggered = []
def track_event(event_name):
events_triggered.append(event_name)
manager.add_event_handler("start", EventHandler(track_event, ("start",)))
manager.add_event_handler("end", EventHandler(track_event, ("end",)))
# Simulate training
manager.trigger_event("start")
manager.trigger_event("end")
assert events_triggered == ["start", "end"]
Performance Considerations¶
Minimize Handler Count¶
Too many handlers can slow down event triggering:
# Good: One handler for related operations
def batch_operations(batch):
log_batch(batch)
update_metrics(batch)
check_conditions(batch)
manager.add_event_handler("batch_end", EventHandler(batch_operations))
# Less efficient: Multiple handlers for small operations
# manager.add_event_handler("batch_end", EventHandler(log_batch))
# manager.add_event_handler("batch_end", EventHandler(update_metrics))
# manager.add_event_handler("batch_end", EventHandler(check_conditions))
Use Conditional Handlers for Infrequent Operations¶
Don't execute expensive operations every time:
# Good: Only save checkpoint periodically
manager.add_event_handler(
"epoch_end", ConditionalEventHandler(save_checkpoint, PeriodicCondition(freq=10))
)
# Less efficient: Save every epoch when not needed
# manager.add_event_handler("epoch_end", EventHandler(save_checkpoint))
Profile Handler Performance¶
Identify slow handlers:
import time
def profiled_handler(*args, **kwargs):
start = time.time()
actual_handler(*args, **kwargs)
elapsed = time.time() - start
if elapsed > 0.1: # Log if takes more than 100ms
logger.warning(f"Handler took {elapsed:.2f}s")
Common Pitfalls¶
Avoid Modifying State During Event Triggering¶
# Problematic: Removing handlers during event triggering
def problematic_handler(manager, event, handler):
# This can cause issues if called during event triggering
manager.remove_event_handler(event, handler)
# Better: Flag for removal, clean up later
handlers_to_remove = []
def safe_handler(event, handler):
handlers_to_remove.append((event, handler))
# Clean up after event triggering is complete
for event, handler in handlers_to_remove:
manager.remove_event_handler(event, handler)
Don't Rely on Execution Order Across Events¶
# Problematic: Assuming event order
manager.trigger_event("event1")
manager.trigger_event("event2")
# Handlers for event1 and event2 may have side effects,
# but don't assume event1 handlers complete before event2 triggers
# Better: Use explicit ordering within the same event
manager.add_event_handler("event", EventHandler(step1))
manager.add_event_handler("event", EventHandler(step2)) # Runs after step1
Avoid Circular Event Triggering¶
# Problematic: Handler triggers the same event
def recursive_handler(manager):
manager.trigger_event("my_event") # This creates infinite recursion!
# Better: Use different events or add guards
def safe_handler(manager, depth=0):
if depth > 0:
return
# Do work
manager.trigger_event("next_event")