RAG Search¶
Система семантического поиска на базе pgvector и GPU-эмбеддингов. Заменяет подход "последние N сообщений" для кросс-диалогового контекста --- AI получает семантически релевантные фрагменты из истории переговоров вместо случайных последних.
Feature Flag
RAG_ENABLED=false по умолчанию. Когда выключен --- zero impact, система работает в legacy-режиме. Все RAG-компоненты проверяют этот флаг перед выполнением.
Архитектура¶
graph TB
subgraph Write["Запись эмбеддингов"]
MSG[Входящее сообщение] --> DM[DialogManager]
DM --> DB_MSG[(messages)]
DM -->|LPUSH| REDIS_Q[Redis: embedding_queue]
REDIS_Q -->|BRPOP| EW[Embedding Worker<br/>HOST, GPU]
EW -->|SentenceTransformer.encode| GPU[CUDA]
GPU --> PG[(message_embeddings<br/>pgvector)]
end
subgraph Read["Поиск при ответе"]
IN[Запрос AI] --> LC[_load_cross_context]
LC --> EMB[RAGEngine.embed_text]
EMB --> CROSS[search_relevant_messages<br/>этот клиент]
EMB --> FEW[search_success_patterns<br/>outcome=paid]
CROSS --> BUILD[_build_cross_context_rag]
FEW --> BUILD
BUILD --> PROMPT[AI промпт с RAG-контекстом]
end
Модели эмбеддингов¶
| Модель | Размерность | Устройство | Статус |
|---|---|---|---|
| Qwen/Qwen3-Embedding-8B | 4096 | GPU (CUDA) | Основная |
| Qwen/Qwen3-Embedding-0.6B | 1024 | GPU/CPU | Rollback |
| BAAI/bge-m3 | 1024 | GPU/CPU | Legacy rollback |
Voyage AI (voyage-multilingual-2) |
API | Cloud | Fallback |
Приоритет: локальная GPU-модель --> Voyage AI API --> RAG отключен.
Два режима поиска¶
Cross-dialog (этот клиент)¶
Поиск семантически похожих сообщений из прошлых бесед текущего клиента. Исключает текущую беседу.
SELECT content_text, 1 - (embedding <=> :query_vec) AS similarity
FROM message_embeddings
WHERE client_id = :client_id
AND conversation_id != :current_conv
AND 1 - (embedding <=> :query_vec) >= :min_sim
ORDER BY embedding <=> :query_vec
LIMIT :top_k
Few-shot (успешные переговоры)¶
Поиск релевантных фрагментов из бесед других клиентов с исходом paid --- примеры успешных переговоров.
SELECT content_text, 1 - (embedding <=> :query_vec) AS similarity
FROM message_embeddings
WHERE conversation_outcome = 'paid'
AND client_id != :current_client
AND 1 - (embedding <=> :query_vec) >= :min_sim
ORDER BY embedding <=> :query_vec
LIMIT :top_k
Оператор <=>
pgvector cosine distance. Similarity = 1 - distance. HNSW-индекс ускоряет поиск (m=16, ef_construction=64).
Embedding Worker¶
HOST-процесс (не Docker) --- GPU на хосте для вычисления эмбеддингов. Запускается как systemd-сервис.
graph LR
Q[Redis BRPOP<br/>embedding_queue] --> BATCH[Накопление batch<br/>до 64 текстов / 2с]
BATCH --> ENCODE[GPU encode<br/>SentenceTransformer]
ENCODE --> INSERT[INSERT INTO<br/>message_embeddings<br/>ON CONFLICT DO NOTHING]
INSERT --> Q
| Параметр | Значение | Описание |
|---|---|---|
QUEUE_NAME |
embedding_queue |
Redis очередь |
BATCH_TIMEOUT |
2 сек | Flush неполного batch |
WORKER_LOCK_KEY |
embedding:worker:singleton |
Redis lock (TTL 120s) |
| Batch size | до 64 | Настраивается через EMBEDDING_BATCH_SIZE |
Запуск:
# systemd (рекомендуется)
sudo systemctl start embedding-worker
# Вручную
python -m core.embedding_worker
Таблица message_embeddings¶
| Поле | Тип | Описание |
|---|---|---|
id |
SERIAL PK | Автоинкремент |
message_id |
FK --> messages | Nullable (для summaries) |
conversation_id |
FK --> conversations | NOT NULL, CASCADE |
client_id |
FK --> clients | NOT NULL, CASCADE |
source |
ENUM | message / conv_summary / key_moment |
content_text |
TEXT | Текст (до 2000 символов) |
embedding |
vector(4096) | pgvector вектор |
direction |
VARCHAR(20) | incoming / outgoing |
conversation_outcome |
VARCHAR(50) | Заполняется при close |
debtor_type |
VARCHAR(50) | Тип должника |
embedding_model |
VARCHAR(100) | Имя модели |
Индексы: B-tree на message_id, conversation_id, client_id, conversation_outcome, campaign_name + HNSW на embedding (vector_cosine_ops).
Token Budget¶
RAG не увеличивает общий токен-бюджет промпта --- он заменяет legacy cross-dialog контекст:
| Компонент | Legacy | С RAG |
|---|---|---|
| System prompt | ~15-20K chars | ~15-20K chars |
| Conversation history | ~5-10K chars | ~5-10K chars |
| Cross-dialog context | ~5-15K chars | до 3000 tokens |
Конфигурация¶
| Переменная | Default | Описание |
|---|---|---|
RAG_ENABLED |
false |
Включить RAG |
EMBEDDING_MODEL |
Qwen/Qwen3-Embedding-8B |
Модель |
EMBEDDING_DIMENSIONS |
4096 |
Размерность вектора |
EMBEDDING_DEVICE |
cpu |
cpu / cuda |
EMBEDDING_BATCH_SIZE |
64 |
Размер batch |
VOYAGE_API_KEY |
"" |
API fallback |
RAG_TOP_K_CROSS_DIALOG |
8 |
Top-K cross-dialog |
RAG_TOP_K_FEW_SHOT |
5 |
Top-K few-shot |
RAG_MAX_TOKEN_BUDGET |
3000 |
Лимит токенов RAG-контекста |
RAG_MIN_SIMILARITY |
0.35 |
Порог cosine similarity |
Safety Net (Celery)¶
Celery-задача embed_pending_messages запускается каждые 10 минут и находит сообщения без эмбеддингов:
SELECT messages LEFT JOIN message_embeddings
WHERE me.id IS NULL LIMIT 500
--> Redis LPUSH "embedding_queue"
Дополнительно: sync_embedding_metadata (каждые 30 мин) синхронизирует outcome, debtor_type, negotiation_stage в существующие эмбеддинги.
API Endpoints¶
| Endpoint | Метод | Описание |
|---|---|---|
/api/search/stats |
GET | Статистика RAG: кол-во эмбеддингов, размер очереди |
/api/search |
POST | Семантический поиск (query, top_k, фильтры) |
/api/rag/status |
GET | Статус зависимостей и модели |
/api/rag/graph |
GET | Payload для UI-графа |
/api/rag/reload-model |
POST | Перезагрузка модели (hot-reload) |
503 при выключенном RAG
Endpoints /api/search и /api/search/stats возвращают 503 если RAG_ENABLED=false.
Обработка ошибок¶
| Ситуация | Поведение |
|---|---|
| Модель не загрузилась | RAG де-факто выключен |
| Voyage API недоступен | embed пропускается |
Ошибка в _enqueue_embedding |
Логирование, safety net подхватит |
| RAG поиск упал | Fallback на legacy last-N |
| Worker не запущен | Сообщения накапливаются в Redis |
| Нет GPU | EMBEDDING_DEVICE=cpu или Voyage AI |
Ключевые файлы¶
| Файл | Назначение |
|---|---|
core/rag_engine.py |
RAGEngine --- embed, search, format |
core/embedding_worker.py |
HOST GPU worker |
db/models/message_embedding.py |
ORM-модель |
api/routes/search.py |
REST API поиска |
api/routes/rag.py |
Диагностика RAG |
core/rag_diagnostics.py |
Health-проверки |
scripts/backfill_embeddings.py |
Backfill исторических данных |