#!/usr/bin/env python # -- coding: utf-8 -- import time, random, os, sys, json import requests, datetime from .settings import SECRETID, SECRETKEY, PROXYPOOL_UPDATENUM, PROXYPOOL_MIN_NUM, PROXYPOOL_MIN_DURATION def get_secret_token(): r = requests.post(url='https://auth.kdlapi.com/api/get_secret_token', data={'secret_id': SECRETID, 'secret_key': SECRETKEY}) if r.status_code != 200: raise KdlException(r.status_code, r.content.decode('utf8')) res = json.loads(r.content.decode('utf8')) code, msg = res['code'], res['msg'] if code != 0: raise KdlException(code, msg) secret_token = res['data']['secret_token'] return secret_token class KdlException(Exception): """异常类""" def __init__(self, code=None, message=None): self.code = code if sys.version_info[0] < 3 and isinstance(message, unicode): message = message.encode("utf8") self.message = message self._hint_message = "[KdlException] code: {} message: {}".format(self.code, self.message) @property def hint_message(self): return self._hint_message @hint_message.setter def hint_message(self, value): self._hint_message = value def __str__(self): if sys.version_info[0] < 3 and isinstance(self.hint_message, unicode): self.hint_message = self.hint_message.encode("utf8") return self.hint_message class ProxyPool: def __init__(self): self.update_num = PROXYPOOL_UPDATENUM self.min_num = PROXYPOOL_MIN_NUM self.min_duration = PROXYPOOL_MIN_DURATION self.signature = get_secret_token() self.api_url = f'https://dps.kdlapi.com/api/getdps/?secret_id={SECRETID}&signature={self.signature}&num={self.update_num}&pt=1&format=json&sep=1' self.proxy_list = [] def get_one(self): self.ensure_min_num() _proxy_list = [] while not _proxy_list: last_got = datetime.datetime.now() - datetime.timedelta(seconds=self.min_duration) _proxy_list = [p for p in self.proxy_list if p['last_got_time'] < last_got] _proxy = random.choice(_proxy_list) _proxy['last_got_time'] = datetime.datetime.now() return _proxy['proxy'] def remove(self, proxy:str): self.proxy_list = [p for p in self.proxy_list if p['proxy'] != proxy] self.ensure_min_num() def ensure_min_num(self): while len(self.proxy_list) < self.min_num: new_proxy_list = requests.get(self.api_url).json().get('data').get('proxy_list') _proxy_list = [{'proxy': p, 'last_got_time': datetime.datetime(2020, 10, 1, 12, 30, 30, 100000)} for p in new_proxy_list] self.proxy_list.extend(_proxy_list) if __name__ == '__main__': proxypool = ProxyPool() print(proxypool.get_one())