Hub
autogen.beta.network.hub.core.Hub #
Hub(store, *, auth=None, clock=None, ttl_sweep_interval=30.0, expectation_sweep_interval=10.0, invite_ack_timeout=30.0)
In-process registry, dispatcher, channel state-machine, persistence root.
Construct with :meth:open for production (hydrates from disk and spawns sweepers); the sync __init__ is for tests that need fine-grained control.
Source code in autogen/beta/network/hub/core.py
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 | |
audit_log property #
Public access to the built-in audit log (a :class:HubListener).
Use this to call :meth:AuditLog.read_all from tooling or to attach a live subscriber via :meth:AuditLog.subscribe. Custom hub subclasses that want a different audit format can replace the instance via :meth:replace_audit_log.
open async classmethod #
open(store, *, auth=None, clock=None, ttl_sweep_interval=30.0, expectation_sweep_interval=10.0, invite_ack_timeout=30.0, register_default_adapters=True)
Construct + hydrate from disk + start sweepers. Production entry point.
register_default_adapters=True (default) registers the built-in adapters (consulting@v1, conversation@v1, discussion@v1) and the built-in expectation evaluators / violation handlers (acks_within / reply_within / max_silence, audit / notify_channel / auto_close) so simple test setups don't need explicit registration calls.
Set expectation_sweep_interval=0 to disable the expectation sweeper entirely (tests usually do this to avoid background timer noise).
Source code in autogen/beta/network/hub/core.py
hydrate async #
Walk the store; rebuild caches. Idempotent.
Loads identities, channels, and tasks from disk. Active channel WALs are re-folded through their adapter so the _adapter_states cache is rebuilt deterministically.
Source code in autogen/beta/network/hub/core.py
start async #
Spawn the TTL + expectation + custom sweepers. Idempotent.
ttl_sweep_interval=0 disables the TTL sweeper; expectation_sweep_interval=0 disables the expectation sweeper. Custom sweepers attached via :meth:register_sweeper start here too (registered before start()) or immediately at registration (after start()).
Source code in autogen/beta/network/hub/core.py
close async #
Cancel sweepers + endpoint tasks; drain queues. Idempotent.
Source code in autogen/beta/network/hub/core.py
register_sweeper #
Attach a custom periodic worker.
fn is called every interval_seconds. Subclasses use this for protocol-specific background work (e.g. polling a chat platform's presence list, refreshing an auth token).
If Hub.start() has already run, the sweeper starts immediately. Otherwise it's queued and starts when start() runs.
Re-registering at the same name raises ValueError — use :meth:unregister_sweeper first if you mean to replace.
Sync vs. async: register_sweeper is synchronous because it only updates internal bookkeeping (and may call the underlying _IntervalSweeper.start which schedules a fire-and-forget task). :meth:unregister_sweeper is async because it awaits the sweeper's task cancellation to ensure clean shutdown.
Source code in autogen/beta/network/hub/core.py
unregister_sweeper async #
Stop and remove a custom sweeper. No-op if absent.
Async to mirror :meth:_IntervalSweeper.stop, which awaits cancellation of the running task. Custom sweepers registered before :meth:start still go through this path on :meth:close, so subclasses don't need to track them themselves.
Source code in autogen/beta/network/hub/core.py
register_adapter #
Register a ChannelAdapter keyed by (type, version).
Re-registering at the same key replaces the prior adapter; the old key's existing in-flight channels keep their snapshotted manifest for life.
Source code in autogen/beta/network/hub/core.py
register_listener #
Attach a :class:HubListener to receive state-transition events.
Listeners receive events in registration order. Each is wrapped in try/except so one buggy listener cannot stall dispatch — its exception is logged at ERROR and the next listener still runs.
Source code in autogen/beta/network/hub/core.py
unregister_listener #
Detach a previously-registered listener. No-op if absent.
report_turn_failure async #
Fan out an on_turn_failed event to every listener.
Public entry point so client-side notify handlers can route substantive-turn crashes through the observability surface without touching hub privates. AuditLog (the built-in listener) records the failure; tenant listeners react however they choose.
Source code in autogen/beta/network/hub/core.py
fire_task_event async #
Fan out an on_task_event to every listener.
Public entry point so :class:TaskMirror (and other tenant observers) can route task-side notifications through the hub's listener surface without touching _fan_out. kind is free-form — the built-in listener Protocol documents "started" / "progress" / "completed" / "failed" / "expired" / "cancelled" / "mirror_failed" as the recognised values, but tenants may emit additional kinds.
Source code in autogen/beta/network/hub/core.py
replace_audit_log #
Swap the built-in :class:AuditLog for a tenant-provided one.
Unregisters the prior audit log from the listener chain, registers the replacement as the first listener (preserving the convention that audit writes complete before tenant listeners observe the same event), and updates :attr:audit_log to point at it.
Source code in autogen/beta/network/hub/core.py
register_arbiter #
Replace the active :class:HubArbiter instance.
The default :class:RuleBasedArbiter is installed automatically and enforces per-agent :class:Rule (access + limits) — the same behavior the hub had inline before this seam existed. Tenants replace it to layer custom permission protocols (JWT scope, federation routing, etc.) on top of (or in place of) the rule-based defaults.
Only one arbiter is active at a time; calling this with a new instance replaces the prior arbiter outright.
Source code in autogen/beta/network/hub/core.py
register_remote_proxy #
Register a federation proxy keyed by proxy.scheme.
When the hub dispatches an envelope to a recipient whose passport has effective_kind == "remote_agent", it looks up the proxy by the recipient's auth.scheme and calls proxy.dispatch(envelope, recipient) instead of sending a NotifyFrame to a local endpoint. Re-registering at the same scheme replaces the prior proxy.
Source code in autogen/beta/network/hub/core.py
unregister_remote_proxy #
Remove the proxy registered for scheme and return it.
Returns None if no proxy was registered for scheme. The caller is responsible for awaiting proxy.close() — the hub leaves lifecycle decisions to whoever owns the proxy instance.
Source code in autogen/beta/network/hub/core.py
remote_proxy_for #
health #
Return an operational snapshot of hub state.
Cheap to compute (in-memory only). Wire to a /health endpoint or operational dashboard. The shape is intentionally small — operators want a handful of indicative numbers, not the full state.
Fields:
active_channels: number of channels in OPENED/PENDING state.registered_agents: total registered identities (agents + humans).pending_inbox_total: sum of per-recipient outstanding envelope counters (best-effort approximation).max_pending_inbox_depth: maximum per-recipient queue depth, orNonewhen nothing is queued. Indicative of the "stuck agent" case.registered_listeners: number of attached :class:HubListenerinstances (the built-in :class:AuditLogcounts).adapters_loaded: number of registered :class:ChannelAdapterinstances.
Source code in autogen/beta/network/hub/core.py
on_envelope_posted async #
on_envelope_rejected async #
on_dispatch_failed async #
on_channel_event async #
on_agent_event async #
on_expectation_fired async #
on_turn_failed async #
on_task_event async #
on_inbox_pressure async #
register_expectation_evaluator #
Register an evaluator keyed by evaluator.name.
Re-registering the same name replaces the prior evaluator.
Source code in autogen/beta/network/hub/core.py
register_violation_handler #
Register a violation handler keyed by handler.name.
Re-registering the same name replaces the prior handler.
Source code in autogen/beta/network/hub/core.py
register async #
Source code in autogen/beta/network/hub/core.py
unregister async #
Source code in autogen/beta/network/hub/core.py
name_for #
Resolve agent_id to its registered Passport.name.
Reads the in-memory passport directory. Returns default when the id is unknown (or agent_id itself if default is None), so callers can use this as a safe NameResolver for view projection without needing to handle the unregistered / unregistered-mid-turn case.
Source code in autogen/beta/network/hub/core.py
get_agent async #
Source code in autogen/beta/network/hub/core.py
get_resume async #
find_agent_id #
Resolve name to its registered agent_id, or None.
Non-raising peer to :meth:get_agent — callers that need to branch on "is this name registered?" without catching an exception use this directly. Returns None when name has no current registration.
Source code in autogen/beta/network/hub/core.py
name_to_id_map #
Snapshot of the name → agent_id directory.
Public read surface so callers that need reverse name resolution (e.g. WorkflowAdapter resolving handoff target names to ids) don't reach into the private index. Returns a copy so mutation can't corrupt the registry.
Source code in autogen/beta/network/hub/core.py
get_rule async #
Return the rule attached to agent_id.
Raises :class:NotFoundError if no rule is registered — the registration path stamps a default :class:Rule for every agent, so a missing entry indicates the agent itself is unregistered.
Source code in autogen/beta/network/hub/core.py
get_skill async #
Source code in autogen/beta/network/hub/core.py
list_agents async #
Enumerate registered participants, optionally filtered.
kind filters by Passport.kind ("agent" / "human" / "remote_agent"). None returns all kinds (current default behavior); passing "agent" also matches passports with kind=None since None is the back-compat alias.
Source code in autogen/beta/network/hub/core.py
set_resume async #
Source code in autogen/beta/network/hub/core.py
set_skill async #
Source code in autogen/beta/network/hub/core.py
set_rule async #
Source code in autogen/beta/network/hub/core.py
record_observation async #
Update Resume.observed[capability] from a terminal task event.
Called by TaskMirror when an owner's task ends with a capability tag set on its TaskSpec. Updates the capability index so the agent appears under that capability even if it wasn't in their original claimed_capabilities.
Outcome must be one of the terminal task states (COMPLETED / FAILED / EXPIRED); other states are ignored. latency_ms, when provided, replaces the prior p50_latency_ms (single-sample stand-in for a future reservoir).
task_id (when provided) is used to dedup: a single task contributing twice to Resume.observed.n (e.g. cascade EXPIRED + owner-emitted COMPLETED) is recorded only once.
Source code in autogen/beta/network/hub/core.py
1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 | |
agents_with_capability #
Return agent_ids matching capability (claimed or observed).
create_channel async #
create_channel(*, creator_id, manifest_type, manifest_version=1, participants, required_acks=None, ttl=None, knobs=None, intent=None, labels=None)
Allocate channel_id, post invites, await acks, return metadata.
Posts EV_CHANNEL_INVITE to every invitee, awaits an EV_CHANNEL_INVITE_ACK from each (the handshake is all-or-nothing — any reject fails creation), transitions to ACTIVE, and broadcasts EV_CHANNEL_OPENED. Times out after invite_ack_timeout if the acks do not arrive.
Source code in autogen/beta/network/hub/core.py
1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 | |
close_channel async #
get_channel async #
can_send #
True if the adapter would accept a substantive send from sender_id against the current state.
Wraps adapter.validate_send with a probe envelope so the default notify handler doesn't need to reach into private hub state to figure out whether it's the agent's turn.
Source code in autogen/beta/network/hub/core.py
default_view_policy #
Return the adapter-declared default view policy for this participant on this channel. Wraps adapter.default_view_policy so callers don't need adapter registry access.
Source code in autogen/beta/network/hub/core.py
adapter_for #
Return the adapter resolved from channel_id's manifest.
Public surface so callers (notably the default notify handler) don't need to reach into _adapter_for(type, version) or the _channels map directly.
Source code in autogen/beta/network/hub/core.py
adapter_state #
Return channel_id's current folded adapter state, or None if the channel has none cached.
Public surface so callers don't need to reach into _adapter_states.
Source code in autogen/beta/network/hub/core.py
list_channels async #
Source code in autogen/beta/network/hub/core.py
read_wal async #
Source code in autogen/beta/network/hub/core.py
observe_task async #
Register a task observed via the agent's stream.
Hub does not create, assign, or cancel — it stores TaskMetadata, persists it, and starts TTL accounting.
On first observation, enforces the owner's Rule.limits.max_concurrent_tasks cap (0 disables).
Source code in autogen/beta/network/hub/core.py
get_task async #
update_task async #
Update an observed task's lifecycle. Used by task_mirror.
Terminal-state transitions stamp completed_at. Idempotent — terminal-on-terminal is a no-op (further events ignored).
Source code in autogen/beta/network/hub/core.py
list_tasks async #
Source code in autogen/beta/network/hub/core.py
checkpoint_task async #
Persist an owner-supplied resume snapshot for task_id.
Writes a single JSON blob at tasks/{task_id}/checkpoint.json; last-write-wins, no history. The framework treats the payload as opaque — owners pick what to store and how to interpret it on resume. Pairs with :meth:read_task_checkpoint for the read side; the canonical entry point is the HubBackedCheckpointStore on the client.
Source code in autogen/beta/network/hub/core.py
read_task_checkpoint async #
Read the resume snapshot for task_id, or None if absent.
Returned dict is the value the owner most recently passed to :meth:checkpoint_task. Malformed JSON on disk surfaces as an exception — the framework does not silently swallow corruption.
Source code in autogen/beta/network/hub/core.py
expire_due async #
Walk active channels and tasks; expire ones past their TTL.
Cascades non-terminal tasks under closing channels (via :meth:_transition_channel).
Source code in autogen/beta/network/hub/core.py
post_envelope async #
Validate sender + adapter + WAL append + dispatch.
Per-channel lock makes validate_send / fold / on_accepted see a consistent state. Dispatch and post-accept transitions happen outside the lock so the broadcast of EV_CHANNEL_CLOSED does not deadlock on the same lock.
Access / limits decisions go through :attr:arbiter so federation / custom permission protocols can replace the default rule-based behavior without forking the hub. Hub fires :meth:HubListener.on_envelope_posted (success) or :meth:on_envelope_rejected (any pre-WAL failure) for every attempt.
Source code in autogen/beta/network/hub/core.py
attach_endpoint #
Source code in autogen/beta/network/hub/core.py
bind_endpoint #
Source code in autogen/beta/network/hub/core.py
pending_turns_for async #
Return turns in active channels where the protocol expects this agent.
Walks every active channel the agent participates in, asks the registered adapter via :meth:ChannelAdapter.expected_next, and returns a :class:PendingTurn per channel where the agent is named. Channels with no specific expected speaker (free-form conversations) or where another participant is expected are skipped. The triggering envelope's created_at is read from the WAL; if the trigger envelope cannot be located the current hub clock is used as a fallback.
Source code in autogen/beta/network/hub/core.py
inbox_cursor #
Last envelope_id agent_id has acked in channel_id.
Empty string when the agent has acked nothing in that channel (or is unknown). Read-only view of the delivery high-water mark a reconnect would replay past.
Source code in autogen/beta/network/hub/core.py
find_envelope_by_causation async #
Look up an envelope previously accepted under this causation key.
Handlers use this to short-circuit duplicate work after an at-least-once redelivery: when the same sender re-posts an envelope with the same causation_id (typical on retry), the prior accepted envelope is returned so the handler can skip the side effect. Returns None if no envelope is recorded for the key — either it was never accepted or its channel has already closed (terminal-channel pruning clears the index).