-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcleanup.py
More file actions
executable file
·104 lines (87 loc) · 3.52 KB
/
cleanup.py
File metadata and controls
executable file
·104 lines (87 loc) · 3.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
# call like this: "./cleanup.py /work/kastnerm/test/"
# script will go in all subdirectories from the given path
# alternatively, set the fallback path here:
prefix = "/work/kastnerm/abc/"
#####################################
if (len(sys.argv) == 2):
prefix = sys.argv[1]
print(prefix)
def check(root, filterGifs = True, filterDuplicates = True, filterCorrupts = True):
from collections import defaultdict
from PIL import Image
import os
import imagehash
import glob
import logging
import cv2
logging.debug("check {}".format(str(root)))
for item in os.listdir(root):
obj = os.path.join(root, item)
if os.path.isdir(obj):
check(obj)
hashtable = defaultdict(lambda: defaultdict(dict))
pre_am = len(glob.glob(root + "/*.*"))
for imagePath in glob.glob(root + "/*.*"):
if os.path.isfile(imagePath):
# Filter GIFs
if(filterGifs):
logging.debug("filter gifs")
#import re
#if (re.search(".gif", imagePath, re.IGNORECASE)):
if (imagePath.endswith(".gif")):
try:
logging.debug("gif file: {}".format(imagePath))
os.remove(imagePath)
except:
pass
continue
# Filter duplicates
if(filterDuplicates):
logging.debug("filter duplicates")
try:
image = Image.open(imagePath)
logging.info(imagePath)
h = str(imagehash.dhash(image))
if (not hashtable[h]):
hashtable[h] = imagePath
else:
logging.debug("collision: {} {}".format(imagePath, hashtable[h]))
os.remove(imagePath)
continue
except:
logging.debug("unreadable file: {}".format(imagePath))
try:
os.remove(imagePath)
except:
pass
continue
# Check corrupt files
# this approach is stupid but it makes sure that all
# common imaging libraries can read this file.
if(filterCorrupts):
logging.debug("filter corrupts")
try:
image = Image.open(imagePath) # Open image in PIL usually already crashes for corrupt ones
imagehash.dhash(image) # Do something
image = cv2.imread(imagePath) # Or maybe OpenCV doesn't like it
cv2.resize(image, (1234, 1234)) # Do something
# by now something should have crashed if the file isn't processable!
except:
logging.debug("unreadable file: {}".format(imagePath))
try:
os.remove(imagePath)
except:
pass
continue
post_am = len(glob.glob(root + "/*.*"))
logging.info("deleted {} files for {}".format(pre_am - post_am, str(root)))
def main():
import logging
logging.basicConfig(
format='[%(asctime)s %(levelname)s] %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logging.DEBUG)
check(prefix, filterGifs=True, filterDuplicates=True, filterCorrupts=True)
if __name__ == "__main__":
main()