文章最后更新时间:
资源信息
这个 Telegram 机器人旨在帮助用户便捷地投稿和转发资源链接(如夸克网盘资源)到指定频道。以下是它的主要功能:
- 模板化投稿:机器人为用户提供标准投稿模板,指导用户填写资源信息(如资源名称、描述、链接、大小、标签等)。这样可以确保所有投稿信息的格式统一,方便后续管理和检索。
-
自动格式验证:机器人会使用正则表达式自动验证投稿内容的格式,检查是否符合预设的模板标准。比如,资源的“大小”和“标签”字段会被严格检验,不符合格式的内容会被去除,提醒用户重新编辑。
-
审核与发布:当用户确认投稿信息无误后,机器人将资源信息转发到指定的频道。该功能确保了资源的合规性,也让用户可以轻松分享自己的资源内容。
-
用户友好交互:机器人提供了简单的交互选项,如“编辑”和“确认发布”按钮。用户在投稿过程中可以随时调整信息,直到对内容完全满意为止。
这个机器人对于分享资源、发布内容的群体特别有用,比如资料分享群组、资源交换群等。它的自动化流程不仅提高了用户体验,还确保了资源内容的规范化和管理的高效性。
前期基础需要的准备工作
- 创建1个Bot机器人、
-
创建1个群组
-
创建1个频道
-
把你的机器人拉入群组 和 频道
代码实现
创建telegram_contribute.py
文件
import asyncio
import re
import os
import logging
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, filters, ContextTypes
from telegram.error import RetryAfter, TimedOut
# 配置日志记录,只记录错误级别及以上的日志
log_filename = "error_log.txt" # 日志文件名
# 创建日志记录器
logging.basicConfig(
filename=log_filename, # 将日志写入到当前目录下的 error_log.txt 文件
level=logging.ERROR, # 只记录错误级别的日志
format='%(asctime)s - %(levelname)s - %(message)s' # 日志格式
)
logger = logging.getLogger(__name__)
# 初始化机器人
TOKEN = '####' # 替换为机器人的Token
# TOKEN = os.getenv('TELEGRAM_BOT_TOKEN') # 隐藏token
CHANNEL_ID = '@naclyunpan' # 替换为你的频道 Chat ID
# 临时存储用户投稿内容
user_posts = {}
# 启动命令,提供投稿模板
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
template_message = (
"请按照以下格式投稿:\n\n"
"图片\n\n"
"名称:\n\n描述:\n\n链接:(夸克网盘)\n\n"
"📁 大小:\n🏷 标签:"
)
await update.message.reply_text(template_message)
# 处理用户的投稿消息
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
# 检查消息和聊天对象是否存在
if update.message is None or update.message.chat is None:
await context.bot.send_message(chat_id=update.effective_chat.id,
text="切勿在原文案上编辑,请重新发布。")
return # 确保消息存在且有效
user_id = update.message.from_user.id
# 检查投稿内容是否包含图片和文字
if not update.message.photo or not update.message.caption:
await update.message.reply_text("投稿格式不正确,请按照模板重新投稿。\n\n"
"请按照以下格式投稿:\n\n"
"图片\n\n"
"名称:\n\n描述:\n\n链接:(夸克网盘)\n\n"
"📁 大小:\n🏷 标签:")
return
# 获取图片和文字内容
image = update.message.photo[-1].file_id
caption = update.message.caption
# 定义格式正则表达式
pattern = (
r"名称:\s*[\s\S]*?(?:\s*\n)*" # 名称字段允许后跟任意空白字符和换行
r"描述:\s*[\s\S]*?(?:\s*\n)*" # 描述字段允许后跟任意空白字符和换行
r"链接:\s*(https:\/\/pan\.quark\.cn\/s\/[^\s]+(?:\s+https:\/\/pan\.quark\.cn\/s\/[^\s]+)*)\n+" # 链接字段
r"(?:📁\s*)?大小:\s*.*\n+" # 大小字段,图标可选,要求换行
r"(?:🏷\s*)?标签:\s*(.*?)\s*$" # 标签字段,图标可选,匹配至文本末尾
)
# 自动修改功能
modified_caption = auto_fix_caption(caption)
# 验证格式
if not re.fullmatch(pattern, modified_caption):
await update.message.reply_text("投稿格式不正确,请按照模板重新投稿。\n\n"
"请按照以下格式投稿:\n\n"
"图片\n\n"
"名称:\n\n描述:\n\n链接:(夸克网盘)\n\n"
"📁 大小:\n🏷 标签:")
return
# 检查链接是否为夸克网盘链接
quark_link_pattern = r"https:\/\/pan\.quark\.cn\/s\/[^\/]+" # 匹配夸克网盘链接
if not re.search(quark_link_pattern, modified_caption):
await update.message.reply_text("链接必须是夸克网盘的链接,请重新投稿。")
return
# 存储用户投稿内容,以便之后编辑
if user_id not in user_posts:
user_posts[user_id] = []
user_posts[user_id].append({'image': image, 'caption': modified_caption})
# 生成所有投稿内容的摘要
all_posts_summary = "\n\n".join(
[f"#{i + 1} 投稿内容:\n{post['caption']}" for i, post in enumerate(user_posts[user_id])])
# 提供确认和编辑按钮
keyboard = [
[InlineKeyboardButton("编辑", callback_data="edit_post")],
[InlineKeyboardButton("确认发布", callback_data="confirm_post")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
# 发送修改后的投稿内容给用户查看
await update.message.reply_text(f"感谢您的投稿!以下是您的所有投稿内容:\n\n{all_posts_summary}\n\n"
"您可以选择编辑或确认发布到频道:", reply_markup=reply_markup)
# 自动修复投稿内容的格式
def auto_fix_caption(caption: str) -> str:
# 定义所需的字段和对应的正则表达式
fields = {
"名称": (r"名称:\s*(.*?)\n", "未知名称"), # 捕获名称字段,默认值"未知名称"
"描述": (r"描述:\s*(.*?)(?=\n链接:)", "无描述"), # 捕获描述字段直到链接,默认值"无描述"
"链接": (r"链接:\s*(https:\/\/pan\.quark\.cn\/s\/[^\s]+)", "无链接"), # 捕获链接字段,默认"无链接"
"📁 大小": (r"📁 大小:\s*(.*?)\n", "未知大小"), # 捕获大小字段,默认值"未知大小"
"🏷 标签": (r"🏷 标签:\s*(.*?)\s*(?=\n|$)", "无标签") # 捕获标签字段,默认"无标签"
}
# 提取符合的字段并设置默认值
result = {}
for field, (pattern, default_value) in fields.items():
match = re.search(pattern, caption, re.DOTALL) # 使用 re.DOTALL 允许点号匹配换行符
result[field] = match.group(1).strip() if match else default_value
# 组合提取到的字段内容成新的 caption
new_caption = "\n\n".join([f"{field}:{value}" for field, value in result.items()])
return new_caption
# 处理编辑按钮的回调
async def handle_edit_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
user_id = query.from_user.id
# 确保回应按钮点击事件
await query.answer()
# 检查当前消息内容
current_text = query.message.text
new_text = "请发送新的投稿内容,格式与之前相同。"
del user_posts[user_id]
# 只有在新内容不同的情况下,才进行编辑
if current_text != new_text:
await query.edit_message_text(new_text)
else:
await query.answer("消息内容未改变。", show_alert=True)
# 更新投稿内容
async def update_post(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.message.from_user.id
# 检查用户是否处于编辑状态
if context.user_data.get('editing'):
new_text = update.message.text
# 更新投稿内容
if user_id in user_posts:
user_posts[user_id]['caption'] = new_text
await update.message.reply_text("您的投稿内容已更新!")
# 清除编辑状态
context.user_data['editing'] = False
else:
await update.message.reply_text("没有找到可编辑的投稿内容。")
else:
await update.message.reply_text("请先使用编辑按钮来编辑您的投稿内容。")
# 确认发布到频道的回调
async def handle_confirm_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
user_id = query.from_user.id
# 检查用户是否有投稿内容
if user_id in user_posts:
# 使用 for 循环逐条发布所有消息
for post_data in user_posts[user_id]:
image = post_data['image']
caption = post_data['caption']
# 构建频道消息内容
channel_message = (
f"{caption}\n\n"
f"📢 频道:@naclyunpan\n"
f"👥 群组:@naclzy"
)
try:
# 发送图片和文字到频道
await context.bot.send_photo(chat_id=CHANNEL_ID, photo=image, caption=channel_message)
except RetryAfter as e:
# 处理频率限制,等待一定的时间后重试
retry_after = e.retry_after
await asyncio.sleep(retry_after)
continue
except TimedOut:
# 处理请求超时
await asyncio.sleep(5) # 等待5秒后重试
continue
except Exception as e:
# 记录其他异常日志
logger.error(f"Error while sending post to channel: {e}")
# 回复用户投稿成功,并从临时存储中删除内容
await query.answer("内容已成功发布到频道!")
await query.edit_message_text("您的投稿已成功发布到频道。感谢您的支持!")
del user_posts[user_id]
else:
await query.answer("找不到您的投稿内容,无法发送到频道。")
def main():
# 创建应用程序实例
application = Application.builder().token(TOKEN).build()
# 添加命令处理器
application.add_handler(CommandHandler("start", start))
# 添加消息处理器,处理首次投稿和更新内容
application.add_handler(MessageHandler(filters.TEXT | filters.PHOTO, handle_message))
# 添加编辑和确认回调处理器
application.add_handler(CallbackQueryHandler(handle_edit_callback, pattern="edit_post"))
application.add_handler(CallbackQueryHandler(handle_confirm_callback, pattern="confirm_post"))
# 添加更新消息处理器
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, update_post))
# 开始轮询更新
application.run_polling()
if __name__ == '__main__':
main()
需要环境requlrements.txt
anyio==4.6.2.post1
certifi==2024.8.30
exceptiongroup==1.2.2
h11==0.14.0
httpcore==1.0.6
httpx==0.27.2
idna==3.10
python-telegram-bot==21.6
sniffio==1.3.1
style==1.1.0
typing_extensions==4.12.2
update==0.0.1
代码使用
白嫖方式一
结合之前白嫖的 Serv00 的虚拟主机,可以直接把代码放到上面运行 🙂
# 首先可以在 Serv00 创建一个 telegram 目录,用来存放代码
mkdir telegram
# 其次进入 telegram 目录 后 上传 telegram_contribute.py 文件 和 requlrements.txt
cd telegram
# 最后运行所需要的依赖 并 执行Python代码
pip install -r requirements.txt
# 这样,Python 脚本会在后台运行,并且输出会被保存在 nohup.out 文件中。
nohup python3 telegram_contribute.py &
# 如果你想查看日志或输出,使用以下命令查看 nohup.out 文件:
tail -f nohup.out
# 如果你想停止后台运行的程序,可以通过以下命令查找其进程ID (PID):
ps aux | grep telegram_contribute.py
# 然后,使用 kill 命令停止进程:
kill <PID>
白嫖方式二
注意:此方法仅限于尝鲜 :bowtie:
不然就会向我一样直接被禁止使用 :sob:
在 GitHub 上新建一个仓库,然后点击 Actions 新建一个工作流程
name: tgBot
on:
push: # 当有代码推送时触发
schedule:
- cron: '0 */5 * * *' # 每隔5小时触发一次
jobs:
run_demo_actions:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run telegram_contribute.py
env:
TELEGRAM_TOKEN: ${{ secrets.TELEGRAM_TOKEN }}
run: python telegram_contribute.py
资源获取
网盘资源收藏(夸克):https://t.me/naclyunpan
资源搜索院(可以搜索任何资源):https://t.me/naclzy
© 版权声明
THE END
暂无评论内容