直播源推流到公共服务器上进行直播
后台驻留的推流的命令:
nohup ffmpeg -re -stream_loop -1 -i http://zh.527200.xyz/proxy/tvbxw.php?id=2 -c:v copy -c:a aac -b:a 128k -f flv -y -reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 rtmp://ali.push.yximgs.com/live/自定义内容 &不后台驻留的命令:
ffmpeg -re -stream_loop -1 -i http://zh.527200.xyz/proxy/tvbxw.php?id=2 -c:v copy -c:a aac -b:a 128k -f flv -y -reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 rtmp://ali.push.yximgs.com/live/自定义内容把第一个直播源地址换成你要推流的直播地址,后面的rtmp://ali.push.yximgs.com/live/是推流的公共服务器
==========================================================================================================
Win版多任务推流工具
在系统上安装好FFmpeg和python,把下面的代码保存为:自定义名称.py再运行就可以了
python
import tkinter as tk
from tkinter import ttk, messagebox, StringVar
import subprocess
import threading
import json
import os
from collections import OrderedDict
class StreamingTask:
def __init__(self, name, source_url, server_url, stream_key):
self.name = name
self.source_url = source_url
self.server_url = server_url
self.stream_key = stream_key
self.status = "等待中"
self.process = None
self.flv_url = f"{server_url}/{stream_key}.flv"
class IPTVStreamerApp:
def __init__(self, root):
self.root = root
self.root.title("IPTV多任务推流工具")
self.root.geometry("1050x700")
# 初始化配置
self.tasks = OrderedDict()
self.current_task_id = 0
self.config_file = "config.json"
self.config = {"server_url": "rtmp://ali.push.yximgs.com/live"}
self.load_config()
self.create_widgets()
self.check_ffmpeg()
def create_widgets(self):
# 左侧面板
left_panel = ttk.Frame(self.root)
left_panel.pack(side="left", fill="both", expand=True, padx=10, pady=10)
# 配置表单
config_frame = ttk.LabelFrame(left_panel, text="任务配置")
config_frame.pack(fill="x", pady=5)
fields = [
("任务名称:", "task_name"),
("直播源地址:", "source_url"),
("服务器地址:", "server_url"),
("流密钥:", "stream_key")
]
for row, (label, attr) in enumerate(fields):
ttk.Label(config_frame, text=label).grid(row=row, column=0, sticky="w", padx=5, pady=2)
entry = ttk.Entry(config_frame, width=30)
entry.grid(row=row, column=1, padx=5, pady=2)
setattr(self, attr, entry)
self.server_url.insert(0, self.config["server_url"])
ttk.Button(config_frame, text="添加任务", command=self.add_task).grid(row=4, columnspan=2, pady=10)
# 日志窗口
log_frame = ttk.LabelFrame(left_panel, text="运行日志")
log_frame.pack(fill="both", expand=True, pady=5)
self.log_text = tk.Text(log_frame, wrap=tk.WORD, state="disabled")
scrollbar = ttk.Scrollbar(log_frame, command=self.log_text.yview)
self.log_text.configure(yscrollcommand=scrollbar.set)
scrollbar.pack(side="right", fill="y")
self.log_text.pack(side="left", fill="both", expand=True)
# 右侧任务列表
right_panel = ttk.Frame(self.root)
right_panel.pack(side="right", fill="both", expand=True, padx=10, pady=10)
task_list_frame = ttk.LabelFrame(right_panel, text="任务列表")
task_list_frame.pack(fill="both", expand=True)
# 滚动区域
self.task_list_canvas = tk.Canvas(task_list_frame)
scrollbar = ttk.Scrollbar(task_list_frame, orient="vertical", command=self.task_list_canvas.yview)
self.task_list_container = ttk.Frame(self.task_list_canvas)
self.task_list_canvas.configure(yscrollcommand=scrollbar.set)
self.task_list_canvas.pack(side="left", fill="both", expand=True)
scrollbar.pack(side="right", fill="y")
self.task_list_canvas.create_window((0, 0), window=self.task_list_container, anchor="nw")
self.task_list_container.bind("<Configure>", lambda e: self.task_list_canvas.configure(
scrollregion=self.task_list_canvas.bbox("all")
))
self.update_task_list()
def log_message(self, message): # 修复缩进的核心方法
self.log_text.config(state="normal")
self.log_text.insert("end", message + "\n")
self.log_text.see("end")
self.log_text.config(state="disabled")
def add_task(self):
name = self.task_name.get()
source = self.source_url.get()
server = self.server_url.get()
key = self.stream_key.get()
if not all([name, source, server, key]):
messagebox.showwarning("警告", "请填写所有配置字段")
return
task_id = self.current_task_id
self.tasks[task_id] = StreamingTask(name, source, server, key)
self.current_task_id += 1
self.update_task_list()
self.save_config()
self.log_message(f"已添加任务: {name}")
def update_task_list(self):
for widget in self.task_list_container.winfo_children():
widget.destroy()
for task_id, task in self.tasks.items():
frame = ttk.Frame(self.task_list_container)
frame.pack(fill="x", pady=2)
ttk.Label(frame, text=task.name, width=15).pack(side="left")
status_var = StringVar(value=task.status)
ttk.Label(frame, textvariable=status_var, width=10).pack(side="left")
ttk.Button(frame, text="编辑", width=5,
command=lambda t=task_id: self.edit_task(t)).pack(side="left", padx=2)
ttk.Button(frame, text="开始", width=5,
command=lambda t=task_id: self.start_task(t)).pack(side="left", padx=2)
ttk.Button(frame, text="停止", width=5,
command=lambda t=task_id: self.stop_task(t)).pack(side="left", padx=2)
ttk.Button(frame, text="删除", width=5,
command=lambda t=task_id: self.delete_task(t)).pack(side="left", padx=2)
task.status_var = status_var
def delete_task(self, task_id):
task = self.tasks[task_id]
if task.status == "运行中":
self.stop_task(task_id)
del self.tasks[task_id]
self.update_task_list()
self.save_config()
self.log_message(f"已删除任务: {task.name}")
def start_task(self, task_id):
task = self.tasks[task_id]
if task.status == "运行中":
return
task.status = "启动中"
task.status_var.set(task.status)
self.log_message(f"正在启动任务: {task.name}")
command = [
"ffmpeg",
"-i", task.source_url,
"-c", "copy",
"-f", "flv",
f"{task.server_url}/{task.stream_key}"
]
threading.Thread(target=self.run_ffmpeg, args=(task, command), daemon=True).start()
def run_ffmpeg(self, task, command):
try:
task.process = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True)
task.status = "运行中"
task.status_var.set(task.status)
self.log_message(f"任务已启动: {task.name}")
while True:
output = task.process.stdout.readline()
if output == '' and task.process.poll() is not None:
break
if output:
self.log_message(f"[{task.name}] {output.strip()}")
except Exception as e:
error_msg = f"任务错误: {task.name} - {str(e)}"
task.status = error_msg
task.status_var.set(error_msg)
self.log_message(error_msg)
finally:
if task.process and task.process.poll() is None:
task.process.terminate()
task.status = "已停止"
task.status_var.set(task.status)
self.log_message(f"任务已停止: {task.name}")
def stop_task(self, task_id):
task = self.tasks[task_id]
if task.process and task.process.poll() is None:
task.process.terminate()
task.status = "已停止"
task.status_var.set(task.status)
def edit_task(self, task_id):
task = self.tasks[task_id]
edit_win = tk.Toplevel(self.root)
edit_win.title("编辑任务")
fields = [
("任务名称:", task.name),
("直播源地址:", task.source_url),
("服务器地址:", task.server_url),
("流密钥:", task.stream_key)
]
entries = []
for row, (label, value) in enumerate(fields):
ttk.Label(edit_win, text=label).grid(row=row, column=0, padx=5, pady=2)
entry = ttk.Entry(edit_win, width=30)
entry.insert(0, value)
entry.grid(row=row, column=1, padx=5, pady=2)
entries.append(entry)
def save_changes():
new_name = entries[0].get()
new_source = entries[1].get()
new_server = entries[2].get()
new_key = entries[3].get()
if not all([new_name, new_source, new_server, new_key]):
messagebox.showwarning("警告", "所有字段必须填写")
return
task.name = new_name
task.source_url = new_source
task.server_url = new_server
task.stream_key = new_key
task.flv_url = f"{new_server}/{new_key}.flv"
if task.status == "运行中":
self.stop_task(task_id)
self.start_task(task_id)
self.update_task_list()
self.save_config()
edit_win.destroy()
ttk.Button(edit_win, text="保存修改", command=save_changes).grid(row=4, columnspan=2, pady=5)
def load_config(self):
try:
if os.path.exists(self.config_file):
with open(self.config_file, 'r') as f:
data = json.load(f)
self.config.update(data)
for task_data in data.get('tasks', []):
task = StreamingTask(
task_data['name'],
task_data['source_url'],
task_data['server_url'],
task_data['stream_key']
)
self.tasks[self.current_task_id] = task
self.current_task_id += 1
except Exception as e:
print(f"配置加载错误: {str(e)}")
def save_config(self):
config_data = {
"server_url": self.server_url.get(),
"tasks": [{
'name': task.name,
'source_url': task.source_url,
'server_url': task.server_url,
'stream_key': task.stream_key
} for task in self.tasks.values()]
}
with open(self.config_file, 'w') as f:
json.dump(config_data, f, indent=2)
def check_ffmpeg(self):
try:
subprocess.run(["ffmpeg", "-version"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
except FileNotFoundError:
messagebox.showerror("错误", "未找到FFmpeg,请先安装并添加到系统路径")
self.root.destroy()
if __name__ == "__main__":
root = tk.Tk()
app = IPTVStreamerApp(root)
root.mainloop()