Архитектура¶
Система построена на принципах асинхронности, модульности и split-topology: данные и веб на удалённом сервере, воркеры и Telegram -- на локальном.
Split-Topology¶
graph TB
subgraph "Remote Server — 2.59.170.200 (fin-boost.com)"
NGINX["nginx<br>SSL/443"]
API["api<br>FastAPI + React SPA<br>:8000"]
DB["db<br>PostgreSQL 15 + pgvector<br>:5432"]
REDIS["redis<br>Redis 7<br>:6379"]
NGINX --> API
API --> DB
API --> REDIS
end
subgraph "Local Server — GPU, Docker"
ORCH["dialog-orchestrator<br>Pyrogram + DialogManager + GoIP"]
ADMIN["admin-bot<br>aiogram 3.x"]
CW["celery-worker<br>concurrency=4"]
CWW["celery-worker-warming<br>concurrency=2"]
CB["celery-beat<br>Планировщик"]
EMU["emulator-worker<br>HOST, systemd"]
EMB["embedding-worker<br>HOST, systemd, GPU"]
end
ORCH -->|asyncpg / aioredis| DB
ORCH -->|asyncpg / aioredis| REDIS
CW --> DB
CW --> REDIS
ADMIN -->|"Redis pub/sub<br>admin:commands"| REDIS
EMU -->|"Redis BLPOP/RPUSH"| REDIS
EMB -->|"Redis BRPOP"| REDIS
EMB --> DB
Единый Redis
Все сервисы используют один Redis (на remote-сервере). Локальные сервисы подключаются через mapped порты (2.59.170.200:6381). Это обеспечивает работу pub/sub между remote API и local orchestrator.
Структура каталогов¶
chat_bot/
├── dialog_orchestrator/ # Standalone entrypoint (main.py)
├── app/ # Pydantic Settings (config.py)
├── core/ # AI engine, tools, memory, dialog/, prompts/
│ ├── dialog/ # 9 суб-модулей DialogManager
│ ├── prompts/ # 28 секций, section_catalog.py
│ ├── ai_engine.py # Anthropic SDK / CLI
│ ├── dialog_manager.py # Тонкий координатор (571 строк)
│ └── rag_engine.py # pgvector semantic search
├── db/ # SQLAlchemy 2.0 async: 28 моделей, Repository
├── telegram/ # Pyrogram pool, sender, warmer, GoIP, emulator
├── config/ # Anti-ban rules, constants
├── api/ # FastAPI REST API (108 endpoints) + React SPA
├── web/ # React SPA (Vite + React 18 + TypeScript)
├── admin_bot/ # aiogram 3.x admin bot
├── workers/ # Celery tasks (22 задачи)
├── scripts/ # CLI утилиты
└── alembic/ # DB миграции (async)
Ключевые паттерны¶
Async-First¶
Вся система построена на asyncio: Pyrogram (MTProto), asyncpg (PostgreSQL), aioredis. Celery-задачи запускают asyncio.run() для взаимодействия с async-кодом.
Session-per-Request¶
Repository(session) создаётся на каждый запрос. В API -- через FastAPI dependency, в DialogManager -- через async context manager.
Distributed Locks¶
DistributedLock (Redis) для TOCTOU-защиты:
client:{id}-- предотвращает двойной первый контактdialog:{account}:{user}-- debounce обработки сообщений
Атомарный выбор аккаунта¶
SELECT ... FROM tg_accounts
WHERE status = 'active' AND active_dialogs < max_dialogs
FOR UPDATE SKIP LOCKED
Предотвращает гонки при одновременном назначении аккаунтов.
DialogManager -- координатор¶
DialogManager (571 строк) -- тонкий координатор, связывающий 9 суб-модулей:
| Модуль | Класс | Ответственность |
|---|---|---|
circuit_breaker.py |
CircuitBreaker | Трекинг ошибок, cooldown (5 ошибок = 10 мин) |
debounce.py |
DebounceManager | Adaptive debounce, typing state, Redis-буферы |
incoming_pipeline.py |
IncomingPipeline | Сохранение, буферизация, dispatch |
dialog_processor.py |
DialogProcessor | AI call, parse, tools, отправка |
first_contact.py |
FirstContactHandler | Исходящий первый контакт |
followup_processor.py |
FollowupProcessor | Планирование и выполнение follow-up |
background_tasks.py |
BackgroundTaskManager | Redis listeners, startup recovery |
cross_context.py |
CrossContextLoader | Кросс-диалоговый контекст + RAG |
account_recovery.py |
AccountRecoveryManager | Health checks, proxy rotation, auto-recovery |
Поток данных: входящее сообщение¶
sequenceDiagram
participant D as Должник
participant TG as Telegram MTProto
participant AP as AccountPool
participant DM as DialogManager
participant AI as AIEngine
participant DB as PostgreSQL
participant MS as MessageSender
D->>TG: Сообщение
TG->>AP: Pyrogram handler
AP->>DM: handle_incoming()
DM->>DB: Сохранение Message (incoming)
DM->>DM: Debounce buffer
DM->>AI: process_message()
Note over AI: Загрузка memory + контекст<br>+ правила кампании<br>+ аналитика
AI-->>DM: JSON response + actions
DM->>DM: ToolExecutor (actions)
DM->>MS: send_message()
MS->>TG: Ответ с антибан-задержками
DM->>DB: Сохранение Message (outgoing)
Поток данных: первый контакт¶
sequenceDiagram
participant CB as Celery Beat
participant DM as DialogManager
participant DB as PostgreSQL
participant AI as AIEngine
participant TG as Telegram
CB->>DM: initiate_new_contacts (каждые 10 мин, 9-19ч)
DM->>DB: get_clients_for_contact()
DM->>DB: get_available_account() [FOR UPDATE SKIP LOCKED]
DM->>AI: generate_first_contact()
Note over AI: parse_success=false<br>→ отмена отправки
AI-->>DM: JSON response
DM->>TG: ImportContacts + send_message
DM->>DB: Создание Conversation
DM->>DM: Engagement ping через 16-20 мин
8 инструментов AI-агента¶
| Tool | Назначение | Влияние на аналитику |
|---|---|---|
record_payment_promise |
Фиксация обещания оплаты | negotiation_stage, discount_offered |
update_client_reason |
Причина неоплаты | debtor_motive, objection_type |
schedule_followup |
Follow-up через N часов (min 4h) | followup_number++ |
escalate_to_human |
Передача оператору | negotiation_stage="closing" |
send_payment_link |
Ссылка на оплату | -- |
update_memory |
Обновление памяти клиента | installment_offered/months |
close_conversation |
Закрытие (paid/refused/unreachable) | debtor_type, namespace tags |
make_flash_call |
Короткий звонок 3-8 сек | -- |
Модель данных¶
erDiagram
Client ||--o{ Conversation : has
Conversation ||--|| TgAccount : uses
Conversation ||--o{ Message : contains
Conversation ||--o{ AgentAction : logs
Conversation ||--o{ MessageEmbedding : embeds
Client ||--o{ MessageEmbedding : has
Department ||--o{ Client : assigns
Department ||--o{ TgAccount : assigns
Proxy ||--o{ TgAccount : connects
Company ||--o{ DocumentTemplate : owns
Conversation ||--o| StrategyOutcome : tracks
28 ORM-моделей, 39 Alembic-миграций, PostgreSQL 15 + pgvector для RAG.
Inter-Service Communication¶
| Канал | Направление | Назначение |
|---|---|---|
Redis pub/sub admin:commands |
admin-bot --> orchestrator | Тестовый контакт, отправка от оператора |
Redis pub/sub config:*:reload |
API --> orchestrator | Hot-reload departments, company, workflows |
Redis queue emulator:requests |
orchestrator --> emulator-worker | Регистрация через Android-эмулятор |
Redis queue embedding_queue |
orchestrator --> embedding-worker | GPU-embed сообщений для RAG |
| Celery (Redis broker) | beat --> workers | 22 периодические задачи |
Resilience¶
- Circuit Breaker: 5 последовательных ошибок на аккаунт = 10 мин cooldown
- FloodWait cap: ожидание > 120с =
FloodWaitExceeded(не блокирует async-цикл) - Auto-Recovery: disabled аккаунты восстанавливаются через SMS (30с) или эмулятор (5 мин)
- Dead Proxy Restart: каждые 2 мин проверка прокси, мёртвые = ротация + рестарт
- Connection Resilience: socket timeouts, keepalive, retry_on_timeout для Redis; exponential backoff для PostgreSQL