内容纲要
背景
操作步骤如下:
一、获得授权
- 访问百度网盘-开放平台
https://pan.baidu.com/union/console/app/44461161 - 创建应用
填写表单,填写3个信息,确认创建
创建后点击打开应用详情页
复制保存秘钥信息到本地,后续python代码或命令行上传下载文件,连接认证时使用
二、安装python依赖
pip install bypy
三、定义网盘操作函数
import subprocess
import time
import re
from bypy import ByPy
def upload_by_file(source_path):
bp = ByPy()
dest_path = source_path.split("/")[-1] # 文件名
out = bp.upload(
source_path,
dest_path
)
return out
def upload_by_path(source_path, compress=True):
bp = ByPy()
# 创建目录
dest_path = source_path.split("/")[-1]
# 压缩
if compress:
print("分卷压缩....")
tar_path = f"{os.getcwd()}/tar/{dest_path}"
print(tar_path)
os.system(f"mkdir -p {tar_path}")
# os.system(f"zip -s 4000M {tar_path}/{dest_path}.zip {source_path}/*")
os.system(f"tar -cvpf - {source_path}| split -b 500M -d -a 3 - {tar_path}/{dest_path}")
source_path = tar_path
print("上传百度云....")
bp.mkdir(dest_path)
# 同步目录
out = bp.syncup(
source_path,
dest_path,
deleteremote=True
)
return out
def list_files():
bp = ByPy()
# 目前网盘文件位置
print(bp.list())
def monitor_bypy_upload(file_path):
# 启动 bypy 上传过程
process = subprocess.Popen(['bypy', 'upload', file_path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
text=True)
try:
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
print(output.strip())
# 这里可以添加代码来解析和处理输出,例如提取上传进度
except KeyboardInterrupt:
print("监控被用户中断")
process.poll()
return process.returncode
四、上传
4.1 监控进度并上传文件路径
# 调用函数监控上传过程
file_path = 'Qwen-7B-Chat'
monitor_bypy_upload(file_path)
4.2 监控进度并上传单个文件
# 调用函数监控上传过程
file_path = 'download/QWen---QWen-7B-Chat/model-00001-of-00008.safetensors'
monitor_bypy_upload(file_path)
4.3 CMD命令行上传单个文件
# 上传
!bypy upload download/QWen---QWen-7B-Chat/model-00001-of-00008.safetensors