Cloudreve 是什么?
引用自 https://docs.cloudreve.org/
Cloudreve 可以让您快速搭建起公私兼备的网盘系统。Cloudreve 在底层支持不同的云存储平台,用户在实际使用时无须关心物理存储方式。你可以使用 Cloudreve 搭建个人用网盘、文件分享系统,亦或是针对大小团体的公有云系统。
Cloudreve 的离线下载核心由 Aria2 驱动。正确配置并启用离线下载功能后,用户可以创建磁力链、HTTP、种子下载任务,由服务端下载完成后加入到用户文件中。
准备工作
本篇文章使用 Python ≥ 3.8
Aria2 的安装与配置此处不再讨论,请自行寻找相关教程。
配置离线下载请查看 Cloudreve 官方文档。
下面的内容默认上述条件已配置完成
创造需求
假如某A科游戏,在官网提供直链安装包,但是下载太慢(几kb那种)。可以利用 离线下载
自动下载至 vps,后续再通过 vps 下载到手机/Pad上,以此获得比较理想的速度。
先决条件
如何得知什么时候该下载?一般取得游戏版本号或哈希就可以解决。
幸运的是该游戏官网有一个带有版本号和下载链接的 api。
1 2 3 4 5 6 7
| { "success": true, "value": { "url": "https://arcaea-static.lowiro-cdn.net/wdNmVfhaxKt9lQUZ9%2FMKFrV9nEs2SQeVTVs0O%2B48hyrF%2FckYd3BNJsIghHTELo5qQ0143aXcIeLQpWNKVjFgtBCcjzv9r3DW8BUYr7GwRvYTUcxi5IyzqxZb3Rxb9F%2B9EvguuumWMUcSZXg%3D?filename=arcaea_4.2.1c.apk", "version": "4.2.1c" } }
|
Cloudreve Api
https://github.com/cloudreve/Cloudreve/blob/master/routers/router.go
实现需求
整理需要用到的内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| url = "https://webapi.lowiro.com/webapi/serve/static/bin/arcaea/apk"
saveDirectory = "/Arcaea"
cloudreveUrl = "http://127.0.0.1:1145" session = cloudreveUrl + "/api/v3/user/session" directory = cloudreveUrl + "/api/v3/directory" + saveDirectory.replace("/", "%2F") objectSet = cloudreveUrl + "/api/v3/object" aria2Download = cloudreveUrl + "/api/v3/aria2/url"
header = { "accept": 'application/json, text/plain, */*', "cookie": cookie }
|
实现接口调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| def Login(user, password): data = json.dumps({"userName": user,"Password": password,"captchaCode": ""}) response = requests.post(session, data, allow_redirects=False) if response.status_code == 200: resJson = json.loads(response.text) if resJson["code"] == 0: cookies = response.cookies.items() cookie = '' for name, value in cookies: cookie += '{0}={1};'.format(name, value) return cookie return None
def GetFileList(): response = requests.get(directory, headers=header) if response.status_code == 200: resJson = json.loads(response.text) return resJson return None
def DeleteApk(id): data = json.dumps({"items": [id], "dirs": []}) response = requests.delete(objectSet, data=data, headers=header) if response.status_code == 200: resJson = json.loads(response.text) if resJson["code"] == 0: return "Success" return resJson["msg"] return response.text
def AddCloudreveAria2(url): data = json.dumps({"url": [url],"dst": saveDirectory}) response = requests.post(aria2Download, data, headers=header) if response.status_code == 200: resJson = json.loads(response.text) if resJson["code"] == 0: return "Success" return resJson["msg"] return response.text
|
实现最终需求
最后就是根据自己的需求编写代码
这里直接贴上我写好的代码,需要系统定时调用
(仅自用,不能保证可靠性)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
| import requests import json import os
url = "https://webapi.lowiro.com/webapi/serve/static/bin/arcaea/apk"
saveDirectory = "/Arcaea"
cloudreveUrl = "http://127.0.0.1:1145" session = cloudreveUrl + "/api/v3/user/session" directory = cloudreveUrl + "/api/v3/directory" + saveDirectory.replace("/", "%2F") objectSet = cloudreveUrl + "/api/v3/object" aria2Download = cloudreveUrl + "/api/v3/aria2/url"
user = "" password = ""
filename = os.getcwd() + "/ArcaeaVer.txt" cookieFile = os.getcwd() + "/key"
def Main(): cookie = ReadCookie() global header header = { "accept": 'application/json, text/plain, */*', "cookie": cookie } response = requests.get(url) resJson = json.loads(response.text) if resJson["success"]: ver = resJson["value"]["version"] verStatus = Compare(ver) if not verStatus: fileStatus = GetFileList(oldVer) if fileStatus[1]: DeleteStatus = DeleteApk(fileStatus[0]) if DeleteStatus == "Success": Ver.Write(ver) print("已删除旧版本") else: print("旧版本删除失败 msg: {}".format(DeleteStatus)) else: if fileStatus[0] == 0: Ver.Write(ver) print("未发现旧版本,无需删除") elif fileStatus[0] == 401: print("尝试登录") status = GetCookie(user, password) if status: print("登录成功") Main() else: print("登录失败") return downloadStatus = AddCloudreveAria2(resJson["value"]["url"]) if downloadStatus == "Success": print("{} 创建离线任务成功".format(ver)) else: print("失败 msg: {}".format(downloadStatus)) else: print("{} 已是最新版本".format(ver)) else: print("获取失败,请稍后再试") print(response.text)
class Ver: @staticmethod def Read(): if not os.path.exists(filename): file = open(filename, "w+") else: file = open(filename, "r") ver = file.read() file.close() return ver @staticmethod def Write(newVer): file = open(filename, "w+") file.write(newVer) file.close()
def GetCookie(user, password): cookie = Login(user, password) if not cookie == None: file = open(cookieFile, "w+") file.write(cookie) return True return False
def ReadCookie(): if os.path.exists(cookieFile): file = open(cookieFile, "r") return file.read() return None
def Compare(newVer): global oldVer oldVer = Ver.Read() if oldVer == newVer: return True return False
def Login(user, password): data = json.dumps({"userName":user,"Password":password,"captchaCode":""}) response = requests.post(session, data, allow_redirects=False) if response.status_code == 200: resJson = json.loads(response.text) if resJson["code"] == 0: cookies = response.cookies.items() cookie = '' for name, value in cookies: cookie += '{0}={1};'.format(name, value) return cookie return None
def GetFileList(ver): response = requests.get(directory, headers=header) if response.status_code == 200: resJson = json.loads(response.text) if resJson["code"] == 0: objects = resJson["data"]["objects"] for ob in objects: if ob["name"] == "arcaea_{}.apk".format(ver): return [ob["id"], True] return [resJson["code"], False] return [response.text, False]
def DeleteApk(id): data = json.dumps({"items": [id], "dirs": []}) response = requests.delete(objectSet, data=data, headers=header) if response.status_code == 200: resJson = json.loads(response.text) if resJson["code"] == 0: return "Success" return resJson["msg"] return response.text
def AddCloudreveAria2(url): data = json.dumps({"url": [url],"dst": saveDirectory}) response = requests.post(aria2Download, data, headers=header) if response.status_code == 200: resJson = json.loads(response.text) if resJson["code"] == 0: return "Success" return resJson["msg"] return response.text
if __name__ == "__main__": Main()
|