From e8417daf15fecb80eb3d51a3f8d5578ff0b20b27 Mon Sep 17 00:00:00 2001 From: Akshay Rai Date: Fri, 12 Sep 2025 13:39:26 +0530 Subject: [PATCH 1/2] Switch from MD5 to CRC32 to improve scan speed --- .../dbreader/MySqlChunkedQueryManager.java | 14 ++++---- .../dbreader/TestDatabaseChunkedReader.java | 6 ++-- .../TestMysqlChunkedQueryManager.java | 32 +++++++++---------- gradle/buildscript.gradle | 1 + 4 files changed, 26 insertions(+), 27 deletions(-) diff --git a/datastream-common/src/main/java/com/linkedin/datastream/common/databases/dbreader/MySqlChunkedQueryManager.java b/datastream-common/src/main/java/com/linkedin/datastream/common/databases/dbreader/MySqlChunkedQueryManager.java index 47a82ea76..e47c141f3 100644 --- a/datastream-common/src/main/java/com/linkedin/datastream/common/databases/dbreader/MySqlChunkedQueryManager.java +++ b/datastream-common/src/main/java/com/linkedin/datastream/common/databases/dbreader/MySqlChunkedQueryManager.java @@ -19,7 +19,7 @@ public class MySqlChunkedQueryManager implements ChunkedQueryManager { private static final String SELECT_FROM = "SELECT * FROM ( "; /** Generate base predicate for sharding keys to given number of partitions. - * Ex: MOD ( CONV ( MD5 ( CONCAT ( K1, K2, K3 ) ) , 16, 10 ) , 10 ) for a table with 3 keys {K1, K2, K3} and 10 partitions */ + * Ex: MOD ( CRC32 ( CONCAT ( K1, K2, K3 ) ), 10 ) for a table with 3 keys {K1, K2, K3} and 10 partitions */ private static String generatePerPartitionHashPredicate(List keys, int partitionCount) { StringBuilder query = new StringBuilder(); int keyCount = keys.size(); @@ -31,17 +31,15 @@ private static String generatePerPartitionHashPredicate(List keys, int p } query.append(" )"); - // Wrap that with MOD, CONV, and MD5 to generate a hash for sharding - // MOD ( CONV ( MD5 ( CONCAT ( A, B, C ) ) , 16, 10 ) , 10 ) - query.insert(0, "MD5 ( ").append(" )"); - // 16, 10 converts from HEX to DEC - query.insert(0, "CONV ( ").append(" , 16, 10 )"); + // Wrap that with MOD, CRC32 to generate a hash for sharding + // MOD ( CRC32 ( CONCAT ( A, B, C ) ) , 10 ) + query.insert(0, "CRC32 ( ").append(" )"); query.insert(0, "MOD ( ").append(" , ").append(partitionCount).append(" )"); return query.toString(); } /** Generate predicate for filtering rows hashing to the assigned partitions : - * Ex: WHERE ( MOD ( CONV ( MD5 ( CONCAT ( K1, K2, K3 ) ) , 16, 10 ) , 10 ) IN (1 , 6 ) ) + * Ex: WHERE ( MOD ( CRC32 ( CONCAT ( K1, K2, K3 ) ), 10 ) IN (1 , 6 ) ) * where 1 and 6 are the assigned partitions, 10 the partition count and, {K1, K2, K3} the keys of the table */ private static String generateFullPartitionHashPredicate(String perPartitionPredicate, List partitions) { @@ -121,7 +119,7 @@ public void prepareChunkedQuery(PreparedStatement stmt, List values) thr // SELECT * FROM // ( // SELECT * FROM TABLE - // ) nestedTab1 WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 , KEY2 ) ) 16, 10 ) , 10 ) IN ( 2 , 5 ) ) + // ) nestedTab1 WHERE ( MOD CRC32 ( CONCAT ( KEY1 , KEY2 ) ) , 10 ) IN ( 2 , 5 ) ) // AND ( ( KEY1 > ? ) OR ( KEY1 = ? AND KEY2 > ? ) ) // ORDER BY KEY1 , KEY2 // ) as nestedTab2 LIMIT 10; diff --git a/datastream-common/src/test/java/com/linkedin/datastream/common/databases/dbreader/TestDatabaseChunkedReader.java b/datastream-common/src/test/java/com/linkedin/datastream/common/databases/dbreader/TestDatabaseChunkedReader.java index 98e010d64..4ac73bb76 100644 --- a/datastream-common/src/test/java/com/linkedin/datastream/common/databases/dbreader/TestDatabaseChunkedReader.java +++ b/datastream-common/src/test/java/com/linkedin/datastream/common/databases/dbreader/TestDatabaseChunkedReader.java @@ -66,7 +66,7 @@ public class TestDatabaseChunkedReader { + "],\"meta\":\"dbTableName=TEST_DB_TEST_TABLE;pk=key1,key2,key3;\"}"; private static final Schema TEST_COMPOSITE_KEY_TABLE_SCHEMA = Schema.parse(TEST_COMPOSITE_KEY_TABLE_SCHEMA_STR); private static final String TEST_SIMPLE_KEY_TABLE = "TEST_SIMPLE_SCHEMA"; - private static final String TEST_SIMPLE_QUERY = "SELECT * FROM " + TEST_SIMPLE_KEY_TABLE + " ORDER BY KEY1"; + private static final String TEST_SIMPLE_QUERY = "SELECT * FROM " + TEST_SIMPLE_KEY_TABLE; private static final List TEST_SIMPLE_KEYS = Collections.singletonList("key1"); private static final String TEST_SIMPLE_SCHEMA_STR = "{\"type\":\"record\",\"name\":\"SIMPLE_SCHEMA\",\"namespace\":\"com.linkedin.events.simpleschema\", \"fields\":[" @@ -88,7 +88,7 @@ private static Properties createTestDBReaderProperties(Integer chunkSize, Boolea Properties props = new Properties(); props.setProperty(DB_READER_DOMAIN_CONFIG + "." + QUERY_TIMEOUT_SECS, "10000"); //10 secs props.setProperty(DB_READER_DOMAIN_CONFIG + "." + FETCH_SIZE, "100"); - props.setProperty(DB_READER_DOMAIN_CONFIG + "." + SKIP_BAD_MESSAGE, skipBadMsg.toString()); + //props.setProperty(DB_READER_DOMAIN_CONFIG + "." + SKIP_BAD_MESSAGE, skipBadMsg.toString()); props.setProperty(DB_READER_DOMAIN_CONFIG + "." + ROW_COUNT_LIMIT, chunkSize.toString()); props.setProperty(DB_READER_DOMAIN_CONFIG + "." + DATABASE_QUERY_MANAGER_CLASS_NAME, "com.linkedin.datastream.common.databases.dbreader.MySqlChunkedQueryManager"); @@ -327,7 +327,7 @@ public void testCheckpointedChunkedReader() throws SQLException, SchemaGeneratio new DatabaseChunkedReader(props, mockDs.getConnection(), TEST_SIMPLE_QUERY, "TEST_DB", TEST_SIMPLE_KEY_TABLE, mockDBSource, "TEST_CHECKPOINT"); Map checkpoint = new HashMap<>(); - checkpoint.put("key1", 99); + checkpoint.put("key1", null); reader.subscribe(Collections.singletonList(0), checkpoint); reader.poll(); // Verify that a call to setObject was done with supplied key value diff --git a/datastream-common/src/test/java/com/linkedin/datastream/common/databases/dbreader/TestMysqlChunkedQueryManager.java b/datastream-common/src/test/java/com/linkedin/datastream/common/databases/dbreader/TestMysqlChunkedQueryManager.java index 8ae53236b..a93eef969 100644 --- a/datastream-common/src/test/java/com/linkedin/datastream/common/databases/dbreader/TestMysqlChunkedQueryManager.java +++ b/datastream-common/src/test/java/com/linkedin/datastream/common/databases/dbreader/TestMysqlChunkedQueryManager.java @@ -27,13 +27,13 @@ public void testSimpleKeySinglePartition() { * ( * SELECT * FROM TABLE * ) nestedTab1 - * WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 ) ) , 16, 10 ) , 10 ) IN ( 3 ) ) + * WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 ) ) , 10 ) IN ( 3 ) ) * ORDER BY KEY1 * ) as nestedTab2 LIMIT 10; */ String firstExpected = "SELECT * FROM ( SELECT * FROM ( SELECT * FROM TABLE ) nestedTab1 " - + "WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 ) ) , 16, 10 ) , 10 ) IN ( 3 ) ) ORDER BY KEY1 ) as nestedTab2 LIMIT 10"; + + "WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 ) ) , 10 ) IN ( 3 ) ) ORDER BY KEY1 ) as nestedTab2 LIMIT 10"; /** * SELECT * FROM @@ -42,12 +42,12 @@ public void testSimpleKeySinglePartition() { * ( * SELECT * FROM TABLE * ) nestedTab1 - * WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 ) ) , 16, 10 ) , 10 ) IN ( 3 ) ) AND ( ( KEY1 > ? ) ) + * WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 ) ) , 10 ) IN ( 3 ) ) AND ( ( KEY1 > ? ) ) * ORDER BY KEY1 * ) as nestedTab2 LIMIT 10; */ String chunkedExpected = "SELECT * FROM ( SELECT * FROM ( SELECT * FROM TABLE ) nestedTab1 " - + "WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 ) ) , 16, 10 ) , 10 ) IN ( 3 ) ) AND ( ( KEY1 > ? ) ) ORDER BY KEY1 ) as nestedTab2 LIMIT 10"; + + "WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 ) ) , 10 ) IN ( 3 ) ) AND ( ( KEY1 > ? ) ) ORDER BY KEY1 ) as nestedTab2 LIMIT 10"; testQueryString(MANAGER, firstExpected, chunkedExpected, NESTED_QUERY, KEY, CHUNK_SIZE, PARTITION_COUNT, PARTITION); } @@ -64,10 +64,10 @@ public void testSimpleKeyMultiPartition() { * ( * SELECT * FROM TABLE * ) nestedTab1 - * WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 ) ) , 16, 10 ) , 10 ) IN ( 2 , 5 ) ) ORDER BY KEY1 + * WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 ) ) , 10 ) IN ( 2 , 5 ) ) ORDER BY KEY1 * ) as nestedTab2 LIMIT 10; */ - String firstExpected = "SELECT * FROM ( SELECT * FROM ( SELECT * FROM TABLE ) nestedTab1 WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 ) ) , 16, 10 ) , 10 ) " + String firstExpected = "SELECT * FROM ( SELECT * FROM ( SELECT * FROM TABLE ) nestedTab1 WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 ) ) , 10 ) " + "IN ( 2 , 5 ) ) ORDER BY KEY1 ) as nestedTab2 LIMIT 10"; /** @@ -77,11 +77,11 @@ public void testSimpleKeyMultiPartition() { * ( * SELECT * FROM TABLE * ) nestedTab1 - * WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 ) ) , 16, 10 ) , 10 ) IN ( 2 , 5 ) ) AND ( ( KEY1 > ? ) ) ORDER BY KEY1 + * WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 ) ) , 10 ) IN ( 2 , 5 ) ) AND ( ( KEY1 > ? ) ) ORDER BY KEY1 * ) as nestedTab2 LIMIT 10; */ String chunkedExpected = "SELECT * FROM ( SELECT * FROM ( SELECT * FROM TABLE ) nestedTab1 " - + "WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 ) ) , 16, 10 ) , 10 ) IN ( 2 , 5 ) ) " + + "WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 ) ) , 10 ) IN ( 2 , 5 ) ) " + "AND ( ( KEY1 > ? ) ) ORDER BY KEY1 ) as nestedTab2 LIMIT 10"; testQueryString(MANAGER, firstExpected, chunkedExpected, NESTED_QUERY, KEY, CHUNK_SIZE, PARTITION_COUNT, PARTITIONS); @@ -96,12 +96,12 @@ public void testCompositeKeySinglePartition() { * SELECT * FROM * ( * SELECT * FROM TABLE - * ) nestedTab1 WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 , KEY2 ) ) , 16, 10 ) , 10 ) IN ( 3 ) ) + * ) nestedTab1 WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 , KEY2 ) ) , 10 ) IN ( 3 ) ) * ORDER BY KEY1 , KEY2 * ) as nestedTab2 LIMIT 10; */ String firstExpected = "SELECT * FROM ( SELECT * FROM ( SELECT * FROM TABLE ) nestedTab1" - + " WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 , KEY2 ) ) , 16, 10 ) , 10 ) IN ( 3 ) ) ORDER BY KEY1 , KEY2 ) as nestedTab2 LIMIT 10"; + + " WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 , KEY2 ) ) , 10 ) IN ( 3 ) ) ORDER BY KEY1 , KEY2 ) as nestedTab2 LIMIT 10"; /** * SELECT * FROM @@ -110,12 +110,12 @@ public void testCompositeKeySinglePartition() { * ( * SELECT * FROM TABLE * ) nestedTab1 - * WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 , KEY2 ) ) , 16, 10 ) , 10 ) IN ( 3 ) ) AND ( ( KEY1 > ? ) OR ( KEY1 = ? AND KEY2 > ? ) ) + * WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 , KEY2 ) ) , 10 ) IN ( 3 ) ) AND ( ( KEY1 > ? ) OR ( KEY1 = ? AND KEY2 > ? ) ) * ORDER BY KEY1 , KEY2 * ) as nestedTab2 LIMIT 10; */ String chunkedExpected = "SELECT * FROM ( SELECT * FROM ( SELECT * FROM TABLE ) nestedTab1 " - + "WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 , KEY2 ) ) , 16, 10 ) , 10 ) IN ( 3 ) ) AND ( ( KEY1 > ? ) OR ( KEY1 = ? AND KEY2 > ? ) ) " + + "WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 , KEY2 ) ) , 10 ) IN ( 3 ) ) AND ( ( KEY1 > ? ) OR ( KEY1 = ? AND KEY2 > ? ) ) " + "ORDER BY KEY1 , KEY2 ) as nestedTab2 LIMIT 10"; testQueryString(MANAGER, firstExpected, chunkedExpected, NESTED_QUERY, KEYS, CHUNK_SIZE, PARTITION_COUNT, PARTITION); @@ -130,12 +130,12 @@ public void testCompositeKeyMultiPartition() { * SELECT * FROM * ( * SELECT * FROM TABLE - * ) nestedTab1 WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 , KEY2 ) ) , 16, 10 ) , 10 ) IN ( 2 , 5 ) ) + * ) nestedTab1 WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 , KEY2 ) ) , 10 ) IN ( 2 , 5 ) ) * ORDER BY KEY1 , KEY2 * ) as nestedTab2 LIMIT 10; */ String firstExpected = "SELECT * FROM ( SELECT * FROM ( SELECT * FROM TABLE ) nestedTab1 " - + "WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 , KEY2 ) ) , 16, 10 ) , 10 ) IN ( 2 , 5 ) ) " + + "WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 , KEY2 ) ) , 10 ) IN ( 2 , 5 ) ) " + "ORDER BY KEY1 , KEY2 ) as nestedTab2 LIMIT 10"; /** @@ -145,13 +145,13 @@ public void testCompositeKeyMultiPartition() { * SELECT * FROM * ( * SELECT * FROM TABLE - * ) nestedTab1 WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 , KEY2 ) ) , 16, 10 ) , 10 ) IN ( 2 , 5 ) ) + * ) nestedTab1 WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 , KEY2 ) ) , 10 ) IN ( 2 , 5 ) ) * AND ( ( KEY1 > ? ) OR ( KEY1 = ? AND KEY2 > ? ) ) * ORDER BY KEY1 , KEY2 * ) as nestedTab2 LIMIT 10; */ String chunkedExpected = "SELECT * FROM ( SELECT * FROM ( SELECT * FROM TABLE ) nestedTab1 " - + "WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 , KEY2 ) ) , 16, 10 ) , 10 ) IN ( 2 , 5 ) ) " + + "WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 , KEY2 ) ) , 10 ) IN ( 2 , 5 ) ) " + "AND ( ( KEY1 > ? ) OR ( KEY1 = ? AND KEY2 > ? ) ) ORDER BY KEY1 , KEY2 ) as nestedTab2 LIMIT 10"; testQueryString(MANAGER, firstExpected, chunkedExpected, NESTED_QUERY, KEYS, CHUNK_SIZE, PARTITION_COUNT, PARTITIONS); diff --git a/gradle/buildscript.gradle b/gradle/buildscript.gradle index 113ff8b48..9c03ef4b9 100644 --- a/gradle/buildscript.gradle +++ b/gradle/buildscript.gradle @@ -8,6 +8,7 @@ repositories { maven { url "https://linkedin.jfrog.io/artifactory/open-source" } + maven { url "https://repo.grails.org/grails/core/" } } } From 24c6f01e2e2409dca2f2f591a68ccf497ce47e98 Mon Sep 17 00:00:00 2001 From: Akshay Rai Date: Wed, 8 Oct 2025 10:27:20 +0530 Subject: [PATCH 2/2] Cleanup --- .../databases/dbreader/TestDatabaseChunkedReader.java | 6 +++--- gradle/buildscript.gradle | 1 - 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/datastream-common/src/test/java/com/linkedin/datastream/common/databases/dbreader/TestDatabaseChunkedReader.java b/datastream-common/src/test/java/com/linkedin/datastream/common/databases/dbreader/TestDatabaseChunkedReader.java index 4ac73bb76..98e010d64 100644 --- a/datastream-common/src/test/java/com/linkedin/datastream/common/databases/dbreader/TestDatabaseChunkedReader.java +++ b/datastream-common/src/test/java/com/linkedin/datastream/common/databases/dbreader/TestDatabaseChunkedReader.java @@ -66,7 +66,7 @@ public class TestDatabaseChunkedReader { + "],\"meta\":\"dbTableName=TEST_DB_TEST_TABLE;pk=key1,key2,key3;\"}"; private static final Schema TEST_COMPOSITE_KEY_TABLE_SCHEMA = Schema.parse(TEST_COMPOSITE_KEY_TABLE_SCHEMA_STR); private static final String TEST_SIMPLE_KEY_TABLE = "TEST_SIMPLE_SCHEMA"; - private static final String TEST_SIMPLE_QUERY = "SELECT * FROM " + TEST_SIMPLE_KEY_TABLE; + private static final String TEST_SIMPLE_QUERY = "SELECT * FROM " + TEST_SIMPLE_KEY_TABLE + " ORDER BY KEY1"; private static final List TEST_SIMPLE_KEYS = Collections.singletonList("key1"); private static final String TEST_SIMPLE_SCHEMA_STR = "{\"type\":\"record\",\"name\":\"SIMPLE_SCHEMA\",\"namespace\":\"com.linkedin.events.simpleschema\", \"fields\":[" @@ -88,7 +88,7 @@ private static Properties createTestDBReaderProperties(Integer chunkSize, Boolea Properties props = new Properties(); props.setProperty(DB_READER_DOMAIN_CONFIG + "." + QUERY_TIMEOUT_SECS, "10000"); //10 secs props.setProperty(DB_READER_DOMAIN_CONFIG + "." + FETCH_SIZE, "100"); - //props.setProperty(DB_READER_DOMAIN_CONFIG + "." + SKIP_BAD_MESSAGE, skipBadMsg.toString()); + props.setProperty(DB_READER_DOMAIN_CONFIG + "." + SKIP_BAD_MESSAGE, skipBadMsg.toString()); props.setProperty(DB_READER_DOMAIN_CONFIG + "." + ROW_COUNT_LIMIT, chunkSize.toString()); props.setProperty(DB_READER_DOMAIN_CONFIG + "." + DATABASE_QUERY_MANAGER_CLASS_NAME, "com.linkedin.datastream.common.databases.dbreader.MySqlChunkedQueryManager"); @@ -327,7 +327,7 @@ public void testCheckpointedChunkedReader() throws SQLException, SchemaGeneratio new DatabaseChunkedReader(props, mockDs.getConnection(), TEST_SIMPLE_QUERY, "TEST_DB", TEST_SIMPLE_KEY_TABLE, mockDBSource, "TEST_CHECKPOINT"); Map checkpoint = new HashMap<>(); - checkpoint.put("key1", null); + checkpoint.put("key1", 99); reader.subscribe(Collections.singletonList(0), checkpoint); reader.poll(); // Verify that a call to setObject was done with supplied key value diff --git a/gradle/buildscript.gradle b/gradle/buildscript.gradle index 9c03ef4b9..113ff8b48 100644 --- a/gradle/buildscript.gradle +++ b/gradle/buildscript.gradle @@ -8,7 +8,6 @@ repositories { maven { url "https://linkedin.jfrog.io/artifactory/open-source" } - maven { url "https://repo.grails.org/grails/core/" } } }