Сообщения и доставка

Очередь команд

Мы сериализуем входящие запуски автоответов (для всех каналов) через небольшую внутрипроцессную очередь, чтобы предотвратить конфликты между несколькими запусками агента, сохраняя при этом безопасный параллелизм между сессиями.

Зачем

  • Запуски автоответов могут быть ресурсоемкими (вызовы LLM) и могут конфликтовать, когда несколько входящих сообщений поступают почти одновременно.
  • Сериализация позволяет избежать конкуренции за общие ресурсы (файлы сессий, логи, stdin CLI) и снижает вероятность срабатывания ограничений скорости (rate limits) на стороне вышестоящих сервисов.

Как это работает

  • Очередь FIFO, учитывающая "полосы" (lane-aware), обрабатывает каждую полосу с настраиваемым ограничением параллелизма (по умолчанию 1 для ненастроенных полос; для основной полосы main по умолчанию 4, для subagent — 8).
  • runEmbeddedPiAgent ставит в очередь по ключу сессии (полоса session:<key>), чтобы гарантировать только один активный запуск на сессию.
  • Затем каждый запуск сессии ставится в очередь в глобальную полосу (по умолчанию main), поэтому общий параллелизм ограничен параметром agents.defaults.maxConcurrent.
  • При включенном подробном логировании (verbose logging) поставленные в очередь запуски выводят короткое уведомление, если они ждали более ~2 секунд перед началом.
  • Индикаторы набора текста (typing indicators) по-прежнему срабатывают немедленно при постановке в очередь (когда это поддерживается каналом), поэтому пользовательский опыт не меняется, пока мы ждем своей очереди.

Режимы очереди (для каждого канала)

Входящие сообщения могут направлять текущий запуск, ждать следующего хода или делать и то, и другое:

  • steer: немедленно внедрить в текущий запуск (отменяет ожидающие вызовы инструментов после следующей границы инструмента). Если потоковая обработка (streaming) не активна, переходит в режим followup.
  • followup: поставить в очередь для следующего хода агента после завершения текущего запуска.
  • collect: объединить все сообщения в очереди в один следующий ход (по умолчанию). Если сообщения предназначены для разных каналов/веток, они обрабатываются отдельно, чтобы сохранить маршрутизацию.
  • steer-backlog (также steer+backlog): направить (steer) сейчас и сохранить сообщение для следующего хода.
  • interrupt (устаревший): прервать активный запуск для этой сессии, затем запустить новейшее сообщение.
  • queue (устаревший алиас): то же, что и steer.

Steer-backlog означает, что вы можете получить ответ после направленного запуска, поэтому на поверхностях с потоковой передачей могут появляться дубликаты. Предпочитайте collect/steer, если хотите один ответ на каждое входящее сообщение. Отправьте /queue collect как отдельную команду (для сессии) или установите messages.queue.byChannel.discord: "collect". Значения по умолчанию (если не задано в конфигурации):

  • Все поверхности → collect

Настройте глобально или для каждого канала через messages.queue:

{
  messages: {
    queue: {
      mode: "collect",
      debounceMs: 1000,
      cap: 20,
      drop: "summarize",
      byChannel: { discord: "collect" },
    },
  },
}

Параметры очереди

Параметры применяются к режимам followup, collect и steer-backlog (а также к steer, когда он переходит в режим followup):

  • debounceMs: ждать периода без сообщений перед началом следующего хода (предотвращает "продолжить, продолжить").
  • cap: максимальное количество сообщений в очереди на сессию.
  • drop: политика обработки переполнения (old, new, summarize).

Summarize сохраняет короткий список в виде маркированного списка отброшенных сообщений и внедряет его как синтетический промпт для следующего хода. Значения по умолчанию: debounceMs: 1000, cap: 20, drop: summarize.

Переопределения для конкретной сессии

  • Отправьте /queue <mode> как отдельную команду, чтобы сохранить режим для текущей сессии.
  • Параметры можно комбинировать: /queue collect debounce:2s cap:25 drop:summarize
  • /queue default или /queue reset очищает переопределение для сессии.

Область применения и гарантии

  • Применяется к запускам агентов автоответов во всех входящих каналах, использующих конвейер ответов шлюза (WhatsApp web, Telegram, Slack, Discord, Signal, iMessage, веб-чат и т.д.).
  • Полоса по умолчанию (main) является общей для всего процесса для входящих сообщений и основных heartbeat-задач; установите agents.defaults.maxConcurrent, чтобы разрешить параллельное выполнение нескольких сессий.
  • Могут существовать дополнительные полосы (например, cron, subagent), чтобы фоновые задачи могли выполняться параллельно, не блокируя входящие ответы.
  • Полосы для каждой сессии гарантируют, что только один запуск агента работает с данной сессией в любой момент времени.
  • Нет внешних зависимостей или фоновых потоков-воркеров; чистый TypeScript + promises.

Устранение неполадок

  • Если команды, кажется, зависли, включите подробное логирование и ищите строки "queued for …ms", чтобы подтвердить, что очередь обрабатывается.
  • Если вам нужна информация о глубине очереди, включите подробное логирование и следите за строками о времени в очереди.

Политика повторов