telegram-forward-videos

[up主专用,视频内嵌代码贴在这]

telegram获取账户加入的群组和频道用户名,ID

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import os
import csv
from telethon import TelegramClient
from telethon.errors import SessionPasswordNeededError
from telethon.tl.types import Channel

# === Telegram 登录信息 ===
api_id = xx
api_hash = 'xx'
PHONE_NUMBER = '+xx'
TWO_STEP_PASSWORD = 'xx'

# === 初始化 Telegram Client ===
client = TelegramClient('user_session', api_id, api_hash)

CSV_FILE = "channels_list.csv"


async def login():
"""登录 Telegram"""
await client.connect()
if not await client.is_user_authorized():
print(f'🔐 使用手机号登录: {PHONE_NUMBER}')
await client.send_code_request(PHONE_NUMBER)
code = input('请输入收到的验证码:')
try:
await client.sign_in(PHONE_NUMBER, code)
except SessionPasswordNeededError:
print("🔑 输入两步验证密码...")
await client.sign_in(password=TWO_STEP_PASSWORD)
print("✅ 登录成功")


async def list_groups_and_channels():
"""列出已加入的群组和频道信息,并保存到 CSV"""
print("\n📋 你加入的群组和频道:")
dialogs = await client.get_dialogs()

data = [("类型", "名称", "用户名", "ID")]

for dialog in dialogs:
entity = dialog.entity
if isinstance(entity, Channel):
chat_type = "频道" if not entity.megagroup else "群组"
username = f"@{entity.username}" if entity.username else ""
print(f"{chat_type}: {entity.title} {username} | ID: {entity.id}")
data.append((chat_type, entity.title, username, entity.id))

# 保存到 CSV
with open(CSV_FILE, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.writer(f)
writer.writerows(data)

print(f"✅ 已保存到 {CSV_FILE}")
print("📋 列表结束\n")


async def main():
await login()
await list_groups_and_channels()


if __name__ == '__main__':
with client:
client.loop.run_until_complete(main())

telegram只转发视频,视频大小不小于1G,频道里原有的重复视频不转发

  1. 固定来源和目标频道
    从 source_channel(@xx)抓视频

转发到 target_channel(@xx)

  1. 登录并支持两步验证
    用手机号 + API ID + API Hash 登录 Telegram

如果开启了两步验证,会自动输入 TWO_STEP_PASSWORD

  1. 批量转发历史视频
    会从源频道的最早一条消息开始往后遍历(reverse=True)

只转发文件大小 ≥ MIN_FILE_SIZE 的视频

转发时会保留原消息文字,并在说明里加上原文件名

  1. 去重机制(两层)
    message_id 去重:用 seen_message_ids.json 记录已经处理过的消息 ID

视频唯一键去重:用 文件名 + 文件大小 组合判断是否已经转发过,避免同一个视频多次出现

  1. 目标频道已有视频扫描
    启动时会遍历目标频道历史,建立一个已有视频的“唯一键集合”

遇到相同视频会跳过,不会重复发送

  1. 实时监听新视频
    监听源频道新发的视频

满足文件大小限制且不是重复视频就立即转发

同样更新 seen_message_ids.json 和已转发视频集合

  1. 延迟转发防封
    转发视频之间会随机等待 delay_between_forwards(例如 5~8 秒)

避免一次性发太多触发 Telegram 限制

  1. 运行模式
    启动 → 登录

扫描目标频道已有视频

批量转发历史大视频

进入实时监听模式,等待新视频出现

你的代码相当于是一个支持历史补档 + 实时转发的大视频爬取/转发机器人,
而且自带去重功能,不会出现同一个视频刷屏的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import asyncio
import random
import os
import json
from telethon import TelegramClient, events
from telethon.errors import SessionPasswordNeededError
from telethon.tl.types import MessageMediaDocument

# === 配置区 ===
MIN_FILE_SIZE_MB = 500 # 最小文件大小(MB),直接改这里
MIN_FILE_SIZE = MIN_FILE_SIZE_MB * 1024 * 1024

api_id = 27133580
api_hash = 'xx'
PHONE_NUMBER = '+xx'
TWO_STEP_PASSWORD = 'xx'

# 频道配置
source_channel = '@xx'
target_channel = '@xx'
delay_between_forwards = (5, 8)

# === 记录文件 ===
SEEN_FILE = "seen_message_ids.json"

print(f"📏 当前设置:只转发 ≥ {MIN_FILE_SIZE_MB} MB 的视频")

# 初始化
client = TelegramClient('user_session', api_id, api_hash)

# 已转发的 message_id
if os.path.exists(SEEN_FILE):
with open(SEEN_FILE, "r", encoding='utf-8') as f:
seen_ids = set(json.load(f))
else:
seen_ids = set()

# 目标频道已有的视频 key(文件名+大小)
existing_video_keys = set()

async def login():
await client.connect()
if not await client.is_user_authorized():
print(f'🔐 使用手机号登录: {PHONE_NUMBER}')
await client.send_code_request(PHONE_NUMBER)
code = input('请输入收到的验证码:')
try:
await client.sign_in(PHONE_NUMBER, code)
except SessionPasswordNeededError:
print("🔑 输入两步验证密码...")
await client.sign_in(password=TWO_STEP_PASSWORD)
print("✅ 登录成功")

async def load_existing_videos():
"""扫描目标频道已有视频,生成唯一key集合"""
print("🔍 正在扫描目标频道已有视频...")
target = await client.get_entity(target_channel)
async for msg in client.iter_messages(target):
if msg.media and isinstance(msg.media, MessageMediaDocument):
size = msg.media.document.size
filename = next(
(attr.file_name for attr in msg.media.document.attributes if hasattr(attr, 'file_name')),
"unknown"
)
key = f"{filename}_{size}"
existing_video_keys.add(key)
print(f"📂 目标频道已有 {len(existing_video_keys)} 个视频记录")

def get_video_key(message):
"""生成视频唯一标识(文件名+大小)"""
size = message.media.document.size
filename = next(
(attr.file_name for attr in message.media.document.attributes if hasattr(attr, 'file_name')),
"video.mp4"
)
return f"{filename}_{size}"

async def batch_forward_videos():
print("📦 正在批量转发历史视频...")
source = await client.get_entity(source_channel)
count = 0

async for message in client.iter_messages(source, reverse=True):
if message.id in seen_ids:
continue
if (message.video or isinstance(message.media, MessageMediaDocument)) and \
message.media and hasattr(message.media, "document") and \
message.media.document.size >= MIN_FILE_SIZE:

video_key = get_video_key(message)
if video_key in existing_video_keys:
print(f"⚠️ 跳过重复视频: {video_key}")
continue

try:
await client.send_file(
target_channel,
file=message.media,
caption=f"{message.message or ''}\n📁 原文件名: {video_key.split('_')[0]}",
file_name=video_key.split('_')[0]
)
count += 1
seen_ids.add(message.id)
existing_video_keys.add(video_key)
print(f"[{count}] ✅ 转发成功: {video_key}")
await asyncio.sleep(random.uniform(*delay_between_forwards))
except Exception as e:
print(f"[{count}] ❌ 转发失败:{e}")
await asyncio.sleep(3)
else:
if message.media and hasattr(message.media, "document"):
size = message.media.document.size
print(f"⚠️ 跳过小文件({size / 1024 / 1024:.2f}MB): message_id={message.id}")

with open(SEEN_FILE, "w", encoding='utf-8') as f:
json.dump(list(seen_ids), f)
print(f"📁 历史视频转发完成,共转发 {count} 个视频。")

@client.on(events.NewMessage(chats=source_channel))
async def live_forward(event):
if event.message.id in seen_ids:
return
if (event.video or isinstance(event.media, MessageMediaDocument)) and \
event.media and hasattr(event.media, "document") and \
event.media.document.size >= MIN_FILE_SIZE:

video_key = get_video_key(event.message)
if video_key in existing_video_keys:
print(f"⚠️ 实时跳过重复视频: {video_key}")
return

try:
print(f"📡 检测到新视频,正在转发: {video_key}")
await client.send_file(
target_channel,
file=event.media,
caption=f"{event.message.message or ''}\n📁 原文件名: {video_key.split('_')[0]}",
file_name=video_key.split('_')[0]
)
seen_ids.add(event.message.id)
existing_video_keys.add(video_key)
with open(SEEN_FILE, "w", encoding='utf-8') as f:
json.dump(list(seen_ids), f)
except Exception as e:
print(f"⚠️ 实时转发失败:{e}")
else:
if event.media and hasattr(event.media, "document"):
size = event.media.document.size
print(f"⚠️ 实时跳过小文件({size / 1024 / 1024:.2f}MB): message_id={event.message.id}")

async def main():
await login()
await load_existing_videos()
await batch_forward_videos()
print(f"⌛ 开始监听 {source_channel} 的新视频消息...")
await client.run_until_disconnected()

if __name__ == '__main__':
with client:
client.loop.run_until_complete(main())

telegram只转发视频,视频大小不小于1G,频道里原有的重复视频不转发,双账号支持以及多账号思路

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import asyncio
import random
import os
import json
from telethon import TelegramClient, events
from telethon.errors import SessionPasswordNeededError
from telethon.tl.types import MessageMediaDocument

# === 配置区 ===
MIN_FILE_SIZE_MB = 500
MIN_FILE_SIZE = MIN_FILE_SIZE_MB * 1024 * 1024
delay_between_forwards = (5, 8)

# === 全局去重文件 ===
GLOBAL_KEYS_FILE = "global_video_keys.json"
GLOBAL_SEEN_FILE = "global_seen_ids.json"

# === 加载全局去重数据 ===
if os.path.exists(GLOBAL_KEYS_FILE):
with open(GLOBAL_KEYS_FILE, "r", encoding="utf-8") as f:
video_keys_global = set(json.load(f))
else:
video_keys_global = set()

if os.path.exists(GLOBAL_SEEN_FILE):
with open(GLOBAL_SEEN_FILE, "r", encoding="utf-8") as f:
seen_ids_global = set(json.load(f))
else:
seen_ids_global = set()

# === 账号1 ===
account1 = {
"session": "user1_session",
"api_id": xx,
"api_hash": "xx",
"phone": "+xx",
"two_step": "xx",
"source_channel": "@xx",
"target_channel": "@xx"
}

# === 账号2 ===
account2 = {
"session": "user2_session",
"api_id": xx,
"api_hash": "xx",
"phone": "+xx",
"two_step": "xx",
"source_channel": "@xx",
"target_channel": "@xx"
}

# 保存全局去重数据
def save_global_data():
with open(GLOBAL_KEYS_FILE, "w", encoding="utf-8") as f:
json.dump(list(video_keys_global), f)
with open(GLOBAL_SEEN_FILE, "w", encoding="utf-8") as f:
json.dump(list(seen_ids_global), f)

async def run_account(acc):
print(f"📏 当前设置:账号 {acc['phone']} 只转发 ≥ {MIN_FILE_SIZE_MB} MB 的视频")

client = TelegramClient(acc["session"], acc["api_id"], acc["api_hash"])
seen_file = f"seen_ids_{acc['phone']}.json"

# 加载账号本地已转发 ID
if os.path.exists(seen_file):
with open(seen_file, "r", encoding="utf-8") as f:
seen_ids_local = set(json.load(f))
else:
seen_ids_local = set()

async def login():
await client.connect()
if not await client.is_user_authorized():
print(f'🔐 使用手机号登录: {acc["phone"]}')
await client.send_code_request(acc["phone"])
code = input(f"[{acc['phone']}] 请输入验证码: ")
try:
await client.sign_in(acc["phone"], code)
except SessionPasswordNeededError:
print(f"[{acc['phone']}] 🔑 输入两步验证密码...")
await client.sign_in(password=acc["two_step"])
print(f"[{acc['phone']}] ✅ 登录成功")

async def load_existing_videos():
print(f"[{acc['phone']}] 🔍 扫描目标频道已有视频...")
target = await client.get_entity(acc["target_channel"])
async for msg in client.iter_messages(target):
if msg.media and isinstance(msg.media, MessageMediaDocument):
size = msg.media.document.size
filename = next((attr.file_name for attr in msg.media.document.attributes if hasattr(attr, 'file_name')), "unknown")
key = f"{filename}_{size}"
video_keys_global.add(key)
save_global_data()
print(f"[{acc['phone']}] 📂 全局视频记录数: {len(video_keys_global)}")

def get_video_key(message):
size = message.media.document.size
filename = next((attr.file_name for attr in message.media.document.attributes if hasattr(attr, 'file_name')), "video.mp4")
return f"{filename}_{size}"

async def forward_video(message, video_key):
"""通用转发(带全局持久化去重)"""
if video_key in video_keys_global:
print(f"[{acc['phone']}] ⚠️ 跳过重复视频(全局): {video_key}")
return
try:
await client.send_file(
acc["target_channel"],
file=message.media,
caption=f"{message.message or ''}\n📁 原文件名: {video_key.split('_')[0]}",
file_name=video_key.split('_')[0]
)
video_keys_global.add(video_key)
seen_ids_global.add(message.id)
seen_ids_local.add(message.id)
save_global_data()
with open(seen_file, "w", encoding="utf-8") as f:
json.dump(list(seen_ids_local), f)
print(f"[{acc['phone']}] ✅ 转发成功: {video_key}")
except Exception as e:
print(f"[{acc['phone']}] ❌ 转发失败: {e}")

async def batch_forward():
print(f"[{acc['phone']}] 📦 批量转发历史视频...")
source = await client.get_entity(acc["source_channel"])
count = 0
async for message in client.iter_messages(source, reverse=True):
if message.id in seen_ids_local or message.id in seen_ids_global:
continue
if (message.video or isinstance(message.media, MessageMediaDocument)) and \
message.media and hasattr(message.media, "document") and \
message.media.document.size >= MIN_FILE_SIZE:
video_key = get_video_key(message)
await forward_video(message, video_key)
count += 1
await asyncio.sleep(random.uniform(*delay_between_forwards))
print(f"[{acc['phone']}] 📁 历史转发完成,共转发 {count} 个视频")

@client.on(events.NewMessage(chats=acc["source_channel"]))
async def live_forward(event):
if event.message.id in seen_ids_local or event.message.id in seen_ids_global:
return
if (event.video or isinstance(event.media, MessageMediaDocument)) and \
event.media and hasattr(event.media, "document") and \
event.media.document.size >= MIN_FILE_SIZE:
video_key = get_video_key(event.message)
await forward_video(event.message, video_key)

async with client:
await login()
await load_existing_videos()
await batch_forward()
print(f"[{acc['phone']}] ⌛ 开始监听 {acc['source_channel']} ...")
await client.run_until_disconnected()

async def main():
await asyncio.gather(
run_account(account1),
run_account(account2)
)

if __name__ == "__main__":
asyncio.run(main())