diff --git a/Crawler/Kinetics/download.py b/Crawler/Kinetics/download.py index c8b2cdc..bb840f0 100755 --- a/Crawler/Kinetics/download.py +++ b/Crawler/Kinetics/download.py @@ -4,12 +4,22 @@ import os import shutil import subprocess +from typing import Dict, List, Set import uuid from collections import OrderedDict +from pathlib import Path from joblib import delayed from joblib import Parallel import pandas as pd +from functools import partial + +import random +def generate_key(): + STR_KEY_GEN = 'ABCDEFGHIJKLMNOPQRSTUVWXYzabcdefghijklmnopqrstuvwxyz' + return ''.join(random.choice(STR_KEY_GEN) for _ in range(20)) + +erf = "errorlog" + generate_key() def create_video_folders(dataset, output_dir, tmp_dir): @@ -52,7 +62,7 @@ def construct_video_filename(row, label_to_dir, trim_format='%06d'): def download_clip(video_identifier, output_filename, start_time, end_time, tmp_dir='/tmp/kinetics', - num_attempts=5, + num_attempts=2, url_base='https://www.youtube.com/watch?v='): """Download a video from youtube if exists and is not blocked. @@ -76,63 +86,89 @@ def download_clip(video_identifier, output_filename, status = False # Construct command line for getting the direct video link. tmp_filename = os.path.join(tmp_dir, - '%s.%%(ext)s' % uuid.uuid4()) - command = ['youtube-dl', - '--quiet', '--no-warnings', - '-f', 'mp4', - '-o', '"%s"' % tmp_filename, - '"%s"' % (url_base + video_identifier)] - command = ' '.join(command) - attempts = 0 - while True: - try: - output = subprocess.check_output(command, shell=True, - stderr=subprocess.STDOUT) - except subprocess.CalledProcessError as err: - attempts += 1 - if attempts == num_attempts: - return status, err.output - else: - break - - tmp_filename = glob.glob('%s*' % tmp_filename.split('.')[0])[0] - # Construct command to trim the videos (ffmpeg required). - command = ['ffmpeg', - '-i', '"%s"' % tmp_filename, - '-ss', str(start_time), - '-t', str(end_time - start_time), - '-c:v', 'libx264', '-c:a', 'copy', - '-threads', '1', - '-loglevel', 'panic', - '"%s"' % output_filename] - command = ' '.join(command) - try: - output = subprocess.check_output(command, shell=True, - stderr=subprocess.STDOUT) - except subprocess.CalledProcessError as err: - return status, err.output + f'{video_identifier}.mp4') + + if not os.path.exists(output_filename): + + command = f'youtube-dl -f mp4 --quiet --no-warnings -o {tmp_filename} "{url_base + video_identifier}" && ffmpeg -i {tmp_filename} -ss {str(start_time)} -t {str(end_time - start_time)} -threads 1 -c:v libx264 -c:a copy -loglevel error "{output_filename}" -f mp4 -y' + + attempts = 0 + while True: + try: + output = subprocess.check_output(command, shell=True, + stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as err: + attempts += 1 + if attempts == num_attempts: + return status, f"{str(err.output)[:500]}" + else: + break # Check if the video was successfully saved. status = os.path.exists(output_filename) - os.remove(tmp_filename) + try: + os.remove(tmp_filename) + except: + pass return status, 'Downloaded' -def download_clip_wrapper(row, label_to_dir, trim_format, tmp_dir): +def download_clip_wrapper(row, label_to_dir, trim_format, tmp_dir, existing_files:Dict[str,Path]={}): """Wrapper for parallel processing purposes.""" output_filename = construct_video_filename(row, label_to_dir, trim_format) clip_id = os.path.basename(output_filename).split('.mp4')[0] if os.path.exists(output_filename): status = tuple([clip_id, True, 'Exists']) + print(status) + return status + if clip_id in existing_files: + shutil.copyfile(src=existing_files[clip_id],dst=output_filename) + status = tuple([clip_id, True, 'Copied']) + print(status) return status downloaded, log = download_clip(row['video-id'], output_filename, row['start-time'], row['end-time'], tmp_dir=tmp_dir) status = tuple([clip_id, downloaded, log]) + print(status) return status +def redownload_clip_wrapper(row, label_to_dir, trim_format, tmp_dir): + """Wrapper for parallel processing purposes.""" + output_filename = construct_video_filename(row, label_to_dir, + trim_format) + clip_id = os.path.basename(output_filename).split('.mp4')[0] + if os.path.exists(output_filename): + + check_for_errors_in_file = f'ffmpeg -v error -i "{output_filename}" -f null - 2>{erf} && cat {erf}' + try: + output = subprocess.check_output(check_for_errors_in_file, shell=True, stderr=subprocess.STDOUT) + if not output: + status = tuple([clip_id, True, 'Exists']) + print(status) + return status + except subprocess.CalledProcessError as err: + print(err) + + print(f"Removing corrupted file: {output_filename}") + try: + os.remove(output_filename) + except: + pass + downloaded, log = download_clip(row['video-id'], output_filename, + row['start-time'], row['end-time'], + tmp_dir=tmp_dir) + status = tuple([clip_id, downloaded, log]) + print(status) + return status + else: + # Was never able to download clip + status = tuple([clip_id, True, "Could not download"]) + print(status) + return status + def parse_kinetics_annotations(input_csv, ignore_is_cc=False): """Returns a parsed DataFrame. @@ -164,33 +200,40 @@ def parse_kinetics_annotations(input_csv, ignore_is_cc=False): def main(input_csv, output_dir, trim_format='%06d', num_jobs=24, tmp_dir='/tmp/kinetics', - drop_duplicates=False): + drop_duplicates=False, download_mode="download", source_dir="", reverse=False): # Reading and parsing Kinetics. dataset = parse_kinetics_annotations(input_csv) - # if os.path.isfile(drop_duplicates): - # print('Attempt to remove duplicates') - # old_dataset = parse_kinetics_annotations(drop_duplicates, - # ignore_is_cc=True) - # df = pd.concat([dataset, old_dataset], axis=0, ignore_index=True) - # df.drop_duplicates(inplace=True, keep=False) - # print(dataset.shape, old_dataset.shape) - # dataset = df - # print(dataset.shape) + if reverse: + dataset.reindex(index=dataset.index[::-1]) + + # Make catalogue of existing files + source_dir = Path(source_dir) + if source_dir.is_dir(): + print("Looking through source dir") + existing_files = {str(p.stem): p for p in source_dir.rglob("*.mp4")} if source_dir.is_dir() else {} + print("Done looking") + else: + existing_files = {} # Creates folders where videos will be saved later. label_to_dir = create_video_folders(dataset, output_dir, tmp_dir) + run = { + "download": partial(download_clip_wrapper, existing_files=existing_files), + "redownload": redownload_clip_wrapper + }[download_mode] + # Download all clips. if num_jobs == 1: status_lst = [] for i, row in dataset.iterrows(): - status_lst.append(download_clip_wrapper(row, label_to_dir, - trim_format, tmp_dir)) + status_lst.append(run(row, label_to_dir, trim_format, tmp_dir)) else: - status_lst = Parallel(n_jobs=num_jobs)(delayed(download_clip_wrapper)( - row, label_to_dir, - trim_format, tmp_dir) for i, row in dataset.iterrows()) + status_lst = Parallel(n_jobs=num_jobs)( + delayed(run)(row, label_to_dir, trim_format, tmp_dir) + for i, row in dataset.iterrows() + ) # Clean tmp dir. shutil.rmtree(tmp_dir) @@ -212,9 +255,11 @@ def main(input_csv, output_dir, help=('This will be the format for the ' 'filename of trimmed videos: ' 'videoid_%0xd(start_time)_%0xd(end_time).mp4')) - p.add_argument('-n', '--num-jobs', type=int, default=24) + p.add_argument('-n', '--num-jobs', type=int, default=1) p.add_argument('-t', '--tmp-dir', type=str, default='/tmp/kinetics') p.add_argument('--drop-duplicates', type=str, default='non-existent', help='Unavailable at the moment') - # help='CSV file of the previous version of Kinetics.') + p.add_argument('-m', '--download_mode', type=str, default='download', choices=["download", "redownload"]) + p.add_argument('-s', '--source-dir', type=str, default='', help="Directory in which to look for files before download.") + p.add_argument('-r', '--reverse', action="store_true", help="Reverse direction of download.") main(**vars(p.parse_args())) diff --git a/Crawler/Kinetics/environment.yml b/Crawler/Kinetics/environment.yml index cc6eadb..3285a56 100755 --- a/Crawler/Kinetics/environment.yml +++ b/Crawler/Kinetics/environment.yml @@ -1,28 +1,37 @@ name: kinetics -channels: !!python/tuple -- !!python/unicode - 'defaults' +channels: + - anaconda + - menpo + - conda-forge + - defaults dependencies: -- joblib=0.9.4=py27_0 -- menpo::ffmpeg=3.1.3=0 -- mkl=2017.0.1=0 -- numpy=1.12.1=py27_0 -- openssl=1.0.2k=1 -- pandas=0.19.2=np112py27_1 -- pip=9.0.1=py27_1 -- python=2.7.13=0 -- python-dateutil=2.6.0=py27_0 -- pytz=2017.2=py27_0 -- readline=6.2=2 -- setuptools=27.2.0=py27_0 -- six=1.10.0=py27_0 -- sqlite=3.13.0=0 -- tk=8.5.18=0 -- wheel=0.29.0=py27_0 -- zlib=1.2.8=3 -- pip: - - decorator==4.0.11 - - olefile==0.44 - - youtube-dl==2017.6.5 -prefix: /home/cabaf/.conda/envs/kinetics + - ca-certificates=2020.1.1 + - certifi=2020.4.5.1 + - ffmpeg=2.8.6 + - libcxx=10.0.0 + - libedit=3.1.20181209 + - libffi=3.3 + - ncurses=6.2 + - openssl=1.1.1g + - pip=20.0.2 + - python=3.7.7 + - readline=8.0 + - setuptools=46.4.0 + - sqlite=3.31.1 + - tk=8.6.8 + - wheel=0.34.2 + - xz=5.2.5 + - zlib=1.2.11 + - pip: + - decorator==4.4.2 + - intel-openmp==2019.0 + - joblib==0.15.1 + - mkl==2019.0 + - numpy==1.18.4 + - olefile==0.46 + - pandas==1.0.3 + - python-dateutil==2.8.1 + - pytz==2020.1 + - six==1.14.0 + - youtube-dl==2020.5.8