diff --git a/axeman/core.py b/axeman/core.py index d67a4c2..bf2d44f 100644 --- a/axeman/core.py +++ b/axeman/core.py @@ -1,5 +1,6 @@ import argparse import asyncio +import time from collections import deque import uvloop @@ -25,7 +26,7 @@ from . import certlib -DOWNLOAD_CONCURRENCY = 50 +DOWNLOAD_CONCURRENCY = 5 MAX_QUEUE_SIZE = 1000 async def download_worker(session, log_info, work_deque, download_queue): @@ -37,17 +38,29 @@ async def download_worker(session, log_info, work_deque, download_queue): logging.debug("[{}] Queueing up blocks {}-{}...".format(log_info['url'], start, end)) - for x in range(3): + for x in range(30): try: async with session.get(certlib.DOWNLOAD.format(log_info['url'], start, end)) as response: + if response.status == 429: + logging.info("got http status {}-{}".format(x, response.status)) + time.sleep(min(x + 1, 10)) + continue + if response.content_type != 'application/json': + text = await response.text() + logging.error("got response {}-{}-{}".format(x, response.content_type, text)) entry_list = await response.json() logging.debug("[{}] Retrieved blocks {}-{}...".format(log_info['url'], start, end)) + if entry_list.get('error_code', '') == 'rate_limited': + logging.info("{}-{}".format(x, entry_list)) + time.sleep(min(x + 1, 10)) + continue break except Exception as e: logging.error("Exception getting block {}-{}! {}".format(start, end, e)) + time.sleep(min(x+1, 10)) else: # Notorious for else, if we didn't encounter a break our request failed 3 times D: with open('/tmp/fails.csv', 'a') as f: - f.write(",".join([log_info['url'], str(start), str(end)])) + f.write(",".join([log_info['url'], str(start), str(end)])+"\n") return for index, entry in zip(range(start, end + 1), entry_list['entries']): @@ -143,9 +156,14 @@ async def processing_coro(download_results_queue, output_dir="/tmp"): logging.debug("Got a chunk of {}. Mapping into process pool".format(process_pool.pool_workers)) - for entry in entries_iter: - csv_storage = '{}/certificates/{}'.format(output_dir, entry['log_info']['url'].replace('/', '_')) + dir_index = 0 + for x in entry['entries']: + dir_index = max(dir_index, int(x['cert_index']/1000000)) + + csv_storage = '{}/certificates/{}/{}'.format(output_dir, + entry['log_info']['url'].replace('/', '_'), + str(dir_index)) if not os.path.exists(csv_storage): print("[{}] Making dir...".format(os.getpid())) os.makedirs(csv_storage) @@ -272,7 +290,8 @@ def main(): parser.add_argument('-v', dest="verbose", action="store_true", help="Print out verbose/debug info") - parser.add_argument('-c', dest='concurrency_count', action='store', default=50, type=int, help="The number of concurrent downloads to run at a time") + parser.add_argument('-c', dest='concurrency_count', action='store', default=DOWNLOAD_CONCURRENCY, type=int, + help="The number of concurrent downloads to run at a time, default "+str(DOWNLOAD_CONCURRENCY)) args = parser.parse_args()