Полная инструкция по установке и коду: Ubuntu 24 + Python + aiogram + ffmpeg + SRT
Бот управляет SRT-потоками на сервере (VPS) через ffmpeg.
Все соединения фактически организованы так:
listener на сервере (порт 5000–5020);udp://127.0.0.1:internal_port.udp://127.0.0.1:internal_port;listener на сервере (порт 7000–7100).
Клиенты на площадках / студиях подключаются к srt://ru.prostudio.net:порт в режиме caller.
Сервер выступает как SRT-хаб с белым IP/доменом.
Лимит: одновременно не более 20 входящих потоков в статусе running.
Исходящих потоков — по свободным портам 7000–7100.
/opt/srt-bot/
venv/ # виртуальное окружение Python
app/
__init__.py
main.py # точка входа бота
config.py # загрузка настроек из .env
bot/
__init__.py
messages.py # все текстовые сообщения и подписи кнопок
keyboards.py # reply/inline клавиатуры
handlers.py # все обработчики aiogram
core/
__init__.py
models.py # dataclass-модели входящих/исходящих потоков
storage.py # загрузка/сохранение состояния (JSON)
ffmpeg_manager.py # запуск/остановка ffmpeg, управление PID
analyzer.py # анализ логов ffmpeg (битрейт, разрешение, длительность)
data/
state.json # состояние потоков (создаётся автоматически)
logs/ # логи ffmpeg (создаётся автоматически)
.env # конфигурация (токен, домен, порты, пути)
requirements.txt
/etc/systemd/system/srt-bot.service # systemd unit sudo mkdir -p /opt/srt-bot
sudo chown "$(whoami)":"$(whoami)" /opt/srt-bot
cd /opt/srt-bot
sudo apt update
sudo apt install -y python3 python3-venv python3-pip ffmpeg cd /opt/srt-bot
python3 -m venv venv
./venv/bin/pip install --upgrade pip Создай файл requirements.txt:
aiogram>=3.0.0,<4.0.0
python-dotenv>=1.0.0 Установи зависимости:
./venv/bin/pip install -r requirements.txt cd /opt/srt-bot
mkdir -p app/bot app/core data logs
touch app/__init__.py app/bot/__init__.py app/core/__init__.py Создай файл /opt/srt-bot/.env:
TELEGRAM_BOT_TOKEN=YOUR_TELEGRAM_BOT_TOKEN_HERE
# Публичный домен или IP сервера
SERVER_PUBLIC_IP=ru.prostudio.net
# Telegram user_id администратора (будет получать уведомления о действиях)
ADMIN_USER_ID=54660228
# Диапазон портов для входящих потоков
INCOMING_PORT_START=5000
INCOMING_PORT_END=5020
# Диапазон портов для исходящих потоков
OUTGOING_PORT_START=7000
OUTGOING_PORT_END=7100
# Максимальное количество одновременно running входящих потоков
MAX_INCOMING_STREAMS=20
# Пути к файлам состояния и логам
STATE_FILE=/opt/srt-bot/data/state.json
LOGS_DIR=/opt/srt-bot/logs
Замените YOUR_TELEGRAM_BOT_TOKEN_HERE на реальный токен бота,
при необходимости поправьте ADMIN_USER_ID и домен.
app/config.py.env в объект Settings.import os
from dataclasses import dataclass
from pathlib import Path
from typing import Tuple, Optional
from dotenv import load_dotenv
@dataclass
class Settings:
bot_token: str
admin_user_id: Optional[int]
server_public_ip: str
incoming_port_range: Tuple[int, int]
outgoing_port_range: Tuple[int, int]
max_incoming_streams: int
state_file: str
logs_dir: str
def load_settings() -> Settings:
base_dir = Path(__file__).resolve().parent.parent
env_path = base_dir / ".env"
if env_path.exists():
load_dotenv(env_path)
bot_token = os.getenv("TELEGRAM_BOT_TOKEN")
if not bot_token:
raise RuntimeError("TELEGRAM_BOT_TOKEN is not set in .env")
admin_raw = os.getenv("ADMIN_USER_ID")
admin_user_id = int(admin_raw) if admin_raw else None
server_public_ip = os.getenv("SERVER_PUBLIC_IP", "ru.prostudio.net")
incoming_start = int(os.getenv("INCOMING_PORT_START", "5000"))
incoming_end = int(os.getenv("INCOMING_PORT_END", "5020"))
outgoing_start = int(os.getenv("OUTGOING_PORT_START", "7000"))
outgoing_end = int(os.getenv("OUTGOING_PORT_END", "7100"))
max_incoming = int(os.getenv("MAX_INCOMING_STREAMS", "20"))
state_file = os.getenv("STATE_FILE", str(base_dir / "data" / "state.json"))
logs_dir = os.getenv("LOGS_DIR", str(base_dir / "logs"))
return Settings(
bot_token=bot_token,
admin_user_id=admin_user_id,
server_public_ip=server_public_ip,
incoming_port_range=(incoming_start, incoming_end),
outgoing_port_range=(outgoing_start, outgoing_end),
max_incoming_streams=max_incoming,
state_file=state_file,
logs_dir=logs_dir,
) app/main.pysettings.import asyncio
import logging
from aiogram import Bot, Dispatcher, BaseMiddleware
from aiogram.client.default import DefaultBotProperties
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.types import TelegramObject
from .config import load_settings, Settings
from .bot.handlers import router as bot_router
class SettingsMiddleware(BaseMiddleware):
"""
Прокидывает объект Settings в каждый хэндлер как параметр `settings: Settings`.
"""
def __init__(self, settings: Settings):
super().__init__()
self.settings = settings
async def __call__(
self,
handler,
event: TelegramObject,
data: dict,
):
data["settings"] = self.settings
return await handler(event, data)
async def main():
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] [%(levelname)s] %(name)s: %(message)s",
)
settings = load_settings()
bot = Bot(
token=settings.bot_token,
default=DefaultBotProperties(parse_mode="HTML"),
)
dp = Dispatcher(storage=MemoryStorage())
dp.include_router(bot_router)
# Подключаем middleware, чтобы в хэндлеры приходил `settings`
dp.update.middleware.register(SettingsMiddleware(settings))
await dp.start_polling(bot)
if __name__ == "__main__":
asyncio.run(main()) app/bot/messages.py# Все текстовые сообщения и подписи кнопок бота
START_MESSAGE = (
"Привет!\n"
"Это SRT-хаб — твой инструмент для быстрой пересылки сигнала.\n\n"
"Создай входящий поток, добавь исходящий — и сигнал пойдёт через сервер ru.prostudio.net."
)
MAIN_MENU_PROMPT = "Выберите действие в меню ниже."
# --- Подписи кнопок главного меню ---
BTN_CREATE_INCOMING = "⬆️ add input"
BTN_ADD_OUTGOING = "⬇️ add output"
BTN_LIST_MY_STREAMS = "Меню потоков"
# Старые константы (на будущее, сейчас напрямую не используются)
BTN_STOP_OR_DELETE = "Остановить / удалить поток"
BTN_STREAM_INFO = "Информация о потоке"
# --- Тексты для создания входящего потока ---
ASK_INCOMING_NAME = "Введите название входящего потока (любой текст):"
ASK_PASSPHRASE_NEEDED = (
"Нужно ли защитить входящий поток паролем (passphrase)?\n"
"Ответьте 'да' или 'нет'."
)
ASK_PASSPHRASE_IN = (
"Введите passphrase для входящего потока (минимум 10 символов):"
)
# --- Тексты для создания исходящего потока ---
ASK_SELECT_INCOMING_FOR_OUTGOING = (
"Выберите входящий поток, от которого нужно сделать исходящий:"
)
ASK_PASSPHRASE_OUT_NEEDED = (
"Нужно ли защитить исходящий поток паролем (passphrase)?\n"
"Ответьте 'да' или 'нет'."
)
ASK_PASSPHRASE_OUT = (
"Введите passphrase для исходящего потока (минимум 10 символов):"
)
# --- Общие сообщения и ошибки ---
INVALID_INPUT = "Некорректный ввод. Попробуйте ещё раз."
PASSPHRASE_TOO_SHORT = "Passphrase должен быть не короче 10 символов. Попробуйте ещё раз."
NO_STREAMS_YET = "У вас пока нет активных потоков."
NO_INCOMING_SELECTED = "Сначала создайте хотя бы один входящий поток."
INCOMING_LIMIT_REACHED = (
"Достигнуто максимальное количество входящих потоков (20).\n"
"Сначала остановите и удалите один из существующих потоков."
)
INCOMING_CREATE_ERROR = "Ошибка при создании входящего потока. Проверьте логи ffmpeg на сервере."
OUTGOING_CREATE_ERROR = "Ошибка при создании исходящего потока. Проверьте логи ffmpeg на сервере."
# --- Успешное создание потоков ---
INCOMING_CREATED_OK = "Входящий поток создан и запущен.\n\n{details}"
OUTGOING_CREATED_OK = "Исходящий поток создан и запущен.\n\n{details}"
# --- Информация и удаление ---
INFO_HEADER = "Информация о потоке:\n\n{details}"
DELETE_OK = "Поток удалён" app/bot/keyboards.pyfrom aiogram.types import (
ReplyKeyboardMarkup,
KeyboardButton,
InlineKeyboardMarkup,
InlineKeyboardButton,
)
from . import messages
def main_menu_keyboard() -> ReplyKeyboardMarkup:
"""
Главное меню:
[⬆️ add input] [⬇️ add output]
[Меню потоков]
"""
kb = ReplyKeyboardMarkup(
keyboard=[
[
KeyboardButton(text=messages.BTN_CREATE_INCOMING),
KeyboardButton(text=messages.BTN_ADD_OUTGOING),
],
[
KeyboardButton(text=messages.BTN_LIST_MY_STREAMS),
],
],
resize_keyboard=True,
)
return kb
def yes_no_keyboard() -> ReplyKeyboardMarkup:
"""
Клавиатура 'Да' / 'Нет' для вопросов про passphrase.
"""
kb = ReplyKeyboardMarkup(
keyboard=[
[
KeyboardButton(text="Да"),
KeyboardButton(text="Нет"),
]
],
resize_keyboard=True,
one_time_keyboard=True,
)
return kb
def incoming_list_inline_keyboard(
streams,
current_user_id: int,
admin_user_id: int | None,
) -> InlineKeyboardMarkup:
"""
Инлайн-клавиатура со списком входящих потоков.
Используется и для 'Меню потоков', и для выбора входящего
при создании исходящего.
"""
rows = []
for s in streams:
label = f"{s.name} ({s.local_port_in})"
# Для админа показываем владельца
if admin_user_id and current_user_id == admin_user_id and s.user_id != admin_user_id:
label += f" [user {s.user_id}]"
rows.append(
[
InlineKeyboardButton(
text=label,
callback_data=f"incoming:{s.id}",
)
]
)
return InlineKeyboardMarkup(inline_keyboard=rows) app/core/models.pyimport uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Optional
class StreamStatus(str, Enum):
RUNNING = "running"
STOPPED = "stopped"
@dataclass
class OutgoingStream:
id: str
user_id: int
local_port_out: int
remote_host_out: str
remote_port_out: int
passphrase_out: Optional[str]
latency_out: int
status: StreamStatus = StreamStatus.STOPPED
pid: Optional[int] = None
log_path: Optional[str] = None
start_time: Optional[float] = None
stop_time: Optional[float] = None
@dataclass
class IncomingStream:
id: str
user_id: int
name: str
local_port_in: int
internal_port: int
remote_host_in: str
remote_port_in: int
passphrase_in: Optional[str]
latency_in: int
status: StreamStatus = StreamStatus.STOPPED
pid: Optional[int] = None
log_path: Optional[str] = None
start_time: Optional[float] = None
stop_time: Optional[float] = None
outgoing_streams: List[OutgoingStream] = field(default_factory=list)
def create_incoming_stream(
user_id: int,
name: str,
local_port_in: int,
remote_host_in: str,
remote_port_in: int,
passphrase_in: Optional[str],
latency: int,
) -> IncomingStream:
"""
Создаёт объект IncomingStream.
internal_port берём как local_port_in + 1000 (5000 -> 6000 и т.п.).
"""
internal_port = local_port_in + 1000
return IncomingStream(
id=str(uuid.uuid4()),
user_id=user_id,
name=name,
local_port_in=local_port_in,
internal_port=internal_port,
remote_host_in=remote_host_in,
remote_port_in=remote_port_in,
passphrase_in=passphrase_in,
latency_in=latency,
status=StreamStatus.STOPPED,
pid=None,
log_path=None,
start_time=None,
stop_time=None,
outgoing_streams=[],
)
def create_outgoing_stream(
user_id: int,
local_port_out: int,
remote_host_out: str,
remote_port_out: int,
passphrase_out: Optional[str],
latency: int,
) -> OutgoingStream:
return OutgoingStream(
id=str(uuid.uuid4()),
user_id=user_id,
local_port_out=local_port_out,
remote_host_out=remote_host_out,
remote_port_out=remote_port_out,
passphrase_out=passphrase_out,
latency_out=latency,
status=StreamStatus.STOPPED,
pid=None,
log_path=None,
start_time=None,
stop_time=None,
) app/core/storage.pyimport json
import os
from dataclasses import dataclass, field
from typing import List, Optional
from .models import IncomingStream, OutgoingStream, StreamStatus
from ..config import Settings
@dataclass
class AppState:
incoming_streams: List[IncomingStream] = field(default_factory=list)
def _outgoing_from_dict(d: dict) -> OutgoingStream:
return OutgoingStream(
id=d["id"],
user_id=d["user_id"],
local_port_out=d["local_port_out"],
remote_host_out=d["remote_host_out"],
remote_port_out=d["remote_port_out"],
passphrase_out=d.get("passphrase_out"),
latency_out=d.get("latency_out", 200),
status=StreamStatus(d.get("status", "stopped")),
pid=d.get("pid"),
log_path=d.get("log_path"),
start_time=d.get("start_time"),
stop_time=d.get("stop_time"),
)
def _incoming_from_dict(d: dict) -> IncomingStream:
outgoing_raw = d.get("outgoing_streams", [])
outgoing_streams = [_outgoing_from_dict(o) for o in outgoing_raw]
return IncomingStream(
id=d["id"],
user_id=d["user_id"],
name=d["name"],
local_port_in=d["local_port_in"],
internal_port=d["internal_port"],
remote_host_in=d["remote_host_in"],
remote_port_in=d["remote_port_in"],
passphrase_in=d.get("passphrase_in"),
latency_in=d.get("latency_in", 200),
status=StreamStatus(d.get("status", "stopped")),
pid=d.get("pid"),
log_path=d.get("log_path"),
start_time=d.get("start_time"),
stop_time=d.get("stop_time"),
outgoing_streams=outgoing_streams,
)
def _outgoing_to_dict(o: OutgoingStream) -> dict:
return {
"id": o.id,
"user_id": o.user_id,
"local_port_out": o.local_port_out,
"remote_host_out": o.remote_host_out,
"remote_port_out": o.remote_port_out,
"passphrase_out": o.passphrase_out,
"latency_out": o.latency_out,
"status": o.status.value,
"pid": o.pid,
"log_path": o.log_path,
"start_time": o.start_time,
"stop_time": o.stop_time,
}
def _incoming_to_dict(s: IncomingStream) -> dict:
return {
"id": s.id,
"user_id": s.user_id,
"name": s.name,
"local_port_in": s.local_port_in,
"internal_port": s.internal_port,
"remote_host_in": s.remote_host_in,
"remote_port_in": s.remote_port_in,
"passphrase_in": s.passphrase_in,
"latency_in": s.latency_in,
"status": s.status.value,
"pid": s.pid,
"log_path": s.log_path,
"start_time": s.start_time,
"stop_time": s.stop_time,
"outgoing_streams": [_outgoing_to_dict(o) for o in s.outgoing_streams],
}
def load_state(settings: Settings) -> AppState:
path = settings.state_file
if not os.path.exists(path):
return AppState()
try:
with open(path, "r", encoding="utf-8") as f:
raw = json.load(f)
except Exception:
return AppState()
incoming_raw = raw.get("incoming_streams", [])
incoming_streams = []
for d in incoming_raw:
try:
incoming_streams.append(_incoming_from_dict(d))
except Exception:
continue
return AppState(incoming_streams=incoming_streams)
def save_state(state: AppState, settings: Settings) -> None:
path = settings.state_file
os.makedirs(os.path.dirname(path), exist_ok=True)
raw = {
"incoming_streams": [_incoming_to_dict(s) for s in state.incoming_streams]
}
tmp_path = f"{path}.tmp"
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(raw, f, ensure_ascii=False, indent=2)
os.replace(tmp_path, path)
def get_user_incoming_streams(state: AppState, user_id: int) -> list[IncomingStream]:
return [s for s in state.incoming_streams if s.user_id == user_id]
def get_incoming_stream_by_id(
state: AppState, stream_id: str
) -> Optional[IncomingStream]:
for s in state.incoming_streams:
if s.id == stream_id:
return s
return None
def count_running_incoming_streams(state: AppState) -> int:
return sum(1 for s in state.incoming_streams if s.status == StreamStatus.RUNNING)
def allocate_incoming_port(state: AppState, settings: Settings) -> Optional[int]:
if count_running_incoming_streams(state) >= settings.max_incoming_streams:
return None
start_port, end_port = settings.incoming_port_range
used_ports = {
s.local_port_in
for s in state.incoming_streams
if s.status == StreamStatus.RUNNING
}
for port in range(start_port, end_port + 1):
if port not in used_ports:
return port
return None
def allocate_outgoing_port(state: AppState, settings: Settings) -> Optional[int]:
start_port, end_port = settings.outgoing_port_range
used_ports = set()
for inc in state.incoming_streams:
for out in inc.outgoing_streams:
if out.status == StreamStatus.RUNNING:
used_ports.add(out.local_port_out)
for port in range(start_port, end_port + 1):
if port not in used_ports:
return port
return None app/core/ffmpeg_manager.pyimport os
import time
import subprocess
import signal
from typing import Optional
from .models import IncomingStream, OutgoingStream, StreamStatus
from .storage import AppState, save_state
from ..config import Settings
def _ensure_logs_dir(settings: Settings) -> None:
os.makedirs(settings.logs_dir, exist_ok=True)
def start_incoming_ffmpeg(
stream: IncomingStream,
settings: Settings,
state: AppState,
) -> None:
"""
ВХОДЯЩИЙ поток:
srt://0.0.0.0:<local_port_in>?mode=listener -> udp://127.0.0.1:<internal_port>
"""
_ensure_logs_dir(settings)
log_path = os.path.join(settings.logs_dir, f"in_{stream.id}.log")
srt_url_in = (
f"srt://{stream.remote_host_in}:{stream.remote_port_in}"
f"?mode=listener&transtype=live&latency={stream.latency_in}"
)
if stream.passphrase_in:
srt_url_in += f"&passphrase={stream.passphrase_in}"
output_url = f"udp://127.0.0.1:{stream.internal_port}"
cmd = [
"ffmpeg",
"-hide_banner",
"-stats",
"-loglevel",
"info",
"-fflags",
"+genpts",
"-i",
srt_url_in,
"-c",
"copy",
"-f",
"mpegts",
output_url,
]
log_file = open(log_path, "a", encoding="utf-8", errors="ignore")
proc = subprocess.Popen(
cmd,
stdout=log_file,
stderr=log_file,
stdin=subprocess.DEVNULL,
)
stream.pid = proc.pid
stream.log_path = log_path
stream.status = StreamStatus.RUNNING
stream.start_time = time.time()
stream.stop_time = None
save_state(state, settings)
def start_outgoing_ffmpeg(
incoming: IncomingStream,
outgoing: OutgoingStream,
settings: Settings,
state: AppState,
) -> None:
"""
ИСХОДЯЩИЙ поток:
udp://127.0.0.1:<internal_port> -> srt://0.0.0.0:<local_port_out>?mode=listener
"""
_ensure_logs_dir(settings)
log_path = os.path.join(settings.logs_dir, f"out_{outgoing.id}.log")
input_url = f"udp://127.0.0.1:{incoming.internal_port}"
srt_url_out = (
f"srt://{outgoing.remote_host_out}:{outgoing.remote_port_out}"
f"?mode=listener&transtype=live&latency={outgoing.latency_out}"
)
if outgoing.passphrase_out:
srt_url_out += f"&passphrase={outgoing.passphrase_out}"
cmd = [
"ffmpeg",
"-hide_banner",
"-stats",
"-loglevel",
"info",
"-fflags",
"+genpts",
"-i",
input_url,
"-c",
"copy",
"-f",
"mpegts",
srt_url_out,
]
log_file = open(log_path, "a", encoding="utf-8", errors="ignore")
proc = subprocess.Popen(
cmd,
stdout=log_file,
stderr=log_file,
stdin=subprocess.DEVNULL,
)
outgoing.pid = proc.pid
outgoing.log_path = log_path
outgoing.status = StreamStatus.RUNNING
outgoing.start_time = time.time()
outgoing.stop_time = None
save_state(state, settings)
def _kill_pid(pid: Optional[int]) -> None:
if not pid:
return
try:
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
return
except Exception:
return
def stop_incoming_stream(
incoming: IncomingStream,
settings: Settings,
state: AppState,
) -> None:
"""
Останавливает входящий поток и все связанные исходящие.
"""
for out in incoming.outgoing_streams:
if out.pid:
_kill_pid(out.pid)
out.status = StreamStatus.STOPPED
out.stop_time = time.time()
if incoming.pid:
_kill_pid(incoming.pid)
incoming.status = StreamStatus.STOPPED
incoming.stop_time = time.time()
save_state(state, settings) app/core/analyzer.pyimport os
import re
import time
from typing import Dict, Optional, List
def _parse_single_log(lines: List[str]) -> Dict[str, str]:
res: Dict[str, str] = {}
if not lines:
return res
# Разрешение (ищем сверху)
for line in lines:
m = re.search(r"(\\d{3,5})x(\\d{3,5})", line)
if m:
w, h = m.group(1), m.group(2)
res["resolution"] = f"{w}x{h}"
break
# Битрейт (ищем снизу)
for line in reversed(lines):
m = re.search(r"bitrate=\\s*([\\d\\.]+kbits/s)", line)
if m:
res["avg_bitrate"] = m.group(1)
break
# Dropped frames (если ffmpeg пишет drop=)
for line in reversed(lines):
m = re.search(r"drop=\\s*(\\d+)", line)
if m:
res["dropped_frames"] = m.group(1)
break
return res
def parse_ffmpeg_logs(log_paths: List[str]) -> Dict[str, str]:
result: Dict[str, str] = {}
last_bitrate: Optional[str] = None
first_resolution: Optional[str] = None
for path in log_paths:
if not path or not os.path.exists(path):
continue
try:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
except Exception:
continue
partial = _parse_single_log(lines)
if not first_resolution and "resolution" in partial:
first_resolution = partial["resolution"]
if "avg_bitrate" in partial:
last_bitrate = partial["avg_bitrate"]
if first_resolution:
result["resolution"] = first_resolution
if last_bitrate:
result["avg_bitrate"] = last_bitrate
return result
def parse_ffmpeg_log(log_path: Optional[str]) -> Dict[str, str]:
if not log_path:
return {}
return parse_ffmpeg_logs([log_path])
def parse_duration(start_time: Optional[float], stop_time: Optional[float]) -> str:
if not start_time:
return "нет данных"
end = stop_time or time.time()
seconds = int(end - start_time)
if seconds < 0:
return "нет данных"
hours = seconds // 3600
minutes = (seconds % 3600) // 60
secs = seconds % 60
parts = []
if hours:
parts.append(f"{hours} ч")
if minutes:
parts.append(f"{minutes} мин")
if not parts:
parts.append(f"{secs} с")
return " ".join(parts) app/bot/handlers.pyfrom aiogram import Router, F
from aiogram.filters import CommandStart
from aiogram.types import (
Message,
CallbackQuery,
InlineKeyboardMarkup,
InlineKeyboardButton,
)
from aiogram.fsm.state import StatesGroup, State
from aiogram.fsm.context import FSMContext
from ..config import Settings
from ..core import storage
from ..core.models import (
create_incoming_stream,
create_outgoing_stream,
StreamStatus,
)
from ..core.ffmpeg_manager import (
start_incoming_ffmpeg,
start_outgoing_ffmpeg,
stop_incoming_stream,
)
from ..core.analyzer import (
parse_ffmpeg_logs,
parse_duration,
)
from . import messages
from .keyboards import (
main_menu_keyboard,
incoming_list_inline_keyboard,
yes_no_keyboard,
)
router = Router()
class IncomingCreation(StatesGroup):
waiting_for_name = State()
waiting_for_passphrase_needed = State()
waiting_for_passphrase = State()
class OutgoingCreation(StatesGroup):
waiting_for_incoming_selection = State()
waiting_for_passphrase_needed = State()
waiting_for_passphrase = State()
class ManageFlow(StatesGroup):
waiting_for_incoming_selection = State()
async def notify_admin(bot, settings: Settings, text: str):
if settings.admin_user_id:
try:
await bot.send_message(settings.admin_user_id, text)
except Exception:
pass
@router.message(CommandStart())
async def cmd_start(message: Message, state: FSMContext, settings: Settings):
bot = message.bot
chat_id = message.chat.id
current_id = message.message_id
for msg_id in range(current_id - 1, max(current_id - 50, 0), -1):
try:
await bot.delete_message(chat_id, msg_id)
except Exception:
continue
await state.clear()
await message.answer(
messages.START_MESSAGE,
reply_markup=main_menu_keyboard(),
)
await message.answer(messages.MAIN_MENU_PROMPT, reply_markup=main_menu_keyboard())
@router.message(F.text == messages.BTN_CREATE_INCOMING)
async def handle_create_incoming(message: Message, state: FSMContext):
await state.set_state(IncomingCreation.waiting_for_name)
await message.answer(messages.ASK_INCOMING_NAME, reply_markup=main_menu_keyboard())
@router.message(F.text == messages.BTN_LIST_MY_STREAMS)
async def handle_stream_menu(message: Message, state: FSMContext, settings: Settings):
state_obj = storage.load_state(settings)
user_id = message.from_user.id
if settings.admin_user_id and user_id == settings.admin_user_id:
streams_all = state_obj.incoming_streams
else:
streams_all = storage.get_user_incoming_streams(state_obj, user_id)
user_streams = [s for s in streams_all if s.status == StreamStatus.RUNNING]
if not user_streams:
await message.answer(
"Нет активных потоков.",
reply_markup=main_menu_keyboard(),
)
return
kb = incoming_list_inline_keyboard(
user_streams,
current_user_id=user_id,
admin_user_id=settings.admin_user_id,
)
await state.set_state(ManageFlow.waiting_for_incoming_selection)
await message.answer("Выберите поток для управления:", reply_markup=kb)
@router.message(F.text == messages.BTN_ADD_OUTGOING)
async def handle_add_outgoing(message: Message, state: FSMContext, settings: Settings):
state_obj = storage.load_state(settings)
user_id = message.from_user.id
if settings.admin_user_id and user_id == settings.admin_user_id:
streams_all = state_obj.incoming_streams
else:
streams_all = storage.get_user_incoming_streams(state_obj, user_id)
user_streams = [s for s in streams_all if s.status == StreamStatus.RUNNING]
if not user_streams:
await message.answer(
"Нет активных входящих потоков.",
reply_markup=main_menu_keyboard(),
)
return
await state.set_state(OutgoingCreation.waiting_for_incoming_selection)
kb = incoming_list_inline_keyboard(
user_streams,
current_user_id=user_id,
admin_user_id=settings.admin_user_id,
)
await message.answer(messages.ASK_SELECT_INCOMING_FOR_OUTGOING, reply_markup=kb)
@router.message(IncomingCreation.waiting_for_name)
async def incoming_name(message: Message, state: FSMContext):
await state.update_data(name=message.text.strip())
await state.set_state(IncomingCreation.waiting_for_passphrase_needed)
await message.answer(
messages.ASK_PASSPHRASE_NEEDED,
reply_markup=yes_no_keyboard(),
)
@router.message(IncomingCreation.waiting_for_passphrase_needed)
async def incoming_passphrase_needed(
message: Message, state: FSMContext, settings: Settings
):
answer = (message.text or "").strip().lower()
if answer in ("да", "yes", "y"):
await state.set_state(IncomingCreation.waiting_for_passphrase)
await message.answer(
messages.ASK_PASSPHRASE_IN,
reply_markup=main_menu_keyboard(),
)
return
if answer in ("нет", "no", "n"):
data = await state.get_data()
await finalize_incoming_creation(
message, state, settings, data, passphrase=None
)
return
await message.answer(
messages.INVALID_INPUT,
reply_markup=yes_no_keyboard(),
)
@router.message(IncomingCreation.waiting_for_passphrase)
async def incoming_passphrase(
message: Message, state: FSMContext, settings: Settings
):
passphrase = (message.text or "").strip()
if len(passphrase) < 10:
await message.answer(messages.PASSPHRASE_TOO_SHORT)
return
data = await state.get_data()
await finalize_incoming_creation(
message, state, settings, data, passphrase=passphrase
)
async def finalize_incoming_creation(
message: Message,
state: FSMContext,
settings: Settings,
data: dict,
passphrase: str | None,
):
try:
state_obj = storage.load_state(settings)
if (
storage.count_running_incoming_streams(state_obj)
>= settings.max_incoming_streams
):
await state.clear()
await message.answer(
messages.INCOMING_LIMIT_REACHED,
reply_markup=main_menu_keyboard(),
)
return
port = storage.allocate_incoming_port(state_obj, settings)
if port is None:
await state.clear()
await message.answer(
"Нет свободных входящих портов в заданном диапазоне.",
reply_markup=main_menu_keyboard(),
)
return
stream = create_incoming_stream(
user_id=message.from_user.id,
name=data["name"],
local_port_in=port,
remote_host_in="0.0.0.0",
remote_port_in=port,
passphrase_in=passphrase,
latency=200,
)
state_obj.incoming_streams.append(stream)
try:
start_incoming_ffmpeg(stream, settings, state_obj)
except Exception as e_ff:
await state.clear()
await message.answer(
f"{messages.INCOMING_CREATE_ERROR}\n\nДетали: {e_ff}",
reply_markup=main_menu_keyboard(),
)
return
storage.save_state(state_obj, settings)
await state.clear()
base_url = f"srt://{settings.server_public_ip}:{stream.local_port_in}"
if passphrase:
srt_url = f"{base_url}?passphrase={passphrase}"
else:
srt_url = base_url
details = (
f"Название: {stream.name}\n"
f"{srt_url}\n"
f"Статус: {stream.status.value}"
)
await message.answer(
messages.INCOMING_CREATED_OK.format(details=details),
reply_markup=main_menu_keyboard(),
)
user = message.from_user
admin_text = (
"Новый ВХОДЯЩИЙ поток\n"
f"Пользователь: {user.id} (@{user.username or 'нет username'})\n"
f"Название: {stream.name}\n"
f"Порт: {stream.local_port_in}\n"
f"URL: {srt_url}"
)
await notify_admin(message.bot, settings, admin_text)
except Exception as e:
await state.clear()
await message.answer(
f"Внутренняя ошибка при создании входящего потока:\n{e}",
reply_markup=main_menu_keyboard(),
)
@router.callback_query(
OutgoingCreation.waiting_for_incoming_selection, F.data.startswith("incoming:")
)
async def outgoing_select_incoming(
callback: CallbackQuery, state: FSMContext, settings: Settings
):
incoming_id = callback.data.split(":", 1)[1]
await state.update_data(incoming_id=incoming_id)
await state.set_state(OutgoingCreation.waiting_for_passphrase_needed)
await callback.message.answer(
messages.ASK_PASSPHRASE_OUT_NEEDED,
reply_markup=yes_no_keyboard(),
)
await callback.answer()
@router.message(OutgoingCreation.waiting_for_passphrase_needed)
async def outgoing_passphrase_needed(
message: Message, state: FSMContext, settings: Settings
):
answer = (message.text or "").strip().lower()
if answer in ("да", "yes", "y"):
await state.set_state(OutgoingCreation.waiting_for_passphrase)
await message.answer(
messages.ASK_PASSPHRASE_OUT,
reply_markup=main_menu_keyboard(),
)
return
if answer in ("нет", "no", "n"):
data = await state.get_data()
await finalize_outgoing_creation(
message, state, settings, data, passphrase=None
)
return
await message.answer(
messages.INVALID_INPUT,
reply_markup=yes_no_keyboard(),
)
@router.message(OutgoingCreation.waiting_for_passphrase)
async def outgoing_passphrase(
message: Message, state: FSMContext, settings: Settings
):
passphrase = (message.text or "").strip()
if len(passphrase) < 10:
await message.answer(messages.PASSPHRASE_TOO_SHORT)
return
data = await state.get_data()
await finalize_outgoing_creation(
message, state, settings, data, passphrase=passphrase
)
async def finalize_outgoing_creation(
message: Message,
state: FSMContext,
settings: Settings,
data: dict,
passphrase: str | None,
):
try:
state_obj = storage.load_state(settings)
incoming = storage.get_incoming_stream_by_id(state_obj, data["incoming_id"])
if not incoming:
await state.clear()
await message.answer(
messages.NO_INCOMING_SELECTED,
reply_markup=main_menu_keyboard(),
)
return
port_out = storage.allocate_outgoing_port(state_obj, settings)
if port_out is None:
await state.clear()
await message.answer(
"Нет свободных исходящих портов в заданном диапазоне.",
reply_markup=main_menu_keyboard(),
)
return
outgoing = create_outgoing_stream(
user_id=message.from_user.id,
local_port_out=port_out,
remote_host_out="0.0.0.0",
remote_port_out=port_out,
passphrase_out=passphrase,
latency=200,
)
incoming.outgoing_streams.append(outgoing)
try:
start_outgoing_ffmpeg(incoming, outgoing, settings, state_obj)
except Exception as e_ff:
await state.clear()
await message.answer(
f"{messages.OUTGOING_CREATE_ERROR}\n\nДетали: {e_ff}",
reply_markup=main_menu_keyboard(),
)
return
storage.save_state(state_obj, settings)
await state.clear()
base_url = f"srt://{settings.server_public_ip}:{outgoing.local_port_out}"
if passphrase:
srt_url = f"{base_url}?passphrase={passphrase}"
else:
srt_url = base_url
details = (
f"Название входящего потока: {incoming.name}\n"
f"{srt_url}\n"
f"Статус: {outgoing.status.value}"
)
await message.answer(
messages.OUTGOING_CREATED_OK.format(details=details),
reply_markup=main_menu_keyboard(),
)
user = message.from_user
admin_text = (
"Новый ИСХОДЯЩИЙ поток\n"
f"Пользователь: {user.id} (@{user.username or 'нет username'})\n"
f"От входящего: {incoming.name} (порт {incoming.local_port_in})\n"
f"Порт исходящего: {outgoing.local_port_out}\n"
f"URL: {srt_url}"
)
await notify_admin(message.bot, settings, admin_text)
except Exception as e:
await state.clear()
await message.answer(
f"Внутренняя ошибка при создании исходящего потока:\n{e}",
reply_markup=main_menu_keyboard(),
)
@router.callback_query(
ManageFlow.waiting_for_incoming_selection, F.data.startswith("incoming:")
)
async def manage_incoming_callback(
callback: CallbackQuery, state: FSMContext, settings: Settings
):
stream_id = callback.data.split(":", 1)[1]
state_obj = storage.load_state(settings)
incoming = storage.get_incoming_stream_by_id(state_obj, stream_id)
if not incoming:
await callback.answer("Поток не найден.", show_alert=True)
return
kb = InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(
text="ℹ️ Информация",
callback_data=f"action:info:{stream_id}",
),
InlineKeyboardButton(
text="???? Удалить поток",
callback_data=f"action:delete:{stream_id}",
),
]
]
)
await callback.message.answer(
f"Выбран поток: {incoming.name} (ID: {incoming.local_port_in})",
reply_markup=kb,
)
await callback.answer()
await state.clear()
@router.callback_query(F.data.startswith("action:"))
async def manage_action_callback(callback: CallbackQuery, settings: Settings):
parts = callback.data.split(":", 2)
if len(parts) != 3:
await callback.answer("Некорректное действие.", show_alert=True)
return
action, stream_id = parts[1], parts[2]
user_id = callback.from_user.id
state_obj = storage.load_state(settings)
incoming = storage.get_incoming_stream_by_id(state_obj, stream_id)
if not incoming:
await callback.answer("Поток не найден.", show_alert=True)
return
is_admin = settings.admin_user_id and (user_id == settings.admin_user_id)
is_owner = incoming.user_id == user_id
if action == "info":
log_paths = []
if incoming.log_path:
log_paths.append(incoming.log_path)
for o in incoming.outgoing_streams:
if o.log_path:
log_paths.append(o.log_path)
stats = parse_ffmpeg_logs(log_paths) if log_paths else {}
duration = parse_duration(incoming.start_time, incoming.stop_time)
base_in = f"srt://{settings.server_public_ip}:{incoming.local_port_in}"
if incoming.passphrase_in:
srt_in_url = f"{base_in}?passphrase={incoming.passphrase_in}"
else:
srt_in_url = base_in
owner_line = f"Владелец (user_id): {incoming.user_id}"
lines = [
f"Название: {incoming.name}",
owner_line,
f"in: {srt_in_url}",
f"Статус: {incoming.status.value}",
f"Время работы: {duration}",
f"Битрейт: {stats.get('avg_bitrate') or 'нет данных'}",
f"Разрешение: {stats.get('resolution') or 'нет данных'}",
"",
"Исходящие потоки:",
]
if not incoming.outgoing_streams:
lines.append("– нет исходящих потоков")
else:
for idx, o in enumerate(incoming.outgoing_streams, start=1):
base_out = f"srt://{settings.server_public_ip}:{o.local_port_out}"
if o.passphrase_out:
srt_out_url = f"{base_out}?passphrase={o.passphrase_out}"
else:
srt_out_url = base_out
lines.append(
f"- Исходящий {idx}: {srt_out_url}, статус: {o.status.value}"
)
await callback.message.answer(
messages.INFO_HEADER.format(details="\n".join(lines)),
reply_markup=main_menu_keyboard(),
)
await callback.answer()
return
if action == "delete":
if not (is_admin or is_owner):
await callback.answer(
"У вас нет прав управлять этим потоком.",
show_alert=True,
)
return
from ..core.ffmpeg_manager import stop_incoming_stream
stop_incoming_stream(incoming, settings, state_obj)
state_obj.incoming_streams = [
s for s in state_obj.incoming_streams if s.id != incoming.id
]
storage.save_state(state_obj, settings)
await callback.message.answer(
messages.DELETE_OK,
reply_markup=main_menu_keyboard(),
)
await callback.answer()
admin_text = (
"Поток УДАЛЁН\n"
f"Инициатор: {user_id}\n"
f"ID потока: {incoming.id}\n"
f"Название: {incoming.name}\n"
f"Порт: {incoming.local_port_in}\n"
f"Владелец (user_id): {incoming.user_id}"
)
await notify_admin(callback.message.bot, settings, admin_text)
return Создай файл /etc/systemd/system/srt-bot.service:
[Unit]
Description=SRT Telegram Bot Service
After=network.target
[Service]
Type=simple
WorkingDirectory=/opt/srt-bot
ExecStart=/opt/srt-bot/venv/bin/python -m app.main
Restart=always
RestartSec=3
# При желании можно запускать под отдельным пользователем:
# User=srtbot
# Group=srtbot
[Install]
WantedBy=multi-user.target Далее:
sudo systemctl daemon-reload
sudo systemctl enable srt-bot.service
sudo systemctl start srt-bot.service
sudo systemctl status srt-bot.service --no-pager Логи сервиса:
sudo journalctl -u srt-bot.service -f /start.⬆️ add input — создать входящий поток.⬇️ add output — добавить исходящий поток к входящему.Меню потоков — список активных потоков:
ADMIN_USER_ID) видит все.⬆️ add input.Входящий поток создан и запущен.
Название: ZAL1
srt://ru.prostudio.net:5000
Статус: running
Этот URL — то, что забивается в vMix / Larix / кодер на площадке (режим caller).
⬇️ add output.Исходящий поток создан и запущен.
Название входящего потока: ZAL1
srt://ru.prostudio.net:7000
Статус: running
Этот URL — куда подключается принимающий vMix / пульт / сервер (режим caller).
Меню потоков.in: srt://ru.prostudio.net:порт (+ passphrase, если есть);Достигнуто максимальное количество входящих потоков (20)... ADMIN_USER_ID):