thief_music_api/api.py
Joe 3d73f1ad10
All checks were successful
continuous-integration/drone/push Build is passing
feat:新增fastapi-login,数据库表
2023-08-16 19:40:35 +08:00

292 lines
9.6 KiB
Python

import fastapi
from fastapi.security import OAuth2PasswordRequestForm
from fastapi_login.exceptions import InvalidCredentialsException
from datetime import timedelta
import requests
import uvicorn
from fastapi import Depends
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi_login import LoginManager
from server.user_server import *
from server.rank_data import *
from server.music_server import *
env = os.environ
app = FastAPI()
# 配置 CORS 中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 允许所有来源,可以根据需求进行配置
allow_credentials=True,
allow_methods=["*"], # 允许所有请求方法
allow_headers=["*"], # 允许所有请求头
)
SECRET = os.urandom(24).hex()
manager = LoginManager(SECRET, token_url='/auth/token', use_cookie=False)
try:
# mysql数据配置
con = pymysql.connect(host=mysql_host, user=mysql_user, password=mysql_password, port=mysql_port,
charset="utf8", database=mysql_database)
# cursor = con.cursor()
except Exception as e:
print(e)
print("数据库连接失败")
@manager.user_loader()
def load_user(username: int):
"""
获取用户信息
:param username:
:return:
"""
try:
with con.cursor() as cursor:
cursor.execute("SELECT * FROM User WHERE username = %s", (username,))
user = cursor.fetchone()
if user:
user_dict = dict(username=user[1], password=user[3])
return user_dict
else:
return None
except Exception as e:
print(e)
return None
# @app.get('/auth/cookie')
# def auth(response: Response, user=Depends(manager)):
# """
# 通过cookie验证用户
# :param response:
# :param user:
# :return:
# """
# # 查询用户信息
# cursor.execute("SELECT * FROM User WHERE email = %s", (user['sub'],))
# user = cursor.fetchone()
# # 生成token
# token = manager.create_access_token(
# data=dict(sub=user[2])
# )
# manager.set_cookie(response, token)
# return response
@app.post('/auth/get_token')
def get_token(data: OAuth2PasswordRequestForm = Depends()):
username = data.username
password = md5(data.password)
user = load_user(username) # we are using the same function to retrieve the user
if not user:
raise InvalidCredentialsException # you can also use your own HTTPException
elif password != user['password']:
raise InvalidCredentialsException
access_token = manager.create_access_token(
data=dict(sub=username),
expires=timedelta(hours=12)
)
return {'access_token': access_token, 'token_type': 'bearer'}
# 验证Token
@app.get('/token/verify_user')
def get_current_user(current_user=Depends(manager.get_current_user)):
if current_user:
return {"status":200 ,"success": True, "message": "Validated successfully"}
else:
return {"status":400 ,"success": False, "message": "Invalid token"}
# @app.get('/protected')
# def protected_route(user=Depends(manager)):
# return "验证通过"
# 注册
@app.post("/user/register")
async def user_register(
username: str = fastapi.Query(..., description="用户名"),
password: str = fastapi.Query(..., description="密码"),
email: str = fastapi.Query(None, description="邮箱")):
"""
注册
:param username:
:param password:
:param email:
:return:
"""
create_user(username, password)
# 获取Token
token_info = get_token(OAuth2PasswordRequestForm(username=username, password=password))
return {"success": True, "message": "注册成功", "token": token_info}
# 登陆
@app.post("/user/login")
async def user_login(
username: str = fastapi.Query(..., description="用户名"),
password: str = fastapi.Query(..., description="密码")):
"""
登陆并获取Token
:param username:
:param password:
:return:
"""
# 请求auth/token接口获取token
token_info = get_token(OAuth2PasswordRequestForm(username=username, password=password))
login_info = user_login(username, password)
if login_info:
return {"success": True, "message": "登陆成功", "token": token_info}
else:
return {"success": False, "message": "登陆失败"}
# 榜单获取
@app.get("/get_rank")
async def get_rank(
rank_id: str = fastapi.Query(..., description="榜单类型"),
current_user=Depends(manager.get_current_user)):
"""
:param rank_id:
19723756 云音乐飙升榜
3779629 云音乐新歌榜
3778678 云音乐热歌榜
2884035 云音乐原创榜
:return:
"""
headers = {
"Referer": "https://music.163.com/",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
" AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.190 Safari/537.36"
}
url = f"https://music.163.com/api/playlist/detail?id={rank_id}"
# 缓存文件不存在,从网络获取榜单数据
response = requests.get(url, headers=headers)
# 如果请求code为-447 那么继续返回json
if response.status_code == -447:
return {"message": "response code is -447, local json data", "data": response.json()}
else:
# 检测是否含有带rank_id的缓存文件
cached_data = read_cache(rank_id)
if cached_data:
return {"message": "local json data", "data": cached_data}
# 将榜单数据写入缓存文件
write_cache(rank_id, response.json())
print(response.json())
data = response.json()
return {"message": "Request Netease Cloud And Local Cache JSON Success", "data": data}
@app.get("/search_song_by_name")
async def search_song_by_name(
name: str = fastapi.Query(..., description="歌曲名称"),
# current_user=Depends(manager.get_current_user)
):
payload = {
"input": name,
"filter": "name",
"type": "netease", # netease, tencent, kugou, xiami, baidu
"page": 1
}
url = "https://sunpma.com/other/musicss/"
# Headers for the POST request
headers = {
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"Origin": "https://sunpma.com",
"Referer": f"https://sunpma.com/other/musicss/?name={payload['input']}&type={payload['type']}",
"Sec-Ch-Ua": "\"Not.A/Brand\";v=\"8\", \"Chromium\";v=\"114\", \"Google Chrome\";v=\"114\"",
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": "\"macOS\"",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"X-Requested-With": "XMLHttpRequest"
}
# 将所有 headers 的值用 'utf-8' 进行编码
headers = {key: value.encode('utf-8') for key, value in headers.items()}
# Making the POST request
response = requests.post(url, data=payload, headers=headers)
# Check if the request was successful (status code 200)
if response.status_code == 200:
# Parse the response JSON data
data = response.json()
return {"message": "success", "data": data}
else:
print(f"Failed to get data. Status code: {response.status_code}")
return {"message": "failed", "data": []}
# 根据歌曲id搜索歌曲
@app.get("/search_song_by_id")
async def search_song_by_id(
music_id: str = fastapi.Query(..., description="歌曲id"),
# current_user=Depends(manager.get_current_user)
):
payload = {
"input": music_id,
"filter": "id",
"type": "netease", # netease, tencent, kugou, xiami, baidu
"page": 1
}
headers = {
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"Origin": "https://sunpma.com",
"Referer": f"https://sunpma.com/other/musicss/?id={payload['input']}&type=netease",
"Sec-Ch-Ua": "\"Not.A/Brand\";v=\"8\", \"Chromium\";v=\"114\", \"Google Chrome\";v=\"114\"",
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": "\"macOS\"",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"X-Requested-With": "XMLHttpRequest"
}
url = "https://sunpma.com/other/musicss/"
# Making the POST request
response = requests.post(url, data=payload, headers=headers)
# Check if the request was successful (status code 200)
if response.status_code == 200:
# Parse the response JSON data
data = response.json()
print(data)
try:
save_search_music(data)
except Exception as e:
print(e)
return {"message": "success", "data": data}
else:
print(f"Failed to get data. Status code: {response.status_code}")
return {"message": "failed", "data": []}
if __name__ == '__main__':
host = env.get("HOST") if env.get("HOST") is not None else "0.0.0.0"
port = int(env.get("PORT")) if env.get("PORT") is not None else 7888
uvicorn.run(app='api:app', host=host, port=port, reload=True)