Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 100 additions & 55 deletions Crawler/Kinetics/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,22 @@
import os
import shutil
import subprocess
from typing import Dict, List, Set
import uuid
from collections import OrderedDict
from pathlib import Path

from joblib import delayed
from joblib import Parallel
import pandas as pd
from functools import partial

import random
def generate_key():
STR_KEY_GEN = 'ABCDEFGHIJKLMNOPQRSTUVWXYzabcdefghijklmnopqrstuvwxyz'
return ''.join(random.choice(STR_KEY_GEN) for _ in range(20))

erf = "errorlog" + generate_key()


def create_video_folders(dataset, output_dir, tmp_dir):
Expand Down Expand Up @@ -52,7 +62,7 @@ def construct_video_filename(row, label_to_dir, trim_format='%06d'):
def download_clip(video_identifier, output_filename,
start_time, end_time,
tmp_dir='/tmp/kinetics',
num_attempts=5,
num_attempts=2,
url_base='https://www.youtube.com/watch?v='):
"""Download a video from youtube if exists and is not blocked.

Expand All @@ -76,63 +86,89 @@ def download_clip(video_identifier, output_filename,
status = False
# Construct command line for getting the direct video link.
tmp_filename = os.path.join(tmp_dir,
'%s.%%(ext)s' % uuid.uuid4())
command = ['youtube-dl',
'--quiet', '--no-warnings',
'-f', 'mp4',
'-o', '"%s"' % tmp_filename,
'"%s"' % (url_base + video_identifier)]
command = ' '.join(command)
attempts = 0
while True:
try:
output = subprocess.check_output(command, shell=True,
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as err:
attempts += 1
if attempts == num_attempts:
return status, err.output
else:
break

tmp_filename = glob.glob('%s*' % tmp_filename.split('.')[0])[0]
# Construct command to trim the videos (ffmpeg required).
command = ['ffmpeg',
'-i', '"%s"' % tmp_filename,
'-ss', str(start_time),
'-t', str(end_time - start_time),
'-c:v', 'libx264', '-c:a', 'copy',
'-threads', '1',
'-loglevel', 'panic',
'"%s"' % output_filename]
command = ' '.join(command)
try:
output = subprocess.check_output(command, shell=True,
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as err:
return status, err.output
f'{video_identifier}.mp4')

if not os.path.exists(output_filename):

command = f'youtube-dl -f mp4 --quiet --no-warnings -o {tmp_filename} "{url_base + video_identifier}" && ffmpeg -i {tmp_filename} -ss {str(start_time)} -t {str(end_time - start_time)} -threads 1 -c:v libx264 -c:a copy -loglevel error "{output_filename}" -f mp4 -y'

attempts = 0
while True:
try:
output = subprocess.check_output(command, shell=True,
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as err:
attempts += 1
if attempts == num_attempts:
return status, f"{str(err.output)[:500]}"
else:
break

# Check if the video was successfully saved.
status = os.path.exists(output_filename)
os.remove(tmp_filename)
try:
os.remove(tmp_filename)
except:
pass
return status, 'Downloaded'


def download_clip_wrapper(row, label_to_dir, trim_format, tmp_dir):
def download_clip_wrapper(row, label_to_dir, trim_format, tmp_dir, existing_files:Dict[str,Path]={}):
"""Wrapper for parallel processing purposes."""
output_filename = construct_video_filename(row, label_to_dir,
trim_format)
clip_id = os.path.basename(output_filename).split('.mp4')[0]
if os.path.exists(output_filename):
status = tuple([clip_id, True, 'Exists'])
print(status)
return status
if clip_id in existing_files:
shutil.copyfile(src=existing_files[clip_id],dst=output_filename)
status = tuple([clip_id, True, 'Copied'])
print(status)
return status

downloaded, log = download_clip(row['video-id'], output_filename,
row['start-time'], row['end-time'],
tmp_dir=tmp_dir)
status = tuple([clip_id, downloaded, log])
print(status)
return status

def redownload_clip_wrapper(row, label_to_dir, trim_format, tmp_dir):
"""Wrapper for parallel processing purposes."""
output_filename = construct_video_filename(row, label_to_dir,
trim_format)
clip_id = os.path.basename(output_filename).split('.mp4')[0]
if os.path.exists(output_filename):

check_for_errors_in_file = f'ffmpeg -v error -i "{output_filename}" -f null - 2>{erf} && cat {erf}'
try:
output = subprocess.check_output(check_for_errors_in_file, shell=True, stderr=subprocess.STDOUT)
if not output:
status = tuple([clip_id, True, 'Exists'])
print(status)
return status
except subprocess.CalledProcessError as err:
print(err)

print(f"Removing corrupted file: {output_filename}")
try:
os.remove(output_filename)
except:
pass
downloaded, log = download_clip(row['video-id'], output_filename,
row['start-time'], row['end-time'],
tmp_dir=tmp_dir)
status = tuple([clip_id, downloaded, log])
print(status)
return status
else:
# Was never able to download clip
status = tuple([clip_id, True, "Could not download"])
print(status)
return status


def parse_kinetics_annotations(input_csv, ignore_is_cc=False):
"""Returns a parsed DataFrame.
Expand Down Expand Up @@ -164,33 +200,40 @@ def parse_kinetics_annotations(input_csv, ignore_is_cc=False):

def main(input_csv, output_dir,
trim_format='%06d', num_jobs=24, tmp_dir='/tmp/kinetics',
drop_duplicates=False):
drop_duplicates=False, download_mode="download", source_dir="", reverse=False):

# Reading and parsing Kinetics.
dataset = parse_kinetics_annotations(input_csv)
# if os.path.isfile(drop_duplicates):
# print('Attempt to remove duplicates')
# old_dataset = parse_kinetics_annotations(drop_duplicates,
# ignore_is_cc=True)
# df = pd.concat([dataset, old_dataset], axis=0, ignore_index=True)
# df.drop_duplicates(inplace=True, keep=False)
# print(dataset.shape, old_dataset.shape)
# dataset = df
# print(dataset.shape)
if reverse:
dataset.reindex(index=dataset.index[::-1])

# Make catalogue of existing files
source_dir = Path(source_dir)
if source_dir.is_dir():
print("Looking through source dir")
existing_files = {str(p.stem): p for p in source_dir.rglob("*.mp4")} if source_dir.is_dir() else {}
print("Done looking")
else:
existing_files = {}

# Creates folders where videos will be saved later.
label_to_dir = create_video_folders(dataset, output_dir, tmp_dir)

run = {
"download": partial(download_clip_wrapper, existing_files=existing_files),
"redownload": redownload_clip_wrapper
}[download_mode]

# Download all clips.
if num_jobs == 1:
status_lst = []
for i, row in dataset.iterrows():
status_lst.append(download_clip_wrapper(row, label_to_dir,
trim_format, tmp_dir))
status_lst.append(run(row, label_to_dir, trim_format, tmp_dir))
else:
status_lst = Parallel(n_jobs=num_jobs)(delayed(download_clip_wrapper)(
row, label_to_dir,
trim_format, tmp_dir) for i, row in dataset.iterrows())
status_lst = Parallel(n_jobs=num_jobs)(
delayed(run)(row, label_to_dir, trim_format, tmp_dir)
for i, row in dataset.iterrows()
)

# Clean tmp dir.
shutil.rmtree(tmp_dir)
Expand All @@ -212,9 +255,11 @@ def main(input_csv, output_dir,
help=('This will be the format for the '
'filename of trimmed videos: '
'videoid_%0xd(start_time)_%0xd(end_time).mp4'))
p.add_argument('-n', '--num-jobs', type=int, default=24)
p.add_argument('-n', '--num-jobs', type=int, default=1)
p.add_argument('-t', '--tmp-dir', type=str, default='/tmp/kinetics')
p.add_argument('--drop-duplicates', type=str, default='non-existent',
help='Unavailable at the moment')
# help='CSV file of the previous version of Kinetics.')
p.add_argument('-m', '--download_mode', type=str, default='download', choices=["download", "redownload"])
p.add_argument('-s', '--source-dir', type=str, default='', help="Directory in which to look for files before download.")
p.add_argument('-r', '--reverse', action="store_true", help="Reverse direction of download.")
main(**vars(p.parse_args()))
59 changes: 34 additions & 25 deletions Crawler/Kinetics/environment.yml
Original file line number Diff line number Diff line change
@@ -1,28 +1,37 @@
name: kinetics
channels: !!python/tuple
- !!python/unicode
'defaults'
channels:
- anaconda
- menpo
- conda-forge
- defaults
dependencies:
- joblib=0.9.4=py27_0
- menpo::ffmpeg=3.1.3=0
- mkl=2017.0.1=0
- numpy=1.12.1=py27_0
- openssl=1.0.2k=1
- pandas=0.19.2=np112py27_1
- pip=9.0.1=py27_1
- python=2.7.13=0
- python-dateutil=2.6.0=py27_0
- pytz=2017.2=py27_0
- readline=6.2=2
- setuptools=27.2.0=py27_0
- six=1.10.0=py27_0
- sqlite=3.13.0=0
- tk=8.5.18=0
- wheel=0.29.0=py27_0
- zlib=1.2.8=3
- pip:
- decorator==4.0.11
- olefile==0.44
- youtube-dl==2017.6.5
prefix: /home/cabaf/.conda/envs/kinetics
- ca-certificates=2020.1.1
- certifi=2020.4.5.1
- ffmpeg=2.8.6
- libcxx=10.0.0
- libedit=3.1.20181209
- libffi=3.3
- ncurses=6.2
- openssl=1.1.1g
- pip=20.0.2
- python=3.7.7
- readline=8.0
- setuptools=46.4.0
- sqlite=3.31.1
- tk=8.6.8
- wheel=0.34.2
- xz=5.2.5
- zlib=1.2.11
- pip:
- decorator==4.4.2
- intel-openmp==2019.0
- joblib==0.15.1
- mkl==2019.0
- numpy==1.18.4
- olefile==0.46
- pandas==1.0.3
- python-dateutil==2.8.1
- pytz==2020.1
- six==1.14.0
- youtube-dl==2020.5.8