Skip to content

Commit 3e506ef

Browse files
authored
Partbyinitialchar (#712)
* Commit for parting by first character of symbol column in WDB, and end of day sort/merge for this partitioning
1 parent c2635d5 commit 3e506ef

File tree

3 files changed

+85
-20
lines changed

3 files changed

+85
-20
lines changed

code/processes/idb.q

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,3 +113,8 @@ maptoint:{[val]
113113
/- if using a symbol column, enumerate against the hdb sym file
114114
sym?`TORQNULLSYMBOL^val]
115115
};
116+
117+
/- helper function to support queries against the sym column in partbyfirstchar
118+
mapfctoint:{[val]
119+
.Q.an?$[0<type val;first each;first] string val
120+
};

code/processes/wdb.q

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ writedownmode:@[value;`writedownmode;`default]; /-the
3030
/- at EOD the data will be merged from each partition before being moved to hdb
3131
/- 3. partbyenum - the data is partitioned by [ partitiontype ] and a symbol or integer column with parted attribution assigned in sort.csv
3232
/- at EOD the data will be merged from each partition before being moved to hdb
33+
/- 3. partbyfirstchar - the data is partitioned by [ partitiontype ] and a symbol column based on the first character with parted attribution assigned in sort.csv
34+
/- at EOD the data will be sorted and merged from each partition before being moved to hdb
35+
3336

3437
mergemode:@[value;`mergemode;`part]; /-the partbyattr writedown mode can merge data from temporary storage to the hdb in three ways:
3538
/- 1. part - the entire partition is merged to the hdb
@@ -52,7 +55,6 @@ tpcheckcycles:@[value;`tpcheckcycles;0W]; /-num
5255

5356
sorttypes:@[value;`sorttypes;`sort]; /-list of sort types to look for upon a sort
5457
sortworkertypes:@[value;`sortworkertypes;`sortworker]; /-list of sort types to look for upon a sort being called with worker process
55-
5658
wdbtypes:@[value;`wdbtypes;`wdb]; /-list of wdb types for sort processes to look for on initmissingtables
5759

5860
subtabs:@[value;`subtabs;`]; /-list of tables to subscribe for
@@ -92,7 +94,7 @@ saveenabled: any `save`saveandsort in mode;
9294
sortenabled: any `sort`saveandsort in mode;
9395

9496
/- parted writedown modes have special behaviour during merging or WDB initialisation
95-
partwritemodes:`partbyattr`partbyenum;
97+
partwritemodes:`partbyattr`partbyenum`partbyfirstchar;
9698

9799
/ - log which modes are enabled
98100
switch: string `off`on;
@@ -125,19 +127,24 @@ maptoint:{[val]
125127
`long$ (` sv hdbsettings[`hdbdir],`sym)?`TORQNULLSYMBOL^val]
126128
};
127129

130+
mapfctoint:{[val]
131+
.Q.an?$[0<type val;first each;first] string val
132+
};
133+
128134
/- function to upsert to specified directory
129135
upserttopartition:{[dir;tablename;tabdata;pt;expttype;expt;writedownmode]
130136
/- enumerate first extra partition value
131137
if[writedownmode~`partbyenum;i:maptoint first expt];
138+
if[writedownmode~`partbyfirstchar;i:mapfctoint first expt];
132139
/- create directory location for selected partition
133140
/- replace non-alphanumeric characters in symbols with _
134141
/- convert to symbols and replace any null values with `TORQNULLSYMBOL
135-
directory:$[writedownmode~`partbyenum;
142+
directory:$[writedownmode in `partbyenum`partbyfirstchar;
136143
` sv .Q.par[dir;pt;`$string i],tablename,`;
137-
` sv .Q.par[dir;pt;tablename],(`$"_"^.Q.an .Q.an?"_" sv string `TORQNULLSYMBOL^ ensuresymlist[expt]),`];
144+
` sv .Q.par[dir;pt;tablename],(`$"_"^.Q.an .Q.an?"_" sv string `TORQNULLSYMBOL^ ensuresymlist[expt]),`];
138145
.lg.o[`save;"saving ",(string tablename)," data to partition ",string directory];
139146
/- selecting rows of table with matching partition
140-
r:?[tabdata;$[writedownmode~`partbyenum;enlist(in;first expttype;expt);{(x;y;(),z)}[in;;]'[expttype;expt]];0b;()];
147+
r:?[tabdata;$[writedownmode in `partbyenum`partbyfirstchar;enlist(in;first expttype;expt);{(x;y;(),z)}[in;;]'[expttype;expt]];0b;()];
141148
/- upsert selected data matched on partition to specific directory
142149
.[upsert;(directory;r);{[e] .lg.e[`savetablesbypart;"Failed to save table to disk : ",e];'e}];
143150
.lg.o[`track;"appending details to partsizes"];
@@ -152,8 +159,8 @@ savetablesbypart:{[dir;pt;forcesave;tablename;writedownmode]
152159
.lg.o[`rowcheck;"the ",(string tablename)," table consists of ", (string arows), " rows"];
153160
/- get additional partition(s) defined by parted attribute in sort.csv
154161
extrapartitiontype:.merge.getextrapartitiontype[tablename];
155-
if[(writedownmode~`partbyenum) and 1<c:count extrapartitiontype;
156-
.lg.e[`partbyenum;"only 1 parted attribute should be defined on table when partbyenum writedown mode is used but we have ",string c]
162+
if[(writedownmode in `partbyenum`partbyfirstchar) and 1<c:count extrapartitiontype;
163+
.lg.e[writedownmode;"only 1 parted attribute should be defined on table when using partbyenum and partbyfirstchar writedown modes, but we have ",string c]
157164
];
158165
/- check each partition type actually is a column in the selected table
159166
.merge.checkpartitiontype[tablename;extrapartitiontype];
@@ -182,7 +189,7 @@ savetables:$[writedownmode in partwritemodes;savetablesbypart[;;;;writedownmode]
182189
savetodisk:{[]
183190
changes:savetables[savedir;getpartition[];immediate;] each tablelist[];
184191
/- we have to let the idbs know of the changes in the wdbhdb. using filldb[] to make sure it is a db with all the tables
185-
if[any[changes] and writedownmode in `partbyenum`default;filldb getpartition[];notifyidbs[`.idb.intradayreload;enlist()]]};
192+
if[any[changes] and writedownmode in `partbyenum`partbyfirstchar`default;filldb getpartition[];notifyidbs[`.idb.intradayreload;enlist()]]};
186193

187194
/- send an intraday reload message to idbs:
188195
notifyidbs:{[func;params]
@@ -332,8 +339,8 @@ merge:{[dir;pt;tableinfo;mergelimits;hdbsettings;mergemethod;writedownmode]
332339
setcompression[hdbsettings[`compression]];
333340
/- get tablename
334341
tabname:tableinfo[0];
335-
/- get list of partition directories for specified table - partbyenum uses different folder structure vs partbyattr/default
336-
partdirs:$[writedownmode in `partbyenum;
342+
/- get list of partition directories for specified table - partbyenum & partbyfirstchar use different folder structures vs partbyattr/default
343+
partdirs:$[writedownmode in `partbyenum`partbyfirstchar;
337344
p where 0<count each key each p:` sv' ((-1_` vs p),/:key p:.Q.par[hsym dir;pt;`]),\: tabname;
338345
` sv' tabledir,/:key tabledir:.Q.par[hsym dir;pt;tabname]];
339346
/- we only really have to merge those partitions where we have received some updates, otherwise table is empty
@@ -363,7 +370,7 @@ merge:{[dir;pt;tableinfo;mergelimits;hdbsettings;mergemethod;writedownmode]
363370
.merge.mergehybrid[tableinfo;dest;partdirs;mergelimits[tabname]]
364371
];
365372
.lg.o[`merge;"removing segments ", (", " sv string[partdirs])];
366-
$[writedownmode in `partbyenum;
373+
$[writedownmode in `partbyenum`partbyfirstchar;
367374
removetablefromenumdir each partdirs;
368375
.os.deldir .os.pth[[string[tabledir]]]
369376
];
@@ -384,7 +391,7 @@ removetablefromenumdir:{[partdir]
384391

385392
endofdaymerge:{[dir;pt;tablist;mergelimits;hdbsettings;mergemethod;writedownmode]
386393
/- merge data from partitions
387-
/- .z.pd funciton in finspace will cause an error. Add in this check to skip over the use of .z.pd. This should be temporary and will be removed when issue resolved by AWS.
394+
/- .z.pd function in finspace will cause an error. Add in this check to skip over the use of .z.pd. This should be temporary and will be removed when issue resolved by AWS.
388395
tempfix2:$[.finspace.enabled;0b;(0 < count .z.pd[])];
389396
$[tempfix2 and ((system "s")<0);
390397
[.lg.o[`merge;"merging on worker"];
@@ -423,12 +430,34 @@ endofdaymerge:{[dir;pt;tablist;mergelimits;hdbsettings;mergemethod;writedownmode
423430
];
424431
};
425432

433+
/- end of day sort and merge only used by writedown mode sortbyfirstchar, requiring sort pre-merge
434+
endofdaysortandmerge:{[dir;pt;tablist;mergelimits;hdbsettings;mergemethod;writedownmode]
435+
/-sort permitted tables in database
436+
/- sort the table and garbage collect (if enabled)
437+
.lg.o[`sort;"starting to sort data"];
438+
/- .z.pd function in finspace will cause an error. Add in this check to skip over the use of .z.pd. This should be temporary and will be removed when issue resolved by AWS.
439+
tempfix1:$[.finspace.enabled;0b;count[.z.pd[]]];
440+
tnds:raze{y,/:.Q.dd[x;]each key[x],\:y}[.Q.dd[dir;pt]]each key tablist;
441+
tnds:tnds where tnds[;1] in exec ptdir from .merge.partsizes;
442+
$[tempfix1&0>system"s";
443+
[.lg.o[`sortandmerge;"sorting on worker sort", string .z.p];
444+
{(neg x)(`.wdb.reloadsymfile;y);(neg x)(::)}[;.Q.dd[hdbsettings `hdbdir;`sym]] each .z.pd[];
445+
{[x;compression] setcompression compression;.sort.sorttab x;if[gc;.gc.run[]]}[;hdbsettings`compression] peach tnds];
446+
[.lg.o[`sort;"sorting on main sort"];
447+
reloadsymfile[.Q.dd[hdbsettings `hdbdir;`sym]];
448+
{[x] .sort.sorttab[x];if[gc;.gc.run[]]} each tnds]];
449+
.lg.o[`sort;"finished sorting data"];
450+
endofdaymerge[dir;pt;tablist;mergelimits;hdbsettings;mergemethod;writedownmode];
451+
};
452+
426453
/- end of day sort [depends on writedown mode]
427454
endofdaysort:{[dir;pt;tablist;writedownmode;mergelimits;hdbsettings;mergemethod]
428455
/- set compression level (.z.zd)
429456
setcompression[hdbsettings[`compression]];
430457
$[writedownmode in partwritemodes;
431-
endofdaymerge[dir;pt;tablist;mergelimits;hdbsettings;mergemethod;writedownmode];
458+
$[writedownmode~`partbyfirstchar; /-partbyfirstchar will not be sorted by sym within each parition, this needs done first
459+
endofdaysortandmerge[dir;pt;tablist;mergelimits;hdbsettings;mergemethod;writedownmode];
460+
endofdaymerge[dir;pt;tablist;mergelimits;hdbsettings;mergemethod;writedownmode]];
432461
endofdaysortdate[dir;pt;key tablist;hdbsettings]
433462
];
434463
/- run steps to rollover idb
@@ -534,7 +563,7 @@ fixpartition:{[subto]
534563
];
535564
}
536565

537-
/- for writedown modes partbyenum/default we make sure that partition 0/currentpartition has all the tables.
566+
/- for writedown modes partbyenum/partbyfirstchar/default we make sure that partition 0/currentpartition has all the tables.
538567
/- In that case we can use .Q.chk later to fill the db making it useable for intraday processes
539568
/- pt - date; partition for which the function should initialise
540569
initmissingtables:{[pt]
@@ -550,7 +579,7 @@ filldb:{[pt]
550579

551580
/- initialises table t in db with its schema in part
552581
inittable:{[t;pt]
553-
tabledir:` sv $[writedownmode~`partbyenum; .Q.par[.Q.dd[hsym savedir;pt];0;t]; .Q.par[hsym savedir;pt;t]],`;
582+
tabledir:` sv $[writedownmode in `partbyenum`partbyfirstchar; .Q.par[.Q.dd[hsym savedir;pt];0;t]; .Q.par[hsym savedir;pt;t]],`;
554583
if[() ~ key tabledir;tabledir set .Q.en[hsym hdbdir;0#value t]];
555584
}
556585

@@ -590,7 +619,7 @@ getsortparams:{[]
590619
/- get the attributes csv file
591620
/-even if running with a sort process should read this file to cope with backups
592621
.sort.getsortcsv[sortcsv];
593-
/- check the sort.csv for parted attributes `p if the writedownmode `partbyattr or `partbyenum is selected
622+
/- check the sort.csv for parted attributes `p if the writedownmode `partbyattr, `partbyenum or `partbyfirstchar is selected
594623
/- if each table does not have at least one `p attribute the process will exit
595624
if[writedownmode in partwritemodes;
596625

@@ -612,7 +641,7 @@ getsortparams:{[]
612641
/- If the function is ran on sort process send initmissingtables command to wdbs
613642
idbreload:{[pt]
614643
.lg.o[`idb;"starting idb reload"];
615-
if[writedownmode in `partbyenum`default;
644+
if[writedownmode in `partbyenum`default`partbyfirstchar;
616645
.lg.o[`eod;"initialising wdbhdb for partition: ",string[pt]];
617646
$[.proc.proctype~`sort;{[pt]ws:exec w from .servers.getservers[`proctype;wdbtypes;()!();1b;0b];{[ws;pt]ws(`.wdb.initmissingtables;[pt])}[;pt] each ws}[pt];initmissingtables[pt]];
618647
.lg.o[`eod;"notifying idbs for newly created partition"];

docs/Processes.md

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,6 +1035,24 @@ sorting at the end of the day.
10351035
In the above example, the data is parted by sym, and number 456 is
10361036
the order of MSFT_N symbol entry in the HDB sym file.
10371037

1038+
- partbyfirstchar - Data is persisted to a partition scheme where the partition
1039+
is derived from the first character in sym colum present in the sort.csv
1040+
file. Like partbyenum, this can be only be done by one column which has the
1041+
parted attribute applied to it. It must be a symbol column due the nature
1042+
of the character extraction. The numerical value for characters will map
1043+
to the index of the character in the .Q.an. For those that arent contained i
1044+
within .Q.an, they will map to the count of .Q.an. Partitioning in this way
1045+
means that the data within each partition is not sorted for the parted
1046+
attribute to be applied, which means in the EOD process the data needs sorted
1047+
before being merged. This sort happens partition by partition rather than
1048+
as a whole. The wdb partition scheme is of the form
1049+
\[wdbdir\]/\[partitiontype\]/\[first char index .Q.an\]/\[table(s)\]/
1050+
A typical partition directory would be similar to (for ex sym: MSFT_N)
1051+
wdb/database/2025.11.04/38/trade
1052+
In the above example, the data is parted by sym, and number 38 is the
1053+
index position of M in .Q.an.
1054+
1055+
10381056
The advantage of partbyenum over partbyattr could be that the
10391057
directory structure it uses represents a HDB that is ready to be loaded
10401058
intraday. At the end of the day the data gets upserted to the HDB the
@@ -1046,7 +1064,10 @@ data sets with a low cardinality (ie. small number of distinct elements)
10461064
the optional method may provide a significant time saving, upwards of
10471065
50%. The optional method should also reduce the memory usage at the end
10481066
of day event, as joining data is generally less memory intensive than
1049-
sorting.
1067+
sorting. The optional partbyfirstchar method allows a method for subdividing
1068+
data with a high cardinality to reduce the number of partitions being
1069+
written to, while providing a means for reduced memory footprint on final sort
1070+
versus default.
10501071

10511072
<a name="idb"></a>
10521073

@@ -1056,13 +1077,19 @@ Intraday Database (IDB)
10561077
The Intraday Database or IDB is a simple process that allows access to
10571078
data written down intraday. This assumes that there is an existing WDB
10581079
(and HDB) process creating a DB on disk that can be loaded with a simple
1059-
load command. As of now default and partbyenum WDB writedown modes are supported.
1060-
The responsibility of an IDB is therefore:
1080+
load command. As of now default, partbyenum and partbyfirstchar WDB writedown
1081+
modes are supported. The responsibility of an IDB is therefore:
10611082

10621083
1. Serving queries. Since partbyenum writedown mode is done by enumerated
10631084
symbol columns a helper function maptoint is implemented to support
10641085
symbol lookup in sym file:
10651086
select from trade where int=maptoint[`MSFT_N]
1087+
Also with partbyfirstchar being an alternate approach to create a
1088+
numerical partition, there is a helper function to locate the correct
1089+
value:
1090+
select from trade where int=mapfctoint[`MSFT],sym=`MSFT
1091+
select from trade where int in mapfctoint[`MSFT`AAPL],sym in `MSFT`AAPL
1092+
10661093

10671094
2. Can be triggered for a reload. This is usually done by the WDB process
10681095
periodically.
@@ -1097,6 +1124,10 @@ The IDB can be queried just like any other HDB. If writedown mode partbyenum is
10971124
```
10981125
neg[gwHandle](`.gw.asyncexec;"select from trade where int=maptoint[`GOOG]";`idb);gwHandle[]
10991126
```
1127+
Likewise if partbyfirstchar writedown mode is used there is a "mapfctoint" which can be used
1128+
```
1129+
neg[gwHandle](`.gw.asyncexec;"select from trade where int in maptoint[`GOOG`MSFT],sym in `GOOG`MSFT";`idb);gwHandle[]
1130+
```
11001131

11011132
### Scalability
11021133

0 commit comments

Comments
 (0)