Telegram投稿机器人,实现批量转发

百度已收录

文章最后更新时间:2025-01-18 15:39:15

资源信息

这个 Telegram 机器人旨在帮助用户便捷地投稿和转发资源链接(如夸克网盘资源)到指定频道。以下是它的主要功能:

  1. 模板化投稿:机器人为用户提供标准投稿模板,指导用户填写资源信息(如资源名称、描述、链接、大小、标签等)。这样可以确保所有投稿信息的格式统一,方便后续管理和检索。

  2. 自动格式验证:机器人会使用正则表达式自动验证投稿内容的格式,检查是否符合预设的模板标准。比如,资源的“大小”和“标签”字段会被严格检验,不符合格式的内容会被去除,提醒用户重新编辑。

  3. 审核与发布:当用户确认投稿信息无误后,机器人将资源信息转发到指定的频道。该功能确保了资源的合规性,也让用户可以轻松分享自己的资源内容。

  4. 用户友好交互:机器人提供了简单的交互选项,如“编辑”和“确认发布”按钮。用户在投稿过程中可以随时调整信息,直到对内容完全满意为止。

这个机器人对于分享资源、发布内容的群体特别有用,比如资料分享群组、资源交换群等。它的自动化流程不仅提高了用户体验,还确保了资源内容的规范化和管理的高效性。

前期基础需要的准备工作

  • 创建1个Bot机器人、

  • 创建1个群组

  • 创建1个频道

  • 把你的机器人拉入群组 和 频道

代码实现

创建telegram_contribute.py文件

import asyncio
import re
import os
import logging
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, filters, ContextTypes
from telegram.error import RetryAfter, TimedOut

# 配置日志记录,只记录错误级别及以上的日志
log_filename = "error_log.txt"  # 日志文件名

# 创建日志记录器
logging.basicConfig(
    filename=log_filename,       # 将日志写入到当前目录下的 error_log.txt 文件
    level=logging.ERROR,          # 只记录错误级别的日志
    format='%(asctime)s - %(levelname)s - %(message)s'  # 日志格式
)

logger = logging.getLogger(__name__)

# 初始化机器人
TOKEN = '####'  # 替换为机器人的Token
# TOKEN = os.getenv('TELEGRAM_BOT_TOKEN') # 隐藏token
CHANNEL_ID = '@naclyunpan'  # 替换为你的频道 Chat ID

# 临时存储用户投稿内容
user_posts = {}

# 启动命令,提供投稿模板
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
    template_message = (
        "请按照以下格式投稿:\n\n"
        "图片\n\n"
        "名称:\n\n描述:\n\n链接:(夸克网盘)\n\n"
        "📁 大小:\n🏷 标签:"
    )
    await update.message.reply_text(template_message)


# 处理用户的投稿消息
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
    # 检查消息和聊天对象是否存在
    if update.message is None or update.message.chat is None:
        await context.bot.send_message(chat_id=update.effective_chat.id,
                                       text="切勿在原文案上编辑,请重新发布。")
        return  # 确保消息存在且有效

    user_id = update.message.from_user.id

    # 检查投稿内容是否包含图片和文字
    if not update.message.photo or not update.message.caption:
        await update.message.reply_text("投稿格式不正确,请按照模板重新投稿。\n\n"
                                        "请按照以下格式投稿:\n\n"
                                        "图片\n\n"
                                        "名称:\n\n描述:\n\n链接:(夸克网盘)\n\n"
                                        "📁 大小:\n🏷 标签:")
        return

    # 获取图片和文字内容
    image = update.message.photo[-1].file_id
    caption = update.message.caption

    # 定义格式正则表达式
    pattern = (
        r"名称:\s*[\s\S]*?(?:\s*\n)*"  # 名称字段允许后跟任意空白字符和换行
        r"描述:\s*[\s\S]*?(?:\s*\n)*"  # 描述字段允许后跟任意空白字符和换行
        r"链接:\s*(https:\/\/pan\.quark\.cn\/s\/[^\s]+(?:\s+https:\/\/pan\.quark\.cn\/s\/[^\s]+)*)\n+"  # 链接字段
        r"(?:📁\s*)?大小:\s*.*\n+"                 # 大小字段,图标可选,要求换行
        r"(?:🏷\s*)?标签:\s*(.*?)\s*$"             # 标签字段,图标可选,匹配至文本末尾
    )

    # 自动修改功能
    modified_caption = auto_fix_caption(caption)

    # 验证格式
    if not re.fullmatch(pattern, modified_caption):
        await update.message.reply_text("投稿格式不正确,请按照模板重新投稿。\n\n"
                                        "请按照以下格式投稿:\n\n"
                                        "图片\n\n"
                                        "名称:\n\n描述:\n\n链接:(夸克网盘)\n\n"
                                        "📁 大小:\n🏷 标签:")
        return

    # 检查链接是否为夸克网盘链接
    quark_link_pattern = r"https:\/\/pan\.quark\.cn\/s\/[^\/]+"  # 匹配夸克网盘链接
    if not re.search(quark_link_pattern, modified_caption):
        await update.message.reply_text("链接必须是夸克网盘的链接,请重新投稿。")
        return

    # 存储用户投稿内容,以便之后编辑
    if user_id not in user_posts:
        user_posts[user_id] = []

    user_posts[user_id].append({'image': image, 'caption': modified_caption})

    # 生成所有投稿内容的摘要
    all_posts_summary = "\n\n".join(
        [f"#{i + 1} 投稿内容:\n{post['caption']}" for i, post in enumerate(user_posts[user_id])])
    # 提供确认和编辑按钮
    keyboard = [
        [InlineKeyboardButton("编辑", callback_data="edit_post")],
        [InlineKeyboardButton("确认发布", callback_data="confirm_post")]
    ]
    reply_markup = InlineKeyboardMarkup(keyboard)

    # 发送修改后的投稿内容给用户查看
    await update.message.reply_text(f"感谢您的投稿!以下是您的所有投稿内容:\n\n{all_posts_summary}\n\n"
                                    "您可以选择编辑或确认发布到频道:", reply_markup=reply_markup)


# 自动修复投稿内容的格式
def auto_fix_caption(caption: str) -> str:
    # 定义所需的字段和对应的正则表达式
    fields = {
        "名称": (r"名称:\s*(.*?)\n", "未知名称"),  # 捕获名称字段,默认值"未知名称"
        "描述": (r"描述:\s*(.*?)(?=\n链接:)", "无描述"),  # 捕获描述字段直到链接,默认值"无描述"
        "链接": (r"链接:\s*(https:\/\/pan\.quark\.cn\/s\/[^\s]+)", "无链接"),  # 捕获链接字段,默认"无链接"
        "📁 大小": (r"📁 大小:\s*(.*?)\n", "未知大小"),  # 捕获大小字段,默认值"未知大小"
        "🏷 标签": (r"🏷 标签:\s*(.*?)\s*(?=\n|$)", "无标签")  # 捕获标签字段,默认"无标签"
    }

    # 提取符合的字段并设置默认值
    result = {}
    for field, (pattern, default_value) in fields.items():
        match = re.search(pattern, caption, re.DOTALL)  # 使用 re.DOTALL 允许点号匹配换行符
        result[field] = match.group(1).strip() if match else default_value

    # 组合提取到的字段内容成新的 caption
    new_caption = "\n\n".join([f"{field}:{value}" for field, value in result.items()])
    return new_caption


# 处理编辑按钮的回调
async def handle_edit_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
    query = update.callback_query
    user_id = query.from_user.id

    # 确保回应按钮点击事件
    await query.answer()

    # 检查当前消息内容
    current_text = query.message.text
    new_text = "请发送新的投稿内容,格式与之前相同。"
    del user_posts[user_id]
    # 只有在新内容不同的情况下,才进行编辑
    if current_text != new_text:
        await query.edit_message_text(new_text)
    else:
        await query.answer("消息内容未改变。", show_alert=True)


# 更新投稿内容
async def update_post(update: Update, context: ContextTypes.DEFAULT_TYPE):
    user_id = update.message.from_user.id

    # 检查用户是否处于编辑状态
    if context.user_data.get('editing'):
        new_text = update.message.text

        # 更新投稿内容
        if user_id in user_posts:
            user_posts[user_id]['caption'] = new_text
            await update.message.reply_text("您的投稿内容已更新!")
            # 清除编辑状态
            context.user_data['editing'] = False
        else:
            await update.message.reply_text("没有找到可编辑的投稿内容。")
    else:
        await update.message.reply_text("请先使用编辑按钮来编辑您的投稿内容。")


# 确认发布到频道的回调
async def handle_confirm_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
    query = update.callback_query
    user_id = query.from_user.id

    # 检查用户是否有投稿内容
    if user_id in user_posts:
        # 使用 for 循环逐条发布所有消息
        for post_data in user_posts[user_id]:
            image = post_data['image']
            caption = post_data['caption']

            # 构建频道消息内容
            channel_message = (
                f"{caption}\n\n"
                f"📢 频道:@naclyunpan\n"
                f"👥 群组:@naclzy"
            )

            try:
                # 发送图片和文字到频道
                await context.bot.send_photo(chat_id=CHANNEL_ID, photo=image, caption=channel_message)
            except RetryAfter as e:
                # 处理频率限制,等待一定的时间后重试
                retry_after = e.retry_after
                await asyncio.sleep(retry_after)
                continue
            except TimedOut:
                # 处理请求超时
                await asyncio.sleep(5)  # 等待5秒后重试
                continue
            except Exception as e:
                # 记录其他异常日志
                logger.error(f"Error while sending post to channel: {e}")

        # 回复用户投稿成功,并从临时存储中删除内容
        await query.answer("内容已成功发布到频道!")
        await query.edit_message_text("您的投稿已成功发布到频道。感谢您的支持!")
        del user_posts[user_id]
    else:
        await query.answer("找不到您的投稿内容,无法发送到频道。")


def main():
    # 创建应用程序实例
    application = Application.builder().token(TOKEN).build()

    # 添加命令处理器
    application.add_handler(CommandHandler("start", start))

    # 添加消息处理器,处理首次投稿和更新内容
    application.add_handler(MessageHandler(filters.TEXT | filters.PHOTO, handle_message))

    # 添加编辑和确认回调处理器
    application.add_handler(CallbackQueryHandler(handle_edit_callback, pattern="edit_post"))
    application.add_handler(CallbackQueryHandler(handle_confirm_callback, pattern="confirm_post"))

    # 添加更新消息处理器
    application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, update_post))

    # 开始轮询更新
    application.run_polling()


if __name__ == '__main__':
    main()


需要环境requlrements.txt

anyio==4.6.2.post1
certifi==2024.8.30
exceptiongroup==1.2.2
h11==0.14.0
httpcore==1.0.6
httpx==0.27.2
idna==3.10
python-telegram-bot==21.6
sniffio==1.3.1
style==1.1.0
typing_extensions==4.12.2
update==0.0.1

代码使用

白嫖方式一

结合之前白嫖的 Serv00 的虚拟主机,可以直接把代码放到上面运行 🙂

# 首先可以在 Serv00 创建一个 telegram 目录,用来存放代码
mkdir telegram

# 其次进入 telegram 目录 后 上传 telegram_contribute.py 文件 和 requlrements.txt
cd telegram

# 最后运行所需要的依赖 并 执行Python代码
pip install -r requirements.txt

# 这样,Python 脚本会在后台运行,并且输出会被保存在 nohup.out 文件中。
nohup python3 telegram_contribute.py &

# 如果你想查看日志或输出,使用以下命令查看 nohup.out 文件:
tail -f nohup.out

# 如果你想停止后台运行的程序,可以通过以下命令查找其进程ID (PID):
ps aux | grep telegram_contribute.py

# 然后,使用 kill 命令停止进程:
kill <PID>

白嫖方式二

注意:此方法仅限于尝鲜 :bowtie:
不然就会向我一样直接被禁止使用 :sob:

在 GitHub 上新建一个仓库,然后点击 Actions 新建一个工作流程

Telegram投稿机器人,实现批量转发-氯化钠资源网站

Telegram投稿机器人,实现批量转发-氯化钠资源网站

name: tgBot

on:
  push:  # 当有代码推送时触发
  schedule:
    - cron: '0 */5 * * *'  # 每隔5小时触发一次

jobs:
  run_demo_actions:
    runs-on: ubuntu-latest
    steps:
      - name: Checkout code
        uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v2
        with:
          python-version: '3.9'

      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt

      - name: Run telegram_contribute.py
        env:
          TELEGRAM_TOKEN: ${{ secrets.TELEGRAM_TOKEN }}
        run: python telegram_contribute.py

资源获取

网盘资源收藏(夸克):https://t.me/naclyunpan
资源搜索院(可以搜索任何资源):https://t.me/naclzy

本文链接:https://lzphy.top/703/
© 版权声明
THE END
喜欢就支持一下吧
点赞0赞赏 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容