Skip to content

直播源推流到公共服务器上进行直播

后台驻留的推流的命令:

nohup ffmpeg -re -stream_loop -1 -i http://zh.527200.xyz/proxy/tvbxw.php?id=2 -c:v copy -c:a aac -b:a 128k -f flv -y -reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 rtmp://ali.push.yximgs.com/live/自定义内容 &

不后台驻留的命令:

ffmpeg -re -stream_loop -1 -i http://zh.527200.xyz/proxy/tvbxw.php?id=2 -c:v copy -c:a aac -b:a 128k -f flv -y -reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 rtmp://ali.push.yximgs.com/live/自定义内容

把第一个直播源地址换成你要推流的直播地址,后面的rtmp://ali.push.yximgs.com/live/是推流的公共服务器

==========================================================================================================

Win版多任务推流工具

在系统上安装好FFmpeg和python,把下面的代码保存为:自定义名称.py再运行就可以了

python
import tkinter as tk
from tkinter import ttk, messagebox, StringVar
import subprocess
import threading
import json
import os
from collections import OrderedDict


class StreamingTask:
    def __init__(self, name, source_url, server_url, stream_key):
        self.name = name
        self.source_url = source_url
        self.server_url = server_url
        self.stream_key = stream_key
        self.status = "等待中"
        self.process = None
        self.flv_url = f"{server_url}/{stream_key}.flv"


class IPTVStreamerApp:
    def __init__(self, root):
        self.root = root
        self.root.title("IPTV多任务推流工具")
        self.root.geometry("1050x700")

        # 初始化配置
        self.tasks = OrderedDict()
        self.current_task_id = 0
        self.config_file = "config.json"
        self.config = {"server_url": "rtmp://ali.push.yximgs.com/live"}

        self.load_config()
        self.create_widgets()
        self.check_ffmpeg()

    def create_widgets(self):
        # 左侧面板
        left_panel = ttk.Frame(self.root)
        left_panel.pack(side="left", fill="both", expand=True, padx=10, pady=10)

        # 配置表单
        config_frame = ttk.LabelFrame(left_panel, text="任务配置")
        config_frame.pack(fill="x", pady=5)

        fields = [
            ("任务名称:", "task_name"),
            ("直播源地址:", "source_url"),
            ("服务器地址:", "server_url"),
            ("流密钥:", "stream_key")
        ]

        for row, (label, attr) in enumerate(fields):
            ttk.Label(config_frame, text=label).grid(row=row, column=0, sticky="w", padx=5, pady=2)
            entry = ttk.Entry(config_frame, width=30)
            entry.grid(row=row, column=1, padx=5, pady=2)
            setattr(self, attr, entry)

        self.server_url.insert(0, self.config["server_url"])
        ttk.Button(config_frame, text="添加任务", command=self.add_task).grid(row=4, columnspan=2, pady=10)

        # 日志窗口
        log_frame = ttk.LabelFrame(left_panel, text="运行日志")
        log_frame.pack(fill="both", expand=True, pady=5)

        self.log_text = tk.Text(log_frame, wrap=tk.WORD, state="disabled")
        scrollbar = ttk.Scrollbar(log_frame, command=self.log_text.yview)
        self.log_text.configure(yscrollcommand=scrollbar.set)
        scrollbar.pack(side="right", fill="y")
        self.log_text.pack(side="left", fill="both", expand=True)

        # 右侧任务列表
        right_panel = ttk.Frame(self.root)
        right_panel.pack(side="right", fill="both", expand=True, padx=10, pady=10)

        task_list_frame = ttk.LabelFrame(right_panel, text="任务列表")
        task_list_frame.pack(fill="both", expand=True)

        # 滚动区域
        self.task_list_canvas = tk.Canvas(task_list_frame)
        scrollbar = ttk.Scrollbar(task_list_frame, orient="vertical", command=self.task_list_canvas.yview)
        self.task_list_container = ttk.Frame(self.task_list_canvas)

        self.task_list_canvas.configure(yscrollcommand=scrollbar.set)
        self.task_list_canvas.pack(side="left", fill="both", expand=True)
        scrollbar.pack(side="right", fill="y")
        self.task_list_canvas.create_window((0, 0), window=self.task_list_container, anchor="nw")
        self.task_list_container.bind("<Configure>", lambda e: self.task_list_canvas.configure(
            scrollregion=self.task_list_canvas.bbox("all")
        ))

        self.update_task_list()

    def log_message(self, message):  # 修复缩进的核心方法
        self.log_text.config(state="normal")
        self.log_text.insert("end", message + "\n")
        self.log_text.see("end")
        self.log_text.config(state="disabled")

    def add_task(self):
        name = self.task_name.get()
        source = self.source_url.get()
        server = self.server_url.get()
        key = self.stream_key.get()

        if not all([name, source, server, key]):
            messagebox.showwarning("警告", "请填写所有配置字段")
            return

        task_id = self.current_task_id
        self.tasks[task_id] = StreamingTask(name, source, server, key)
        self.current_task_id += 1
        self.update_task_list()
        self.save_config()
        self.log_message(f"已添加任务: {name}")

    def update_task_list(self):
        for widget in self.task_list_container.winfo_children():
            widget.destroy()

        for task_id, task in self.tasks.items():
            frame = ttk.Frame(self.task_list_container)
            frame.pack(fill="x", pady=2)

            ttk.Label(frame, text=task.name, width=15).pack(side="left")
            status_var = StringVar(value=task.status)
            ttk.Label(frame, textvariable=status_var, width=10).pack(side="left")

            ttk.Button(frame, text="编辑", width=5,
                       command=lambda t=task_id: self.edit_task(t)).pack(side="left", padx=2)
            ttk.Button(frame, text="开始", width=5,
                       command=lambda t=task_id: self.start_task(t)).pack(side="left", padx=2)
            ttk.Button(frame, text="停止", width=5,
                       command=lambda t=task_id: self.stop_task(t)).pack(side="left", padx=2)
            ttk.Button(frame, text="删除", width=5,
                       command=lambda t=task_id: self.delete_task(t)).pack(side="left", padx=2)

            task.status_var = status_var

    def delete_task(self, task_id):
        task = self.tasks[task_id]
        if task.status == "运行中":
            self.stop_task(task_id)
        del self.tasks[task_id]
        self.update_task_list()
        self.save_config()
        self.log_message(f"已删除任务: {task.name}")

    def start_task(self, task_id):
        task = self.tasks[task_id]
        if task.status == "运行中":
            return

        task.status = "启动中"
        task.status_var.set(task.status)
        self.log_message(f"正在启动任务: {task.name}")

        command = [
            "ffmpeg",
            "-i", task.source_url,
            "-c", "copy",
            "-f", "flv",
            f"{task.server_url}/{task.stream_key}"
        ]

        threading.Thread(target=self.run_ffmpeg, args=(task, command), daemon=True).start()

    def run_ffmpeg(self, task, command):
        try:
            task.process = subprocess.Popen(command,
                                            stdout=subprocess.PIPE,
                                            stderr=subprocess.STDOUT,
                                            universal_newlines=True)
            task.status = "运行中"
            task.status_var.set(task.status)
            self.log_message(f"任务已启动: {task.name}")

            while True:
                output = task.process.stdout.readline()
                if output == '' and task.process.poll() is not None:
                    break
                if output:
                    self.log_message(f"[{task.name}] {output.strip()}")

        except Exception as e:
            error_msg = f"任务错误: {task.name} - {str(e)}"
            task.status = error_msg
            task.status_var.set(error_msg)
            self.log_message(error_msg)
        finally:
            if task.process and task.process.poll() is None:
                task.process.terminate()
            task.status = "已停止"
            task.status_var.set(task.status)
            self.log_message(f"任务已停止: {task.name}")

    def stop_task(self, task_id):
        task = self.tasks[task_id]
        if task.process and task.process.poll() is None:
            task.process.terminate()
            task.status = "已停止"
            task.status_var.set(task.status)

    def edit_task(self, task_id):
        task = self.tasks[task_id]

        edit_win = tk.Toplevel(self.root)
        edit_win.title("编辑任务")

        fields = [
            ("任务名称:", task.name),
            ("直播源地址:", task.source_url),
            ("服务器地址:", task.server_url),
            ("流密钥:", task.stream_key)
        ]

        entries = []
        for row, (label, value) in enumerate(fields):
            ttk.Label(edit_win, text=label).grid(row=row, column=0, padx=5, pady=2)
            entry = ttk.Entry(edit_win, width=30)
            entry.insert(0, value)
            entry.grid(row=row, column=1, padx=5, pady=2)
            entries.append(entry)

        def save_changes():
            new_name = entries[0].get()
            new_source = entries[1].get()
            new_server = entries[2].get()
            new_key = entries[3].get()

            if not all([new_name, new_source, new_server, new_key]):
                messagebox.showwarning("警告", "所有字段必须填写")
                return

            task.name = new_name
            task.source_url = new_source
            task.server_url = new_server
            task.stream_key = new_key
            task.flv_url = f"{new_server}/{new_key}.flv"

            if task.status == "运行中":
                self.stop_task(task_id)
                self.start_task(task_id)

            self.update_task_list()
            self.save_config()
            edit_win.destroy()

        ttk.Button(edit_win, text="保存修改", command=save_changes).grid(row=4, columnspan=2, pady=5)

    def load_config(self):
        try:
            if os.path.exists(self.config_file):
                with open(self.config_file, 'r') as f:
                    data = json.load(f)
                    self.config.update(data)
                    for task_data in data.get('tasks', []):
                        task = StreamingTask(
                            task_data['name'],
                            task_data['source_url'],
                            task_data['server_url'],
                            task_data['stream_key']
                        )
                        self.tasks[self.current_task_id] = task
                        self.current_task_id += 1
        except Exception as e:
            print(f"配置加载错误: {str(e)}")

    def save_config(self):
        config_data = {
            "server_url": self.server_url.get(),
            "tasks": [{
                'name': task.name,
                'source_url': task.source_url,
                'server_url': task.server_url,
                'stream_key': task.stream_key
            } for task in self.tasks.values()]
        }
        with open(self.config_file, 'w') as f:
            json.dump(config_data, f, indent=2)

    def check_ffmpeg(self):
        try:
            subprocess.run(["ffmpeg", "-version"],
                           stdout=subprocess.DEVNULL,
                           stderr=subprocess.DEVNULL)
        except FileNotFoundError:
            messagebox.showerror("错误", "未找到FFmpeg,请先安装并添加到系统路径")
            self.root.destroy()


if __name__ == "__main__":
    root = tk.Tk()
    app = IPTVStreamerApp(root)
    root.mainloop()

上次更新于: