Skip to content

API Reference

Core

Kernel

agent_os_kernel.kernel.Kernel

The Agent OS Kernel.

One API: submit(action_request) -> action_result Three components: Policy, Gate (this class), Log Three invariants: all access through Gate, default deny, no silent actions

Source code in src/agent_os_kernel/kernel.py
class Kernel:
    """The Agent OS Kernel.

    One API: submit(action_request) -> action_result
    Three components: Policy, Gate (this class), Log
    Three invariants: all access through Gate, default deny, no silent actions
    """

    def __init__(
        self,
        policy: str | Path | Policy,
        providers: dict[str, Provider] | list[Provider] | None = None,
        log_path: str | Path = "kernel.log",
    ) -> None:
        """Initialize the kernel.

        Args:
            policy: Path to YAML policy file, or a Policy instance.
            providers: Provider registry. Either a dict mapping action types
                to providers, or a list of providers (auto-registered by
                their declared action types).
            log_path: Path to the JSONL log file.
        """
        if isinstance(policy, Policy):
            self._policy = policy
        else:
            self._policy = load_policy(policy)

        self._providers: dict[str, Provider] = {}
        if providers is not None:
            if isinstance(providers, dict):
                self._providers = dict(providers)
            else:
                for provider in providers:
                    for action in provider.actions:
                        self._providers[action] = provider

        self._log = Log(log_path)
        self._log.open()

    def submit(self, request: ActionRequest) -> ActionResult:
        """Submit an action request through the Gate.

        Per v2 §4.2:
        1. Validate request format
        2. Match request against policy (default deny)
        3. Resolve provider for action
        4. Call provider.execute(request)
        5. Log result
        6. Return result

        Every path produces exactly one log record.
        """
        # 1. Validate
        if not request.validate():
            self._record(request, "INVALID")
            return ActionResult(status="ERROR", data=None, error="malformed request")

        # 2. Authorize
        if not self._policy.is_allowed(request):
            self._record(request, "DENIED")
            return ActionResult(status="DENIED", data=None, error="not permitted")

        # 3. Resolve provider
        provider = self._providers.get(request.action)
        if provider is None:
            self._record(request, "NO_PROVIDER")
            return ActionResult(status="ERROR", data=None, error="no provider")

        # 4. Execute
        start = time.monotonic()
        try:
            result = provider.execute(request)
            duration_ms = int((time.monotonic() - start) * 1000)
            self._record(request, "OK", duration_ms=duration_ms)
            return ActionResult(status="OK", data=result, error=None)
        except Exception as e:
            duration_ms = int((time.monotonic() - start) * 1000)
            self._record(request, "FAILED", error=str(e), duration_ms=duration_ms)
            return ActionResult(status="ERROR", data=None, error=str(e))

    def close(self) -> None:
        """Close the kernel and its log."""
        self._log.close()

    @property
    def policy(self) -> Policy:
        """Access the kernel's policy (read-only)."""
        return self._policy

    @property
    def log(self) -> Log:
        """Access the kernel's log (read-only)."""
        return self._log

    def _record(
        self,
        request: ActionRequest,
        status: str,
        error: str | None = None,
        duration_ms: int | None = None,
    ) -> None:
        """Write a single log record."""
        record = Record(
            timestamp=datetime.now(timezone.utc).isoformat(),
            action=request.action,
            target=request.target,
            status=status,
            error=error,
            duration_ms=duration_ms,
        )
        self._log.write(record)

    def __enter__(self) -> Kernel:
        return self

    def __exit__(self, *args: Any) -> None:
        self.close()

log property

Access the kernel's log (read-only).

policy property

Access the kernel's policy (read-only).

__init__(policy, providers=None, log_path='kernel.log')

Initialize the kernel.

Parameters:

Name Type Description Default
policy str | Path | Policy

Path to YAML policy file, or a Policy instance.

required
providers dict[str, Provider] | list[Provider] | None

Provider registry. Either a dict mapping action types to providers, or a list of providers (auto-registered by their declared action types).

None
log_path str | Path

Path to the JSONL log file.

'kernel.log'
Source code in src/agent_os_kernel/kernel.py
def __init__(
    self,
    policy: str | Path | Policy,
    providers: dict[str, Provider] | list[Provider] | None = None,
    log_path: str | Path = "kernel.log",
) -> None:
    """Initialize the kernel.

    Args:
        policy: Path to YAML policy file, or a Policy instance.
        providers: Provider registry. Either a dict mapping action types
            to providers, or a list of providers (auto-registered by
            their declared action types).
        log_path: Path to the JSONL log file.
    """
    if isinstance(policy, Policy):
        self._policy = policy
    else:
        self._policy = load_policy(policy)

    self._providers: dict[str, Provider] = {}
    if providers is not None:
        if isinstance(providers, dict):
            self._providers = dict(providers)
        else:
            for provider in providers:
                for action in provider.actions:
                    self._providers[action] = provider

    self._log = Log(log_path)
    self._log.open()

close()

Close the kernel and its log.

Source code in src/agent_os_kernel/kernel.py
def close(self) -> None:
    """Close the kernel and its log."""
    self._log.close()

submit(request)

Submit an action request through the Gate.

Per v2 §4.2: 1. Validate request format 2. Match request against policy (default deny) 3. Resolve provider for action 4. Call provider.execute(request) 5. Log result 6. Return result

Every path produces exactly one log record.

Source code in src/agent_os_kernel/kernel.py
def submit(self, request: ActionRequest) -> ActionResult:
    """Submit an action request through the Gate.

    Per v2 §4.2:
    1. Validate request format
    2. Match request against policy (default deny)
    3. Resolve provider for action
    4. Call provider.execute(request)
    5. Log result
    6. Return result

    Every path produces exactly one log record.
    """
    # 1. Validate
    if not request.validate():
        self._record(request, "INVALID")
        return ActionResult(status="ERROR", data=None, error="malformed request")

    # 2. Authorize
    if not self._policy.is_allowed(request):
        self._record(request, "DENIED")
        return ActionResult(status="DENIED", data=None, error="not permitted")

    # 3. Resolve provider
    provider = self._providers.get(request.action)
    if provider is None:
        self._record(request, "NO_PROVIDER")
        return ActionResult(status="ERROR", data=None, error="no provider")

    # 4. Execute
    start = time.monotonic()
    try:
        result = provider.execute(request)
        duration_ms = int((time.monotonic() - start) * 1000)
        self._record(request, "OK", duration_ms=duration_ms)
        return ActionResult(status="OK", data=result, error=None)
    except Exception as e:
        duration_ms = int((time.monotonic() - start) * 1000)
        self._record(request, "FAILED", error=str(e), duration_ms=duration_ms)
        return ActionResult(status="ERROR", data=None, error=str(e))

ActionRequest

agent_os_kernel.models.ActionRequest dataclass

A request to perform a world-facing action.

Attributes:

Name Type Description
action str

Action type, e.g. "fs.read", "net.http".

target str

Resource target, e.g. "/workspace/data.csv".

params dict[str, Any]

Action-specific parameters.

Source code in src/agent_os_kernel/models.py
@dataclass
class ActionRequest:
    """A request to perform a world-facing action.

    Attributes:
        action: Action type, e.g. "fs.read", "net.http".
        target: Resource target, e.g. "/workspace/data.csv".
        params: Action-specific parameters.
    """

    action: str
    target: str
    params: dict[str, Any] = field(default_factory=dict)

    def validate(self) -> bool:
        """Check that the request has required fields and valid format."""
        return bool(self.action and self.target)

validate()

Check that the request has required fields and valid format.

Source code in src/agent_os_kernel/models.py
def validate(self) -> bool:
    """Check that the request has required fields and valid format."""
    return bool(self.action and self.target)

ActionResult

agent_os_kernel.models.ActionResult dataclass

The result of a submitted action.

Attributes:

Name Type Description
status str

One of "OK", "DENIED", "ERROR".

data Any

Provider return value, or None.

error str | None

Error message if status is not OK.

record_id str | None

Optional snapshot ID for reversible actions (v2.1). Set by ReversibleActionLayer after successful snapshot persistence. This is the caller-side correlation key — use it to rollback via layer.rollback(result.record_id).

Source code in src/agent_os_kernel/models.py
@dataclass
class ActionResult:
    """The result of a submitted action.

    Attributes:
        status: One of "OK", "DENIED", "ERROR".
        data: Provider return value, or None.
        error: Error message if status is not OK.
        record_id: Optional snapshot ID for reversible actions (v2.1).
            Set by ReversibleActionLayer after successful snapshot persistence.
            This is the caller-side correlation key — use it to rollback via
            ``layer.rollback(result.record_id)``.
    """

    status: str
    data: Any = None
    error: str | None = None
    record_id: str | None = None

Record

agent_os_kernel.models.Record dataclass

An append-only log entry produced by every Gate decision.

Attributes:

Name Type Description
timestamp str

ISO 8601 timestamp.

action str

Action type, e.g. "fs.read".

target str

Resource target.

status str

One of "INVALID", "DENIED", "NO_PROVIDER", "FAILED", "OK".

error str | None

Error message if status is not OK.

duration_ms int | None

Execution time if provider was called.

record_id str | None

Optional snapshot ID linking to reversible action (v2.1). Note: the kernel does not populate this field — it is always None in log entries written by the Gate. The v2.1 design includes it in the schema for forward-compatibility, but writing it would require modifying the kernel interface (contradicting v2.1's "kernel is unchanged" principle). Use ActionResult.record_id for caller-side snapshot correlation instead.

Source code in src/agent_os_kernel/models.py
@dataclass
class Record:
    """An append-only log entry produced by every Gate decision.

    Attributes:
        timestamp: ISO 8601 timestamp.
        action: Action type, e.g. "fs.read".
        target: Resource target.
        status: One of "INVALID", "DENIED", "NO_PROVIDER", "FAILED", "OK".
        error: Error message if status is not OK.
        duration_ms: Execution time if provider was called.
        record_id: Optional snapshot ID linking to reversible action (v2.1).
            Note: the kernel does not populate this field — it is always None
            in log entries written by the Gate. The v2.1 design includes it in
            the schema for forward-compatibility, but writing it would require
            modifying the kernel interface (contradicting v2.1's "kernel is
            unchanged" principle). Use ActionResult.record_id for caller-side
            snapshot correlation instead.
    """

    timestamp: str
    action: str
    target: str
    status: str
    error: str | None = None
    duration_ms: int | None = None
    record_id: str | None = None

Policy

Policy

agent_os_kernel.policy.Policy dataclass

A set of capability rules loaded from a YAML file.

Policy is a static allow-list. If no rule matches, the action is denied.

Source code in src/agent_os_kernel/policy.py
@dataclass
class Policy:
    """A set of capability rules loaded from a YAML file.

    Policy is a static allow-list. If no rule matches, the action is denied.
    """

    capabilities: list[CapabilityRule] = field(default_factory=list)

    def is_allowed(self, request: ActionRequest) -> bool:
        """Check if a request is permitted by any capability rule.

        Per v2 §3.3: iterate rules, check action + resource + constraint.
        Default deny if no rule matches.
        """
        for cap in self.capabilities:
            if (
                cap.action_matches(request.action)
                and cap.resource_matches(request.target)
                and cap.constraint_matches(request)
            ):
                return True
        return False

is_allowed(request)

Check if a request is permitted by any capability rule.

Per v2 §3.3: iterate rules, check action + resource + constraint. Default deny if no rule matches.

Source code in src/agent_os_kernel/policy.py
def is_allowed(self, request: ActionRequest) -> bool:
    """Check if a request is permitted by any capability rule.

    Per v2 §3.3: iterate rules, check action + resource + constraint.
    Default deny if no rule matches.
    """
    for cap in self.capabilities:
        if (
            cap.action_matches(request.action)
            and cap.resource_matches(request.target)
            and cap.constraint_matches(request)
        ):
            return True
    return False

CapabilityRule

agent_os_kernel.policy.CapabilityRule dataclass

A single capability rule from the policy file.

Attributes:

Name Type Description
action str

The action type being permitted, e.g. "fs.read".

resource str

The resource pattern being permitted, e.g. "/workspace/**".

constraint dict[str, Any] | None

Optional additional restrictions, e.g. {"method": "GET"}.

Source code in src/agent_os_kernel/policy.py
@dataclass
class CapabilityRule:
    """A single capability rule from the policy file.

    Attributes:
        action: The action type being permitted, e.g. "fs.read".
        resource: The resource pattern being permitted, e.g. "/workspace/**".
        constraint: Optional additional restrictions, e.g. {"method": "GET"}.
    """

    action: str
    resource: str
    constraint: dict[str, Any] | None = None

    def action_matches(self, action: str) -> bool:
        """Check if the requested action matches this rule's action type."""
        return self.action == action

    def resource_matches(self, target: str) -> bool:
        """Check if the requested target matches this rule's resource pattern.

        Uses glob-style matching: ** matches everything including path separators.
        """
        return fnmatch.fnmatch(target, self.resource)

    def constraint_matches(self, request: ActionRequest) -> bool:
        """Check if the request satisfies this rule's constraints."""
        if self.constraint is None:
            return True
        return all(request.params.get(key) == value for key, value in self.constraint.items())

action_matches(action)

Check if the requested action matches this rule's action type.

Source code in src/agent_os_kernel/policy.py
def action_matches(self, action: str) -> bool:
    """Check if the requested action matches this rule's action type."""
    return self.action == action

constraint_matches(request)

Check if the request satisfies this rule's constraints.

Source code in src/agent_os_kernel/policy.py
def constraint_matches(self, request: ActionRequest) -> bool:
    """Check if the request satisfies this rule's constraints."""
    if self.constraint is None:
        return True
    return all(request.params.get(key) == value for key, value in self.constraint.items())

resource_matches(target)

Check if the requested target matches this rule's resource pattern.

Uses glob-style matching: ** matches everything including path separators.

Source code in src/agent_os_kernel/policy.py
def resource_matches(self, target: str) -> bool:
    """Check if the requested target matches this rule's resource pattern.

    Uses glob-style matching: ** matches everything including path separators.
    """
    return fnmatch.fnmatch(target, self.resource)

load_policy

agent_os_kernel.policy.load_policy(policy_path)

Load a policy from a YAML file.

Expected format

capabilities: - action: fs.read resource: /workspace/ - action: fs.write resource: /workspace/output/

Parameters:

Name Type Description Default
policy_path str | Path

Path to the YAML policy file.

required

Returns:

Type Description
Policy

A Policy instance with the parsed capability rules.

Raises:

Type Description
FileNotFoundError

If the policy file does not exist.

ValueError

If the policy file is malformed.

Source code in src/agent_os_kernel/policy.py
def load_policy(policy_path: str | Path) -> Policy:
    """Load a policy from a YAML file.

    Expected format:
        capabilities:
          - action: fs.read
            resource: /workspace/**
          - action: fs.write
            resource: /workspace/output/**

    Args:
        policy_path: Path to the YAML policy file.

    Returns:
        A Policy instance with the parsed capability rules.

    Raises:
        FileNotFoundError: If the policy file does not exist.
        ValueError: If the policy file is malformed.
    """
    path = Path(policy_path)
    with path.open() as f:
        data = yaml.safe_load(f)

    if not isinstance(data, dict) or "capabilities" not in data:
        raise ValueError(f"Policy file must contain 'capabilities' key: {policy_path}")

    capabilities: list[CapabilityRule] = []
    for rule in data["capabilities"]:
        if not isinstance(rule, dict) or "action" not in rule or "resource" not in rule:
            raise ValueError(f"Each capability must have 'action' and 'resource': {rule}")
        capabilities.append(
            CapabilityRule(
                action=rule["action"],
                resource=rule["resource"],
                constraint=rule.get("constraint"),
            )
        )

    return Policy(capabilities=capabilities)

Log

agent_os_kernel.log.Log

Append-only JSONL log writer.

Writes one JSON object per line. Never modifies or deletes records.

Source code in src/agent_os_kernel/log.py
class Log:
    """Append-only JSONL log writer.

    Writes one JSON object per line. Never modifies or deletes records.
    """

    def __init__(self, log_path: str | Path) -> None:
        self._path = Path(log_path)
        self._file: IO[str] | None = None

    def open(self) -> None:
        """Open the log file for appending."""
        self._path.parent.mkdir(parents=True, exist_ok=True)
        self._file = self._path.open("a")

    def close(self) -> None:
        """Close the log file."""
        if self._file is not None:
            self._file.close()
            self._file = None

    def write(self, record: Record) -> None:
        """Append a single record to the log.

        Args:
            record: The Record to write.

        Raises:
            RuntimeError: If the log is not open.
        """
        if self._file is None:
            raise RuntimeError("Log is not open. Call open() first.")
        data = asdict(record)
        # Remove None values for compact output
        data = {k: v for k, v in data.items() if v is not None}
        self._file.write(json.dumps(data) + "\n")
        self._file.flush()

    def read_all(self) -> list[Record]:
        """Read all records from the log file.

        Returns:
            List of Record instances.
        """
        if not self._path.exists():
            return []
        records: list[Record] = []
        with self._path.open() as f:
            for line in f:
                line = line.strip()
                if line:
                    data = json.loads(line)
                    records.append(
                        Record(
                            timestamp=data["timestamp"],
                            action=data["action"],
                            target=data["target"],
                            status=data["status"],
                            error=data.get("error"),
                            duration_ms=data.get("duration_ms"),
                            record_id=data.get("record_id"),
                        )
                    )
        return records

    def __enter__(self) -> Log:
        self.open()
        return self

    def __exit__(self, *args: object) -> None:
        self.close()

close()

Close the log file.

Source code in src/agent_os_kernel/log.py
def close(self) -> None:
    """Close the log file."""
    if self._file is not None:
        self._file.close()
        self._file = None

open()

Open the log file for appending.

Source code in src/agent_os_kernel/log.py
def open(self) -> None:
    """Open the log file for appending."""
    self._path.parent.mkdir(parents=True, exist_ok=True)
    self._file = self._path.open("a")

read_all()

Read all records from the log file.

Returns:

Type Description
list[Record]

List of Record instances.

Source code in src/agent_os_kernel/log.py
def read_all(self) -> list[Record]:
    """Read all records from the log file.

    Returns:
        List of Record instances.
    """
    if not self._path.exists():
        return []
    records: list[Record] = []
    with self._path.open() as f:
        for line in f:
            line = line.strip()
            if line:
                data = json.loads(line)
                records.append(
                    Record(
                        timestamp=data["timestamp"],
                        action=data["action"],
                        target=data["target"],
                        status=data["status"],
                        error=data.get("error"),
                        duration_ms=data.get("duration_ms"),
                        record_id=data.get("record_id"),
                    )
                )
    return records

write(record)

Append a single record to the log.

Parameters:

Name Type Description Default
record Record

The Record to write.

required

Raises:

Type Description
RuntimeError

If the log is not open.

Source code in src/agent_os_kernel/log.py
def write(self, record: Record) -> None:
    """Append a single record to the log.

    Args:
        record: The Record to write.

    Raises:
        RuntimeError: If the log is not open.
    """
    if self._file is None:
        raise RuntimeError("Log is not open. Call open() first.")
    data = asdict(record)
    # Remove None values for compact output
    data = {k: v for k, v in data.items() if v is not None}
    self._file.write(json.dumps(data) + "\n")
    self._file.flush()

Providers

Provider (Base)

agent_os_kernel.providers.base.Provider

Bases: ABC

Abstract base class for all providers.

A provider: - declares which action types it handles - receives an already-authorized request from the Gate - executes the real effect - returns a result or raises an exception

A provider does NOT: - check authorization (the Gate already did) - write to the Log (the Gate does this) - call other providers - interact with the agent loop

Source code in src/agent_os_kernel/providers/base.py
class Provider(ABC):
    """Abstract base class for all providers.

    A provider:
    - declares which action types it handles
    - receives an already-authorized request from the Gate
    - executes the real effect
    - returns a result or raises an exception

    A provider does NOT:
    - check authorization (the Gate already did)
    - write to the Log (the Gate does this)
    - call other providers
    - interact with the agent loop
    """

    @property
    @abstractmethod
    def actions(self) -> list[str]:
        """List of action types this provider handles."""
        ...

    @abstractmethod
    def execute(self, request: ActionRequest) -> Any:
        """Execute the action and return a result.

        Args:
            request: An already-authorized ActionRequest.

        Returns:
            The result of the action (provider-specific).

        Raises:
            Exception: If execution fails.
        """
        ...

actions abstractmethod property

List of action types this provider handles.

execute(request) abstractmethod

Execute the action and return a result.

Parameters:

Name Type Description Default
request ActionRequest

An already-authorized ActionRequest.

required

Returns:

Type Description
Any

The result of the action (provider-specific).

Raises:

Type Description
Exception

If execution fails.

Source code in src/agent_os_kernel/providers/base.py
@abstractmethod
def execute(self, request: ActionRequest) -> Any:
    """Execute the action and return a result.

    Args:
        request: An already-authorized ActionRequest.

    Returns:
        The result of the action (provider-specific).

    Raises:
        Exception: If execution fails.
    """
    ...

FilesystemProvider

agent_os_kernel.providers.filesystem.FilesystemProvider

Bases: Provider

Provider for filesystem operations.

Source code in src/agent_os_kernel/providers/filesystem.py
class FilesystemProvider(Provider):
    """Provider for filesystem operations."""

    @property
    def actions(self) -> list[str]:
        return ["fs.read", "fs.write", "fs.delete"]

    def execute(self, request: ActionRequest) -> Any:
        if request.action == "fs.read":
            return self._read(request)
        elif request.action == "fs.write":
            return self._write(request)
        elif request.action == "fs.delete":
            return self._delete(request)
        else:
            raise ValueError(f"Unknown action: {request.action}")

    def _read(self, request: ActionRequest) -> str:
        path = Path(request.target)
        if not path.exists():
            raise FileNotFoundError(f"File not found: {request.target}")
        return path.read_text()

    def _write(self, request: ActionRequest) -> dict[str, Any]:
        path = Path(request.target)
        content = request.params.get("content", "")
        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_text(content)
        return {"bytes_written": len(content)}

    def _delete(self, request: ActionRequest) -> dict[str, bool]:
        path = Path(request.target)
        if not path.exists():
            raise FileNotFoundError(f"File not found: {request.target}")
        path.unlink()
        return {"deleted": True}

ProcessProvider

agent_os_kernel.providers.process.ProcessProvider

Bases: Provider

Provider for process execution.

Source code in src/agent_os_kernel/providers/process.py
class ProcessProvider(Provider):
    """Provider for process execution."""

    @property
    def actions(self) -> list[str]:
        return ["proc.exec"]

    def execute(self, request: ActionRequest) -> Any:
        command = request.target
        args = request.params.get("args", [])
        timeout = request.params.get("timeout", DEFAULT_TIMEOUT)
        cwd = request.params.get("cwd")

        cmd = [command, *args] if args else [command]

        result = subprocess.run(  # noqa: S603
            cmd,
            capture_output=True,
            text=True,
            timeout=timeout,
            cwd=cwd,
        )

        return {
            "returncode": result.returncode,
            "stdout": result.stdout,
            "stderr": result.stderr,
        }

HttpProvider

agent_os_kernel.providers.http.HttpProvider

Bases: Provider

Provider for HTTP requests using urllib (no external dependencies).

Source code in src/agent_os_kernel/providers/http.py
class HttpProvider(Provider):
    """Provider for HTTP requests using urllib (no external dependencies)."""

    @property
    def actions(self) -> list[str]:
        return ["net.http"]

    def execute(self, request: ActionRequest) -> Any:
        url = request.target
        method = request.params.get("method", "GET").upper()
        headers = request.params.get("headers", {})
        body = request.params.get("body")
        timeout = request.params.get("timeout", DEFAULT_TIMEOUT)

        data = None
        if body is not None:
            if isinstance(body, dict | list):
                data = json_mod.dumps(body).encode("utf-8")
                headers.setdefault("Content-Type", "application/json")
            elif isinstance(body, str):
                data = body.encode("utf-8")
            else:
                data = body

        req = urllib.request.Request(url, data=data, headers=headers, method=method)

        try:
            with urllib.request.urlopen(req, timeout=timeout) as response:  # noqa: S310
                response_body = response.read().decode("utf-8")
                return {
                    "status_code": response.status,
                    "headers": dict(response.headers),
                    "body": response_body,
                }
        except urllib.error.HTTPError as e:
            return {
                "status_code": e.code,
                "headers": dict(e.headers) if e.headers else {},
                "body": e.read().decode("utf-8"),
            }

McpProvider

agent_os_kernel.providers.mcp.McpProvider

Bases: Provider

Provider for MCP tool calls.

The target format is "server_name/tool_name", e.g. "scholar/search". MCP servers are configured at initialization with their command and env.

Server config format

{ "server_name": { "command": ["npx", "-y", "@modelcontextprotocol/server-filesystem", "/path"], "env": {"KEY": "value"}, # optional } }

Source code in src/agent_os_kernel/providers/mcp.py
class McpProvider(Provider):
    """Provider for MCP tool calls.

    The target format is "server_name/tool_name", e.g. "scholar/search".
    MCP servers are configured at initialization with their command and env.

    Server config format:
        {
            "server_name": {
                "command": ["npx", "-y", "@modelcontextprotocol/server-filesystem", "/path"],
                "env": {"KEY": "value"},  # optional
            }
        }
    """

    def __init__(self, servers: dict[str, Any] | None = None) -> None:
        """Initialize with MCP server configurations.

        Args:
            servers: Mapping of server names to connection configs.
                Each config must have a "command" key (list of strings).
                Optional "env" key for environment variables.
        """
        self._server_configs = servers or {}
        self._clients: dict[str, McpClient] = {}

    @property
    def actions(self) -> list[str]:
        return ["mcp.call"]

    def execute(self, request: ActionRequest) -> Any:
        target = request.target
        if "/" not in target:
            raise ValueError(f"MCP target must be 'server/tool' format: {target}")

        server_name, tool_name = target.split("/", 1)

        if server_name not in self._server_configs:
            raise ValueError(f"Unknown MCP server: {server_name}")

        client = self._get_or_create_client(server_name)
        arguments = request.params.get("arguments", request.params)
        # Remove non-argument keys
        arguments = {k: v for k, v in arguments.items() if k != "arguments"}

        return client.call_tool(tool_name, arguments)

    def _get_or_create_client(self, server_name: str) -> McpClient:
        """Get an existing client or create and connect a new one."""
        if server_name in self._clients:
            return self._clients[server_name]

        config = self._server_configs[server_name]
        command = config.get("command")
        if not command:
            raise ValueError(f"MCP server '{server_name}' config missing 'command'")

        env = config.get("env")
        client = McpClient(command=command, env=env)
        client.connect()
        self._clients[server_name] = client
        return client

    def close(self) -> None:
        """Close all MCP server connections."""
        for client in self._clients.values():
            client.close()
        self._clients.clear()

__init__(servers=None)

Initialize with MCP server configurations.

Parameters:

Name Type Description Default
servers dict[str, Any] | None

Mapping of server names to connection configs. Each config must have a "command" key (list of strings). Optional "env" key for environment variables.

None
Source code in src/agent_os_kernel/providers/mcp.py
def __init__(self, servers: dict[str, Any] | None = None) -> None:
    """Initialize with MCP server configurations.

    Args:
        servers: Mapping of server names to connection configs.
            Each config must have a "command" key (list of strings).
            Optional "env" key for environment variables.
    """
    self._server_configs = servers or {}
    self._clients: dict[str, McpClient] = {}

close()

Close all MCP server connections.

Source code in src/agent_os_kernel/providers/mcp.py
def close(self) -> None:
    """Close all MCP server connections."""
    for client in self._clients.values():
        client.close()
    self._clients.clear()

Reversible Action Layer

ReversibleActionLayer

agent_os_kernel.reversible.ReversibleActionLayer

Wraps the kernel to provide snapshot-based rollback.

The layer coordinates snapshot capture, execution, and rollback. The kernel does not know this layer exists.

Source code in src/agent_os_kernel/reversible.py
class ReversibleActionLayer:
    """Wraps the kernel to provide snapshot-based rollback.

    The layer coordinates snapshot capture, execution, and rollback.
    The kernel does not know this layer exists.
    """

    def __init__(
        self,
        kernel: Kernel,
        strategies: list[SnapshotStrategy],
        store: SnapshotStore,
    ) -> None:
        self.kernel = kernel
        self.strategies = strategies
        self.store = store

    def submit(self, request: ActionRequest) -> ActionResult:
        """Submit an action, capturing a snapshot if the action is reversible.

        Per design §7.1-7.2: capture and persistence failures are best-effort.
        If either fails, the action proceeds without rollback capability.

        Note: record_id is set on the returned ActionResult, not on the
        kernel's log Record. See Record.record_id docstring for rationale.
        """
        # 1. Find a matching snapshot strategy
        strategy = self._find_strategy(request)

        # 2. Capture snapshot before execution (best-effort per §7.1)
        snapshot = None
        if strategy is not None:
            try:
                snapshot = strategy.capture(request)
            except Exception:
                logger.warning(
                    "Snapshot capture failed for %s:%s",
                    request.action,
                    request.target,
                    exc_info=True,
                )
                snapshot = None

        # 3. Execute through the kernel
        result = self.kernel.submit(request)

        # 4. If execution succeeded and we have a snapshot, persist it (best-effort per §7.2)
        if result.status == "OK" and snapshot is not None:
            record_id = self._generate_record_id()
            try:
                self.store.save(record_id, request, snapshot)
                result.record_id = record_id
            except Exception:
                logger.warning(
                    "Failed to save snapshot for %s:%s",
                    request.action,
                    request.target,
                    exc_info=True,
                )

        return result

    def rollback(self, record_id: str) -> ActionResult:
        """Roll back a previously executed action by its record ID."""
        # 1. Load the snapshot
        entry = self.store.load(record_id)
        if entry is None:
            return ActionResult(status="ERROR", data=None, error="no snapshot found")

        original_request, snapshot = entry

        # 2. Find the strategy that created this snapshot
        strategy = self._find_strategy(original_request)
        if strategy is None:
            return ActionResult(status="ERROR", data=None, error="no strategy for action type")

        # 3. Build the restore request
        restore_request = strategy.restore(original_request, snapshot)

        # 4. Submit the restore through the kernel (authorized + logged)
        result = self.kernel.submit(restore_request)

        # 5. Clean up the snapshot on success
        if result.status == "OK":
            self.store.delete(record_id)

        return result

    def _find_strategy(self, request: ActionRequest) -> SnapshotStrategy | None:
        for strategy in self.strategies:
            if strategy.supports(request):
                return strategy
        return None

    def _generate_record_id(self) -> str:
        return uuid4().hex

rollback(record_id)

Roll back a previously executed action by its record ID.

Source code in src/agent_os_kernel/reversible.py
def rollback(self, record_id: str) -> ActionResult:
    """Roll back a previously executed action by its record ID."""
    # 1. Load the snapshot
    entry = self.store.load(record_id)
    if entry is None:
        return ActionResult(status="ERROR", data=None, error="no snapshot found")

    original_request, snapshot = entry

    # 2. Find the strategy that created this snapshot
    strategy = self._find_strategy(original_request)
    if strategy is None:
        return ActionResult(status="ERROR", data=None, error="no strategy for action type")

    # 3. Build the restore request
    restore_request = strategy.restore(original_request, snapshot)

    # 4. Submit the restore through the kernel (authorized + logged)
    result = self.kernel.submit(restore_request)

    # 5. Clean up the snapshot on success
    if result.status == "OK":
        self.store.delete(record_id)

    return result

submit(request)

Submit an action, capturing a snapshot if the action is reversible.

Per design §7.1-7.2: capture and persistence failures are best-effort. If either fails, the action proceeds without rollback capability.

Note: record_id is set on the returned ActionResult, not on the kernel's log Record. See Record.record_id docstring for rationale.

Source code in src/agent_os_kernel/reversible.py
def submit(self, request: ActionRequest) -> ActionResult:
    """Submit an action, capturing a snapshot if the action is reversible.

    Per design §7.1-7.2: capture and persistence failures are best-effort.
    If either fails, the action proceeds without rollback capability.

    Note: record_id is set on the returned ActionResult, not on the
    kernel's log Record. See Record.record_id docstring for rationale.
    """
    # 1. Find a matching snapshot strategy
    strategy = self._find_strategy(request)

    # 2. Capture snapshot before execution (best-effort per §7.1)
    snapshot = None
    if strategy is not None:
        try:
            snapshot = strategy.capture(request)
        except Exception:
            logger.warning(
                "Snapshot capture failed for %s:%s",
                request.action,
                request.target,
                exc_info=True,
            )
            snapshot = None

    # 3. Execute through the kernel
    result = self.kernel.submit(request)

    # 4. If execution succeeded and we have a snapshot, persist it (best-effort per §7.2)
    if result.status == "OK" and snapshot is not None:
        record_id = self._generate_record_id()
        try:
            self.store.save(record_id, request, snapshot)
            result.record_id = record_id
        except Exception:
            logger.warning(
                "Failed to save snapshot for %s:%s",
                request.action,
                request.target,
                exc_info=True,
            )

    return result

SnapshotStrategy

agent_os_kernel.reversible.SnapshotStrategy

Bases: ABC

Captures pre-execution state for a specific action type.

Each provider that supports rollback defines its own strategy.

Source code in src/agent_os_kernel/reversible.py
class SnapshotStrategy(ABC):
    """Captures pre-execution state for a specific action type.

    Each provider that supports rollback defines its own strategy.
    """

    @abstractmethod
    def supports(self, request: ActionRequest) -> bool:
        """Whether this strategy can snapshot the given request."""
        ...

    @abstractmethod
    def capture(self, request: ActionRequest) -> Any:
        """Capture state before execution. Returns opaque snapshot data."""
        ...

    @abstractmethod
    def restore(self, request: ActionRequest, snapshot: Any) -> ActionRequest:
        """Build a restore ActionRequest from the snapshot.

        The returned ActionRequest will be submitted through the kernel
        like any other action — authorized and logged.
        """
        ...

capture(request) abstractmethod

Capture state before execution. Returns opaque snapshot data.

Source code in src/agent_os_kernel/reversible.py
@abstractmethod
def capture(self, request: ActionRequest) -> Any:
    """Capture state before execution. Returns opaque snapshot data."""
    ...

restore(request, snapshot) abstractmethod

Build a restore ActionRequest from the snapshot.

The returned ActionRequest will be submitted through the kernel like any other action — authorized and logged.

Source code in src/agent_os_kernel/reversible.py
@abstractmethod
def restore(self, request: ActionRequest, snapshot: Any) -> ActionRequest:
    """Build a restore ActionRequest from the snapshot.

    The returned ActionRequest will be submitted through the kernel
    like any other action — authorized and logged.
    """
    ...

supports(request) abstractmethod

Whether this strategy can snapshot the given request.

Source code in src/agent_os_kernel/reversible.py
@abstractmethod
def supports(self, request: ActionRequest) -> bool:
    """Whether this strategy can snapshot the given request."""
    ...

FsWriteSnapshotStrategy

agent_os_kernel.reversible.FsWriteSnapshotStrategy

Bases: SnapshotStrategy

Snapshot strategy for fs.write actions.

Captures file content before overwrite so it can be restored.

Source code in src/agent_os_kernel/reversible.py
class FsWriteSnapshotStrategy(SnapshotStrategy):
    """Snapshot strategy for fs.write actions.

    Captures file content before overwrite so it can be restored.
    """

    def supports(self, request: ActionRequest) -> bool:
        return request.action == "fs.write"

    def capture(self, request: ActionRequest) -> dict[str, Any]:
        path = Path(request.target)
        if path.exists():
            return {"existed": True, "content": path.read_text()}
        return {"existed": False}

    def restore(self, request: ActionRequest, snapshot: dict[str, Any]) -> ActionRequest:
        if snapshot["existed"]:
            return ActionRequest(
                action="fs.write",
                target=request.target,
                params={"content": snapshot["content"]},
            )
        else:
            return ActionRequest(
                action="fs.delete",
                target=request.target,
                params={},
            )

FsDeleteSnapshotStrategy

agent_os_kernel.reversible.FsDeleteSnapshotStrategy

Bases: SnapshotStrategy

Snapshot strategy for fs.delete actions.

Captures file content before deletion so it can be restored via fs.write. If the file did not exist before delete, capture returns None and no snapshot is persisted (the delete was a no-op, nothing to restore).

Source code in src/agent_os_kernel/reversible.py
class FsDeleteSnapshotStrategy(SnapshotStrategy):
    """Snapshot strategy for fs.delete actions.

    Captures file content before deletion so it can be restored via fs.write.
    If the file did not exist before delete, capture returns None and no
    snapshot is persisted (the delete was a no-op, nothing to restore).
    """

    def supports(self, request: ActionRequest) -> bool:
        return request.action == "fs.delete"

    def capture(self, request: ActionRequest) -> dict[str, Any] | None:
        path = Path(request.target)
        if path.exists():
            return {"content": path.read_text()}
        return None

    def restore(self, request: ActionRequest, snapshot: dict[str, Any]) -> ActionRequest:
        return ActionRequest(
            action="fs.write",
            target=request.target,
            params={"content": snapshot["content"]},
        )

SnapshotStore

agent_os_kernel.reversible.SnapshotStore

Persists snapshots indexed by record ID.

A simple file-based key-value store. Snapshots expire after TTL.

Source code in src/agent_os_kernel/reversible.py
class SnapshotStore:
    """Persists snapshots indexed by record ID.

    A simple file-based key-value store. Snapshots expire after TTL.
    """

    def __init__(self, store_dir: str | Path, ttl_seconds: int = 3600) -> None:
        self._store_dir = Path(store_dir)
        self._store_dir.mkdir(parents=True, exist_ok=True)
        self._ttl_seconds = ttl_seconds

    def save(self, record_id: str, request: ActionRequest, snapshot: Any) -> None:
        """Save a snapshot associated with a record ID."""
        from datetime import datetime, timezone

        now = datetime.now(timezone.utc)
        entry = {
            "record_id": record_id,
            "original_request": {
                "action": request.action,
                "target": request.target,
                "params": request.params,
            },
            "snapshot": snapshot,
            "created_at": now.isoformat(),
            "expires_at": (datetime.fromtimestamp(now.timestamp() + self._ttl_seconds, tz=timezone.utc)).isoformat(),
        }
        path = self._store_dir / f"{record_id}.json"
        path.write_text(json.dumps(entry))

    def load(self, record_id: str) -> tuple[ActionRequest, Any] | None:
        """Load a snapshot by record ID. Returns None if not found or expired."""
        from datetime import datetime, timezone

        path = self._store_dir / f"{record_id}.json"
        if not path.exists():
            return None

        entry = json.loads(path.read_text())

        # Check TTL via expires_at (preferred) or created_at (legacy)
        if "expires_at" in entry:
            expires_at = datetime.fromisoformat(entry["expires_at"])
            if datetime.now(timezone.utc) >= expires_at:
                path.unlink(missing_ok=True)
                return None
        elif "created_at" in entry:
            # Legacy format: created_at as float
            created = entry["created_at"]
            if isinstance(created, int | float) and time.time() - created > self._ttl_seconds:
                path.unlink(missing_ok=True)
                return None

        # Support both new ("original_request") and legacy ("request") key
        req_data = entry.get("original_request") or entry["request"]
        request = ActionRequest(
            action=req_data["action"],
            target=req_data["target"],
            params=req_data["params"],
        )
        return request, entry["snapshot"]

    def delete(self, record_id: str) -> None:
        """Remove a snapshot after successful rollback."""
        path = self._store_dir / f"{record_id}.json"
        path.unlink(missing_ok=True)

delete(record_id)

Remove a snapshot after successful rollback.

Source code in src/agent_os_kernel/reversible.py
def delete(self, record_id: str) -> None:
    """Remove a snapshot after successful rollback."""
    path = self._store_dir / f"{record_id}.json"
    path.unlink(missing_ok=True)

load(record_id)

Load a snapshot by record ID. Returns None if not found or expired.

Source code in src/agent_os_kernel/reversible.py
def load(self, record_id: str) -> tuple[ActionRequest, Any] | None:
    """Load a snapshot by record ID. Returns None if not found or expired."""
    from datetime import datetime, timezone

    path = self._store_dir / f"{record_id}.json"
    if not path.exists():
        return None

    entry = json.loads(path.read_text())

    # Check TTL via expires_at (preferred) or created_at (legacy)
    if "expires_at" in entry:
        expires_at = datetime.fromisoformat(entry["expires_at"])
        if datetime.now(timezone.utc) >= expires_at:
            path.unlink(missing_ok=True)
            return None
    elif "created_at" in entry:
        # Legacy format: created_at as float
        created = entry["created_at"]
        if isinstance(created, int | float) and time.time() - created > self._ttl_seconds:
            path.unlink(missing_ok=True)
            return None

    # Support both new ("original_request") and legacy ("request") key
    req_data = entry.get("original_request") or entry["request"]
    request = ActionRequest(
        action=req_data["action"],
        target=req_data["target"],
        params=req_data["params"],
    )
    return request, entry["snapshot"]

save(record_id, request, snapshot)

Save a snapshot associated with a record ID.

Source code in src/agent_os_kernel/reversible.py
def save(self, record_id: str, request: ActionRequest, snapshot: Any) -> None:
    """Save a snapshot associated with a record ID."""
    from datetime import datetime, timezone

    now = datetime.now(timezone.utc)
    entry = {
        "record_id": record_id,
        "original_request": {
            "action": request.action,
            "target": request.target,
            "params": request.params,
        },
        "snapshot": snapshot,
        "created_at": now.isoformat(),
        "expires_at": (datetime.fromtimestamp(now.timestamp() + self._ttl_seconds, tz=timezone.utc)).isoformat(),
    }
    path = self._store_dir / f"{record_id}.json"
    path.write_text(json.dumps(entry))

Agent Loop

ToolDef

agent_os_kernel.agent_loop.ToolDef dataclass

Declares a tool the LLM can call. Contains NO execution logic.

Execution is always handled by kernel.submit() -> provider.execute(). ToolDef only provides the metadata the LLM needs to generate tool calls and the mapping rules to convert tool calls into ActionRequests.

Attributes:

Name Type Description
name str

Tool name shown to the LLM.

description str

Tool description shown to the LLM.

parameters dict[str, Any]

JSON Schema for tool parameters.

action str

Kernel action type, e.g. "fs.read", "mcp.call".

target_from str | Callable[[dict[str, Any]], str]

How to extract the target for the ActionRequest. If a string, uses that parameter name from the tool args. If a callable, calls it with the args dict to produce the target.

Source code in src/agent_os_kernel/agent_loop.py
@dataclass
class ToolDef:
    """Declares a tool the LLM can call. Contains NO execution logic.

    Execution is always handled by kernel.submit() -> provider.execute().
    ToolDef only provides the metadata the LLM needs to generate tool calls
    and the mapping rules to convert tool calls into ActionRequests.

    Attributes:
        name: Tool name shown to the LLM.
        description: Tool description shown to the LLM.
        parameters: JSON Schema for tool parameters.
        action: Kernel action type, e.g. "fs.read", "mcp.call".
        target_from: How to extract the target for the ActionRequest.
            If a string, uses that parameter name from the tool args.
            If a callable, calls it with the args dict to produce the target.
    """

    name: str
    description: str
    parameters: dict[str, Any]
    action: str
    target_from: str | Callable[[dict[str, Any]], str] = field(default="target")

AgentLoop

agent_os_kernel.agent_loop.AgentLoop

LLM agent loop where kernel.submit() is the sole execution path.

Invariant: there is no code path that executes a tool without going through kernel.submit(). This is enforced structurally — ToolDefs contain no execution logic, and this class only calls kernel.submit().

Source code in src/agent_os_kernel/agent_loop.py
class AgentLoop:
    """LLM agent loop where kernel.submit() is the sole execution path.

    Invariant: there is no code path that executes a tool without going
    through kernel.submit(). This is enforced structurally — ToolDefs
    contain no execution logic, and this class only calls kernel.submit().
    """

    def __init__(
        self,
        kernel: Kernel,
        model: str,
        instructions: str = "",
        tools: list[ToolDef] | None = None,
        max_turns: int = 20,
        submit: SubmitFn | None = None,
    ) -> None:
        """Initialize the agent loop.

        Args:
            kernel: The Kernel instance for authorization and execution.
            model: LiteLLM model string, e.g. "gpt-4o", "anthropic/claude-sonnet-4-20250514".
            instructions: System prompt for the LLM.
            tools: List of ToolDefs available to the agent.
            max_turns: Maximum LLM call iterations before stopping.
            submit: Optional override for the submit callable. Defaults to
                kernel.submit(). Use this for ReversibleActionLayer integration.
        """
        self.kernel = kernel
        self.model = model
        self.instructions = instructions
        self.tools: dict[str, ToolDef] = {t.name: t for t in (tools or [])}
        self.max_turns = max_turns
        self._submit = submit or kernel.submit

    async def run(self, prompt: str) -> str:
        """Run the agent loop until completion or max_turns.

        Args:
            prompt: User input prompt.

        Returns:
            The agent's final text output.
        """
        messages: list[dict[str, Any]] = []
        if self.instructions:
            messages.append({"role": "system", "content": self.instructions})
        messages.append({"role": "user", "content": prompt})

        logger.info("agent_loop.start model=%s max_turns=%d tools=%d", self.model, self.max_turns, len(self.tools))

        for turn in range(self.max_turns):
            try:
                response = await litellm.acompletion(
                    model=self.model,
                    messages=messages,
                    tools=self._tool_schemas() if self.tools else None,
                    tool_choice="auto" if self.tools else None,
                )
            except Exception as exc:
                logger.error("agent_loop.llm_error model=%s turn=%d error=%s", self.model, turn + 1, exc)
                return f"[LLM error: {exc}]"

            choice = response.choices[0]
            message = choice.message

            # Terminal: LLM produced final text
            if choice.finish_reason == "stop":
                logger.info("agent_loop.done model=%s turns_used=%d", self.model, turn + 1)
                return message.content or ""

            # Tool calls: execute each through kernel
            if message.tool_calls:
                tool_names = [tc.function.name for tc in message.tool_calls]
                logger.info(
                    "agent_loop.tool_calls turn=%d count=%d tools=%s", turn + 1, len(message.tool_calls), tool_names
                )
                messages.append(message.model_dump())
                for tc in message.tool_calls:
                    result = self._execute_tool_call(tc)
                    messages.append(
                        {
                            "role": "tool",
                            "tool_call_id": tc.id,
                            "content": result,
                        }
                    )
                continue

            # Unexpected finish reason (e.g. length) — return whatever we have
            logger.warning("agent_loop.unexpected_finish turn=%d reason=%s", turn + 1, choice.finish_reason)
            return message.content or ""

        logger.warning("agent_loop.max_turns model=%s max_turns=%d", self.model, self.max_turns)
        return "[max turns reached]"

    def _execute_tool_call(self, tool_call: Any) -> str:
        """Convert a tool call to an ActionRequest and submit through kernel.

        THIS IS THE ONLY EXECUTION PATH. There is no else branch,
        no fallback, no direct function call. Every tool call becomes
        a kernel.submit() call.
        """
        func = tool_call.function
        tool_def = self.tools.get(func.name)

        if tool_def is None:
            return json.dumps({"error": f"unknown tool: {func.name}", "status": "ERROR"})

        args: dict[str, Any] = json.loads(func.arguments) if func.arguments else {}

        # Resolve target
        if isinstance(tool_def.target_from, str):
            target = str(args.get(tool_def.target_from, func.name))
        else:
            target = tool_def.target_from(args)

        # Submit through kernel — the ONLY execution path
        request = ActionRequest(action=tool_def.action, target=target, params=args)
        result = self._submit(request)

        return json.dumps(
            {"status": result.status, "data": result.data, "error": result.error},
            default=str,
        )

    def _tool_schemas(self) -> list[dict[str, Any]]:
        """Convert ToolDefs to LiteLLM/OpenAI tool format."""
        return [
            {
                "type": "function",
                "function": {
                    "name": td.name,
                    "description": td.description,
                    "parameters": td.parameters,
                },
            }
            for td in self.tools.values()
        ]

__init__(kernel, model, instructions='', tools=None, max_turns=20, submit=None)

Initialize the agent loop.

Parameters:

Name Type Description Default
kernel Kernel

The Kernel instance for authorization and execution.

required
model str

LiteLLM model string, e.g. "gpt-4o", "anthropic/claude-sonnet-4-20250514".

required
instructions str

System prompt for the LLM.

''
tools list[ToolDef] | None

List of ToolDefs available to the agent.

None
max_turns int

Maximum LLM call iterations before stopping.

20
submit SubmitFn | None

Optional override for the submit callable. Defaults to kernel.submit(). Use this for ReversibleActionLayer integration.

None
Source code in src/agent_os_kernel/agent_loop.py
def __init__(
    self,
    kernel: Kernel,
    model: str,
    instructions: str = "",
    tools: list[ToolDef] | None = None,
    max_turns: int = 20,
    submit: SubmitFn | None = None,
) -> None:
    """Initialize the agent loop.

    Args:
        kernel: The Kernel instance for authorization and execution.
        model: LiteLLM model string, e.g. "gpt-4o", "anthropic/claude-sonnet-4-20250514".
        instructions: System prompt for the LLM.
        tools: List of ToolDefs available to the agent.
        max_turns: Maximum LLM call iterations before stopping.
        submit: Optional override for the submit callable. Defaults to
            kernel.submit(). Use this for ReversibleActionLayer integration.
    """
    self.kernel = kernel
    self.model = model
    self.instructions = instructions
    self.tools: dict[str, ToolDef] = {t.name: t for t in (tools or [])}
    self.max_turns = max_turns
    self._submit = submit or kernel.submit

run(prompt) async

Run the agent loop until completion or max_turns.

Parameters:

Name Type Description Default
prompt str

User input prompt.

required

Returns:

Type Description
str

The agent's final text output.

Source code in src/agent_os_kernel/agent_loop.py
async def run(self, prompt: str) -> str:
    """Run the agent loop until completion or max_turns.

    Args:
        prompt: User input prompt.

    Returns:
        The agent's final text output.
    """
    messages: list[dict[str, Any]] = []
    if self.instructions:
        messages.append({"role": "system", "content": self.instructions})
    messages.append({"role": "user", "content": prompt})

    logger.info("agent_loop.start model=%s max_turns=%d tools=%d", self.model, self.max_turns, len(self.tools))

    for turn in range(self.max_turns):
        try:
            response = await litellm.acompletion(
                model=self.model,
                messages=messages,
                tools=self._tool_schemas() if self.tools else None,
                tool_choice="auto" if self.tools else None,
            )
        except Exception as exc:
            logger.error("agent_loop.llm_error model=%s turn=%d error=%s", self.model, turn + 1, exc)
            return f"[LLM error: {exc}]"

        choice = response.choices[0]
        message = choice.message

        # Terminal: LLM produced final text
        if choice.finish_reason == "stop":
            logger.info("agent_loop.done model=%s turns_used=%d", self.model, turn + 1)
            return message.content or ""

        # Tool calls: execute each through kernel
        if message.tool_calls:
            tool_names = [tc.function.name for tc in message.tool_calls]
            logger.info(
                "agent_loop.tool_calls turn=%d count=%d tools=%s", turn + 1, len(message.tool_calls), tool_names
            )
            messages.append(message.model_dump())
            for tc in message.tool_calls:
                result = self._execute_tool_call(tc)
                messages.append(
                    {
                        "role": "tool",
                        "tool_call_id": tc.id,
                        "content": result,
                    }
                )
            continue

        # Unexpected finish reason (e.g. length) — return whatever we have
        logger.warning("agent_loop.unexpected_finish turn=%d reason=%s", turn + 1, choice.finish_reason)
        return message.content or ""

    logger.warning("agent_loop.max_turns model=%s max_turns=%d", self.model, self.max_turns)
    return "[max turns reached]"

run_agent_loop

agent_os_kernel.agent_loop.run_agent_loop(kernel, model, prompt, *, instructions='', tools=None, max_turns=20, submit=None) async

Convenience function: create an AgentLoop and run it.

Parameters:

Name Type Description Default
kernel Kernel

The Kernel instance.

required
model str

LiteLLM model string.

required
prompt str

User input prompt.

required
instructions str

System prompt for the LLM.

''
tools list[ToolDef] | None

List of ToolDefs.

None
max_turns int

Maximum LLM call iterations.

20
submit SubmitFn | None

Optional override for the submit callable. Defaults to kernel.submit(). Use this for ReversibleActionLayer integration.

None

Returns:

Type Description
str

The agent's final text output.

Source code in src/agent_os_kernel/agent_loop.py
async def run_agent_loop(
    kernel: Kernel,
    model: str,
    prompt: str,
    *,
    instructions: str = "",
    tools: list[ToolDef] | None = None,
    max_turns: int = 20,
    submit: SubmitFn | None = None,
) -> str:
    """Convenience function: create an AgentLoop and run it.

    Args:
        kernel: The Kernel instance.
        model: LiteLLM model string.
        prompt: User input prompt.
        instructions: System prompt for the LLM.
        tools: List of ToolDefs.
        max_turns: Maximum LLM call iterations.
        submit: Optional override for the submit callable. Defaults to
            kernel.submit(). Use this for ReversibleActionLayer integration.

    Returns:
        The agent's final text output.
    """
    loop = AgentLoop(
        kernel=kernel,
        model=model,
        instructions=instructions,
        tools=tools,
        max_turns=max_turns,
        submit=submit,
    )
    return await loop.run(prompt)

SubmitFn

agent_os_kernel.models.SubmitFn = Callable[['ActionRequest'], 'ActionResult'] module-attribute

CLI

The kernel provides a CLI entry point:

python -m agent_os_kernel <command> [options]

Commands

Command Description
submit Submit an action through the kernel
log Display kernel log entries
validate-policy Validate a policy YAML file
version Print version information

Examples

# Validate a policy file
python -m agent_os_kernel validate-policy --policy configs/example_policy.yaml

# Submit an action
python -m agent_os_kernel submit --policy policy.yaml --action fs.read --target /workspace/data.txt

# View log entries
python -m agent_os_kernel log --log-path kernel.log --status OK --limit 10

# Print version
python -m agent_os_kernel version