newsspider/decspider/myutils.py

110 lines
3.8 KiB
Python
Raw Normal View History

2024-05-17 13:49:44 +08:00
#!/usr/bin/env python
# -- coding: utf-8 --
import time, random, os, sys, json
import requests, datetime
from .settings import SECRETID, SECRETKEY, PROXYPOOL_UPDATENUM, PROXYPOOL_MIN_NUM, PROXYPOOL_MIN_DURATION
SECRET_PATH = './.secret'
def _get_secret_token():
r = requests.post(url='https://auth.kdlapi.com/api/get_secret_token', data={'secret_id': SECRETID, 'secret_key': SECRETKEY})
if r.status_code != 200:
raise KdlException(r.status_code, r.content.decode('utf8'))
res = json.loads(r.content.decode('utf8'))
code, msg = res['code'], res['msg']
if code != 0:
raise KdlException(code, msg)
secret_token = res['data']['secret_token']
expire = str(res['data']['expire'])
_time = '%.6f' % time.time()
return secret_token, expire, _time
def _read_secret_token():
with open(SECRET_PATH, 'r') as f:
token_info = f.read()
secret_token, expire, _time, last_secret_id = token_info.split('|')
if float(_time) + float(expire) - 3 * 60 < time.time() or SECRETID != last_secret_id: # 还有3分钟过期或SecretId变化时更新
secret_token, expire, _time = _get_secret_token()
with open(SECRET_PATH, 'w') as f:
f.write(secret_token + '|' + expire + '|' + _time + '|' + SECRETID)
return secret_token
def get_secret_token():
if os.path.exists(SECRET_PATH):
secret_token = _read_secret_token()
else:
secret_token, expire, _time = _get_secret_token()
with open(SECRET_PATH, 'w') as f:
f.write(secret_token + '|' + expire + '|' + _time + '|' + SECRETID)
return secret_token
class KdlException(Exception):
"""异常类"""
def __init__(self, code=None, message=None):
self.code = code
if sys.version_info[0] < 3 and isinstance(message, unicode):
message = message.encode("utf8")
self.message = message
self._hint_message = "[KdlException] code: {} message: {}".format(self.code, self.message)
@property
def hint_message(self):
return self._hint_message
@hint_message.setter
def hint_message(self, value):
self._hint_message = value
def __str__(self):
if sys.version_info[0] < 3 and isinstance(self.hint_message, unicode):
self.hint_message = self.hint_message.encode("utf8")
return self.hint_message
class ProxyPool:
def __init__(self):
self.update_num = PROXYPOOL_UPDATENUM
self.min_num = PROXYPOOL_MIN_NUM
self.min_duration = PROXYPOOL_MIN_DURATION
self.signature = get_secret_token()
self.api_url = f'https://dps.kdlapi.com/api/getdps/?secret_id={SECRETID}&signature={self.signature}&num={self.update_num}&pt=1&format=json&sep=1'
self.proxy_list = []
def get_one(self):
self.ensure_min_num()
_proxy_list = []
while not _proxy_list:
last_got = datetime.datetime.now() - datetime.timedelta(seconds=self.min_duration)
_proxy_list = [p for p in self.proxy_list if p['last_got_time'] < last_got]
_proxy = random.choice(_proxy_list)
_proxy['last_got_time'] = datetime.datetime.now()
return _proxy['proxy']
def remove(self, proxy:str):
self.proxy_list = [p for p in self.proxy_list if p['proxy'] != proxy]
self.ensure_min_num()
def ensure_min_num(self):
while len(self.proxy_list) < self.min_num:
new_proxy_list = requests.get(self.api_url).json().get('data').get('proxy_list')
_proxy_list = [{'proxy': p, 'last_got_time': datetime.datetime(2020, 10, 1, 12, 30, 30, 100000)} for p in new_proxy_list]
self.proxy_list.extend(_proxy_list)
if __name__ == '__main__':
proxypool = ProxyPool()
print(proxypool.get_one())