Skip to content

Commit c2635d5

Browse files
authored
Add parallel compression for multithreaded process (#711)
* Add parallel compression for multithreaded process * Ammendments to the parallel_compression logic to clean the code and make it more modular depending on threads count
1 parent a0e6ea4 commit c2635d5

File tree

1 file changed

+46
-13
lines changed

1 file changed

+46
-13
lines changed

code/common/compress.q

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,30 @@ showcomp:{[hdbpath;csvpath;maxage]
129129

130130
compressfromtable:{[table]
131131
statstab::([] file:`$(); algo:`int$(); compressedLength:`long$();uncompressedLength:`long$());
132-
{compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} each table;}
132+
/ Check if process is single threaded - if multi then compress in parallel then clean up after
133+
// Add metrics on any files due to be compressed to be used afterwards for comparison
134+
table:update compressionvaluepre:{(-21!x)`compressedLength}'[fullpath] from table;
135+
$[0= system"s";
136+
singlethreadcompress[table];
137+
multithreadcompress[table]];
138+
/ Update the stats tab table after the compression
139+
{statstabupdate[x`fullpath;x`calgo;x`currentsize;x`compressionvaluepre]} each table}
140+
141+
statstabupdate:{[file;algo;sizeuncomp;compressionvaluepre]
142+
if[not compressionvaluepre ~ (-21!file)`compressedLength;
143+
statstab,:$[not 0=algo;(file;algo;(-21!file)`compressedLength;sizeuncomp);(file;algo;compressionvaluepre;sizeuncomp)]]}
144+
145+
singlethreadcompress:{[table]
146+
.lg.o[`compression; "Single threaded process, compress applied sequentially"];
147+
{compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize];
148+
cleancompressed[x `fullpath;x `calgo]} each table;
149+
}
150+
151+
multithreadcompress:{[table]
152+
.lg.o[`compression; "Multithreaded process, compress applied in parallel "];
153+
{compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} peach table;
154+
{cleancompressed[x `fullpath;x `calgo]} each table;
155+
}
133156

134157
/- call the compression with a max age paramter implemented
135158
compressmaxage:{[hdbpath;csvpath;maxage]
@@ -160,23 +183,33 @@ compress:{[filetoCompress;algo;blocksize;level;sizeuncomp]
160183
$[((0 = count -21!filetoCompress) & not 0 = algo)|((not 0 = count -21!filetoCompress) & 0 = algo);
161184
[.lg.o[`compression;cmp,"compressing ","file ", (string filetoCompress), " with algo: ", (string algo), ", blocksize: ", (string blocksize), ", and level: ", (string level), "."];
162185
/ perform the compression/decompression
163-
if[0=algo;comprL:(-21!filetoCompress)`compressedLength];
164186
-19!(filetoCompress;compressedFile;blocksize;algo;level);
165-
/ check the compressed/decomp file and move if appropriate; else delete compressed file and log error
166-
$[((get compressedFile)~sf:get filetoCompress) & (count -21!compressedFile) or algo=0;
167-
[.lg.o[`compression;"File ",cmp,"compressed ","successfully; matches orginal. Deleting original."];
168-
system "r ", (last ":" vs string compressedFile)," ", last ":" vs string filetoCompress;
169-
/ move the hash files too.
170-
hashfilecheck[compressedFile;filetoCompress;sf];
171-
/-log to the table if the algo wasn't 0
172-
statstab,:$[not 0=algo;(filetoCompress;algo;(-21!filetoCompress)`compressedLength;sizeuncomp);(filetoCompress;algo;comprL;sizeuncomp)]];
173-
[$[not count -21!compressedFile;
174-
[.lg.o[`compression; "Failed to compress file ",string[filetoCompress]];hdel compressedFile];
175-
[.lg.o[`compression;cmp,"compressed ","file ",string[compressedFile]," doesn't match original. Deleting new file"];hdel compressedFile]]]]
176187
];
177188
/ if already compressed/decompressed, then log that and skip.
178189
.lg.o[`compression; "file ", (string filetoCompress), " is already ",cmp,"compressed",". Skipping this file"]]}
179190

191+
cleancompressed:{[filetoCompress;algo]
192+
compressedFile: hsym `$(string filetoCompress),"_kdbtempzip";
193+
cmp:$[algo=0;"de";""];
194+
// Verify compressed file exists
195+
if[()~ key compressedFile;
196+
.lg.o[`compression; "No compressed file present for the following file - ",string[filetoCompress]];
197+
:();
198+
];
199+
// Verify compressed file's contents match original
200+
if[not ((get compressedFile)~sf:get filetoCompress) & (count -21!compressedFile) or algo=0;
201+
.lg.o[`compression;cmp,"compressed ","file ",string[compressedFile]," doesn't match original. Deleting new file"];
202+
hdel compressedFile;
203+
:();
204+
];
205+
// Given above two checks satisfied run the delete of old and rename compressed to original name
206+
.lg.o[`compression;"File ",cmp,"compressed ",string[filetoCompress]," successfully; matches orginal. Deleting original."];
207+
system "r ", (last ":" vs string compressedFile)," ", last ":" vs string filetoCompress;
208+
/ move the hash files too.
209+
hashfilecheck[compressedFile;filetoCompress;sf];
210+
}
211+
212+
180213
hashfilecheck:{[compressedFile;filetoCompress;sf]
181214
/ if running 3.6 or higher, account for anymap type for nested lists
182215
/ check for double hash file if nested data contains symbol vector/atom

0 commit comments

Comments
 (0)