Agent Scheduling¶
The AgentSchedulerService is responsible for running agents on their configured schedules, caching their context data, and forwarding their alerts to the alert queue. It runs in a dedicated daemon thread with its own asyncio event loop, keeping agent execution off the main thread.
AgentSchedule Dataclass¶
Every agent declares its schedule via the schedule property, which returns an AgentSchedule:
from dataclasses import dataclass
@dataclass
class AgentSchedule:
interval_seconds: int # Minimum interval between runs
run_on_startup: bool = True # Whether to run immediately when scheduler starts
| Field | Type | Default | Description |
|---|---|---|---|
interval_seconds |
int |
(required) | Minimum seconds between runs. The scheduler checks every 10 seconds, so effective granularity is ~10 seconds. |
run_on_startup |
bool |
True |
If True, the agent runs immediately when the scheduler starts, before the periodic loop begins. |
Choosing an Interval¶
| Agent Type | Suggested Interval | Reasoning |
|---|---|---|
| Device state (Home Assistant) | 300s (5 min) | Devices change infrequently; 5 minutes balances freshness vs API load |
| Calendar events | 300s (5 min) | Events are relatively static; 5 minutes catches new/changed events |
| News/RSS feeds | 1800s (30 min) | RSS feeds update slowly; frequent polling wastes resources |
| Email checking | 300s (5 min) | Matches typical IMAP push notification latency |
| Token refresh | 3300s (55 min) | Refresh 5 minutes before a 60-minute token expires |
| System monitoring | 60s (1 min) | System stats change quickly; 1 minute gives reasonable resolution |
Scheduler Architecture¶
┌─────────────────────────────────────────────────┐
│ Main Thread │
│ │
│ main.py startup │
│ │ │
│ ├── initialize_agent_scheduler() │
│ │ │ │
│ │ ├── AgentDiscoveryService.get_all_agents()│
│ │ │ → {name: agent, ...} │
│ │ │ │
│ │ └── scheduler.start() │
│ │ │ │
│ │ └── Creates daemon thread ─────────┼──┐
│ │ │ │
│ ▼ │ │
│ Voice loop (reads context via │ │
│ get_aggregated_context()) │ │
│ │ │
└───────────────────────────────────────────────────┘ │
│
┌───────────────────────────────────────────────────┐ │
│ Scheduler Daemon Thread │◄─┘
│ │
│ asyncio event loop │
│ │ │
│ ├── _run_startup_agents() │
│ │ └── asyncio.gather(agent1.run(), │
│ │ agent2.run(), ...) │
│ │ │
│ └── _scheduler_loop() [every 10s] │
│ │ │
│ ├── For each agent: │
│ │ if (now - last_run) >= interval: │
│ │ await agent.run() │
│ │ cache context_data (with lock) │
│ │ collect alerts → AlertQueueService │
│ │ │
│ └── Wait 10s or stop signal │
│ │
└───────────────────────────────────────────────────┘
Key Design Decisions¶
Dedicated daemon thread with asyncio loop. The scheduler creates a new asyncio.AbstractEventLoop in a daemon thread. This allows agents to use async/await for network I/O without blocking the main thread. The daemon flag ensures the thread dies when the main process exits.
10-second check interval. The scheduler does not use per-agent timers. Instead, it wakes up every 10 seconds and checks which agents are due based on (now - last_run) >= interval_seconds. This keeps the implementation simple and avoids timer drift issues.
Startup agents run concurrently. Agents with run_on_startup=True are executed concurrently via asyncio.gather() during startup. This means a slow agent (e.g., one that takes 5 seconds to fetch from an API) does not delay other startup agents.
Thread-safe context access. The _context_cache dict is protected by a threading.Lock. The scheduler thread writes to it after each agent run, and the main thread reads from it during voice requests via get_aggregated_context().
Startup Sequence¶
When initialize_agent_scheduler() is called during main.py startup:
- Agent discovery:
AgentDiscoveryService.get_all_agents()scans theagents/directory forIJarvisAgentimplementations - Secret validation: Each agent's
validate_secrets()is called; agents with missing secrets are logged and skipped - Thread creation: A daemon thread is created with a new asyncio event loop
- Startup agents: All agents with
schedule.run_on_startup = Truerun concurrently viaasyncio.gather() - Periodic loop: The scheduler enters its 10-second check loop
# In main.py
from services.agent_scheduler_service import initialize_agent_scheduler
from services.alert_queue_service import get_alert_queue_service
scheduler = initialize_agent_scheduler()
scheduler.set_alert_queue(get_alert_queue_service())
Context Aggregation¶
After each successful agent run, the scheduler caches the agent's context data:
# Inside _run_agent_safe():
if agent.include_in_context:
context = agent.get_context_data()
with self._context_lock:
self._context_cache[agent.name] = context
The voice thread retrieves a snapshot of all cached context:
def get_aggregated_context(self) -> Dict[str, Dict[str, Any]]:
with self._context_lock:
return self._context_cache.copy()
This returns a dict mapping agent names to their context data:
{
"home_assistant": {
"light_controls": {...},
"device_controls": {...},
"floors": {...},
},
"system_monitor": {
"cpu_percent": 23.5,
"memory_percent": 67.2,
...
},
}
The voice thread places this into node_context["agents"] before sending to the Command Center.
Error Handling¶
The scheduler is designed to never crash from a single agent failure:
-
Agent exceptions are caught: If
run()raises an exception, the scheduler logs the error and caches an error state instead of context data: -
Alert collection is wrapped: If
get_alerts()raises, the error is logged but does not affect the context cache or other agents. -
The loop continues: After an error, the scheduler continues checking and running other agents on their normal schedules. The failed agent will be retried on its next interval.
Public API¶
start()¶
Start the scheduler. Discovers agents, creates the daemon thread, and begins the scheduling loop. Idempotent --- calling start() when already running logs a warning and returns.
stop()¶
Stop the scheduler gracefully. Signals the event loop to exit and waits up to 5 seconds for the daemon thread to finish.
run_agent_now(name: str) -> bool¶
Trigger an immediate run of a specific agent, regardless of its schedule. Returns True if the agent was found and the run was scheduled, False otherwise. The run is asynchronous --- it is submitted to the event loop and executes when the loop is free.
get_agent_status() -> Dict[str, Dict[str, Any]]¶
Get status information for all registered agents:
{
"home_assistant": {
"name": "home_assistant",
"description": "Pre-fetches Home Assistant device states",
"interval_seconds": 300,
"last_run": "2026-03-17T12:00:00+00:00",
"next_run": "2026-03-17T12:05:00+00:00",
"include_in_context": True,
},
"token_refresh": {
"name": "token_refresh",
"description": "Refreshes OAuth tokens before expiry",
"interval_seconds": 3300,
"last_run": None,
"next_run": "pending",
"include_in_context": False,
},
}
get_aggregated_context() -> Dict[str, Dict[str, Any]]¶
Get a thread-safe snapshot of all cached agent context data. This is the primary interface for the voice thread.
set_alert_queue(queue: AlertQueueService) -> None¶
Wire the alert queue so agent alerts are collected after each run. Must be called before start() if you want alert forwarding.
Singleton Access¶
The scheduler uses a singleton pattern with two accessors:
from services.agent_scheduler_service import (
get_agent_scheduler_service, # Get the singleton (lazy init)
initialize_agent_scheduler, # Get + start (call once at startup)
)
get_agent_scheduler_service() returns the singleton instance without starting it. initialize_agent_scheduler() returns the singleton and calls start(). Use the latter in main.py at startup; use the former everywhere else to access the running instance.
Source Files¶
| File | Description |
|---|---|
jarvis-node-setup/services/agent_scheduler_service.py |
AgentSchedulerService implementation |
jarvis-node-setup/core/ijarvis_agent.py |
AgentSchedule dataclass, IJarvisAgent ABC |
jarvis-node-setup/services/alert_queue_service.py |
AlertQueueService (wired via set_alert_queue) |
jarvis-node-setup/utils/agent_discovery_service.py |
Agent discovery (used by scheduler at startup) |