привилегия При первом заказе - ЗВУК премиум класса со скидкой 50%
Узнайте стоимость онлайн трансляции — за 4 шага
Выберите что будем транслировать?
Дальше
Осталось 4 шагов из 4
Узнайте стоимость онлайн трансляции — за 4 шага
Выберите место трансляции?
Дальше
Осталось 3 шагов из 4
Узнайте стоимость онлайн трансляции — за 4 шага
Куда хотите транслировать?
Дальше
Осталось 3 шагов из 4
Узнайте стоимость онлайн трансляции — за 4 шага
Укажите способ связи:
SRT-хаб — Telegram-бот для управления SRT потоками

SRT-хаб: Telegram-бот для управления SRT потоками

Полная инструкция по установке и коду: Ubuntu 24 + Python + aiogram + ffmpeg + SRT

1. Архитектура и концепция

Бот управляет SRT-потоками на сервере (VPS) через ffmpeg. Все соединения фактически организованы так:

  • Входящий поток (input):
    • ffmpeg принимает SRT-поток в режиме listener на сервере (порт 5000–5020);
    • выводит транспортный поток в локальный udp://127.0.0.1:internal_port.
  • Исходящий поток (output):
    • ffmpeg читает из локального udp://127.0.0.1:internal_port;
    • отдаёт SRT-поток в режиме listener на сервере (порт 7000–7100).

Клиенты на площадках / студиях подключаются к srt://ru.prostudio.net:порт в режиме caller. Сервер выступает как SRT-хаб с белым IP/доменом.

Лимит: одновременно не более 20 входящих потоков в статусе running. Исходящих потоков — по свободным портам 7000–7100.

2. Структура проекта

/opt/srt-bot/
  venv/                      # виртуальное окружение Python
  app/
    __init__.py
    main.py                  # точка входа бота
    config.py                # загрузка настроек из .env
    bot/
      __init__.py
      messages.py            # все текстовые сообщения и подписи кнопок
      keyboards.py           # reply/inline клавиатуры
      handlers.py            # все обработчики aiogram
    core/
      __init__.py
      models.py              # dataclass-модели входящих/исходящих потоков
      storage.py             # загрузка/сохранение состояния (JSON)
      ffmpeg_manager.py      # запуск/остановка ffmpeg, управление PID
      analyzer.py            # анализ логов ffmpeg (битрейт, разрешение, длительность)
  data/
    state.json               # состояние потоков (создаётся автоматически)
  logs/                      # логи ffmpeg (создаётся автоматически)
  .env                       # конфигурация (токен, домен, порты, пути)
  requirements.txt
/etc/systemd/system/srt-bot.service  # systemd unit

3. Шаги установки на Ubuntu 24

3.1. Создание каталога и установка системных пакетов

sudo mkdir -p /opt/srt-bot
sudo chown "$(whoami)":"$(whoami)" /opt/srt-bot
cd /opt/srt-bot

sudo apt update
sudo apt install -y python3 python3-venv python3-pip ffmpeg

3.2. Виртуальное окружение и Python-зависимости

cd /opt/srt-bot

python3 -m venv venv
./venv/bin/pip install --upgrade pip

Создай файл requirements.txt:

aiogram>=3.0.0,<4.0.0
python-dotenv>=1.0.0

Установи зависимости:

./venv/bin/pip install -r requirements.txt

3.3. Базовая структура каталогов

cd /opt/srt-bot

mkdir -p app/bot app/core data logs
touch app/__init__.py app/bot/__init__.py app/core/__init__.py

4. Конфигурация: файл .env

Создай файл /opt/srt-bot/.env:

TELEGRAM_BOT_TOKEN=YOUR_TELEGRAM_BOT_TOKEN_HERE

# Публичный домен или IP сервера
SERVER_PUBLIC_IP=ru.prostudio.net

# Telegram user_id администратора (будет получать уведомления о действиях)
ADMIN_USER_ID=54660228

# Диапазон портов для входящих потоков
INCOMING_PORT_START=5000
INCOMING_PORT_END=5020

# Диапазон портов для исходящих потоков
OUTGOING_PORT_START=7000
OUTGOING_PORT_END=7100

# Максимальное количество одновременно running входящих потоков
MAX_INCOMING_STREAMS=20

# Пути к файлам состояния и логам
STATE_FILE=/opt/srt-bot/data/state.json
LOGS_DIR=/opt/srt-bot/logs

Замените YOUR_TELEGRAM_BOT_TOKEN_HERE на реальный токен бота, при необходимости поправьте ADMIN_USER_ID и домен.

5. Код приложения (Python)

5.1. app/config.py

Назначение: загрузка настроек из .env в объект Settings.
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Tuple, Optional

from dotenv import load_dotenv


@dataclass
class Settings:
    bot_token: str
    admin_user_id: Optional[int]
    server_public_ip: str
    incoming_port_range: Tuple[int, int]
    outgoing_port_range: Tuple[int, int]
    max_incoming_streams: int
    state_file: str
    logs_dir: str


def load_settings() -> Settings:
    base_dir = Path(__file__).resolve().parent.parent
    env_path = base_dir / ".env"

    if env_path.exists():
        load_dotenv(env_path)

    bot_token = os.getenv("TELEGRAM_BOT_TOKEN")
    if not bot_token:
        raise RuntimeError("TELEGRAM_BOT_TOKEN is not set in .env")

    admin_raw = os.getenv("ADMIN_USER_ID")
    admin_user_id = int(admin_raw) if admin_raw else None

    server_public_ip = os.getenv("SERVER_PUBLIC_IP", "ru.prostudio.net")

    incoming_start = int(os.getenv("INCOMING_PORT_START", "5000"))
    incoming_end = int(os.getenv("INCOMING_PORT_END", "5020"))

    outgoing_start = int(os.getenv("OUTGOING_PORT_START", "7000"))
    outgoing_end = int(os.getenv("OUTGOING_PORT_END", "7100"))

    max_incoming = int(os.getenv("MAX_INCOMING_STREAMS", "20"))

    state_file = os.getenv("STATE_FILE", str(base_dir / "data" / "state.json"))
    logs_dir = os.getenv("LOGS_DIR", str(base_dir / "logs"))

    return Settings(
        bot_token=bot_token,
        admin_user_id=admin_user_id,
        server_public_ip=server_public_ip,
        incoming_port_range=(incoming_start, incoming_end),
        outgoing_port_range=(outgoing_start, outgoing_end),
        max_incoming_streams=max_incoming,
        state_file=state_file,
        logs_dir=logs_dir,
    )

5.2. app/main.py

Назначение: точка входа; поднимает aiogram-бота, регистрирует middleware с settings.
import asyncio
import logging

from aiogram import Bot, Dispatcher, BaseMiddleware
from aiogram.client.default import DefaultBotProperties
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.types import TelegramObject

from .config import load_settings, Settings
from .bot.handlers import router as bot_router


class SettingsMiddleware(BaseMiddleware):
    """
    Прокидывает объект Settings в каждый хэндлер как параметр `settings: Settings`.
    """

    def __init__(self, settings: Settings):
        super().__init__()
        self.settings = settings

    async def __call__(
        self,
        handler,
        event: TelegramObject,
        data: dict,
    ):
        data["settings"] = self.settings
        return await handler(event, data)


async def main():
    logging.basicConfig(
        level=logging.INFO,
        format="[%(asctime)s] [%(levelname)s] %(name)s: %(message)s",
    )

    settings = load_settings()

    bot = Bot(
        token=settings.bot_token,
        default=DefaultBotProperties(parse_mode="HTML"),
    )

    dp = Dispatcher(storage=MemoryStorage())
    dp.include_router(bot_router)

    # Подключаем middleware, чтобы в хэндлеры приходил `settings`
    dp.update.middleware.register(SettingsMiddleware(settings))

    await dp.start_polling(bot)


if __name__ == "__main__":
    asyncio.run(main())

5.3. app/bot/messages.py

Назначение: все текстовые сообщения бота и подписи кнопок.
# Все текстовые сообщения и подписи кнопок бота

START_MESSAGE = (
    "Привет!\n"
    "Это SRT-хаб — твой инструмент для быстрой пересылки сигнала.\n\n"
    "Создай входящий поток, добавь исходящий — и сигнал пойдёт через сервер ru.prostudio.net."
)

MAIN_MENU_PROMPT = "Выберите действие в меню ниже."


# --- Подписи кнопок главного меню ---

BTN_CREATE_INCOMING = "⬆️ add input"
BTN_ADD_OUTGOING = "⬇️ add output"
BTN_LIST_MY_STREAMS = "Меню потоков"

# Старые константы (на будущее, сейчас напрямую не используются)
BTN_STOP_OR_DELETE = "Остановить / удалить поток"
BTN_STREAM_INFO = "Информация о потоке"


# --- Тексты для создания входящего потока ---

ASK_INCOMING_NAME = "Введите название входящего потока (любой текст):"

ASK_PASSPHRASE_NEEDED = (
    "Нужно ли защитить входящий поток паролем (passphrase)?\n"
    "Ответьте 'да' или 'нет'."
)

ASK_PASSPHRASE_IN = (
    "Введите passphrase для входящего потока (минимум 10 символов):"
)


# --- Тексты для создания исходящего потока ---

ASK_SELECT_INCOMING_FOR_OUTGOING = (
    "Выберите входящий поток, от которого нужно сделать исходящий:"
)

ASK_PASSPHRASE_OUT_NEEDED = (
    "Нужно ли защитить исходящий поток паролем (passphrase)?\n"
    "Ответьте 'да' или 'нет'."
)

ASK_PASSPHRASE_OUT = (
    "Введите passphrase для исходящего потока (минимум 10 символов):"
)


# --- Общие сообщения и ошибки ---

INVALID_INPUT = "Некорректный ввод. Попробуйте ещё раз."
PASSPHRASE_TOO_SHORT = "Passphrase должен быть не короче 10 символов. Попробуйте ещё раз."

NO_STREAMS_YET = "У вас пока нет активных потоков."
NO_INCOMING_SELECTED = "Сначала создайте хотя бы один входящий поток."

INCOMING_LIMIT_REACHED = (
    "Достигнуто максимальное количество входящих потоков (20).\n"
    "Сначала остановите и удалите один из существующих потоков."
)

INCOMING_CREATE_ERROR = "Ошибка при создании входящего потока. Проверьте логи ffmpeg на сервере."
OUTGOING_CREATE_ERROR = "Ошибка при создании исходящего потока. Проверьте логи ffmpeg на сервере."


# --- Успешное создание потоков ---

INCOMING_CREATED_OK = "Входящий поток создан и запущен.\n\n{details}"
OUTGOING_CREATED_OK = "Исходящий поток создан и запущен.\n\n{details}"


# --- Информация и удаление ---

INFO_HEADER = "Информация о потоке:\n\n{details}"
DELETE_OK = "Поток удалён"

5.4. app/bot/keyboards.py

Назначение: главное меню, клавиатура Да/Нет и список потоков.
from aiogram.types import (
    ReplyKeyboardMarkup,
    KeyboardButton,
    InlineKeyboardMarkup,
    InlineKeyboardButton,
)

from . import messages


def main_menu_keyboard() -> ReplyKeyboardMarkup:
    """
    Главное меню:
    [⬆️ add input] [⬇️ add output]
    [Меню потоков]
    """
    kb = ReplyKeyboardMarkup(
        keyboard=[
            [
                KeyboardButton(text=messages.BTN_CREATE_INCOMING),
                KeyboardButton(text=messages.BTN_ADD_OUTGOING),
            ],
            [
                KeyboardButton(text=messages.BTN_LIST_MY_STREAMS),
            ],
        ],
        resize_keyboard=True,
    )
    return kb


def yes_no_keyboard() -> ReplyKeyboardMarkup:
    """
    Клавиатура 'Да' / 'Нет' для вопросов про passphrase.
    """
    kb = ReplyKeyboardMarkup(
        keyboard=[
            [
                KeyboardButton(text="Да"),
                KeyboardButton(text="Нет"),
            ]
        ],
        resize_keyboard=True,
        one_time_keyboard=True,
    )
    return kb


def incoming_list_inline_keyboard(
    streams,
    current_user_id: int,
    admin_user_id: int | None,
) -> InlineKeyboardMarkup:
    """
    Инлайн-клавиатура со списком входящих потоков.
    Используется и для 'Меню потоков', и для выбора входящего
    при создании исходящего.
    """
    rows = []
    for s in streams:
        label = f"{s.name} ({s.local_port_in})"
        # Для админа показываем владельца
        if admin_user_id and current_user_id == admin_user_id and s.user_id != admin_user_id:
            label += f" [user {s.user_id}]"

        rows.append(
            [
                InlineKeyboardButton(
                    text=label,
                    callback_data=f"incoming:{s.id}",
                )
            ]
        )

    return InlineKeyboardMarkup(inline_keyboard=rows)

5.5. app/core/models.py

Назначение: описание входящих и исходящих потоков (dataclass).
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Optional


class StreamStatus(str, Enum):
    RUNNING = "running"
    STOPPED = "stopped"


@dataclass
class OutgoingStream:
    id: str
    user_id: int
    local_port_out: int
    remote_host_out: str
    remote_port_out: int
    passphrase_out: Optional[str]
    latency_out: int
    status: StreamStatus = StreamStatus.STOPPED
    pid: Optional[int] = None
    log_path: Optional[str] = None
    start_time: Optional[float] = None
    stop_time: Optional[float] = None


@dataclass
class IncomingStream:
    id: str
    user_id: int
    name: str
    local_port_in: int
    internal_port: int
    remote_host_in: str
    remote_port_in: int
    passphrase_in: Optional[str]
    latency_in: int
    status: StreamStatus = StreamStatus.STOPPED
    pid: Optional[int] = None
    log_path: Optional[str] = None
    start_time: Optional[float] = None
    stop_time: Optional[float] = None
    outgoing_streams: List[OutgoingStream] = field(default_factory=list)


def create_incoming_stream(
    user_id: int,
    name: str,
    local_port_in: int,
    remote_host_in: str,
    remote_port_in: int,
    passphrase_in: Optional[str],
    latency: int,
) -> IncomingStream:
    """
    Создаёт объект IncomingStream.
    internal_port берём как local_port_in + 1000 (5000 -> 6000 и т.п.).
    """
    internal_port = local_port_in + 1000
    return IncomingStream(
        id=str(uuid.uuid4()),
        user_id=user_id,
        name=name,
        local_port_in=local_port_in,
        internal_port=internal_port,
        remote_host_in=remote_host_in,
        remote_port_in=remote_port_in,
        passphrase_in=passphrase_in,
        latency_in=latency,
        status=StreamStatus.STOPPED,
        pid=None,
        log_path=None,
        start_time=None,
        stop_time=None,
        outgoing_streams=[],
    )


def create_outgoing_stream(
    user_id: int,
    local_port_out: int,
    remote_host_out: str,
    remote_port_out: int,
    passphrase_out: Optional[str],
    latency: int,
) -> OutgoingStream:
    return OutgoingStream(
        id=str(uuid.uuid4()),
        user_id=user_id,
        local_port_out=local_port_out,
        remote_host_out=remote_host_out,
        remote_port_out=remote_port_out,
        passphrase_out=passphrase_out,
        latency_out=latency,
        status=StreamStatus.STOPPED,
        pid=None,
        log_path=None,
        start_time=None,
        stop_time=None,
    )

5.6. app/core/storage.py

Назначение: состояние потоков в JSON + лимит 20 входящих.
import json
import os
from dataclasses import dataclass, field
from typing import List, Optional

from .models import IncomingStream, OutgoingStream, StreamStatus
from ..config import Settings


@dataclass
class AppState:
    incoming_streams: List[IncomingStream] = field(default_factory=list)


def _outgoing_from_dict(d: dict) -> OutgoingStream:
    return OutgoingStream(
        id=d["id"],
        user_id=d["user_id"],
        local_port_out=d["local_port_out"],
        remote_host_out=d["remote_host_out"],
        remote_port_out=d["remote_port_out"],
        passphrase_out=d.get("passphrase_out"),
        latency_out=d.get("latency_out", 200),
        status=StreamStatus(d.get("status", "stopped")),
        pid=d.get("pid"),
        log_path=d.get("log_path"),
        start_time=d.get("start_time"),
        stop_time=d.get("stop_time"),
    )


def _incoming_from_dict(d: dict) -> IncomingStream:
    outgoing_raw = d.get("outgoing_streams", [])
    outgoing_streams = [_outgoing_from_dict(o) for o in outgoing_raw]

    return IncomingStream(
        id=d["id"],
        user_id=d["user_id"],
        name=d["name"],
        local_port_in=d["local_port_in"],
        internal_port=d["internal_port"],
        remote_host_in=d["remote_host_in"],
        remote_port_in=d["remote_port_in"],
        passphrase_in=d.get("passphrase_in"),
        latency_in=d.get("latency_in", 200),
        status=StreamStatus(d.get("status", "stopped")),
        pid=d.get("pid"),
        log_path=d.get("log_path"),
        start_time=d.get("start_time"),
        stop_time=d.get("stop_time"),
        outgoing_streams=outgoing_streams,
    )


def _outgoing_to_dict(o: OutgoingStream) -> dict:
    return {
        "id": o.id,
        "user_id": o.user_id,
        "local_port_out": o.local_port_out,
        "remote_host_out": o.remote_host_out,
        "remote_port_out": o.remote_port_out,
        "passphrase_out": o.passphrase_out,
        "latency_out": o.latency_out,
        "status": o.status.value,
        "pid": o.pid,
        "log_path": o.log_path,
        "start_time": o.start_time,
        "stop_time": o.stop_time,
    }


def _incoming_to_dict(s: IncomingStream) -> dict:
    return {
        "id": s.id,
        "user_id": s.user_id,
        "name": s.name,
        "local_port_in": s.local_port_in,
        "internal_port": s.internal_port,
        "remote_host_in": s.remote_host_in,
        "remote_port_in": s.remote_port_in,
        "passphrase_in": s.passphrase_in,
        "latency_in": s.latency_in,
        "status": s.status.value,
        "pid": s.pid,
        "log_path": s.log_path,
        "start_time": s.start_time,
        "stop_time": s.stop_time,
        "outgoing_streams": [_outgoing_to_dict(o) for o in s.outgoing_streams],
    }


def load_state(settings: Settings) -> AppState:
    path = settings.state_file
    if not os.path.exists(path):
        return AppState()

    try:
        with open(path, "r", encoding="utf-8") as f:
            raw = json.load(f)
    except Exception:
        return AppState()

    incoming_raw = raw.get("incoming_streams", [])
    incoming_streams = []
    for d in incoming_raw:
        try:
            incoming_streams.append(_incoming_from_dict(d))
        except Exception:
            continue

    return AppState(incoming_streams=incoming_streams)


def save_state(state: AppState, settings: Settings) -> None:
    path = settings.state_file
    os.makedirs(os.path.dirname(path), exist_ok=True)

    raw = {
        "incoming_streams": [_incoming_to_dict(s) for s in state.incoming_streams]
    }

    tmp_path = f"{path}.tmp"
    with open(tmp_path, "w", encoding="utf-8") as f:
        json.dump(raw, f, ensure_ascii=False, indent=2)
    os.replace(tmp_path, path)


def get_user_incoming_streams(state: AppState, user_id: int) -> list[IncomingStream]:
    return [s for s in state.incoming_streams if s.user_id == user_id]


def get_incoming_stream_by_id(
    state: AppState, stream_id: str
) -> Optional[IncomingStream]:
    for s in state.incoming_streams:
        if s.id == stream_id:
            return s
    return None


def count_running_incoming_streams(state: AppState) -> int:
    return sum(1 for s in state.incoming_streams if s.status == StreamStatus.RUNNING)


def allocate_incoming_port(state: AppState, settings: Settings) -> Optional[int]:
    if count_running_incoming_streams(state) >= settings.max_incoming_streams:
        return None

    start_port, end_port = settings.incoming_port_range

    used_ports = {
        s.local_port_in
        for s in state.incoming_streams
        if s.status == StreamStatus.RUNNING
    }

    for port in range(start_port, end_port + 1):
        if port not in used_ports:
            return port

    return None


def allocate_outgoing_port(state: AppState, settings: Settings) -> Optional[int]:
    start_port, end_port = settings.outgoing_port_range

    used_ports = set()
    for inc in state.incoming_streams:
        for out in inc.outgoing_streams:
            if out.status == StreamStatus.RUNNING:
                used_ports.add(out.local_port_out)

    for port in range(start_port, end_port + 1):
        if port not in used_ports:
            return port

    return None

5.7. app/core/ffmpeg_manager.py

Назначение: запуск и остановка ffmpeg для входящих и исходящих потоков.
import os
import time
import subprocess
import signal
from typing import Optional

from .models import IncomingStream, OutgoingStream, StreamStatus
from .storage import AppState, save_state
from ..config import Settings


def _ensure_logs_dir(settings: Settings) -> None:
    os.makedirs(settings.logs_dir, exist_ok=True)


def start_incoming_ffmpeg(
    stream: IncomingStream,
    settings: Settings,
    state: AppState,
) -> None:
    """
    ВХОДЯЩИЙ поток:
    srt://0.0.0.0:<local_port_in>?mode=listener -> udp://127.0.0.1:<internal_port>
    """
    _ensure_logs_dir(settings)

    log_path = os.path.join(settings.logs_dir, f"in_{stream.id}.log")

    srt_url_in = (
        f"srt://{stream.remote_host_in}:{stream.remote_port_in}"
        f"?mode=listener&transtype=live&latency={stream.latency_in}"
    )
    if stream.passphrase_in:
        srt_url_in += f"&passphrase={stream.passphrase_in}"

    output_url = f"udp://127.0.0.1:{stream.internal_port}"

    cmd = [
        "ffmpeg",
        "-hide_banner",
        "-stats",
        "-loglevel",
        "info",
        "-fflags",
        "+genpts",
        "-i",
        srt_url_in,
        "-c",
        "copy",
        "-f",
        "mpegts",
        output_url,
    ]

    log_file = open(log_path, "a", encoding="utf-8", errors="ignore")

    proc = subprocess.Popen(
        cmd,
        stdout=log_file,
        stderr=log_file,
        stdin=subprocess.DEVNULL,
    )

    stream.pid = proc.pid
    stream.log_path = log_path
    stream.status = StreamStatus.RUNNING
    stream.start_time = time.time()
    stream.stop_time = None

    save_state(state, settings)


def start_outgoing_ffmpeg(
    incoming: IncomingStream,
    outgoing: OutgoingStream,
    settings: Settings,
    state: AppState,
) -> None:
    """
    ИСХОДЯЩИЙ поток:
    udp://127.0.0.1:<internal_port> -> srt://0.0.0.0:<local_port_out>?mode=listener
    """
    _ensure_logs_dir(settings)

    log_path = os.path.join(settings.logs_dir, f"out_{outgoing.id}.log")

    input_url = f"udp://127.0.0.1:{incoming.internal_port}"

    srt_url_out = (
        f"srt://{outgoing.remote_host_out}:{outgoing.remote_port_out}"
        f"?mode=listener&transtype=live&latency={outgoing.latency_out}"
    )
    if outgoing.passphrase_out:
        srt_url_out += f"&passphrase={outgoing.passphrase_out}"

    cmd = [
        "ffmpeg",
        "-hide_banner",
        "-stats",
        "-loglevel",
        "info",
        "-fflags",
        "+genpts",
        "-i",
        input_url,
        "-c",
        "copy",
        "-f",
        "mpegts",
        srt_url_out,
    ]

    log_file = open(log_path, "a", encoding="utf-8", errors="ignore")

    proc = subprocess.Popen(
        cmd,
        stdout=log_file,
        stderr=log_file,
        stdin=subprocess.DEVNULL,
    )

    outgoing.pid = proc.pid
    outgoing.log_path = log_path
    outgoing.status = StreamStatus.RUNNING
    outgoing.start_time = time.time()
    outgoing.stop_time = None

    save_state(state, settings)


def _kill_pid(pid: Optional[int]) -> None:
    if not pid:
        return
    try:
        os.kill(pid, signal.SIGTERM)
    except ProcessLookupError:
        return
    except Exception:
        return


def stop_incoming_stream(
    incoming: IncomingStream,
    settings: Settings,
    state: AppState,
) -> None:
    """
    Останавливает входящий поток и все связанные исходящие.
    """
    for out in incoming.outgoing_streams:
        if out.pid:
            _kill_pid(out.pid)
        out.status = StreamStatus.STOPPED
        out.stop_time = time.time()

    if incoming.pid:
        _kill_pid(incoming.pid)
    incoming.status = StreamStatus.STOPPED
    incoming.stop_time = time.time()

    save_state(state, settings)

5.8. app/core/analyzer.py

Назначение: достаёт из логов ffmpeg битрейт, разрешение и длительность.
import os
import re
import time
from typing import Dict, Optional, List


def _parse_single_log(lines: List[str]) -> Dict[str, str]:
    res: Dict[str, str] = {}

    if not lines:
        return res

    # Разрешение (ищем сверху)
    for line in lines:
        m = re.search(r"(\\d{3,5})x(\\d{3,5})", line)
        if m:
            w, h = m.group(1), m.group(2)
            res["resolution"] = f"{w}x{h}"
            break

    # Битрейт (ищем снизу)
    for line in reversed(lines):
        m = re.search(r"bitrate=\\s*([\\d\\.]+kbits/s)", line)
        if m:
            res["avg_bitrate"] = m.group(1)
            break

    # Dropped frames (если ffmpeg пишет drop=)
    for line in reversed(lines):
        m = re.search(r"drop=\\s*(\\d+)", line)
        if m:
            res["dropped_frames"] = m.group(1)
            break

    return res


def parse_ffmpeg_logs(log_paths: List[str]) -> Dict[str, str]:
    result: Dict[str, str] = {}

    last_bitrate: Optional[str] = None
    first_resolution: Optional[str] = None

    for path in log_paths:
        if not path or not os.path.exists(path):
            continue

        try:
            with open(path, "r", encoding="utf-8", errors="ignore") as f:
                lines = f.readlines()
        except Exception:
            continue

        partial = _parse_single_log(lines)

        if not first_resolution and "resolution" in partial:
            first_resolution = partial["resolution"]

        if "avg_bitrate" in partial:
            last_bitrate = partial["avg_bitrate"]

    if first_resolution:
        result["resolution"] = first_resolution
    if last_bitrate:
        result["avg_bitrate"] = last_bitrate

    return result


def parse_ffmpeg_log(log_path: Optional[str]) -> Dict[str, str]:
    if not log_path:
        return {}
    return parse_ffmpeg_logs([log_path])


def parse_duration(start_time: Optional[float], stop_time: Optional[float]) -> str:
    if not start_time:
        return "нет данных"

    end = stop_time or time.time()
    seconds = int(end - start_time)
    if seconds < 0:
        return "нет данных"

    hours = seconds // 3600
    minutes = (seconds % 3600) // 60
    secs = seconds % 60

    parts = []
    if hours:
        parts.append(f"{hours} ч")
    if minutes:
        parts.append(f"{minutes} мин")
    if not parts:
        parts.append(f"{secs} с")

    return " ".join(parts)

5.9. app/bot/handlers.py

Назначение: все сценарии бота (создание входящих/исходящих, меню потоков, инфо, удаление).
from aiogram import Router, F
from aiogram.filters import CommandStart
from aiogram.types import (
    Message,
    CallbackQuery,
    InlineKeyboardMarkup,
    InlineKeyboardButton,
)
from aiogram.fsm.state import StatesGroup, State
    from aiogram.fsm.context import FSMContext

from ..config import Settings
from ..core import storage
from ..core.models import (
    create_incoming_stream,
    create_outgoing_stream,
    StreamStatus,
)
from ..core.ffmpeg_manager import (
    start_incoming_ffmpeg,
    start_outgoing_ffmpeg,
    stop_incoming_stream,
)
from ..core.analyzer import (
    parse_ffmpeg_logs,
    parse_duration,
)
from . import messages
from .keyboards import (
    main_menu_keyboard,
    incoming_list_inline_keyboard,
    yes_no_keyboard,
)

router = Router()


class IncomingCreation(StatesGroup):
    waiting_for_name = State()
    waiting_for_passphrase_needed = State()
    waiting_for_passphrase = State()


class OutgoingCreation(StatesGroup):
    waiting_for_incoming_selection = State()
    waiting_for_passphrase_needed = State()
    waiting_for_passphrase = State()


class ManageFlow(StatesGroup):
    waiting_for_incoming_selection = State()


async def notify_admin(bot, settings: Settings, text: str):
    if settings.admin_user_id:
        try:
            await bot.send_message(settings.admin_user_id, text)
        except Exception:
            pass


@router.message(CommandStart())
async def cmd_start(message: Message, state: FSMContext, settings: Settings):
    bot = message.bot
    chat_id = message.chat.id
    current_id = message.message_id

    for msg_id in range(current_id - 1, max(current_id - 50, 0), -1):
        try:
            await bot.delete_message(chat_id, msg_id)
        except Exception:
            continue

    await state.clear()
    await message.answer(
        messages.START_MESSAGE,
        reply_markup=main_menu_keyboard(),
    )
    await message.answer(messages.MAIN_MENU_PROMPT, reply_markup=main_menu_keyboard())


@router.message(F.text == messages.BTN_CREATE_INCOMING)
async def handle_create_incoming(message: Message, state: FSMContext):
    await state.set_state(IncomingCreation.waiting_for_name)
    await message.answer(messages.ASK_INCOMING_NAME, reply_markup=main_menu_keyboard())


@router.message(F.text == messages.BTN_LIST_MY_STREAMS)
async def handle_stream_menu(message: Message, state: FSMContext, settings: Settings):
    state_obj = storage.load_state(settings)
    user_id = message.from_user.id

    if settings.admin_user_id and user_id == settings.admin_user_id:
        streams_all = state_obj.incoming_streams
    else:
        streams_all = storage.get_user_incoming_streams(state_obj, user_id)

    user_streams = [s for s in streams_all if s.status == StreamStatus.RUNNING]

    if not user_streams:
        await message.answer(
            "Нет активных потоков.",
            reply_markup=main_menu_keyboard(),
        )
        return

    kb = incoming_list_inline_keyboard(
        user_streams,
        current_user_id=user_id,
        admin_user_id=settings.admin_user_id,
    )
    await state.set_state(ManageFlow.waiting_for_incoming_selection)
    await message.answer("Выберите поток для управления:", reply_markup=kb)


@router.message(F.text == messages.BTN_ADD_OUTGOING)
async def handle_add_outgoing(message: Message, state: FSMContext, settings: Settings):
    state_obj = storage.load_state(settings)
    user_id = message.from_user.id

    if settings.admin_user_id and user_id == settings.admin_user_id:
        streams_all = state_obj.incoming_streams
    else:
        streams_all = storage.get_user_incoming_streams(state_obj, user_id)

    user_streams = [s for s in streams_all if s.status == StreamStatus.RUNNING]

    if not user_streams:
        await message.answer(
            "Нет активных входящих потоков.",
            reply_markup=main_menu_keyboard(),
        )
        return

    await state.set_state(OutgoingCreation.waiting_for_incoming_selection)
    kb = incoming_list_inline_keyboard(
        user_streams,
        current_user_id=user_id,
        admin_user_id=settings.admin_user_id,
    )
    await message.answer(messages.ASK_SELECT_INCOMING_FOR_OUTGOING, reply_markup=kb)


@router.message(IncomingCreation.waiting_for_name)
async def incoming_name(message: Message, state: FSMContext):
    await state.update_data(name=message.text.strip())
    await state.set_state(IncomingCreation.waiting_for_passphrase_needed)
    await message.answer(
        messages.ASK_PASSPHRASE_NEEDED,
        reply_markup=yes_no_keyboard(),
    )


@router.message(IncomingCreation.waiting_for_passphrase_needed)
async def incoming_passphrase_needed(
    message: Message, state: FSMContext, settings: Settings
):
    answer = (message.text or "").strip().lower()

    if answer in ("да", "yes", "y"):
        await state.set_state(IncomingCreation.waiting_for_passphrase)
        await message.answer(
            messages.ASK_PASSPHRASE_IN,
            reply_markup=main_menu_keyboard(),
        )
        return

    if answer in ("нет", "no", "n"):
        data = await state.get_data()
        await finalize_incoming_creation(
            message, state, settings, data, passphrase=None
        )
        return

    await message.answer(
        messages.INVALID_INPUT,
        reply_markup=yes_no_keyboard(),
    )


@router.message(IncomingCreation.waiting_for_passphrase)
async def incoming_passphrase(
    message: Message, state: FSMContext, settings: Settings
):
    passphrase = (message.text or "").strip()
    if len(passphrase) < 10:
        await message.answer(messages.PASSPHRASE_TOO_SHORT)
        return

    data = await state.get_data()
    await finalize_incoming_creation(
        message, state, settings, data, passphrase=passphrase
    )


async def finalize_incoming_creation(
    message: Message,
    state: FSMContext,
    settings: Settings,
    data: dict,
    passphrase: str | None,
):
    try:
        state_obj = storage.load_state(settings)

        if (
            storage.count_running_incoming_streams(state_obj)
            >= settings.max_incoming_streams
        ):
            await state.clear()
            await message.answer(
                messages.INCOMING_LIMIT_REACHED,
                reply_markup=main_menu_keyboard(),
            )
            return

        port = storage.allocate_incoming_port(state_obj, settings)
        if port is None:
            await state.clear()
            await message.answer(
                "Нет свободных входящих портов в заданном диапазоне.",
                reply_markup=main_menu_keyboard(),
            )
            return

        stream = create_incoming_stream(
            user_id=message.from_user.id,
            name=data["name"],
            local_port_in=port,
            remote_host_in="0.0.0.0",
            remote_port_in=port,
            passphrase_in=passphrase,
            latency=200,
        )

        state_obj.incoming_streams.append(stream)

        try:
            start_incoming_ffmpeg(stream, settings, state_obj)
        except Exception as e_ff:
            await state.clear()
            await message.answer(
                f"{messages.INCOMING_CREATE_ERROR}\n\nДетали: {e_ff}",
                reply_markup=main_menu_keyboard(),
            )
            return

        storage.save_state(state_obj, settings)
        await state.clear()

        base_url = f"srt://{settings.server_public_ip}:{stream.local_port_in}"
        if passphrase:
            srt_url = f"{base_url}?passphrase={passphrase}"
        else:
            srt_url = base_url

        details = (
            f"Название: {stream.name}\n"
            f"{srt_url}\n"
            f"Статус: {stream.status.value}"
        )
        await message.answer(
            messages.INCOMING_CREATED_OK.format(details=details),
            reply_markup=main_menu_keyboard(),
        )

        user = message.from_user
        admin_text = (
            "Новый ВХОДЯЩИЙ поток\n"
            f"Пользователь: {user.id} (@{user.username or 'нет username'})\n"
            f"Название: {stream.name}\n"
            f"Порт: {stream.local_port_in}\n"
            f"URL: {srt_url}"
        )
        await notify_admin(message.bot, settings, admin_text)

    except Exception as e:
        await state.clear()
        await message.answer(
            f"Внутренняя ошибка при создании входящего потока:\n{e}",
            reply_markup=main_menu_keyboard(),
        )


@router.callback_query(
    OutgoingCreation.waiting_for_incoming_selection, F.data.startswith("incoming:")
)
async def outgoing_select_incoming(
    callback: CallbackQuery, state: FSMContext, settings: Settings
):
    incoming_id = callback.data.split(":", 1)[1]
    await state.update_data(incoming_id=incoming_id)
    await state.set_state(OutgoingCreation.waiting_for_passphrase_needed)
    await callback.message.answer(
        messages.ASK_PASSPHRASE_OUT_NEEDED,
        reply_markup=yes_no_keyboard(),
    )
    await callback.answer()


@router.message(OutgoingCreation.waiting_for_passphrase_needed)
async def outgoing_passphrase_needed(
    message: Message, state: FSMContext, settings: Settings
):
    answer = (message.text or "").strip().lower()

    if answer in ("да", "yes", "y"):
        await state.set_state(OutgoingCreation.waiting_for_passphrase)
        await message.answer(
            messages.ASK_PASSPHRASE_OUT,
            reply_markup=main_menu_keyboard(),
        )
        return

    if answer in ("нет", "no", "n"):
        data = await state.get_data()
        await finalize_outgoing_creation(
            message, state, settings, data, passphrase=None
        )
        return

    await message.answer(
        messages.INVALID_INPUT,
        reply_markup=yes_no_keyboard(),
    )


@router.message(OutgoingCreation.waiting_for_passphrase)
async def outgoing_passphrase(
    message: Message, state: FSMContext, settings: Settings
):
    passphrase = (message.text or "").strip()
    if len(passphrase) < 10:
        await message.answer(messages.PASSPHRASE_TOO_SHORT)
        return

    data = await state.get_data()
    await finalize_outgoing_creation(
        message, state, settings, data, passphrase=passphrase
    )


async def finalize_outgoing_creation(
    message: Message,
    state: FSMContext,
    settings: Settings,
    data: dict,
    passphrase: str | None,
):
    try:
        state_obj = storage.load_state(settings)
        incoming = storage.get_incoming_stream_by_id(state_obj, data["incoming_id"])
        if not incoming:
            await state.clear()
            await message.answer(
                messages.NO_INCOMING_SELECTED,
                reply_markup=main_menu_keyboard(),
            )
            return

        port_out = storage.allocate_outgoing_port(state_obj, settings)
        if port_out is None:
            await state.clear()
            await message.answer(
                "Нет свободных исходящих портов в заданном диапазоне.",
                reply_markup=main_menu_keyboard(),
            )
            return

        outgoing = create_outgoing_stream(
            user_id=message.from_user.id,
            local_port_out=port_out,
            remote_host_out="0.0.0.0",
            remote_port_out=port_out,
            passphrase_out=passphrase,
            latency=200,
        )

        incoming.outgoing_streams.append(outgoing)

        try:
            start_outgoing_ffmpeg(incoming, outgoing, settings, state_obj)
        except Exception as e_ff:
            await state.clear()
            await message.answer(
                f"{messages.OUTGOING_CREATE_ERROR}\n\nДетали: {e_ff}",
                reply_markup=main_menu_keyboard(),
            )
            return

        storage.save_state(state_obj, settings)
        await state.clear()

        base_url = f"srt://{settings.server_public_ip}:{outgoing.local_port_out}"
        if passphrase:
            srt_url = f"{base_url}?passphrase={passphrase}"
        else:
            srt_url = base_url

        details = (
            f"Название входящего потока: {incoming.name}\n"
            f"{srt_url}\n"
            f"Статус: {outgoing.status.value}"
        )

        await message.answer(
            messages.OUTGOING_CREATED_OK.format(details=details),
            reply_markup=main_menu_keyboard(),
        )

        user = message.from_user
        admin_text = (
            "Новый ИСХОДЯЩИЙ поток\n"
            f"Пользователь: {user.id} (@{user.username or 'нет username'})\n"
            f"От входящего: {incoming.name} (порт {incoming.local_port_in})\n"
            f"Порт исходящего: {outgoing.local_port_out}\n"
            f"URL: {srt_url}"
        )
        await notify_admin(message.bot, settings, admin_text)

    except Exception as e:
        await state.clear()
        await message.answer(
            f"Внутренняя ошибка при создании исходящего потока:\n{e}",
            reply_markup=main_menu_keyboard(),
        )


@router.callback_query(
    ManageFlow.waiting_for_incoming_selection, F.data.startswith("incoming:")
)
async def manage_incoming_callback(
    callback: CallbackQuery, state: FSMContext, settings: Settings
):
    stream_id = callback.data.split(":", 1)[1]
    state_obj = storage.load_state(settings)
    incoming = storage.get_incoming_stream_by_id(state_obj, stream_id)
    if not incoming:
        await callback.answer("Поток не найден.", show_alert=True)
        return

    kb = InlineKeyboardMarkup(
        inline_keyboard=[
            [
                InlineKeyboardButton(
                    text="ℹ️ Информация",
                    callback_data=f"action:info:{stream_id}",
                ),
                InlineKeyboardButton(
                    text="???? Удалить поток",
                    callback_data=f"action:delete:{stream_id}",
                ),
            ]
        ]
    )

    await callback.message.answer(
        f"Выбран поток: {incoming.name} (ID: {incoming.local_port_in})",
        reply_markup=kb,
    )
    await callback.answer()
    await state.clear()


@router.callback_query(F.data.startswith("action:"))
async def manage_action_callback(callback: CallbackQuery, settings: Settings):
    parts = callback.data.split(":", 2)
    if len(parts) != 3:
        await callback.answer("Некорректное действие.", show_alert=True)
        return

    action, stream_id = parts[1], parts[2]
    user_id = callback.from_user.id

    state_obj = storage.load_state(settings)
    incoming = storage.get_incoming_stream_by_id(state_obj, stream_id)
    if not incoming:
        await callback.answer("Поток не найден.", show_alert=True)
        return

    is_admin = settings.admin_user_id and (user_id == settings.admin_user_id)
    is_owner = incoming.user_id == user_id

    if action == "info":
        log_paths = []
        if incoming.log_path:
            log_paths.append(incoming.log_path)
        for o in incoming.outgoing_streams:
            if o.log_path:
                log_paths.append(o.log_path)

        stats = parse_ffmpeg_logs(log_paths) if log_paths else {}
        duration = parse_duration(incoming.start_time, incoming.stop_time)

        base_in = f"srt://{settings.server_public_ip}:{incoming.local_port_in}"
        if incoming.passphrase_in:
            srt_in_url = f"{base_in}?passphrase={incoming.passphrase_in}"
        else:
            srt_in_url = base_in

        owner_line = f"Владелец (user_id): {incoming.user_id}"

        lines = [
            f"Название: {incoming.name}",
            owner_line,
            f"in: {srt_in_url}",
            f"Статус: {incoming.status.value}",
            f"Время работы: {duration}",
            f"Битрейт: {stats.get('avg_bitrate') or 'нет данных'}",
            f"Разрешение: {stats.get('resolution') or 'нет данных'}",
            "",
            "Исходящие потоки:",
        ]

        if not incoming.outgoing_streams:
            lines.append("– нет исходящих потоков")
        else:
            for idx, o in enumerate(incoming.outgoing_streams, start=1):
                base_out = f"srt://{settings.server_public_ip}:{o.local_port_out}"
                if o.passphrase_out:
                    srt_out_url = f"{base_out}?passphrase={o.passphrase_out}"
                else:
                    srt_out_url = base_out

                lines.append(
                    f"- Исходящий {idx}: {srt_out_url}, статус: {o.status.value}"
                )

        await callback.message.answer(
            messages.INFO_HEADER.format(details="\n".join(lines)),
            reply_markup=main_menu_keyboard(),
        )
        await callback.answer()
        return

    if action == "delete":
        if not (is_admin or is_owner):
            await callback.answer(
                "У вас нет прав управлять этим потоком.",
                show_alert=True,
            )
            return

        from ..core.ffmpeg_manager import stop_incoming_stream

        stop_incoming_stream(incoming, settings, state_obj)
        state_obj.incoming_streams = [
            s for s in state_obj.incoming_streams if s.id != incoming.id
        ]
        storage.save_state(state_obj, settings)

        await callback.message.answer(
            messages.DELETE_OK,
            reply_markup=main_menu_keyboard(),
        )
        await callback.answer()

        admin_text = (
            "Поток УДАЛЁН\n"
            f"Инициатор: {user_id}\n"
            f"ID потока: {incoming.id}\n"
            f"Название: {incoming.name}\n"
            f"Порт: {incoming.local_port_in}\n"
            f"Владелец (user_id): {incoming.user_id}"
        )
        await notify_admin(callback.message.bot, settings, admin_text)
        return

6. systemd unit: автозапуск бота

Создай файл /etc/systemd/system/srt-bot.service:

[Unit]
Description=SRT Telegram Bot Service
After=network.target

[Service]
Type=simple
WorkingDirectory=/opt/srt-bot
ExecStart=/opt/srt-bot/venv/bin/python -m app.main
Restart=always
RestartSec=3

# При желании можно запускать под отдельным пользователем:
# User=srtbot
# Group=srtbot

[Install]
WantedBy=multi-user.target

Далее:

sudo systemctl daemon-reload
sudo systemctl enable srt-bot.service
sudo systemctl start srt-bot.service
sudo systemctl status srt-bot.service --no-pager

Логи сервиса:

sudo journalctl -u srt-bot.service -f

7. Как пользоваться ботом

7.1. Старт

  • В Telegram открыть бота.
  • Отправить команду /start.
  • Появится приветствие и главное меню:
    • ⬆️ add input — создать входящий поток.
    • ⬇️ add output — добавить исходящий поток к входящему.
    • Меню потоков — список активных потоков:
      • обычный пользователь видит только свои;
      • админ (user_id из ADMIN_USER_ID) видит все.

7.2. Создание входящего потока

  1. Нажать ⬆️ add input.
  2. Ввести название потока (например, название площадки).
  3. Ответить на вопрос:
    «Нужно ли защитить входящий поток паролем (passphrase)?» — кнопками Да / Нет.
  4. Если «Да» — ввести passphrase (минимум 10 символов).
  5. Бот ответит, например:
    Входящий поток создан и запущен.
    
    Название: ZAL1
    srt://ru.prostudio.net:5000
    Статус: running
    Этот URL — то, что забивается в vMix / Larix / кодер на площадке (режим caller).

7.3. Добавление исходящего потока

  1. Нажать ⬇️ add output.
  2. Выбрать входящий поток из списка.
  3. Ответить Да/Нет на защиту паролем исходящего.
  4. При необходимости — ввести passphrase.
  5. Бот ответит, например:
    Исходящий поток создан и запущен.
    
    Название входящего потока: ZAL1
    srt://ru.prostudio.net:7000
    Статус: running
    Этот URL — куда подключается принимающий vMix / пульт / сервер (режим caller).

7.4. Меню потоков (управление)

  1. Нажать Меню потоков.
  2. Увидеть список активных входящих потоков (у админа — все, у пользователя — только свои).
  3. Нажать на нужный поток — появятся кнопки:
    • ℹ️ Информация — показывает:
      • Название;
      • Владельца (user_id);
      • in: srt://ru.prostudio.net:порт (+ passphrase, если есть);
      • Статус (running/stopped);
      • Время работы;
      • Битрейт и разрешение (по логам ffmpeg, если доступны);
      • Список исходящих потоков с их URL и статусом.
    • ???? Удалить поток — гасит входящий ffmpeg и все связанные исходящие, удаляет поток из состояния.

7.5. Лимиты и права

  • Максимум 20 одновременно running входящих потоков. На 21-м бот ответит:
    Достигнуто максимальное количество входящих потоков (20)...
  • Обычный пользователь видит и управляет только своими потоками.
  • Админ (user_id из ADMIN_USER_ID):
    • видит все потоки;
    • может удалять чужие потоки;
    • получает уведомления о создании/удалении потоков.