下载地址:https://www.pan38.com/yun/share.php?code=JCnzE 提取密码:3665
这个抖音批量发布工具包含三个主要模块:主上传程序、配置文件和视频预处理工具。主程序使用Selenium自动化浏览器操作,实现了登录、单个视频上传和批量上传功能。视频预处理模块可以调整视频时长、分辨率和添加水印。使用时需要先配置config.ini文件,然后运行主程序即可。
因为是给客户定制的,没过一周吧,发现挺稳定的就吧稳定部分的核心框架上传出来,因为这个不会出验证码,所以还是挺稳定哈。
下面是源码部分哈~~~
import os
import time
import random
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from PIL import Image
import cv2
import numpy as np
import logging
from datetime import datetime
import configparser
class DouyinUploader:
def init(self, config_file='config.ini'):
self.config = configparser.ConfigParser()
self.config.read(config_file)
self.setup_logging()
self.driver = None
self.login_status = False
def setup_logging(self):
logging.basicConfig(
filename='dy_uploader.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger('DouyinUploader')
def init_driver(self):
chrome_options = Options()
if self.config.getboolean('Browser', 'headless'):
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--window-size=1920,1080')
self.driver = webdriver.Chrome(
executable_path=self.config.get('Browser', 'driver_path'),
options=chrome_options
)
self.logger.info('浏览器初始化完成')
def login(self):
try:
self.driver.get('https://www.douyin.com/')
time.sleep(3)
# 点击登录按钮
login_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//*[@id="login-pannel"]/div[2]/div[1]/div[1]'))
)
login_btn.click()
time.sleep(2)
# 切换到扫码登录
qr_login = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//*[@id="login-pannel"]/div[2]/div[2]/div[2]/div[1]/div[1]'))
)
qr_login.click()
time.sleep(2)
# 等待用户扫码登录
WebDriverWait(self.driver, 120).until(
EC.presence_of_element_located((By.XPATH, '//*[@id="root"]/div/div[2]/div/div/div[1]/div[3]/div/div[1]/div/div[1]/div[1]/div[1]/span'))
)
self.login_status = True
self.logger.info('登录成功')
return True
except Exception as e:
self.logger.error(f'登录失败: {str(e)}')
return False
def upload_video(self, video_path, description='', tags=[]):
if not self.login_status:
self.logger.warning('未登录,尝试登录...')
if not self.login():
return False
try:
# 打开发布页面
self.driver.get('https://creator.douyin.com/creator-micro/content/upload')
time.sleep(5)
# 上传视频文件
upload_input = WebDriverWait(self.driver, 20).until(
EC.presence_of_element_located((By.XPATH, '//input[@type="file"]'))
)
upload_input.send_keys(os.path.abspath(video_path))
self.logger.info(f'开始上传视频: {video_path}')
# 等待视频上传完成
WebDriverWait(self.driver, 300).until(
EC.presence_of_element_located((By.XPATH, '//*[contains(text(), "上传完成")]'))
)
self.logger.info('视频上传完成')
# 输入视频描述
if description:
desc_input = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, '//textarea[@placeholder="添加描述..."]'))
)
desc_input.send_keys(description)
self.logger.info(f'已添加描述: {description}')
# 添加标签
if tags:
for tag in tags:
try:
tag_input = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="添加标签"]'))
)
tag_input.send_keys(tag)
tag_input.send_keys(Keys.RETURN)
time.sleep(1)
self.logger.info(f'已添加标签: {tag}')
except:
continue
# 点击发布按钮
publish_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//button[contains(text(), "发布")]'))
)
publish_btn.click()
self.logger.info('点击发布按钮')
# 等待发布完成
WebDriverWait(self.driver, 60).until(
EC.presence_of_element_located((By.XPATH, '//*[contains(text(), "发布成功")]'))
)
self.logger.info('视频发布成功')
return True
except Exception as e:
self.logger.error(f'上传视频失败: {str(e)}')
return False
def batch_upload(self, video_dir, descriptions=[], tags=[]):
if not os.path.isdir(video_dir):
self.logger.error(f'目录不存在: {video_dir}')
return False
video_files = [f for f in os.listdir(video_dir) if f.lower().endswith(('.mp4', '.mov', '.avi'))]
if not video_files:
self.logger.warning(f'目录中没有视频文件: {video_dir}')
return False
self.logger.info(f'开始批量上传,共 {len(video_files)} 个视频')
for i, video_file in enumerate(video_files):
video_path = os.path.join(video_dir, video_file)
description = descriptions[i] if i < len(descriptions) else ''
self.logger.info(f'正在处理第 {i+1} 个视频: {video_file}')
if not self.upload_video(video_path, description, tags):
self.logger.warning(f'跳过视频: {video_file}')
continue
# 随机等待时间,避免频繁操作
wait_time = random.randint(30, 120)
self.logger.info(f'等待 {wait_time} 秒后继续...')
time.sleep(wait_time)
self.logger.info('批量上传完成')
return True
def close(self):
if self.driver:
self.driver.quit()
self.logger.info('浏览器已关闭')
def __del__(self):
self.close()
if name == 'main':
uploader = DouyinUploader()
uploader.init_driver()
try:
if uploader.login():
video_dir = 'videos'
descriptions = [
'今天的旅行vlog #旅行 #风景',
'美食探店分享 #美食 #探店',
'生活日常记录 #日常 #生活'
]
common_tags = ['短视频', '原创']
uploader.batch_upload(video_dir, descriptions, common_tags)
except Exception as e:
uploader.logger.error(f'程序异常: {str(e)}')
finally:
uploader.close()