Skip to content

API Reference

Auto-generated from source docstrings.

Config

Pydantic configuration models for mattermind.

AgentConfig

Bases: BaseModel

Agent loop behaviour settings.

Source code in src/mattermind/config/models.py
class AgentConfig(BaseModel):
    """Agent loop behaviour settings."""

    max_iterations: int = 15
    max_threads_per_query: int = 20
    max_link_depth: int = 2
    total_token_budget: int = 200_000
    parallel_tool_calls: bool = True

AppConfig

Bases: BaseModel

Root application configuration.

Source code in src/mattermind/config/models.py
class AppConfig(BaseModel):
    """Root application configuration."""

    mattermost: MattermostConfig
    llm: LLMConfig
    agent: AgentConfig = Field(default_factory=AgentConfig)
    output: OutputConfig = Field(default_factory=OutputConfig)
    logging: LoggingConfig = Field(default_factory=LoggingConfig)

LLMConfig

Bases: BaseModel

LLM provider settings.

Source code in src/mattermind/config/models.py
class LLMConfig(BaseModel):
    """LLM provider settings."""

    base_url: str = "https://api.openai.com/v1"
    api_key: str
    model: str = "gpt-4o-mini"
    temperature: float = 0.2
    max_tokens_per_response: int = 4000
    request_timeout_seconds: int = 120
    verify_ssl: bool = True

    def __repr__(self) -> str:
        masked_key = self.api_key[:5] + "***" if len(self.api_key) > 5 else "***"
        return (
            f"LLMConfig(base_url={self.base_url!r}, api_key={masked_key!r}, "
            f"model={self.model!r}, temperature={self.temperature}, "
            f"max_tokens_per_response={self.max_tokens_per_response})"
        )

    def __str__(self) -> str:
        return self.__repr__()

LoggingConfig

Bases: BaseModel

Logging settings.

Source code in src/mattermind/config/models.py
class LoggingConfig(BaseModel):
    """Logging settings."""

    level: str = "INFO"

MattermostConfig

Bases: BaseModel

Mattermost connection settings.

Auth is either a personal access token (token) or a login/password pair. Exactly one of the two must be provided.

Source code in src/mattermind/config/models.py
class MattermostConfig(BaseModel):
    """Mattermost connection settings.

    Auth is either a personal access token (``token``) or a login/password pair.
    Exactly one of the two must be provided.
    """

    url: str
    team: str | None = None
    timeout_seconds: int = 30
    rate_limit_rps: int = 10
    verify_ssl: bool = True

    # Token-based auth
    token: str | None = None

    # Login/password auth
    login: str | None = None
    password: str | None = None

    @model_validator(mode="after")
    def _validate_auth(self) -> MattermostConfig:
        has_token = bool(self.token)
        has_login = bool(self.login) and bool(self.password)
        if not has_token and not has_login:
            raise ValueError(
                "Mattermost auth is not configured. Provide either 'token' or both 'login' and 'password'."
            )
        if has_token and has_login:
            raise ValueError("Provide either 'token' or 'login'+'password', not both.")
        return self

    def __repr__(self) -> str:
        if self.token:
            auth = "token=" + (self.token[:5] + "***" if len(self.token) > 5 else "***")
        else:
            auth = f"login={self.login!r}, password=***"
        team = f", team={self.team!r}" if self.team else ""
        return (
            f"MattermostConfig(url={self.url!r}, {auth}"
            f"{team}, timeout_seconds={self.timeout_seconds}, "
            f"rate_limit_rps={self.rate_limit_rps})"
        )

    def __str__(self) -> str:
        return self.__repr__()

OutputConfig

Bases: BaseModel

Output rendering settings.

Source code in src/mattermind/config/models.py
class OutputConfig(BaseModel):
    """Output rendering settings."""

    show_thread_tree: bool = True
    show_token_usage: bool = True
    show_timings: bool = True
    format: str = "markdown"  # markdown | json | plain

Mattermost Client

Async Mattermost REST API v4 client with retries and rate limiting.

MattermostAPIError

Bases: Exception

Raised when the Mattermost API returns an error response.

Source code in src/mattermind/mattermost/client.py
class MattermostAPIError(Exception):
    """Raised when the Mattermost API returns an error response."""

    def __init__(self, status_code: int, message: str) -> None:
        self.status_code = status_code
        super().__init__(f"Mattermost API error {status_code}: {message}")

MattermostClient

Async HTTP client for Mattermost REST API v4.

Usage::

async with MattermostClient(config) as client:
    hits = await client.search_posts(team_id, "incident", None, 20)
Source code in src/mattermind/mattermost/client.py
class MattermostClient:
    """Async HTTP client for Mattermost REST API v4.

    Usage::

        async with MattermostClient(config) as client:
            hits = await client.search_posts(team_id, "incident", None, 20)
    """

    def __init__(self, config: MattermostConfig) -> None:
        self._config = config
        self._base_url = config.url.rstrip("/")
        self._headers: dict[str, str] = {"Content-Type": "application/json"}
        if config.token:
            self._headers["Authorization"] = f"Bearer {config.token}"
        self._semaphore = asyncio.Semaphore(config.rate_limit_rps)
        self._http: httpx.AsyncClient | None = None

    async def __aenter__(self) -> MattermostClient:
        self._http = httpx.AsyncClient(
            base_url=self._base_url,
            headers=self._headers,
            timeout=self._config.timeout_seconds,
            verify=self._config.verify_ssl,
        )
        if not self._config.token:
            await self._login()
        return self

    async def _login(self) -> None:
        """Authenticate with login/password and store the session token."""
        assert self._http is not None
        response = await self._http.post(
            "/api/v4/users/login",
            json={"login_id": self._config.login, "password": self._config.password},
        )
        if response.status_code >= 400:
            try:
                detail = response.json().get("message", response.text)
            except Exception:
                detail = response.text
            raise MattermostAPIError(response.status_code, detail)

        token = response.headers.get("Token")
        if not token:
            raise MattermostAPIError(0, "Login succeeded but no Token header returned.")

        assert self._http is not None
        self._http.headers["Authorization"] = f"Bearer {token}"
        logger.debug("Authenticated via login/password, session token obtained.")

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        if self._http is not None:
            await self._http.aclose()
            self._http = None

    @property
    def _client(self) -> httpx.AsyncClient:
        if self._http is None:
            raise RuntimeError("MattermostClient must be used as an async context manager.")
        return self._http

    async def _request(
        self,
        method: str,
        path: str,
        **kwargs: Any,
    ) -> Any:
        """Execute a rate-limited, retried HTTP request and return parsed JSON."""

        @retry(
            retry=retry_if_exception(_should_retry),
            stop=stop_after_attempt(3),
            wait=wait_exponential(multiplier=1, min=1, max=8),
            reraise=True,
        )
        async def _inner() -> Any:
            async with self._semaphore:
                logger.debug("%s %s", method.upper(), path)
                response = await self._client.request(method, path, **kwargs)

            if response.status_code >= 400:
                try:
                    detail = response.json().get("message", response.text)
                except Exception:
                    detail = response.text
                raise MattermostAPIError(response.status_code, detail)

            if response.status_code == 204 or not response.content:
                return {}

            return response.json()

        return await _inner()

    # ------------------------------------------------------------------ #
    # Public API methods                                                   #
    # ------------------------------------------------------------------ #

    async def get_my_teams(self) -> list[Team]:
        """Return all teams the current user belongs to."""
        data: list[dict[str, Any]] = await self._request("GET", "/api/v4/users/me/teams")
        return [Team.model_validate(t) for t in data]

    async def validate_connection(self) -> bool:
        """Return True if the token is valid and the server is reachable."""
        try:
            await self._request("GET", "/api/v4/users/me")
        except Exception:
            return False
        else:
            return True

    async def get_team_id(self, team_name_or_id: str) -> str:
        """Resolve a team name or ID to a team ID string."""
        try:
            data: dict[str, Any] = await self._request("GET", f"/api/v4/teams/name/{team_name_or_id}")
            return str(data["id"])
        except MattermostAPIError as exc:
            if exc.status_code == 404:
                # Maybe it was already an ID — validate it
                try:
                    data = await self._request("GET", f"/api/v4/teams/{team_name_or_id}")
                    return str(data["id"])
                except MattermostAPIError:
                    pass
            raise

    async def search_posts(
        self,
        team_id: str,
        query: str,
        channel_id: str | None = None,
        per_page: int = 20,
    ) -> list[SearchHit]:
        """Full-text search for posts within a team."""
        payload: dict[str, Any] = {"terms": query, "is_or_search": False, "per_page": per_page}
        if channel_id:
            payload["channel_id"] = channel_id

        data: dict[str, Any] = await self._request(
            "POST",
            f"/api/v4/teams/{team_id}/posts/search",
            json=payload,
        )

        order: list[str] = data.get("order", [])
        posts_map: dict[str, Any] = data.get("posts", {})

        # Fetch channel names in parallel (one request per unique channel_id)
        unique_channel_ids: set[str] = {posts_map[pid]["channel_id"] for pid in order if pid in posts_map}
        channel_names: dict[str, str] = {}
        if unique_channel_ids:
            results = await asyncio.gather(
                *[self.get_channel(cid) for cid in unique_channel_ids],
                return_exceptions=True,
            )
            for cid, result in zip(unique_channel_ids, results, strict=False):
                if isinstance(result, dict):
                    channel_names[cid] = result.get("name", cid)
                else:
                    channel_names[cid] = cid

        hits: list[SearchHit] = []
        for post_id in order:
            if post_id not in posts_map:
                continue
            raw = posts_map[post_id]
            cid = raw.get("channel_id", "")
            hits.append(
                SearchHit(
                    post_id=post_id,
                    message=raw.get("message", ""),
                    channel_id=cid,
                    channel_name=channel_names.get(cid, cid),
                    user_id=raw.get("user_id", ""),
                )
            )

        return hits

    async def get_thread(self, post_id: str) -> Thread:
        """Fetch the complete thread containing post_id."""
        data: dict[str, Any] = await self._request(
            "GET",
            f"/api/v4/posts/{post_id}/thread",
            params={"skipFetchThreads": "false", "collapsedThreads": "false"},
        )

        posts_map: dict[str, Any] = data.get("posts", {})
        posts: list[Post] = []
        for raw in posts_map.values():
            try:
                posts.append(Post.model_validate(raw))
            except Exception as exc:
                logger.warning("Failed to parse post %s: %s", raw.get("id"), exc)

        posts.sort(key=lambda p: p.create_at)

        # The root post is the one with no root_id
        root_id = next(
            (p.id for p in posts if not p.root_id),
            posts[0].id if posts else post_id,
        )

        return Thread(root_post_id=root_id, posts=posts)

    async def get_user(self, user_id: str) -> User:
        """Fetch a user by ID."""
        data: dict[str, Any] = await self._request("GET", f"/api/v4/users/{user_id}")
        return User.model_validate(data)

    async def get_channel(self, channel_id: str) -> dict[str, str]:
        """Fetch channel metadata by ID."""
        data: dict[str, Any] = await self._request("GET", f"/api/v4/channels/{channel_id}")
        return {k: str(v) for k, v in data.items()}

get_channel(channel_id) async

Fetch channel metadata by ID.

Source code in src/mattermind/mattermost/client.py
async def get_channel(self, channel_id: str) -> dict[str, str]:
    """Fetch channel metadata by ID."""
    data: dict[str, Any] = await self._request("GET", f"/api/v4/channels/{channel_id}")
    return {k: str(v) for k, v in data.items()}

get_my_teams() async

Return all teams the current user belongs to.

Source code in src/mattermind/mattermost/client.py
async def get_my_teams(self) -> list[Team]:
    """Return all teams the current user belongs to."""
    data: list[dict[str, Any]] = await self._request("GET", "/api/v4/users/me/teams")
    return [Team.model_validate(t) for t in data]

get_team_id(team_name_or_id) async

Resolve a team name or ID to a team ID string.

Source code in src/mattermind/mattermost/client.py
async def get_team_id(self, team_name_or_id: str) -> str:
    """Resolve a team name or ID to a team ID string."""
    try:
        data: dict[str, Any] = await self._request("GET", f"/api/v4/teams/name/{team_name_or_id}")
        return str(data["id"])
    except MattermostAPIError as exc:
        if exc.status_code == 404:
            # Maybe it was already an ID — validate it
            try:
                data = await self._request("GET", f"/api/v4/teams/{team_name_or_id}")
                return str(data["id"])
            except MattermostAPIError:
                pass
        raise

get_thread(post_id) async

Fetch the complete thread containing post_id.

Source code in src/mattermind/mattermost/client.py
async def get_thread(self, post_id: str) -> Thread:
    """Fetch the complete thread containing post_id."""
    data: dict[str, Any] = await self._request(
        "GET",
        f"/api/v4/posts/{post_id}/thread",
        params={"skipFetchThreads": "false", "collapsedThreads": "false"},
    )

    posts_map: dict[str, Any] = data.get("posts", {})
    posts: list[Post] = []
    for raw in posts_map.values():
        try:
            posts.append(Post.model_validate(raw))
        except Exception as exc:
            logger.warning("Failed to parse post %s: %s", raw.get("id"), exc)

    posts.sort(key=lambda p: p.create_at)

    # The root post is the one with no root_id
    root_id = next(
        (p.id for p in posts if not p.root_id),
        posts[0].id if posts else post_id,
    )

    return Thread(root_post_id=root_id, posts=posts)

get_user(user_id) async

Fetch a user by ID.

Source code in src/mattermind/mattermost/client.py
async def get_user(self, user_id: str) -> User:
    """Fetch a user by ID."""
    data: dict[str, Any] = await self._request("GET", f"/api/v4/users/{user_id}")
    return User.model_validate(data)

search_posts(team_id, query, channel_id=None, per_page=20) async

Full-text search for posts within a team.

Source code in src/mattermind/mattermost/client.py
async def search_posts(
    self,
    team_id: str,
    query: str,
    channel_id: str | None = None,
    per_page: int = 20,
) -> list[SearchHit]:
    """Full-text search for posts within a team."""
    payload: dict[str, Any] = {"terms": query, "is_or_search": False, "per_page": per_page}
    if channel_id:
        payload["channel_id"] = channel_id

    data: dict[str, Any] = await self._request(
        "POST",
        f"/api/v4/teams/{team_id}/posts/search",
        json=payload,
    )

    order: list[str] = data.get("order", [])
    posts_map: dict[str, Any] = data.get("posts", {})

    # Fetch channel names in parallel (one request per unique channel_id)
    unique_channel_ids: set[str] = {posts_map[pid]["channel_id"] for pid in order if pid in posts_map}
    channel_names: dict[str, str] = {}
    if unique_channel_ids:
        results = await asyncio.gather(
            *[self.get_channel(cid) for cid in unique_channel_ids],
            return_exceptions=True,
        )
        for cid, result in zip(unique_channel_ids, results, strict=False):
            if isinstance(result, dict):
                channel_names[cid] = result.get("name", cid)
            else:
                channel_names[cid] = cid

    hits: list[SearchHit] = []
    for post_id in order:
        if post_id not in posts_map:
            continue
        raw = posts_map[post_id]
        cid = raw.get("channel_id", "")
        hits.append(
            SearchHit(
                post_id=post_id,
                message=raw.get("message", ""),
                channel_id=cid,
                channel_name=channel_names.get(cid, cid),
                user_id=raw.get("user_id", ""),
            )
        )

    return hits

validate_connection() async

Return True if the token is valid and the server is reachable.

Source code in src/mattermind/mattermost/client.py
async def validate_connection(self) -> bool:
    """Return True if the token is valid and the server is reachable."""
    try:
        await self._request("GET", "/api/v4/users/me")
    except Exception:
        return False
    else:
        return True

Agent

Main agentic loop that drives tool calling and answer generation.

AgentLoop

Orchestrates the LLM + tool-calling loop to answer a user's question.

Source code in src/mattermind/agent/loop.py
class AgentLoop:
    """Orchestrates the LLM + tool-calling loop to answer a user's question."""

    def __init__(
        self,
        config: AppConfig,
        client: MattermostClient,
        console: Console,
        verbose: bool = False,
    ) -> None:
        self._config = config
        self._mm_client = client
        self._console = console
        self._verbose = verbose

        self._llm = AsyncOpenAI(
            base_url=config.llm.base_url,
            api_key=config.llm.api_key,
            http_client=httpx.AsyncClient(
                verify=config.llm.verify_ssl,
                timeout=config.llm.request_timeout_seconds,
            ),
        )

    async def run(
        self,
        question: str,
        on_status: Callable[[str], None] | None = None,
    ) -> AskResult:
        """Run the agentic loop for a given question and return AskResult."""
        start = time.monotonic()

        state = AgentState()

        # Resolve team_id once
        if on_status:
            on_status("Connecting to Mattermost...")
        if not self._config.mattermost.team:
            raise ValueError("Mattermost team is not set. Add 'team' to your config or use --team.")
        team_id = await self._mm_client.get_team_id(self._config.mattermost.team)

        system = SYSTEM_PROMPT.format(max_depth=self._config.agent.max_link_depth)

        messages: list[ChatCompletionMessageParam] = [
            {"role": "system", "content": system},
            {"role": "user", "content": question},
        ]

        answer = ""
        iteration = 0

        while iteration < self._config.agent.max_iterations:
            iteration += 1

            if on_status:
                on_status(f"Thinking... (iteration {iteration})")

            logger.debug("LLM call iteration %d", iteration)

            typed_tools = cast(list[ChatCompletionToolParam], TOOL_DEFINITIONS)
            response = await self._llm.chat.completions.create(
                model=self._config.llm.model,
                messages=messages,
                tools=typed_tools,
                tool_choice="auto",
                temperature=self._config.llm.temperature,
                max_tokens=self._config.llm.max_tokens_per_response,
                timeout=self._config.llm.request_timeout_seconds,
            )

            # Track token usage
            if response.usage:
                state.token_usage = state.token_usage.add(
                    TokenUsage(
                        prompt_tokens=response.usage.prompt_tokens,
                        completion_tokens=response.usage.completion_tokens,
                        total_tokens=response.usage.total_tokens,
                    )
                )

            # Check budget
            if state.token_usage.total_tokens >= self._config.agent.total_token_budget:
                logger.warning("Token budget exhausted at iteration %d", iteration)
                if on_status:
                    on_status(f"Token budget exhausted ({state.token_usage.total_tokens:,} tokens). Continue? [y/N] ")
                confirmed = await asyncio.get_event_loop().run_in_executor(None, self._ask_continue)
                if confirmed:
                    # Double the remaining budget and keep going
                    self._config.agent.total_token_budget = (
                        state.token_usage.total_tokens + self._config.agent.total_token_budget
                    )
                    logger.debug("Budget extended to %d", self._config.agent.total_token_budget)
                    continue
                state.incomplete = True
                if on_status:
                    on_status("Stopped by user.")
                break

            choice = response.choices[0]
            message = choice.message

            # Append assistant message to conversation
            messages.append(cast(ChatCompletionMessageParam, message.model_dump(exclude_none=True)))

            # Only handle function tool calls (ignore custom tool calls).
            # Use isinstance to satisfy mypy; MagicMock objects in tests pass this check
            # because MagicMock supports arbitrary attribute access and isinstance checks.
            raw_tool_calls = message.tool_calls or []
            tool_calls: list[ChatCompletionMessageToolCall] = [
                tc  # type: ignore[misc]
                for tc in raw_tool_calls
                if hasattr(tc, "function") and hasattr(tc, "id")
            ]

            if not tool_calls:
                # LLM gave a final text answer (or only unsupported custom tools)
                answer = message.content or ""
                if on_status:
                    on_status("Answer ready.")
                break

            # --- Execute all tool calls (optionally in parallel) ---
            if on_status:
                tool_names = ", ".join(tc.function.name for tc in tool_calls)
                on_status(f"Calling tools: {tool_names}")

            if self._verbose:
                for tc in tool_calls:
                    self._console.print(f"[dim]Tool call: {tc.function.name}({tc.function.arguments})[/dim]")

            tool_tasks = [
                self._run_tool(tc.id, tc.function.name, tc.function.arguments, team_id, state) for tc in tool_calls
            ]

            if self._config.agent.parallel_tool_calls:
                tool_results = await asyncio.gather(*tool_tasks, return_exceptions=True)
            else:
                tool_results = []
                for task in tool_tasks:
                    tool_results.append(await task)

            # Append tool results to message history
            for tc, result in zip(tool_calls, tool_results, strict=False):
                result_str = (
                    orjson.dumps({"error": str(result)}).decode() if isinstance(result, BaseException) else str(result)
                )

                if self._verbose:
                    self._console.print(f"[dim]Tool result ({tc.function.name}): {result_str[:200]}[/dim]")

                tool_msg: ChatCompletionToolMessageParam = {
                    "role": "tool",
                    "tool_call_id": tc.id,
                    "content": result_str,
                }
                messages.append(tool_msg)

        else:
            # Iteration limit reached without a final answer
            state.incomplete = True
            if on_status:
                on_status("Iteration limit reached — stopping early.")
            # Use the last assistant content if any
            for msg in reversed(messages):
                if isinstance(msg, dict) and msg.get("role") == "assistant":
                    answer = str(msg.get("content") or "")
                    break

        elapsed = time.monotonic() - start

        # Collect unique permalinks from explored threads
        permalinks = list({t["permalink"] for t in state.explored_threads if t.get("permalink")})

        return AskResult(
            answer=answer,
            threads_explored=state.threads_fetched,
            tool_calls_made=state.tool_calls_made,
            token_usage=state.token_usage,
            elapsed_seconds=elapsed,
            incomplete=state.incomplete,
            permalinks=permalinks,
        )

    def _ask_continue(self) -> bool:
        """Prompt the user synchronously (runs in a thread executor)."""
        try:
            answer = input("Continue beyond token budget? [y/N]: ").strip().lower()
        except (EOFError, KeyboardInterrupt):
            return False
        else:
            return answer in ("y", "yes", "д", "да")

    async def _run_tool(
        self,
        _tool_call_id: str,
        tool_name: str,
        arguments_json: str,
        team_id: str,
        state: AgentState,
    ) -> str:
        """Parse arguments and delegate to execute_tool."""
        try:
            args: dict[str, Any] = orjson.loads(arguments_json)
        except orjson.JSONDecodeError as exc:
            return orjson.dumps({"error": f"Invalid tool arguments JSON: {exc}"}).decode()

        return await execute_tool(
            tool_name=tool_name,
            tool_args=args,
            client=self._mm_client,
            team_id=team_id,
            state=state,
            config=self._config.agent,
            mm_url=self._config.mattermost.url,
            mm_team=self._config.mattermost.team or "",
        )

run(question, on_status=None) async

Run the agentic loop for a given question and return AskResult.

Source code in src/mattermind/agent/loop.py
async def run(
    self,
    question: str,
    on_status: Callable[[str], None] | None = None,
) -> AskResult:
    """Run the agentic loop for a given question and return AskResult."""
    start = time.monotonic()

    state = AgentState()

    # Resolve team_id once
    if on_status:
        on_status("Connecting to Mattermost...")
    if not self._config.mattermost.team:
        raise ValueError("Mattermost team is not set. Add 'team' to your config or use --team.")
    team_id = await self._mm_client.get_team_id(self._config.mattermost.team)

    system = SYSTEM_PROMPT.format(max_depth=self._config.agent.max_link_depth)

    messages: list[ChatCompletionMessageParam] = [
        {"role": "system", "content": system},
        {"role": "user", "content": question},
    ]

    answer = ""
    iteration = 0

    while iteration < self._config.agent.max_iterations:
        iteration += 1

        if on_status:
            on_status(f"Thinking... (iteration {iteration})")

        logger.debug("LLM call iteration %d", iteration)

        typed_tools = cast(list[ChatCompletionToolParam], TOOL_DEFINITIONS)
        response = await self._llm.chat.completions.create(
            model=self._config.llm.model,
            messages=messages,
            tools=typed_tools,
            tool_choice="auto",
            temperature=self._config.llm.temperature,
            max_tokens=self._config.llm.max_tokens_per_response,
            timeout=self._config.llm.request_timeout_seconds,
        )

        # Track token usage
        if response.usage:
            state.token_usage = state.token_usage.add(
                TokenUsage(
                    prompt_tokens=response.usage.prompt_tokens,
                    completion_tokens=response.usage.completion_tokens,
                    total_tokens=response.usage.total_tokens,
                )
            )

        # Check budget
        if state.token_usage.total_tokens >= self._config.agent.total_token_budget:
            logger.warning("Token budget exhausted at iteration %d", iteration)
            if on_status:
                on_status(f"Token budget exhausted ({state.token_usage.total_tokens:,} tokens). Continue? [y/N] ")
            confirmed = await asyncio.get_event_loop().run_in_executor(None, self._ask_continue)
            if confirmed:
                # Double the remaining budget and keep going
                self._config.agent.total_token_budget = (
                    state.token_usage.total_tokens + self._config.agent.total_token_budget
                )
                logger.debug("Budget extended to %d", self._config.agent.total_token_budget)
                continue
            state.incomplete = True
            if on_status:
                on_status("Stopped by user.")
            break

        choice = response.choices[0]
        message = choice.message

        # Append assistant message to conversation
        messages.append(cast(ChatCompletionMessageParam, message.model_dump(exclude_none=True)))

        # Only handle function tool calls (ignore custom tool calls).
        # Use isinstance to satisfy mypy; MagicMock objects in tests pass this check
        # because MagicMock supports arbitrary attribute access and isinstance checks.
        raw_tool_calls = message.tool_calls or []
        tool_calls: list[ChatCompletionMessageToolCall] = [
            tc  # type: ignore[misc]
            for tc in raw_tool_calls
            if hasattr(tc, "function") and hasattr(tc, "id")
        ]

        if not tool_calls:
            # LLM gave a final text answer (or only unsupported custom tools)
            answer = message.content or ""
            if on_status:
                on_status("Answer ready.")
            break

        # --- Execute all tool calls (optionally in parallel) ---
        if on_status:
            tool_names = ", ".join(tc.function.name for tc in tool_calls)
            on_status(f"Calling tools: {tool_names}")

        if self._verbose:
            for tc in tool_calls:
                self._console.print(f"[dim]Tool call: {tc.function.name}({tc.function.arguments})[/dim]")

        tool_tasks = [
            self._run_tool(tc.id, tc.function.name, tc.function.arguments, team_id, state) for tc in tool_calls
        ]

        if self._config.agent.parallel_tool_calls:
            tool_results = await asyncio.gather(*tool_tasks, return_exceptions=True)
        else:
            tool_results = []
            for task in tool_tasks:
                tool_results.append(await task)

        # Append tool results to message history
        for tc, result in zip(tool_calls, tool_results, strict=False):
            result_str = (
                orjson.dumps({"error": str(result)}).decode() if isinstance(result, BaseException) else str(result)
            )

            if self._verbose:
                self._console.print(f"[dim]Tool result ({tc.function.name}): {result_str[:200]}[/dim]")

            tool_msg: ChatCompletionToolMessageParam = {
                "role": "tool",
                "tool_call_id": tc.id,
                "content": result_str,
            }
            messages.append(tool_msg)

    else:
        # Iteration limit reached without a final answer
        state.incomplete = True
        if on_status:
            on_status("Iteration limit reached — stopping early.")
        # Use the last assistant content if any
        for msg in reversed(messages):
            if isinstance(msg, dict) and msg.get("role") == "assistant":
                answer = str(msg.get("content") or "")
                break

    elapsed = time.monotonic() - start

    # Collect unique permalinks from explored threads
    permalinks = list({t["permalink"] for t in state.explored_threads if t.get("permalink")})

    return AskResult(
        answer=answer,
        threads_explored=state.threads_fetched,
        tool_calls_made=state.tool_calls_made,
        token_usage=state.token_usage,
        elapsed_seconds=elapsed,
        incomplete=state.incomplete,
        permalinks=permalinks,
    )

Tool definitions and execution for the Mattermind agent.

execute_tool(tool_name, tool_args, client, team_id, state, config, mm_url, mm_team) async

Dispatch a tool call and return a JSON string result.

All errors are caught and returned as JSON error objects so the LLM can decide how to proceed rather than crashing the loop.

Source code in src/mattermind/agent/tools.py
async def execute_tool(
    tool_name: str,
    tool_args: dict[str, Any],
    client: MattermostClient,
    team_id: str,
    state: AgentState,
    config: AgentConfig,
    mm_url: str,
    mm_team: str,
) -> str:
    """Dispatch a tool call and return a JSON string result.

    All errors are caught and returned as JSON error objects so the LLM
    can decide how to proceed rather than crashing the loop.
    """
    state.tool_calls_made += 1

    try:
        if tool_name == "mm_search":
            return await _tool_search(tool_args, client, team_id)
        if tool_name == "mm_get_thread":
            return await _tool_get_thread(tool_args, client, state, config, mm_url, mm_team)
        if tool_name == "mm_resolve_permalink":
            return _tool_resolve_permalink(tool_args)
        if tool_name == "mm_get_user":
            return await _tool_get_user(tool_args, client)
    except Exception as exc:
        logger.warning("Tool %s raised: %s", tool_name, exc)
        return _dumps({"error": str(exc)})

    return _dumps({"error": f"Unknown tool: {tool_name}"})

parse_post_id_from_permalink(url)

Extract the post ID from a Mattermost permalink URL.

Returns the post_id string, or None if the URL is not a valid permalink.

Source code in src/mattermind/agent/tools.py
def parse_post_id_from_permalink(url: str) -> str | None:
    """Extract the post ID from a Mattermost permalink URL.

    Returns the post_id string, or None if the URL is not a valid permalink.
    """
    match = _PERMALINK_RE.search(url)
    if match:
        return match.group(1)
    return None

Mutable agent state shared across the agentic loop iterations.

AgentState dataclass

Tracks all mutable state during an agent run.

Source code in src/mattermind/agent/state.py
@dataclass
class AgentState:
    """Tracks all mutable state during an agent run."""

    visited_post_ids: set[str] = field(default_factory=set)
    """Post IDs of threads we have already fetched — prevents re-fetching."""

    link_depth: dict[str, int] = field(default_factory=dict)
    """Maps post_id -> depth at which it was discovered (0 = direct search hit)."""

    token_usage: TokenUsage = field(default_factory=TokenUsage)
    """Cumulative token usage across all LLM calls."""

    threads_fetched: int = 0
    """How many mm_get_thread calls succeeded."""

    tool_calls_made: int = 0
    """Total number of tool calls dispatched."""

    explored_threads: list[dict[str, str]] = field(default_factory=list)
    """Metadata for the "Threads explored" section. Each entry:
    {"post_id": ..., "channel": ..., "title": ..., "permalink": ...}
    """

    incomplete: bool = False
    """True if the run was terminated early (budget / iteration limit)."""

explored_threads = field(default_factory=list) class-attribute instance-attribute

Metadata for the "Threads explored" section. Each entry:

incomplete = False class-attribute instance-attribute

True if the run was terminated early (budget / iteration limit).

Maps post_id -> depth at which it was discovered (0 = direct search hit).

threads_fetched = 0 class-attribute instance-attribute

How many mm_get_thread calls succeeded.

token_usage = field(default_factory=TokenUsage) class-attribute instance-attribute

Cumulative token usage across all LLM calls.

tool_calls_made = 0 class-attribute instance-attribute

Total number of tool calls dispatched.

visited_post_ids = field(default_factory=set) class-attribute instance-attribute

Post IDs of threads we have already fetched — prevents re-fetching.

UI

Rich rendering helpers for mattermind output.

render_answer(console, markdown_text, fmt='markdown')

Render the final LLM answer in the chosen format.

Source code in src/mattermind/ui/renderer.py
def render_answer(console: Console, markdown_text: str, fmt: str = "markdown") -> None:
    """Render the final LLM answer in the chosen format."""
    if fmt == "json":
        # Already handled by caller via AskResult.model_dump_json()
        return
    if fmt == "plain":
        console.print(markdown_text)
        return
    # Default: markdown
    console.print(
        Panel(
            Markdown(markdown_text),
            title="[primary]Answer[/primary]",
            border_style="primary",
            padding=(1, 2),
        )
    )

render_banner(console)

Render ASCII art banner with a colour gradient.

Source code in src/mattermind/ui/renderer.py
def render_banner(console: Console) -> None:
    """Render ASCII art banner with a colour gradient."""
    text = Text()
    for i, line in enumerate(_BANNER_LINES):
        style = _GRADIENT[i % len(_GRADIENT)]
        text.append(line + "\n", style=style)
    console.print(text)
    console.print("[muted]  Ask questions about your Mattermost workspace\n[/muted]")

render_query_panel(console, query)

Render a panel showing the user's question.

Source code in src/mattermind/ui/renderer.py
def render_query_panel(console: Console, query: str) -> None:
    """Render a panel showing the user's question."""
    console.print(
        Panel(
            Text(query, style="bold white"),
            title="[primary]Question[/primary]",
            border_style="secondary",
            expand=False,
            padding=(0, 2),
        )
    )

render_status(console, message)

Print a muted status line (used outside of live context).

Source code in src/mattermind/ui/renderer.py
def render_status(console: Console, message: str) -> None:
    """Print a muted status line (used outside of live context)."""
    console.print(f"[muted]  {message}[/muted]")

render_summary(console, result, show_tokens=True, show_timings=True)

Render a compact summary panel with run statistics.

Source code in src/mattermind/ui/renderer.py
def render_summary(console: Console, result: AskResult, show_tokens: bool = True, show_timings: bool = True) -> None:
    """Render a compact summary panel with run statistics."""
    table = Table.grid(padding=(0, 2))
    table.add_column(style="muted", justify="right")
    table.add_column()

    table.add_row("Threads explored", str(result.threads_explored))
    table.add_row("Tool calls", str(result.tool_calls_made))

    if show_tokens:
        table.add_row(
            "Tokens",
            f"prompt={result.token_usage.prompt_tokens}  "
            f"completion={result.token_usage.completion_tokens}  "
            f"total={result.token_usage.total_tokens}",
        )

    if show_timings:
        table.add_row("Elapsed", f"{result.elapsed_seconds:.1f}s")

    if result.incomplete:
        table.add_row("[warning]Status[/warning]", "[warning]incomplete (budget/limit reached)[/warning]")

    console.print(
        Panel(
            table,
            title="[muted]Run summary[/muted]",
            border_style="muted",
            expand=False,
            padding=(0, 1),
        )
    )

render_thread_tree(console, explored_threads)

Render a rich tree listing every explored thread with clickable links.

Source code in src/mattermind/ui/renderer.py
def render_thread_tree(
    console: Console,
    explored_threads: list[dict[str, str]],
) -> None:
    """Render a rich tree listing every explored thread with clickable links."""
    if not explored_threads:
        return

    root = Tree("[bold]Threads explored[/bold]", guide_style="muted")

    for thread in explored_threads:
        permalink = thread.get("permalink", "")
        channel = thread.get("channel", "unknown")
        title = thread.get("title", "")[:60] or "(no title)"

        label = Text()
        label.append(f"#{channel}  ", style="secondary")
        if permalink:
            label.append(title, style=f"link {permalink}")
        else:
            label.append(title)

        root.add(label)

    console.print(root)

Beautiful error rendering via Rich panels.

error_panel(console, title, message, hint=None)

Render a red error panel with an optional hint line.

Source code in src/mattermind/ui/errors.py
def error_panel(
    console: Console,
    title: str,
    message: str,
    hint: str | None = None,
) -> None:
    """Render a red error panel with an optional hint line."""
    body = Text()
    body.append(message, style="bold white")
    if hint:
        body.append("\n\n")
        body.append("Hint: ", style="yellow bold")
        body.append(hint, style="yellow")

    console.print(
        Panel(
            body,
            title=f"[error] {title}[/error]",
            border_style="error",
            expand=False,
            padding=(1, 2),
        )
    )