Skip to content

Архитектура

Система построена на принципах асинхронности, модульности и split-topology: данные и веб на удалённом сервере, воркеры и Telegram -- на локальном.

Split-Topology

graph TB
    subgraph "Remote Server — 2.59.170.200 (fin-boost.com)"
        NGINX["nginx<br>SSL/443"]
        API["api<br>FastAPI + React SPA<br>:8000"]
        DB["db<br>PostgreSQL 15 + pgvector<br>:5432"]
        REDIS["redis<br>Redis 7<br>:6379"]
        NGINX --> API
        API --> DB
        API --> REDIS
    end

    subgraph "Local Server — GPU, Docker"
        ORCH["dialog-orchestrator<br>Pyrogram + DialogManager + GoIP"]
        ADMIN["admin-bot<br>aiogram 3.x"]
        CW["celery-worker<br>concurrency=4"]
        CWW["celery-worker-warming<br>concurrency=2"]
        CB["celery-beat<br>Планировщик"]
        EMU["emulator-worker<br>HOST, systemd"]
        EMB["embedding-worker<br>HOST, systemd, GPU"]
    end

    ORCH -->|asyncpg / aioredis| DB
    ORCH -->|asyncpg / aioredis| REDIS
    CW --> DB
    CW --> REDIS
    ADMIN -->|"Redis pub/sub<br>admin:commands"| REDIS
    EMU -->|"Redis BLPOP/RPUSH"| REDIS
    EMB -->|"Redis BRPOP"| REDIS
    EMB --> DB

Единый Redis

Все сервисы используют один Redis (на remote-сервере). Локальные сервисы подключаются через mapped порты (2.59.170.200:6381). Это обеспечивает работу pub/sub между remote API и local orchestrator.

Структура каталогов

chat_bot/
├── dialog_orchestrator/  # Standalone entrypoint (main.py)
├── app/                  # Pydantic Settings (config.py)
├── core/                 # AI engine, tools, memory, dialog/, prompts/
│   ├── dialog/           # 9 суб-модулей DialogManager
│   ├── prompts/          # 28 секций, section_catalog.py
│   ├── ai_engine.py      # Anthropic SDK / CLI
│   ├── dialog_manager.py # Тонкий координатор (571 строк)
│   └── rag_engine.py     # pgvector semantic search
├── db/                   # SQLAlchemy 2.0 async: 28 моделей, Repository
├── telegram/             # Pyrogram pool, sender, warmer, GoIP, emulator
├── config/               # Anti-ban rules, constants
├── api/                  # FastAPI REST API (108 endpoints) + React SPA
├── web/                  # React SPA (Vite + React 18 + TypeScript)
├── admin_bot/            # aiogram 3.x admin bot
├── workers/              # Celery tasks (22 задачи)
├── scripts/              # CLI утилиты
└── alembic/              # DB миграции (async)

Ключевые паттерны

Async-First

Вся система построена на asyncio: Pyrogram (MTProto), asyncpg (PostgreSQL), aioredis. Celery-задачи запускают asyncio.run() для взаимодействия с async-кодом.

Session-per-Request

Repository(session) создаётся на каждый запрос. В API -- через FastAPI dependency, в DialogManager -- через async context manager.

Distributed Locks

DistributedLock (Redis) для TOCTOU-защиты:

  • client:{id} -- предотвращает двойной первый контакт
  • dialog:{account}:{user} -- debounce обработки сообщений

Атомарный выбор аккаунта

SELECT ... FROM tg_accounts
WHERE status = 'active' AND active_dialogs < max_dialogs
FOR UPDATE SKIP LOCKED

Предотвращает гонки при одновременном назначении аккаунтов.

DialogManager -- координатор

DialogManager (571 строк) -- тонкий координатор, связывающий 9 суб-модулей:

Модуль Класс Ответственность
circuit_breaker.py CircuitBreaker Трекинг ошибок, cooldown (5 ошибок = 10 мин)
debounce.py DebounceManager Adaptive debounce, typing state, Redis-буферы
incoming_pipeline.py IncomingPipeline Сохранение, буферизация, dispatch
dialog_processor.py DialogProcessor AI call, parse, tools, отправка
first_contact.py FirstContactHandler Исходящий первый контакт
followup_processor.py FollowupProcessor Планирование и выполнение follow-up
background_tasks.py BackgroundTaskManager Redis listeners, startup recovery
cross_context.py CrossContextLoader Кросс-диалоговый контекст + RAG
account_recovery.py AccountRecoveryManager Health checks, proxy rotation, auto-recovery

Поток данных: входящее сообщение

sequenceDiagram
    participant D as Должник
    participant TG as Telegram MTProto
    participant AP as AccountPool
    participant DM as DialogManager
    participant AI as AIEngine
    participant DB as PostgreSQL
    participant MS as MessageSender

    D->>TG: Сообщение
    TG->>AP: Pyrogram handler
    AP->>DM: handle_incoming()
    DM->>DB: Сохранение Message (incoming)
    DM->>DM: Debounce buffer
    DM->>AI: process_message()
    Note over AI: Загрузка memory + контекст<br>+ правила кампании<br>+ аналитика
    AI-->>DM: JSON response + actions
    DM->>DM: ToolExecutor (actions)
    DM->>MS: send_message()
    MS->>TG: Ответ с антибан-задержками
    DM->>DB: Сохранение Message (outgoing)

Поток данных: первый контакт

sequenceDiagram
    participant CB as Celery Beat
    participant DM as DialogManager
    participant DB as PostgreSQL
    participant AI as AIEngine
    participant TG as Telegram

    CB->>DM: initiate_new_contacts (каждые 10 мин, 9-19ч)
    DM->>DB: get_clients_for_contact()
    DM->>DB: get_available_account() [FOR UPDATE SKIP LOCKED]
    DM->>AI: generate_first_contact()
    Note over AI: parse_success=false<br>→ отмена отправки
    AI-->>DM: JSON response
    DM->>TG: ImportContacts + send_message
    DM->>DB: Создание Conversation
    DM->>DM: Engagement ping через 16-20 мин

8 инструментов AI-агента

Tool Назначение Влияние на аналитику
record_payment_promise Фиксация обещания оплаты negotiation_stage, discount_offered
update_client_reason Причина неоплаты debtor_motive, objection_type
schedule_followup Follow-up через N часов (min 4h) followup_number++
escalate_to_human Передача оператору negotiation_stage="closing"
send_payment_link Ссылка на оплату --
update_memory Обновление памяти клиента installment_offered/months
close_conversation Закрытие (paid/refused/unreachable) debtor_type, namespace tags
make_flash_call Короткий звонок 3-8 сек --

Модель данных

erDiagram
    Client ||--o{ Conversation : has
    Conversation ||--|| TgAccount : uses
    Conversation ||--o{ Message : contains
    Conversation ||--o{ AgentAction : logs
    Conversation ||--o{ MessageEmbedding : embeds
    Client ||--o{ MessageEmbedding : has
    Department ||--o{ Client : assigns
    Department ||--o{ TgAccount : assigns
    Proxy ||--o{ TgAccount : connects
    Company ||--o{ DocumentTemplate : owns
    Conversation ||--o| StrategyOutcome : tracks

28 ORM-моделей, 39 Alembic-миграций, PostgreSQL 15 + pgvector для RAG.

Inter-Service Communication

Канал Направление Назначение
Redis pub/sub admin:commands admin-bot --> orchestrator Тестовый контакт, отправка от оператора
Redis pub/sub config:*:reload API --> orchestrator Hot-reload departments, company, workflows
Redis queue emulator:requests orchestrator --> emulator-worker Регистрация через Android-эмулятор
Redis queue embedding_queue orchestrator --> embedding-worker GPU-embed сообщений для RAG
Celery (Redis broker) beat --> workers 22 периодические задачи

Resilience

  • Circuit Breaker: 5 последовательных ошибок на аккаунт = 10 мин cooldown
  • FloodWait cap: ожидание > 120с = FloodWaitExceeded (не блокирует async-цикл)
  • Auto-Recovery: disabled аккаунты восстанавливаются через SMS (30с) или эмулятор (5 мин)
  • Dead Proxy Restart: каждые 2 мин проверка прокси, мёртвые = ротация + рестарт
  • Connection Resilience: socket timeouts, keepalive, retry_on_timeout для Redis; exponential backoff для PostgreSQL