From d3db3930e4ce073435880ffcb8a052cc88255d4f Mon Sep 17 00:00:00 2001 From: Will Spencer Date: Tue, 24 Feb 2026 11:23:31 -0500 Subject: [PATCH 01/19] Implement BigQuery API --- runtime/Cargo.lock | 721 ++++++++++++++++++++++++- runtime/plaid-stl/src/gcp/bigquery.rs | 121 +++++ runtime/plaid-stl/src/gcp/mod.rs | 1 + runtime/plaid/Cargo.toml | 6 +- runtime/plaid/src/apis/gcp/bigquery.rs | 326 +++++++++++ runtime/plaid/src/apis/gcp/mod.rs | 19 +- runtime/plaid/src/apis/mod.rs | 9 + runtime/plaid/src/functions/api.rs | 7 + 8 files changed, 1199 insertions(+), 11 deletions(-) create mode 100644 runtime/plaid-stl/src/gcp/bigquery.rs create mode 100644 runtime/plaid/src/apis/gcp/bigquery.rs diff --git a/runtime/Cargo.lock b/runtime/Cargo.lock index a4217fb4..cee69b40 100644 --- a/runtime/Cargo.lock +++ b/runtime/Cargo.lock @@ -91,6 +91,20 @@ dependencies = [ "byteorder", ] +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "const-random", + "getrandom 0.3.4", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.4" @@ -120,6 +134,12 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + [[package]] name = "android_system_properties" version = "0.1.5" @@ -168,6 +188,179 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "arrow" +version = "53.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3a3ec4fe573f9d1f59d99c085197ef669b00b088ba1d7bb75224732d9357a74" +dependencies = [ + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", +] + +[[package]] +name = "arrow-arith" +version = "53.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dcf19f07792d8c7f91086c67b574a79301e367029b17fcf63fb854332246a10" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "num", +] + +[[package]] +name = "arrow-array" +version = "53.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7845c32b41f7053e37a075b3c2f29c6f5ea1b3ca6e5df7a2d325ee6e1b4a63cf" +dependencies = [ + "ahash", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "hashbrown 0.15.5", + "num", +] + +[[package]] +name = "arrow-buffer" +version = "53.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b5c681a99606f3316f2a99d9c8b6fa3aad0b1d34d8f6d7a1b471893940219d8" +dependencies = [ + "bytes", + "half", + "num", +] + +[[package]] +name = "arrow-cast" +version = "53.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6365f8527d4f87b133eeb862f9b8093c009d41a210b8f101f91aa2392f61daac" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "atoi", + "base64 0.22.1", + "chrono", + "half", + "lexical-core", + "num", + "ryu", +] + +[[package]] +name = "arrow-data" +version = "53.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd962fc3bf7f60705b25bcaa8eb3318b2545aa1d528656525ebdd6a17a6cd6fb" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half", + "num", +] + +[[package]] +name = "arrow-ipc" +version = "53.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3527365b24372f9c948f16e53738eb098720eea2093ae73c7af04ac5e30a39b" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "flatbuffers", +] + +[[package]] +name = "arrow-ord" +version = "53.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79af2db0e62a508d34ddf4f76bfd6109b6ecc845257c9cba6f939653668f89ac" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "half", + "num", +] + +[[package]] +name = "arrow-row" +version = "53.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da30e9d10e9c52f09ea0cf15086d6d785c11ae8dcc3ea5f16d402221b6ac7735" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "half", +] + +[[package]] +name = "arrow-schema" +version = "53.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35b0f9c0c3582dd55db0f136d3b44bfa0189df07adcf7dc7f2f2e74db0f52eb8" + +[[package]] +name = "arrow-select" +version = "53.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92fc337f01635218493c23da81a364daf38c694b05fc20569c3193c11c561984" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num", +] + +[[package]] +name = "arrow-string" +version = "53.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d596a9fc25dae556672d5069b090331aca8acb93cae426d8b7dcdf1c558fa0ce" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "memchr", + "num", + "regex", + "regex-syntax", +] + [[package]] name = "asn1-rs" version = "0.5.2" @@ -246,6 +439,28 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -257,6 +472,15 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -820,6 +1044,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" dependencies = [ "fastrand", + "tokio", ] [[package]] @@ -883,6 +1108,20 @@ version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06" +[[package]] +name = "bigdecimal" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d6867f1565b3aad85681f1015055b087fcfd840d6aeee6eee7f2da317603695" +dependencies = [ + "autocfg", + "libm", + "num-bigint", + "num-integer", + "num-traits", + "serde", +] + [[package]] name = "bindgen" version = "0.72.1" @@ -1055,16 +1294,17 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "chrono" -version = "0.4.44" +version = "0.4.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" +checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" dependencies = [ + "android-tzdata", "iana-time-zone", "js-sys", "num-traits", "serde", "wasm-bindgen", - "windows-link", + "windows-targets 0.52.6", ] [[package]] @@ -1191,6 +1431,26 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6ef517f0926dd24a1582492c791b6a4818a4d94e789a334894aa15b0d12f55c" +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.17", + "once_cell", + "tiny-keccak", +] + [[package]] name = "constant_time_eq" version = "0.3.1" @@ -1531,6 +1791,12 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crunchy" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" + [[package]] name = "crypto-bigint" version = "0.5.5" @@ -1914,9 +2180,9 @@ version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4e7f34442dbe69c60fe8eaf58a8cafff81a1f278816d8ab4db255b3bef4ac3c4" dependencies = [ - "getrandom 0.3.3", + "getrandom 0.3.4", "libm", - "rand 0.9.1", + "rand 0.9.2", "siphasher", ] @@ -1953,6 +2219,16 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" +[[package]] +name = "flatbuffers" +version = "24.12.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f1baf0dbf96932ec9a3038d57900329c015b0bfb7b63d904f3bc27e2b02a096" +dependencies = [ + "bitflags 1.3.2", + "rustc_version", +] + [[package]] name = "flate2" version = "1.1.9" @@ -1982,6 +2258,21 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -2191,6 +2482,104 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" +[[package]] +name = "google-cloud-auth" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e57a13fbacc5e9c41ded3ad8d0373175a6b7a6ad430d99e89d314ac121b7ab06" +dependencies = [ + "async-trait", + "base64 0.21.7", + "google-cloud-metadata", + "google-cloud-token", + "home", + "jsonwebtoken", + "reqwest", + "serde", + "serde_json", + "thiserror 1.0.69", + "time", + "tokio", + "tracing", + "urlencoding", +] + +[[package]] +name = "google-cloud-bigquery" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f09bf057f5a44f4975252492feea3a7b26ca0693cbe12511317504c3c02fca5e" +dependencies = [ + "anyhow", + "arrow", + "async-stream", + "async-trait", + "backon", + "base64 0.21.7", + "bigdecimal", + "google-cloud-auth", + "google-cloud-gax", + "google-cloud-googleapis", + "google-cloud-token", + "num-bigint", + "prost-types", + "reqwest", + "reqwest-middleware", + "serde", + "serde_json", + "thiserror 1.0.69", + "time", + "tokio", + "tracing", +] + +[[package]] +name = "google-cloud-gax" +version = "0.19.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de13e62d7e0ffc3eb40a0113ddf753cf6ec741be739164442b08893db4f9bfca" +dependencies = [ + "google-cloud-token", + "http 1.4.0", + "thiserror 1.0.69", + "tokio", + "tokio-retry2", + "tonic", + "tower 0.4.13", + "tracing", +] + +[[package]] +name = "google-cloud-googleapis" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "886aa8ec755382a1fdf4651f6e6ec01f2f3bf49f2cb0f068b9a74cafd574a715" +dependencies = [ + "prost", + "prost-types", + "tonic", +] + +[[package]] +name = "google-cloud-metadata" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d901aeb453fd80e51d64df4ee005014f6cf39f2d736dd64f7239c132d9d39a6a" +dependencies = [ + "reqwest", + "thiserror 1.0.69", + "tokio", +] + +[[package]] +name = "google-cloud-token" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c12ba8b21d128a2ce8585955246977fbce4415f680ebf9199b6f9d6d725f" +dependencies = [ + "async-trait", +] + [[package]] name = "group" version = "0.13.0" @@ -2240,6 +2629,18 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "2.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ea2d84b969582b4b1864a92dc5d27cd2b77b622a8d79306834f1be5ba20d84b" +dependencies = [ + "cfg-if", + "crunchy", + "num-traits", + "zerocopy", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -2359,6 +2760,15 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "home" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc627f471c528ff0c4a49e1d5e60450c8f6461dd6d10ba9dcd3a61d3dff7728d" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "http" version = "0.2.12" @@ -2536,7 +2946,7 @@ dependencies = [ "tokio", "tokio-rustls 0.26.4", "tower-service", - "webpki-roots", + "webpki-roots 1.0.6", ] [[package]] @@ -2552,6 +2962,22 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.8.1", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.20" @@ -2923,6 +3349,63 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" +[[package]] +name = "lexical-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d8d125a277f807e55a77304455eb7b1cb52f2b18c143b60e766c120bd64a594" +dependencies = [ + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", +] + +[[package]] +name = "lexical-parse-float" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52a9f232fbd6f550bc0137dcb5f99ab674071ac2d690ac69704593cb4abbea56" +dependencies = [ + "lexical-parse-integer", + "lexical-util", +] + +[[package]] +name = "lexical-parse-integer" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a7a039f8fb9c19c996cd7b2fcce303c1b2874fe1aca544edc85c4a5f8489b34" +dependencies = [ + "lexical-util", +] + +[[package]] +name = "lexical-util" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2604dd126bb14f13fb5d1bd6a66155079cb9fa655b37f875b3a742c705dbed17" + +[[package]] +name = "lexical-write-float" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50c438c87c013188d415fbabbb1dceb44249ab81664efbd31b14ae55dabb6361" +dependencies = [ + "lexical-util", + "lexical-write-integer", +] + +[[package]] +name = "lexical-write-integer" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "409851a618475d2d5796377cad353802345cba92c867d9fbcde9cf4eac4e14df" +dependencies = [ + "lexical-util", +] + [[package]] name = "libc" version = "0.2.182" @@ -3228,6 +3711,23 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "native-tls" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "465500e14ea162429d264d44189adc38b199b62b1c21eea9f69e4b73cb03bbf2" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe 0.2.1", + "openssl-sys", + "schannel", + "security-framework 3.7.0", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nom" version = "7.1.3" @@ -3238,6 +3738,20 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "num" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + [[package]] name = "num-bigint" version = "0.4.6" @@ -3264,6 +3778,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-complex" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" +dependencies = [ + "num-traits", +] + [[package]] name = "num-conv" version = "0.2.0" @@ -3290,6 +3813,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-rational" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" +dependencies = [ + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -3392,6 +3926,32 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" +[[package]] +name = "openssl" +version = "0.10.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08838db121398ad17ab8531ce9de97b244589089e290a384c900cb9ff7434328" +dependencies = [ + "bitflags 2.11.0", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "openssl-probe" version = "0.1.6" @@ -3404,6 +3964,18 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" +[[package]] +name = "openssl-sys" +version = "0.9.111" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82cab2d520aa75e3c58898289429321eb788c3106963d0dc886ec7a5f4adc321" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "outref" version = "0.5.2" @@ -3647,6 +4219,7 @@ dependencies = [ "fastbloom", "flate2", "futures-util", + "google-cloud-bigquery", "hex", "http 1.4.0", "jsonwebtoken", @@ -3792,6 +4365,38 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "prost-types" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +dependencies = [ + "prost", +] + [[package]] name = "psl-types" version = "2.0.11" @@ -4182,6 +4787,7 @@ dependencies = [ "bytes", "cookie", "cookie_store 0.22.1", + "encoding_rs", "futures-core", "futures-util", "http 1.4.0", @@ -4189,10 +4795,13 @@ dependencies = [ "http-body-util", "hyper 1.8.1", "hyper-rustls 0.27.7", + "hyper-tls", "hyper-util", "js-sys", "log", + "mime", "mime_guess", + "native-tls", "percent-encoding", "pin-project-lite", "quinn", @@ -4203,6 +4812,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper", "tokio", + "tokio-native-tls", "tokio-rustls 0.26.4", "tokio-util", "tower 0.5.3", @@ -4213,7 +4823,22 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", - "webpki-roots", + "webpki-roots 1.0.6", +] + +[[package]] +name = "reqwest-middleware" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57f17d28a6e6acfe1733fe24bcd30774d13bffa4b8a22535b4c8c98423088d4e" +dependencies = [ + "anyhow", + "async-trait", + "http 1.4.0", + "reqwest", + "serde", + "thiserror 1.0.69", + "tower-service", ] [[package]] @@ -5161,6 +5786,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinystr" version = "0.8.2" @@ -5214,6 +5848,26 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + +[[package]] +name = "tokio-retry2" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a0a122635e32bd827df297f311ca5e0292636bbd82f67ff84a4bedeab06dbeb" +dependencies = [ + "pin-project", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" @@ -5245,6 +5899,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-tungstenite" version = "0.21.0" @@ -5295,6 +5960,36 @@ dependencies = [ "serde", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "flate2", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.8.1", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "rustls-pemfile", + "tokio", + "tokio-rustls 0.26.4", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", + "webpki-roots 0.26.11", +] + [[package]] name = "totp-rs" version = "5.7.0" @@ -5316,8 +6011,11 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand 0.8.5", + "slab", "tokio", "tokio-util", "tower-layer", @@ -6017,6 +6715,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.6", +] + [[package]] name = "webpki-roots" version = "1.0.6" diff --git a/runtime/plaid-stl/src/gcp/bigquery.rs b/runtime/plaid-stl/src/gcp/bigquery.rs new file mode 100644 index 00000000..7bc3256b --- /dev/null +++ b/runtime/plaid-stl/src/gcp/bigquery.rs @@ -0,0 +1,121 @@ +use std::{ + collections::HashMap, + fmt::Display, + ops::{Deref, DerefMut}, +}; + +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::PlaidFunctionError; + +/// Request sent to the runtime to read rows from a BigQuery table. +#[derive(Deserialize, Serialize)] +pub struct ReadTableRequest { + pub dataset: String, + pub table: String, + /// Columns to select. Must be non-empty; the runtime does not support + /// `SELECT *` so that callers are always explicit about what data they + /// need and the runtime can return named results. + pub columns: Vec, +} + +/// Response returned by the runtime for a BigQuery read. +/// +/// Each row is a [`HashMap`] keyed by column name. NULL database values are +/// represented as [`Value::Null`]. +/// +/// `ReadTableResponse` implements [`Deref`] to `[HashMap]` and +/// both consuming and borrowing [`IntoIterator`], so it can be used directly as +/// a collection without accessing the inner field: +/// +/// ```ignore +/// let rows = read_from_table("my_dataset", "events", &["user_id", "count"])?; +/// +/// for row in &rows { +/// let user = &row["user_id"]; // Value::String +/// let count = &row["count"]; // Value::Number (if schema declares integer) +/// } +/// +/// println!("{} rows returned", rows.len()); +/// ``` +#[derive(Deserialize, Serialize)] +pub struct ReadTableResponse { + pub rows: Vec>, +} + +impl Deref for ReadTableResponse { + type Target = [HashMap]; + fn deref(&self) -> &Self::Target { + &self.rows + } +} + +impl DerefMut for ReadTableResponse { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.rows + } +} + +impl IntoIterator for ReadTableResponse { + type Item = HashMap; + type IntoIter = std::vec::IntoIter; + fn into_iter(self) -> Self::IntoIter { + self.rows.into_iter() + } +} + +impl<'a> IntoIterator for &'a ReadTableResponse { + type Item = &'a HashMap; + type IntoIter = std::slice::Iter<'a, HashMap>; + fn into_iter(self) -> Self::IntoIter { + self.rows.iter() + } +} + +/// Read rows from a BigQuery table. +/// +/// `columns` must be non-empty. Specify exactly which columns you need; +/// the runtime will reject requests that do not name at least one column. +/// +/// Returns a [`ReadTableResponse`] that can be iterated directly or indexed +/// like a slice. Each row is a [`HashMap`] keyed by the column names supplied +/// in `columns`. NULL database values are represented as [`Value::Null`]. +pub fn query_table( + dataset: impl Display, + table: impl Display, + columns: &[impl Display], +) -> Result { + extern "C" { + new_host_function_with_error_buffer!(bigquery, query_table); + } + + let params = ReadTableRequest { + dataset: dataset.to_string(), + table: table.to_string(), + columns: columns.iter().map(|c| c.to_string()).collect(), + }; + + const RETURN_BUFFER_SIZE: usize = 1024 * 1024; // 1 MiB + let mut return_buffer = vec![0; RETURN_BUFFER_SIZE]; + + let params = serde_json::to_string(¶ms).unwrap(); + let res = unsafe { + bigquery_query_table( + params.as_bytes().as_ptr(), + params.as_bytes().len(), + return_buffer.as_mut_ptr(), + RETURN_BUFFER_SIZE, + ) + }; + + if res < 0 { + return Err(res.into()); + } + + return_buffer.truncate(res as usize); + + let response = String::from_utf8(return_buffer).unwrap(); + + serde_json::from_str(&response).map_err(|_| PlaidFunctionError::InternalApiError) +} diff --git a/runtime/plaid-stl/src/gcp/mod.rs b/runtime/plaid-stl/src/gcp/mod.rs index c77b382d..a39be50e 100644 --- a/runtime/plaid-stl/src/gcp/mod.rs +++ b/runtime/plaid-stl/src/gcp/mod.rs @@ -1 +1,2 @@ +pub mod bigquery; pub mod google_docs; diff --git a/runtime/plaid/Cargo.toml b/runtime/plaid/Cargo.toml index ccd6d138..434dc202 100644 --- a/runtime/plaid/Cargo.toml +++ b/runtime/plaid/Cargo.toml @@ -15,7 +15,7 @@ aws = [ ] cranelift = ["wasmer/cranelift"] llvm = ["wasmer/llvm"] -gcp = [] +gcp = ["google-cloud-bigquery"] [dependencies] aes = "0.7" @@ -30,7 +30,7 @@ aws-sdk-sqs = { version = "1.41.0", optional = true } base64 = "0.13" block-modes = "0.8" fastbloom = "0.14" -chrono = "0.4.41" +chrono = "=0.4.39" clap = { version = "4", default-features = false, features = [ "std", "help", @@ -95,7 +95,7 @@ wasmer-middlewares = "7" x509-parser = "0.18.0" thiserror = "2.0.17" pulldown-cmark = "0.13.0" - +google-cloud-bigquery = { version = "0.15.0", optional = true, default-features = true } [[example]] name = "github-tailer" diff --git a/runtime/plaid/src/apis/gcp/bigquery.rs b/runtime/plaid/src/apis/gcp/bigquery.rs new file mode 100644 index 00000000..26ea85e1 --- /dev/null +++ b/runtime/plaid/src/apis/gcp/bigquery.rs @@ -0,0 +1,326 @@ +use std::{collections::HashMap, sync::Arc}; + +use google_cloud_bigquery::{ + client::{google_cloud_auth, Client, ClientConfig}, + http::job::query::QueryRequest, + query::row::Row, +}; +use plaid_stl::gcp::bigquery::{ReadTableRequest, ReadTableResponse}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +use crate::{apis::ApiError, loader::PlaidModule}; +use log::error; + +const TOKEN_URI: &str = "https://oauth2.googleapis.com/token"; + +#[derive(Error, Debug)] +pub enum BigQueryError { + #[error("Authentication error: {0}")] + Auth(#[from] google_cloud_auth::error::Error), + #[error("Client initialization error: {0}")] + Client(String), + #[error("Query error: {0}")] + QueryError(#[from] google_cloud_bigquery::client::QueryError), + #[error("Row iteration error: {0}")] + IterError(#[from] google_cloud_bigquery::query::Error), + #[error("Row decode error: {0}")] + RowError(#[from] google_cloud_bigquery::query::row::Error), +} + +/// The BigQuery type a column should be decoded into. +/// +/// The BigQuery HTTP API returns all values as strings on the wire, so without +/// explicit type information the runtime would always produce +/// `serde_json::Value::String`. Providing a `ColumnType` in the schema causes +/// the runtime to parse the raw string into the correct JSON primitive before +/// returning it to the module. +#[derive(Deserialize, Serialize, Clone, Copy)] +#[serde(rename_all = "snake_case")] +pub enum ColumnType { + /// UTF-8 text. Produces `Value::String`. This is the default when no + /// schema entry is present for a column. + String, + /// 64-bit signed integer. Produces `Value::Number`. + Integer, + /// 64-bit IEEE 754 float. Produces `Value::Number`. + Float, + /// Boolean. Produces `Value::Bool`. + Boolean, +} + +/// Mirrors the fields of a GCP service account JSON key file that are required +/// to authenticate as a service account. Credentials are supplied inline in the +/// Plaid config rather than via a key file on disk. +#[derive(Deserialize)] +pub struct BigQueryConfig { + /// GCP project ID (e.g. "my-gcp-project") + project_id: String, + /// Service account email (e.g. "my-sa@my-project.iam.gserviceaccount.com") + client_email: String, + /// RSA private key in PEM format (the `private_key` field from the JSON key file) + private_key: String, + /// Private key ID (the `private_key_id` field from the JSON key file) + private_key_id: String, + /// Read access map: module name → dataset → list of tables the module may query + r: HashMap>>, + /// Column type schema: dataset → table → column name → [`ColumnType`]. + /// + /// Columns not present in the schema are decoded as `String`. Providing a + /// schema entry ensures numeric and boolean values arrive with the correct + /// JSON type rather than being silently coerced to strings. + #[serde(default)] + schemas: HashMap>>, + /// Timeout (in milliseconds) applied to all queries + #[serde(default = "default_timeout_ms")] + timeout_ms: i64, +} + +/// The default timeout if none is provided. +fn default_timeout_ms() -> i64 { + 50000 +} + +pub struct BigQuery { + client: Client, + project_id: String, + config: BigQueryConfig, +} + +impl BigQuery { + pub async fn new(config: BigQueryConfig) -> Result { + // Build a CredentialsFile JSON from our config fields + let credentials_json = serde_json::json!({ + "type": "service_account", + "project_id": config.project_id, + "client_email": config.client_email, + "private_key_id": config.private_key_id, + "private_key": config.private_key, + "token_uri": TOKEN_URI, + }); + + let credentials = google_cloud_auth::credentials::CredentialsFile::new_from_str( + &credentials_json.to_string(), + ) + .await?; + + let (client_config, resolved_project_id) = + ClientConfig::new_with_credentials(credentials).await?; + + // The project ID resolved from the credentials should match config.project_id, + // but we fall back to config.project_id if the token source didn't return one. + let project_id = resolved_project_id.unwrap_or_else(|| config.project_id.clone()); + + let client = Client::new(client_config) + .await + .map_err(|e| BigQueryError::Client(e.to_string()))?; + + Ok(BigQuery { + client, + project_id, + config, + }) + } + + /// Executes a `SELECT` query against a BigQuery table and returns the + /// results serialised as a [`ReadTableResponse`] JSON string. + /// + /// # Flow + /// + /// 1. Deserializes `params` into a [`ReadTableRequest`] (dataset, table, + /// columns). Returns `BadRequest` if the payload is malformed. + /// 2. Calls [`build_query_request`][Self::build_query_request], which + /// enforces module permissions and validates all identifiers. + /// 3. Issues the query via the BigQuery client and drains the async + /// iterator into a `Vec` of rows. + /// 4. For each cell, looks up the column's [`ColumnType`] from the + /// configured schema (defaulting to `String` when absent) and calls + /// [`decode_column`] to produce the correct `serde_json::Value` variant. + /// 5. Wraps the rows in a [`ReadTableResponse`] and serializes to JSON. + /// + /// # Errors + /// + /// Returns `ApiError::BadRequest` if permissions or identifier validation + /// fails, and `ApiError::BigQueryError` for any network or decoding error. + pub async fn query_table( + &self, + params: &str, + module: Arc, + ) -> Result { + let params = + serde_json::from_str::(params).map_err(|_| ApiError::BadRequest)?; + + let query_request = self.build_query_request(&module, ¶ms)?; + + let mut iter = self + .client + .query::(&self.project_id, query_request) + .await + .map_err(BigQueryError::from)?; + + // Resolve the column type schema once for the requested table so we + // don't repeat the map lookups inside the hot row-iteration loop. + let table_schema = self + .config + .schemas + .get(¶ms.dataset) + .and_then(|d| d.get(¶ms.table)); + + let mut rows = Vec::new(); + while let Some(row) = iter.next().await.map_err(BigQueryError::from)? { + let mut map = HashMap::new(); + for (i, col_name) in params.columns.iter().enumerate() { + let col_type = table_schema + .and_then(|s| s.get(col_name)) + .copied() + .unwrap_or(ColumnType::String); + let value = decode_column(&row, i, col_type).map_err(BigQueryError::from)?; + map.insert(col_name.clone(), value); + } + rows.push(map); + } + + let response = ReadTableResponse { rows }; + serde_json::to_string(&response).map_err(|_| ApiError::ImpossibleError) + } + + /// Returns `Ok(())` if the module has an entry in `r` for the given dataset + /// that includes the requested table. + fn check_module_permission( + &self, + module: &Arc, + dataset: &str, + table: &str, + ) -> Result<(), ApiError> { + let Some(datasets) = self.config.r.get(&module.to_string()) else { + error!("[{module}] attempted to read BigQuery table [{dataset}.{table}] but has no BigQuery permissions configured"); + return Err(ApiError::BadRequest); + }; + + let Some(tables) = datasets.get(dataset) else { + error!("[{module}] attempted to read BigQuery table [{dataset}.{table}] but is not permitted to access dataset [{dataset}]"); + return Err(ApiError::BadRequest); + }; + + if !tables.iter().any(|t| t == table) { + error!("[{module}] attempted to read BigQuery table [{dataset}.{table}] but [{table}] is not in the permitted tables for dataset [{dataset}]"); + return Err(ApiError::BadRequest); + } + + Ok(()) + } + + /// Validates permissions and constructs a [`QueryRequest`] ready to send to + /// BigQuery. + /// + /// Combines the two validation steps that must both pass before a query is + /// issued: + /// + /// 1. **Permission check** — asserts that `module` is allowed to read from + /// `params.dataset` and `params.table` according to the `r` config map. + /// 2. **Query construction** — delegates to [`build_query_string`], which + /// validates all identifiers against a strict allowlist and assembles the + /// `SELECT` statement. + /// + /// Returns `ApiError::BadRequest` if either check fails. + fn build_query_request( + &self, + module: &Arc, + params: &ReadTableRequest, + ) -> Result { + self.check_module_permission(module, ¶ms.dataset, ¶ms.table)?; + let query = build_query_string(¶ms.dataset, ¶ms.table, ¶ms.columns)?; + Ok(QueryRequest { + timeout_ms: Some(self.config.timeout_ms), + query, + ..Default::default() + }) + } +} + +/// Validates that every identifier (dataset, table, column names) contains +/// only ASCII letters, digits, and underscores, then builds the SELECT query +/// with each column wrapped in backticks. +/// +/// Backtick-quoting alone is not sufficient — a column name containing a +/// literal backtick would break out of the quoting. Rejecting any name that +/// is not a clean identifier is the safest approach and the correct thing to +/// do in a controlled API: callers supply column names, not arbitrary SQL. +fn build_query_string(dataset: &str, table: &str, columns: &[String]) -> Result { + if !is_valid_identifier(dataset) || !is_valid_identifier(table) { + return Err(ApiError::BadRequest); + } + + if columns.is_empty() { + return Err(ApiError::BadRequest); + } + + for col in columns { + if !is_valid_identifier(col) { + return Err(ApiError::BadRequest); + } + } + + let column_list = columns + .iter() + .map(|c| format!("`{c}`")) + .collect::>() + .join(", "); + + Ok(format!("SELECT {column_list} FROM `{dataset}`.`{table}`")) +} + +/// Extracts the value at `index` from `row`, parsing it into the +/// `serde_json::Value` variant that corresponds to `col_type`. +/// +/// The BigQuery HTTP API returns every value as a raw string on the wire. +/// Without schema information, `String` is the only safe choice. When a +/// `ColumnType` is provided the raw string is re-parsed into the correct +/// primitive so that modules receive `Value::Number` or `Value::Bool` instead +/// of a stringly-typed number that they would have to parse themselves. +fn decode_column( + row: &Row, + index: usize, + col_type: ColumnType, +) -> Result { + Ok(match col_type { + ColumnType::String => { + let v: Option = row.column(index)?; + v.map(serde_json::Value::String) + .unwrap_or(serde_json::Value::Null) + } + ColumnType::Integer => { + let v: Option = row.column(index)?; + v.map(|n| serde_json::Value::Number(n.into())) + .unwrap_or(serde_json::Value::Null) + } + ColumnType::Float => { + let v: Option = row.column(index)?; + v.and_then(serde_json::Number::from_f64) + .map(serde_json::Value::Number) + .unwrap_or(serde_json::Value::Null) + } + ColumnType::Boolean => { + let v: Option = row.column(index)?; + v.map(serde_json::Value::Bool) + .unwrap_or(serde_json::Value::Null) + } + }) +} + +/// Returns `true` if `s` is a valid BigQuery identifier: non-empty, starts +/// with an ASCII letter or underscore, and contains only ASCII letters, digits, +/// and underscores (`[A-Za-z_][A-Za-z0-9_]*`). +/// +/// This is a strict allowlist. Any character outside that set — including +/// spaces, quotes, backticks, semicolons, parentheses, and comment markers +/// (`--`, `/*`) — causes the function to return `false`, making SQL injection +/// via crafted identifier names impossible regardless of the surrounding query +/// structure. +fn is_valid_identifier(s: &str) -> bool { + !s.is_empty() + && s.chars() + .next() + .is_some_and(|c| c.is_ascii_alphabetic() || c == '_') + && s.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') +} diff --git a/runtime/plaid/src/apis/gcp/mod.rs b/runtime/plaid/src/apis/gcp/mod.rs index 0cbf4784..083330bd 100644 --- a/runtime/plaid/src/apis/gcp/mod.rs +++ b/runtime/plaid/src/apis/gcp/mod.rs @@ -1,18 +1,23 @@ use serde::Deserialize; +use bigquery::{BigQuery, BigQueryConfig}; use google_docs::{GoogleDocs, GoogleDocsConfig}; +pub mod bigquery; pub mod google_docs; #[derive(Deserialize)] pub struct GcpConfig { pub google_docs: Option, + pub bigquery: Option, } /// Contains all GCP services that Plaid implements APIs for pub struct Gcp { /// Google Docs pub google_docs: Option, + /// BigQuery + pub bigquery: Option, } impl Gcp { @@ -22,6 +27,18 @@ impl Gcp { None => None, }; - Gcp { google_docs } + let bigquery = match config.bigquery { + Some(conf) => Some( + BigQuery::new(conf) + .await + .expect("Failed to initialize BigQuery client"), + ), + None => None, + }; + + Gcp { + google_docs, + bigquery, + } } } diff --git a/runtime/plaid/src/apis/mod.rs b/runtime/plaid/src/apis/mod.rs index 40b8b833..e6f0c97f 100644 --- a/runtime/plaid/src/apis/mod.rs +++ b/runtime/plaid/src/apis/mod.rs @@ -129,6 +129,8 @@ pub enum ApiError { S3Error(aws::s3::S3Errors), #[cfg(feature = "gcp")] GoogleDocsError(gcp::google_docs::GoogleDocsError), + #[cfg(feature = "gcp")] + BigQueryError(gcp::bigquery::BigQueryError), #[cfg(feature = "aws")] KmsError(aws::kms::KmsErrors), NetworkError(reqwest::Error), @@ -168,6 +170,13 @@ impl From for ApiError { } } +#[cfg(feature = "gcp")] +impl From for ApiError { + fn from(e: gcp::bigquery::BigQueryError) -> Self { + Self::BigQueryError(e) + } +} + impl Api { pub async fn new( config: ApiConfigs, diff --git a/runtime/plaid/src/functions/api.rs b/runtime/plaid/src/functions/api.rs index c54af192..7266222c 100644 --- a/runtime/plaid/src/functions/api.rs +++ b/runtime/plaid/src/functions/api.rs @@ -528,6 +528,8 @@ impl_new_sub_module_function_with_error_buffer!( create_sheet_from_csv, DISALLOW_IN_TEST_MODE ); +#[cfg(feature = "gcp")] +impl_new_sub_module_function_with_error_buffer!(gcp, bigquery, query_table, DISALLOW_IN_TEST_MODE); // Npm Functions impl_new_function_with_error_buffer!(npm, publish_empty_stub, DISALLOW_IN_TEST_MODE); @@ -989,6 +991,11 @@ pub fn to_api_function( Function::new_typed_with_env(&mut store, &env, gcp_google_docs_create_sheet_from_csv) } + #[cfg(feature = "gcp")] + "gcp_bigquery_query_table" => { + Function::new_typed_with_env(&mut store, &env, gcp_bigquery_query_table) + } + // PagerDuty Calls "pagerduty_trigger_incident" => { Function::new_typed_with_env(&mut store, &env, pagerduty_trigger_incident) From cdb7f199de39787a490b50164e05158f0e530b9b Mon Sep 17 00:00:00 2001 From: Will Spencer Date: Tue, 24 Feb 2026 11:27:12 -0500 Subject: [PATCH 02/19] Implement Debug for ReadTableResponse --- runtime/plaid-stl/src/gcp/bigquery.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/plaid-stl/src/gcp/bigquery.rs b/runtime/plaid-stl/src/gcp/bigquery.rs index 7bc3256b..1abd2438 100644 --- a/runtime/plaid-stl/src/gcp/bigquery.rs +++ b/runtime/plaid-stl/src/gcp/bigquery.rs @@ -39,7 +39,7 @@ pub struct ReadTableRequest { /// /// println!("{} rows returned", rows.len()); /// ``` -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Debug)] pub struct ReadTableResponse { pub rows: Vec>, } From 3b0ac2798ec8fbaa58b43a513024f6c85ca66d50 Mon Sep 17 00:00:00 2001 From: Will Spencer Date: Tue, 24 Feb 2026 11:37:44 -0500 Subject: [PATCH 03/19] Update function name in STL --- runtime/plaid-stl/src/gcp/bigquery.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/plaid-stl/src/gcp/bigquery.rs b/runtime/plaid-stl/src/gcp/bigquery.rs index 1abd2438..29dbc053 100644 --- a/runtime/plaid-stl/src/gcp/bigquery.rs +++ b/runtime/plaid-stl/src/gcp/bigquery.rs @@ -87,7 +87,7 @@ pub fn query_table( columns: &[impl Display], ) -> Result { extern "C" { - new_host_function_with_error_buffer!(bigquery, query_table); + new_host_function_with_error_buffer!(gcp_bigquery, query_table); } let params = ReadTableRequest { @@ -101,7 +101,7 @@ pub fn query_table( let params = serde_json::to_string(¶ms).unwrap(); let res = unsafe { - bigquery_query_table( + gcp_bigquery_query_table( params.as_bytes().as_ptr(), params.as_bytes().len(), return_buffer.as_mut_ptr(), From 0deae02a0e6daa0091e0927abf1b43e18b2b5623 Mon Sep 17 00:00:00 2001 From: Will Spencer Date: Tue, 24 Feb 2026 11:58:39 -0500 Subject: [PATCH 04/19] Add WHERE clause to query_table API --- runtime/plaid-stl/src/gcp/bigquery.rs | 77 +++++++++++++++ runtime/plaid/src/apis/gcp/bigquery.rs | 129 ++++++++++++++++++++++--- 2 files changed, 195 insertions(+), 11 deletions(-) diff --git a/runtime/plaid-stl/src/gcp/bigquery.rs b/runtime/plaid-stl/src/gcp/bigquery.rs index 29dbc053..3331f526 100644 --- a/runtime/plaid-stl/src/gcp/bigquery.rs +++ b/runtime/plaid-stl/src/gcp/bigquery.rs @@ -18,6 +18,78 @@ pub struct ReadTableRequest { /// `SELECT *` so that callers are always explicit about what data they /// need and the runtime can return named results. pub columns: Vec, + /// Optional WHERE clause. When `None` the query returns all rows. + pub filter: Option, +} + +/// A node in a WHERE clause expression tree. +/// +/// Conditions can be nested arbitrarily using `And` and `Or`. The runtime +/// validates all column names and renders the tree into safe BigQuery SQL — +/// modules never construct raw SQL strings. +/// +/// # Example +/// +/// ```ignore +/// // WHERE (status = 'active' AND login_count > 5) +/// let filter = Filter::And(vec![ +/// Filter::Condition { +/// column: "status".into(), +/// operator: Operator::Eq, +/// value: FilterValue::String("active".into()), +/// }, +/// Filter::Condition { +/// column: "login_count".into(), +/// operator: Operator::Gt, +/// value: FilterValue::Integer(5), +/// }, +/// ]); +/// ``` +#[derive(Serialize, Deserialize)] +pub enum Filter { + /// All child conditions must be true. + And(Vec), + /// At least one child condition must be true. + Or(Vec), + /// A single column comparison. + Condition { + column: String, + operator: Operator, + value: FilterValue, + }, +} + +/// Comparison operator for a [`Filter::Condition`]. +#[derive(Serialize, Deserialize)] +pub enum Operator { + /// `=` + Eq, + /// `!=` + Ne, + /// `<` + Lt, + /// `<=` + Le, + /// `>` + Gt, + /// `>=` + Ge, + /// `LIKE` — use `%` and `_` wildcards in a [`FilterValue::String`]. + Like, + /// `IS NULL` — no value is required; the runtime ignores the `value` field. + IsNull, + /// `IS NOT NULL` — no value is required; the runtime ignores the `value` field. + IsNotNull, +} + +/// The right-hand-side value for a [`Filter::Condition`]. +#[derive(Serialize, Deserialize)] +pub enum FilterValue { + String(String), + Integer(i64), + Float(f64), + Boolean(bool), + Null, } /// Response returned by the runtime for a BigQuery read. @@ -81,10 +153,14 @@ impl<'a> IntoIterator for &'a ReadTableResponse { /// Returns a [`ReadTableResponse`] that can be iterated directly or indexed /// like a slice. Each row is a [`HashMap`] keyed by the column names supplied /// in `columns`. NULL database values are represented as [`Value::Null`]. +/// +/// Pass `filter` to add a WHERE clause. Use [`Filter`] to build the condition +/// tree — the runtime validates all identifiers and renders the SQL safely. pub fn query_table( dataset: impl Display, table: impl Display, columns: &[impl Display], + filter: Option, ) -> Result { extern "C" { new_host_function_with_error_buffer!(gcp_bigquery, query_table); @@ -94,6 +170,7 @@ pub fn query_table( dataset: dataset.to_string(), table: table.to_string(), columns: columns.iter().map(|c| c.to_string()).collect(), + filter, }; const RETURN_BUFFER_SIZE: usize = 1024 * 1024; // 1 MiB diff --git a/runtime/plaid/src/apis/gcp/bigquery.rs b/runtime/plaid/src/apis/gcp/bigquery.rs index 26ea85e1..9795062c 100644 --- a/runtime/plaid/src/apis/gcp/bigquery.rs +++ b/runtime/plaid/src/apis/gcp/bigquery.rs @@ -5,7 +5,9 @@ use google_cloud_bigquery::{ http::job::query::QueryRequest, query::row::Row, }; -use plaid_stl::gcp::bigquery::{ReadTableRequest, ReadTableResponse}; +use plaid_stl::gcp::bigquery::{ + Filter, FilterValue, Operator, ReadTableRequest, ReadTableResponse, +}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -229,7 +231,12 @@ impl BigQuery { params: &ReadTableRequest, ) -> Result { self.check_module_permission(module, ¶ms.dataset, ¶ms.table)?; - let query = build_query_string(¶ms.dataset, ¶ms.table, ¶ms.columns)?; + let query = build_query_string( + ¶ms.dataset, + ¶ms.table, + ¶ms.columns, + params.filter.as_ref(), + )?; Ok(QueryRequest { timeout_ms: Some(self.config.timeout_ms), query, @@ -238,15 +245,18 @@ impl BigQuery { } } -/// Validates that every identifier (dataset, table, column names) contains -/// only ASCII letters, digits, and underscores, then builds the SELECT query -/// with each column wrapped in backticks. +/// Validates every identifier and builds the full `SELECT … FROM … [WHERE …]` +/// query string. /// -/// Backtick-quoting alone is not sufficient — a column name containing a -/// literal backtick would break out of the quoting. Rejecting any name that -/// is not a clean identifier is the safest approach and the correct thing to -/// do in a controlled API: callers supply column names, not arbitrary SQL. -fn build_query_string(dataset: &str, table: &str, columns: &[String]) -> Result { +/// Column names, dataset, and table are validated against a strict identifier +/// allowlist. The optional `filter` is rendered via [`build_filter_sql`], which +/// recursively validates all column names inside the condition tree. +fn build_query_string( + dataset: &str, + table: &str, + columns: &[String], + filter: Option<&Filter>, +) -> Result { if !is_valid_identifier(dataset) || !is_valid_identifier(table) { return Err(ApiError::BadRequest); } @@ -267,7 +277,104 @@ fn build_query_string(dataset: &str, table: &str, columns: &[String]) -> Result< .collect::>() .join(", "); - Ok(format!("SELECT {column_list} FROM `{dataset}`.`{table}`")) + let mut sql = format!("SELECT {column_list} FROM `{dataset}`.`{table}`"); + + if let Some(f) = filter { + sql.push_str(" WHERE "); + sql.push_str(&build_filter_sql(f)?); + } + + Ok(sql) +} + +/// Recursively renders a [`Filter`] tree into a WHERE clause fragment. +/// +/// Column names inside `Condition` nodes are validated with +/// [`is_valid_identifier`] before use. `And` and `Or` nodes must contain at +/// least one child. +fn build_filter_sql(filter: &Filter) -> Result { + match filter { + Filter::And(children) | Filter::Or(children) if children.is_empty() => { + Err(ApiError::BadRequest) + } + Filter::And(children) => { + let parts = children + .iter() + .map(build_filter_sql) + .collect::, _>>()?; + Ok(format!("({})", parts.join(" AND "))) + } + Filter::Or(children) => { + let parts = children + .iter() + .map(build_filter_sql) + .collect::, _>>()?; + Ok(format!("({})", parts.join(" OR "))) + } + Filter::Condition { + column, + operator, + value, + } => { + if !is_valid_identifier(column) { + return Err(ApiError::BadRequest); + } + build_condition_sql(column, operator, value) + } + } +} + +/// Renders a single `column OP value` condition. +/// +/// `IsNull` and `IsNotNull` ignore `value` entirely. +fn build_condition_sql( + column: &str, + op: &Operator, + value: &FilterValue, +) -> Result { + let col = format!("`{column}`"); + match op { + Operator::IsNull => return Ok(format!("{col} IS NULL")), + Operator::IsNotNull => return Ok(format!("{col} IS NOT NULL")), + _ => {} + } + + let op_str = match op { + Operator::Eq => "=", + Operator::Ne => "<>", + Operator::Lt => "<", + Operator::Le => "<=", + Operator::Gt => ">", + Operator::Ge => ">=", + Operator::Like => "LIKE", + Operator::IsNull | Operator::IsNotNull => unreachable!(), // safety: returns above + }; + + Ok(format!("{col} {op_str} {}", build_value_sql(value)?)) +} + +/// Formats a [`FilterValue`] as a safe SQL literal. +/// +/// Strings are wrapped in single quotes with internal single quotes doubled +/// (`'` → `''`), which is the standard SQL escaping mechanism. NaN and +/// infinite floats are rejected because BigQuery has no SQL representation for +/// them. +fn build_value_sql(value: &FilterValue) -> Result { + match value { + FilterValue::String(s) => { + let escaped = s.replace('\'', "''"); + Ok(format!("'{escaped}'")) + } + FilterValue::Integer(n) => Ok(n.to_string()), + FilterValue::Float(f) => { + if f.is_nan() || f.is_infinite() { + return Err(ApiError::BadRequest); + } + Ok(f.to_string()) + } + FilterValue::Boolean(b) => Ok(if *b { "TRUE" } else { "FALSE" }.to_string()), + FilterValue::Null => Ok("NULL".to_string()), + } } /// Extracts the value at `index` from `row`, parsing it into the From 05eb68d5f83173fa508fd12dfb668f4a3698b77b Mon Sep 17 00:00:00 2001 From: Will Spencer Date: Tue, 24 Feb 2026 12:07:33 -0500 Subject: [PATCH 05/19] Add comment --- runtime/plaid/src/apis/gcp/bigquery.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/runtime/plaid/src/apis/gcp/bigquery.rs b/runtime/plaid/src/apis/gcp/bigquery.rs index 9795062c..d662fa6f 100644 --- a/runtime/plaid/src/apis/gcp/bigquery.rs +++ b/runtime/plaid/src/apis/gcp/bigquery.rs @@ -168,6 +168,13 @@ impl BigQuery { .get(¶ms.dataset) .and_then(|d| d.get(¶ms.table)); + // Stream rows from BigQuery one at a time. For each row, walk the + // requested columns in order — BigQuery returns them positionally, so + // the column index `i` is the correct handle into the row. The schema + // lookup drives decode_column so that integers, floats, and booleans + // are preserved as their native JSON types rather than being coerced to + // strings. Columns absent from the schema fall back to String, which is + // always safe to attempt. let mut rows = Vec::new(); while let Some(row) = iter.next().await.map_err(BigQueryError::from)? { let mut map = HashMap::new(); From 24c144f616d262950ca1a09e69d5dcafdcfa2913 Mon Sep 17 00:00:00 2001 From: Will Spencer Date: Tue, 24 Feb 2026 12:13:07 -0500 Subject: [PATCH 06/19] Update default timeout for BQ queries --- runtime/plaid/src/apis/gcp/bigquery.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/plaid/src/apis/gcp/bigquery.rs b/runtime/plaid/src/apis/gcp/bigquery.rs index d662fa6f..26707899 100644 --- a/runtime/plaid/src/apis/gcp/bigquery.rs +++ b/runtime/plaid/src/apis/gcp/bigquery.rs @@ -80,7 +80,7 @@ pub struct BigQueryConfig { /// The default timeout if none is provided. fn default_timeout_ms() -> i64 { - 50000 + 5000 } pub struct BigQuery { From 6b98d829c283b8d55c84acdd292b9d18f68e73b5 Mon Sep 17 00:00:00 2001 From: Will Spencer Date: Tue, 24 Feb 2026 12:13:47 -0500 Subject: [PATCH 07/19] Implement Clone for STL types + type rename --- runtime/plaid-stl/src/gcp/bigquery.rs | 40 ++++++++++++++------------ runtime/plaid/src/apis/gcp/bigquery.rs | 8 +++--- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/runtime/plaid-stl/src/gcp/bigquery.rs b/runtime/plaid-stl/src/gcp/bigquery.rs index 3331f526..cc5366a3 100644 --- a/runtime/plaid-stl/src/gcp/bigquery.rs +++ b/runtime/plaid-stl/src/gcp/bigquery.rs @@ -9,10 +9,12 @@ use serde_json::Value; use crate::PlaidFunctionError; -/// Request sent to the runtime to read rows from a BigQuery table. -#[derive(Deserialize, Serialize)] -pub struct ReadTableRequest { +/// Request sent to the runtime to query a BigQuery table. +#[derive(Deserialize, Serialize, Clone)] +pub struct QueryTableRequest { + /// Dataset where `table` lives pub dataset: String, + /// Name of the table to query pub table: String, /// Columns to select. Must be non-empty; the runtime does not support /// `SELECT *` so that callers are always explicit about what data they @@ -45,7 +47,7 @@ pub struct ReadTableRequest { /// }, /// ]); /// ``` -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub enum Filter { /// All child conditions must be true. And(Vec), @@ -60,7 +62,7 @@ pub enum Filter { } /// Comparison operator for a [`Filter::Condition`]. -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub enum Operator { /// `=` Eq, @@ -83,7 +85,7 @@ pub enum Operator { } /// The right-hand-side value for a [`Filter::Condition`]. -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub enum FilterValue { String(String), Integer(i64), @@ -92,17 +94,17 @@ pub enum FilterValue { Null, } -/// Response returned by the runtime for a BigQuery read. +/// Response returned by the runtime for a BigQuery query. /// /// Each row is a [`HashMap`] keyed by column name. NULL database values are /// represented as [`Value::Null`]. /// -/// `ReadTableResponse` implements [`Deref`] to `[HashMap]` and +/// `QueryTableResponse` implements [`Deref`] to `[HashMap]` and /// both consuming and borrowing [`IntoIterator`], so it can be used directly as /// a collection without accessing the inner field: /// /// ```ignore -/// let rows = read_from_table("my_dataset", "events", &["user_id", "count"])?; +/// let rows = query_table("my_dataset", "events", &["user_id", "count"])?; /// /// for row in &rows { /// let user = &row["user_id"]; // Value::String @@ -111,25 +113,25 @@ pub enum FilterValue { /// /// println!("{} rows returned", rows.len()); /// ``` -#[derive(Deserialize, Serialize, Debug)] -pub struct ReadTableResponse { +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct QueryTableResponse { pub rows: Vec>, } -impl Deref for ReadTableResponse { +impl Deref for QueryTableResponse { type Target = [HashMap]; fn deref(&self) -> &Self::Target { &self.rows } } -impl DerefMut for ReadTableResponse { +impl DerefMut for QueryTableResponse { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.rows } } -impl IntoIterator for ReadTableResponse { +impl IntoIterator for QueryTableResponse { type Item = HashMap; type IntoIter = std::vec::IntoIter; fn into_iter(self) -> Self::IntoIter { @@ -137,7 +139,7 @@ impl IntoIterator for ReadTableResponse { } } -impl<'a> IntoIterator for &'a ReadTableResponse { +impl<'a> IntoIterator for &'a QueryTableResponse { type Item = &'a HashMap; type IntoIter = std::slice::Iter<'a, HashMap>; fn into_iter(self) -> Self::IntoIter { @@ -145,12 +147,12 @@ impl<'a> IntoIterator for &'a ReadTableResponse { } } -/// Read rows from a BigQuery table. +/// Query a BigQuery table. /// /// `columns` must be non-empty. Specify exactly which columns you need; /// the runtime will reject requests that do not name at least one column. /// -/// Returns a [`ReadTableResponse`] that can be iterated directly or indexed +/// Returns a [`QueryTableResponse`] that can be iterated directly or indexed /// like a slice. Each row is a [`HashMap`] keyed by the column names supplied /// in `columns`. NULL database values are represented as [`Value::Null`]. /// @@ -161,12 +163,12 @@ pub fn query_table( table: impl Display, columns: &[impl Display], filter: Option, -) -> Result { +) -> Result { extern "C" { new_host_function_with_error_buffer!(gcp_bigquery, query_table); } - let params = ReadTableRequest { + let params = QueryTableRequest { dataset: dataset.to_string(), table: table.to_string(), columns: columns.iter().map(|c| c.to_string()).collect(), diff --git a/runtime/plaid/src/apis/gcp/bigquery.rs b/runtime/plaid/src/apis/gcp/bigquery.rs index 26707899..6db3352f 100644 --- a/runtime/plaid/src/apis/gcp/bigquery.rs +++ b/runtime/plaid/src/apis/gcp/bigquery.rs @@ -6,7 +6,7 @@ use google_cloud_bigquery::{ query::row::Row, }; use plaid_stl::gcp::bigquery::{ - Filter, FilterValue, Operator, ReadTableRequest, ReadTableResponse, + Filter, FilterValue, Operator, QueryTableRequest, QueryTableResponse, }; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -150,7 +150,7 @@ impl BigQuery { module: Arc, ) -> Result { let params = - serde_json::from_str::(params).map_err(|_| ApiError::BadRequest)?; + serde_json::from_str::(params).map_err(|_| ApiError::BadRequest)?; let query_request = self.build_query_request(&module, ¶ms)?; @@ -189,7 +189,7 @@ impl BigQuery { rows.push(map); } - let response = ReadTableResponse { rows }; + let response = QueryTableResponse { rows }; serde_json::to_string(&response).map_err(|_| ApiError::ImpossibleError) } @@ -235,7 +235,7 @@ impl BigQuery { fn build_query_request( &self, module: &Arc, - params: &ReadTableRequest, + params: &QueryTableRequest, ) -> Result { self.check_module_permission(module, ¶ms.dataset, ¶ms.table)?; let query = build_query_string( From b5522848eb5ef23a362bf2e5a8e16ef19daa25e6 Mon Sep 17 00:00:00 2001 From: Will Spencer Date: Tue, 24 Feb 2026 12:21:21 -0500 Subject: [PATCH 08/19] Bump version --- modules/Cargo.lock | 2 +- runtime/Cargo.lock | 4 ++-- runtime/plaid-stl/Cargo.toml | 2 +- runtime/plaid/Cargo.toml | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/modules/Cargo.lock b/modules/Cargo.lock index c3c0ec18..de4d99d8 100644 --- a/modules/Cargo.lock +++ b/modules/Cargo.lock @@ -215,7 +215,7 @@ checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" [[package]] name = "plaid_stl" -version = "0.36.1" +version = "0.37.0" dependencies = [ "base64 0.13.1", "chrono", diff --git a/runtime/Cargo.lock b/runtime/Cargo.lock index cee69b40..02e638fb 100644 --- a/runtime/Cargo.lock +++ b/runtime/Cargo.lock @@ -4198,7 +4198,7 @@ checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" [[package]] name = "plaid" -version = "0.36.1" +version = "0.37.0" dependencies = [ "aes 0.7.5", "alkali", @@ -4263,7 +4263,7 @@ dependencies = [ [[package]] name = "plaid_stl" -version = "0.36.1" +version = "0.37.0" dependencies = [ "base64 0.13.1", "chrono", diff --git a/runtime/plaid-stl/Cargo.toml b/runtime/plaid-stl/Cargo.toml index 37226f94..f92f8c9d 100644 --- a/runtime/plaid-stl/Cargo.toml +++ b/runtime/plaid-stl/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "plaid_stl" -version = "0.36.1" +version = "0.37.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/runtime/plaid/Cargo.toml b/runtime/plaid/Cargo.toml index 434dc202..c7caf096 100644 --- a/runtime/plaid/Cargo.toml +++ b/runtime/plaid/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "plaid" -version = "0.36.1" +version = "0.37.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html From b0fcb43b980f98f35a4c17c3ed2d81d077bb4216 Mon Sep 17 00:00:00 2001 From: Will Spencer Date: Tue, 24 Feb 2026 12:31:24 -0500 Subject: [PATCH 09/19] Use != instead of <> for not equals operator --- runtime/plaid/src/apis/gcp/bigquery.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/plaid/src/apis/gcp/bigquery.rs b/runtime/plaid/src/apis/gcp/bigquery.rs index 6db3352f..861094d4 100644 --- a/runtime/plaid/src/apis/gcp/bigquery.rs +++ b/runtime/plaid/src/apis/gcp/bigquery.rs @@ -348,7 +348,7 @@ fn build_condition_sql( let op_str = match op { Operator::Eq => "=", - Operator::Ne => "<>", + Operator::Ne => "!=", Operator::Lt => "<", Operator::Le => "<=", Operator::Gt => ">", From 79ff50edb87df7d158c9726760a1ff4f31b2e9b6 Mon Sep 17 00:00:00 2001 From: Will Spencer Date: Tue, 24 Feb 2026 12:32:00 -0500 Subject: [PATCH 10/19] Update stale comments --- runtime/plaid/src/apis/gcp/bigquery.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/plaid/src/apis/gcp/bigquery.rs b/runtime/plaid/src/apis/gcp/bigquery.rs index 861094d4..f3faeb42 100644 --- a/runtime/plaid/src/apis/gcp/bigquery.rs +++ b/runtime/plaid/src/apis/gcp/bigquery.rs @@ -125,7 +125,7 @@ impl BigQuery { } /// Executes a `SELECT` query against a BigQuery table and returns the - /// results serialised as a [`ReadTableResponse`] JSON string. + /// results serialised as a [`QueryTableResponse`] JSON string. /// /// # Flow /// @@ -138,7 +138,7 @@ impl BigQuery { /// 4. For each cell, looks up the column's [`ColumnType`] from the /// configured schema (defaulting to `String` when absent) and calls /// [`decode_column`] to produce the correct `serde_json::Value` variant. - /// 5. Wraps the rows in a [`ReadTableResponse`] and serializes to JSON. + /// 5. Wraps the rows in a [`QueryTableResponse`] and serializes to JSON. /// /// # Errors /// From 2010dd7c6b958277c6633b4332acf8216a4efd9d Mon Sep 17 00:00:00 2001 From: Will Spencer Date: Tue, 24 Feb 2026 12:48:20 -0500 Subject: [PATCH 11/19] Update return buffer size in STL + add comment regarding memory safety --- runtime/plaid-stl/src/gcp/bigquery.rs | 2 +- runtime/plaid/src/apis/gcp/bigquery.rs | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/runtime/plaid-stl/src/gcp/bigquery.rs b/runtime/plaid-stl/src/gcp/bigquery.rs index cc5366a3..aaef4726 100644 --- a/runtime/plaid-stl/src/gcp/bigquery.rs +++ b/runtime/plaid-stl/src/gcp/bigquery.rs @@ -175,7 +175,7 @@ pub fn query_table( filter, }; - const RETURN_BUFFER_SIZE: usize = 1024 * 1024; // 1 MiB + const RETURN_BUFFER_SIZE: usize = 10 * 1000 * 1000; // 10 MB let mut return_buffer = vec![0; RETURN_BUFFER_SIZE]; let params = serde_json::to_string(¶ms).unwrap(); diff --git a/runtime/plaid/src/apis/gcp/bigquery.rs b/runtime/plaid/src/apis/gcp/bigquery.rs index f3faeb42..fdd52129 100644 --- a/runtime/plaid/src/apis/gcp/bigquery.rs +++ b/runtime/plaid/src/apis/gcp/bigquery.rs @@ -175,7 +175,11 @@ impl BigQuery { // are preserved as their native JSON types rather than being coerced to // strings. Columns absent from the schema fall back to String, which is // always safe to attempt. - let mut rows = Vec::new(); + // + // Memory safety: the BigQuery API enforces a hard 10 MB + // per-response cap server-side, so a malicious module cannot trigger an + // OOM by issuing a query that matches an arbitrarily large result set. + let mut rows: Vec> = Vec::new(); while let Some(row) = iter.next().await.map_err(BigQueryError::from)? { let mut map = HashMap::new(); for (i, col_name) in params.columns.iter().enumerate() { From 28f595e3695e4d4f0aa2cb0adb3ecfb6b22ac397 Mon Sep 17 00:00:00 2001 From: Will Spencer Date: Tue, 24 Feb 2026 13:01:41 -0500 Subject: [PATCH 12/19] Fix Docker build errors --- runtime/Cargo.lock | 99 ---------------------------------------- runtime/plaid/Cargo.toml | 5 +- 2 files changed, 4 insertions(+), 100 deletions(-) diff --git a/runtime/Cargo.lock b/runtime/Cargo.lock index 02e638fb..9a408a2e 100644 --- a/runtime/Cargo.lock +++ b/runtime/Cargo.lock @@ -2258,21 +2258,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" -[[package]] -name = "foreign-types" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" -dependencies = [ - "foreign-types-shared", -] - -[[package]] -name = "foreign-types-shared" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" - [[package]] name = "form_urlencoded" version = "1.2.2" @@ -2962,22 +2947,6 @@ dependencies = [ "tower-service", ] -[[package]] -name = "hyper-tls" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" -dependencies = [ - "bytes", - "http-body-util", - "hyper 1.8.1", - "hyper-util", - "native-tls", - "tokio", - "tokio-native-tls", - "tower-service", -] - [[package]] name = "hyper-util" version = "0.1.20" @@ -3711,23 +3680,6 @@ dependencies = [ "syn 2.0.117", ] -[[package]] -name = "native-tls" -version = "0.2.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "465500e14ea162429d264d44189adc38b199b62b1c21eea9f69e4b73cb03bbf2" -dependencies = [ - "libc", - "log", - "openssl", - "openssl-probe 0.2.1", - "openssl-sys", - "schannel", - "security-framework 3.7.0", - "security-framework-sys", - "tempfile", -] - [[package]] name = "nom" version = "7.1.3" @@ -3926,32 +3878,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" -[[package]] -name = "openssl" -version = "0.10.75" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08838db121398ad17ab8531ce9de97b244589089e290a384c900cb9ff7434328" -dependencies = [ - "bitflags 2.11.0", - "cfg-if", - "foreign-types", - "libc", - "once_cell", - "openssl-macros", - "openssl-sys", -] - -[[package]] -name = "openssl-macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.117", -] - [[package]] name = "openssl-probe" version = "0.1.6" @@ -3964,18 +3890,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" -[[package]] -name = "openssl-sys" -version = "0.9.111" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82cab2d520aa75e3c58898289429321eb788c3106963d0dc886ec7a5f4adc321" -dependencies = [ - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "outref" version = "0.5.2" @@ -4795,13 +4709,11 @@ dependencies = [ "http-body-util", "hyper 1.8.1", "hyper-rustls 0.27.7", - "hyper-tls", "hyper-util", "js-sys", "log", "mime", "mime_guess", - "native-tls", "percent-encoding", "pin-project-lite", "quinn", @@ -4812,7 +4724,6 @@ dependencies = [ "serde_urlencoded", "sync_wrapper", "tokio", - "tokio-native-tls", "tokio-rustls 0.26.4", "tokio-util", "tower 0.5.3", @@ -5848,16 +5759,6 @@ dependencies = [ "syn 2.0.117", ] -[[package]] -name = "tokio-native-tls" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" -dependencies = [ - "native-tls", - "tokio", -] - [[package]] name = "tokio-retry2" version = "0.5.8" diff --git a/runtime/plaid/Cargo.toml b/runtime/plaid/Cargo.toml index c7caf096..45dc710a 100644 --- a/runtime/plaid/Cargo.toml +++ b/runtime/plaid/Cargo.toml @@ -95,7 +95,10 @@ wasmer-middlewares = "7" x509-parser = "0.18.0" thiserror = "2.0.17" pulldown-cmark = "0.13.0" -google-cloud-bigquery = { version = "0.15.0", optional = true, default-features = true } +google-cloud-bigquery = { version = "0.15.0", optional = true, default-features = false, features = [ + "rustls-tls", + "auth", +] } [[example]] name = "github-tailer" From 928696c2af442b812e38763fa0e4afbd62b33bc4 Mon Sep 17 00:00:00 2001 From: Will Spencer Date: Tue, 24 Feb 2026 13:09:18 -0500 Subject: [PATCH 13/19] Add example config --- runtime/plaid/resources/config/apis.toml | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/runtime/plaid/resources/config/apis.toml b/runtime/plaid/resources/config/apis.toml index 6c9eb290..f5d89760 100644 --- a/runtime/plaid/resources/config/apis.toml +++ b/runtime/plaid/resources/config/apis.toml @@ -338,3 +338,27 @@ rules_and_actions = { "some_rule.wasm" = ["Encrypt", "Decrypt"] } [apis.bloom_filter] # nothing here + +# ============================================================================= +# BigQuery example configuration +# ============================================================================= +# [apis.gcp.bigquery] +# project_id = "" +# client_email = "" +# private_key_id = "" +# private_key = """ +# +# """ +# timeout_ms = 30000 # optional, defaults to 50000 + +# # Read access map: module → dataset → list of permitted tables +# [apis.gcp.bigquery.r.".wasm"] +# "" = ["table1", "table2"] + +# # Column type schemas: dataset → table → column → type +# # Supported types: "string" | "integer" | "float" | "boolean" +# # Columns not listed here default to "string". +# [apis.gcp.bigquery.schemas.""."table1"] +# column1 = "string" +# colum2 = "float" + From ecf716e74044beb0a6cac8f3b2e2b232ccdba766 Mon Sep 17 00:00:00 2001 From: Will Spencer Date: Thu, 26 Feb 2026 10:16:36 -0500 Subject: [PATCH 14/19] Implement max response size checking --- runtime/plaid/src/apis/gcp/bigquery.rs | 49 +++++++++++++++++++++++--- 1 file changed, 44 insertions(+), 5 deletions(-) diff --git a/runtime/plaid/src/apis/gcp/bigquery.rs b/runtime/plaid/src/apis/gcp/bigquery.rs index fdd52129..d310747d 100644 --- a/runtime/plaid/src/apis/gcp/bigquery.rs +++ b/runtime/plaid/src/apis/gcp/bigquery.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, io, sync::Arc}; use google_cloud_bigquery::{ client::{google_cloud_auth, Client, ClientConfig}, @@ -28,6 +28,8 @@ pub enum BigQueryError { IterError(#[from] google_cloud_bigquery::query::Error), #[error("Row decode error: {0}")] RowError(#[from] google_cloud_bigquery::query::row::Error), + #[error("Response too large: accumulated row data exceeded the configured max_response_size")] + ResponseTooLarge, } /// The BigQuery type a column should be decoded into. @@ -76,6 +78,32 @@ pub struct BigQueryConfig { /// Timeout (in milliseconds) applied to all queries #[serde(default = "default_timeout_ms")] timeout_ms: i64, + /// Maximum total response size in bytes across all rows returned by a + /// single query. Defaults to 1 MiB. Queries whose accumulated decoded row + /// data exceeds this limit are aborted with a `ResponseTooLarge` error. + #[serde(default = "default_max_response_size")] + max_response_size: usize, +} + +fn default_max_response_size() -> usize { + 1024 * 1024 // 1 MiB +} + +/// A zero-allocation `Write` sink that counts the bytes written to it. +/// +/// Used with `serde_json::to_writer` to measure the serialized size of a value +/// without allocating a temporary buffer — unlike `serde_json::to_vec`, which +/// builds an owned `Vec` purely so we can call `.len()` on it. +struct CountWriter(usize); + +impl io::Write for CountWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.0 += buf.len(); + Ok(buf.len()) + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } } /// The default timeout if none is provided. @@ -143,7 +171,9 @@ impl BigQuery { /// # Errors /// /// Returns `ApiError::BadRequest` if permissions or identifier validation - /// fails, and `ApiError::BigQueryError` for any network or decoding error. + /// fails, `ApiError::BigQueryError(BigQueryError::ResponseTooLarge)` if the + /// accumulated decoded row data exceeds `max_response_size`, and + /// `ApiError::BigQueryError` for any other network or decoding error. pub async fn query_table( &self, params: &str, @@ -176,10 +206,13 @@ impl BigQuery { // strings. Columns absent from the schema fall back to String, which is // always safe to attempt. // - // Memory safety: the BigQuery API enforces a hard 10 MB - // per-response cap server-side, so a malicious module cannot trigger an - // OOM by issuing a query that matches an arbitrarily large result set. + // Memory safety: each decoded row is serialised to JSON to measure its + // wire size, and the running total is checked against max_response_size + // before the row is buffered. This gives us a hard cap below BigQuery's + // own 10 MB server-side limit and prevents a large result set from + // exhausting process memory. let mut rows: Vec> = Vec::new(); + let mut bytes_accumulated: usize = 0; while let Some(row) = iter.next().await.map_err(BigQueryError::from)? { let mut map = HashMap::new(); for (i, col_name) in params.columns.iter().enumerate() { @@ -190,6 +223,12 @@ impl BigQuery { let value = decode_column(&row, i, col_type).map_err(BigQueryError::from)?; map.insert(col_name.clone(), value); } + let mut counter = CountWriter(0); + serde_json::to_writer(&mut counter, &map).map_err(|_| ApiError::ImpossibleError)?; + bytes_accumulated += counter.0; + if bytes_accumulated > self.config.max_response_size { + return Err(ApiError::BigQueryError(BigQueryError::ResponseTooLarge)); + } rows.push(map); } From 29f576fc5325a0e7b92dfd041ef0fc4412c8dff4 Mon Sep 17 00:00:00 2001 From: Will Spencer Date: Thu, 26 Feb 2026 10:17:42 -0500 Subject: [PATCH 15/19] Require rule to provide return buffer size --- runtime/plaid-stl/src/gcp/bigquery.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/runtime/plaid-stl/src/gcp/bigquery.rs b/runtime/plaid-stl/src/gcp/bigquery.rs index aaef4726..847ea856 100644 --- a/runtime/plaid-stl/src/gcp/bigquery.rs +++ b/runtime/plaid-stl/src/gcp/bigquery.rs @@ -163,6 +163,7 @@ pub fn query_table( table: impl Display, columns: &[impl Display], filter: Option, + return_buffer_size: usize, ) -> Result { extern "C" { new_host_function_with_error_buffer!(gcp_bigquery, query_table); @@ -175,8 +176,7 @@ pub fn query_table( filter, }; - const RETURN_BUFFER_SIZE: usize = 10 * 1000 * 1000; // 10 MB - let mut return_buffer = vec![0; RETURN_BUFFER_SIZE]; + let mut return_buffer = vec![0; return_buffer_size]; let params = serde_json::to_string(¶ms).unwrap(); let res = unsafe { @@ -184,7 +184,7 @@ pub fn query_table( params.as_bytes().as_ptr(), params.as_bytes().len(), return_buffer.as_mut_ptr(), - RETURN_BUFFER_SIZE, + return_buffer_size, ) }; From 0b9e3c4028235e7617796e6fc3a829a28775be12 Mon Sep 17 00:00:00 2001 From: Will Spencer Date: Thu, 26 Feb 2026 10:29:49 -0500 Subject: [PATCH 16/19] Add depth limit to Filter --- runtime/plaid-stl/src/gcp/bigquery.rs | 4 +--- runtime/plaid/src/apis/gcp/bigquery.rs | 17 ++++++++++++----- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/runtime/plaid-stl/src/gcp/bigquery.rs b/runtime/plaid-stl/src/gcp/bigquery.rs index 847ea856..e83d6f79 100644 --- a/runtime/plaid-stl/src/gcp/bigquery.rs +++ b/runtime/plaid-stl/src/gcp/bigquery.rs @@ -26,9 +26,7 @@ pub struct QueryTableRequest { /// A node in a WHERE clause expression tree. /// -/// Conditions can be nested arbitrarily using `And` and `Or`. The runtime -/// validates all column names and renders the tree into safe BigQuery SQL — -/// modules never construct raw SQL strings. +/// Conditions can be nested using `And` and `Or`. /// /// # Example /// diff --git a/runtime/plaid/src/apis/gcp/bigquery.rs b/runtime/plaid/src/apis/gcp/bigquery.rs index d310747d..275e4c99 100644 --- a/runtime/plaid/src/apis/gcp/bigquery.rs +++ b/runtime/plaid/src/apis/gcp/bigquery.rs @@ -16,6 +16,9 @@ use log::error; const TOKEN_URI: &str = "https://oauth2.googleapis.com/token"; +/// Maximum `And`/`Or` nesting depth for a [`Filter`] tree. +const MAX_FILTER_DEPTH: usize = 4; + #[derive(Error, Debug)] pub enum BigQueryError { #[error("Authentication error: {0}")] @@ -331,7 +334,7 @@ fn build_query_string( if let Some(f) = filter { sql.push_str(" WHERE "); - sql.push_str(&build_filter_sql(f)?); + sql.push_str(&build_filter_sql(f, 0)?); } Ok(sql) @@ -341,8 +344,12 @@ fn build_query_string( /// /// Column names inside `Condition` nodes are validated with /// [`is_valid_identifier`] before use. `And` and `Or` nodes must contain at -/// least one child. -fn build_filter_sql(filter: &Filter) -> Result { +/// least one child. Nesting depth is limited to [`MAX_FILTER_DEPTH`]; trees +/// deeper than that are rejected with `BadRequest`. +fn build_filter_sql(filter: &Filter, depth: usize) -> Result { + if depth > MAX_FILTER_DEPTH { + return Err(ApiError::BadRequest); + } match filter { Filter::And(children) | Filter::Or(children) if children.is_empty() => { Err(ApiError::BadRequest) @@ -350,14 +357,14 @@ fn build_filter_sql(filter: &Filter) -> Result { Filter::And(children) => { let parts = children .iter() - .map(build_filter_sql) + .map(|c| build_filter_sql(c, depth + 1)) .collect::, _>>()?; Ok(format!("({})", parts.join(" AND "))) } Filter::Or(children) => { let parts = children .iter() - .map(build_filter_sql) + .map(|c| build_filter_sql(c, depth + 1)) .collect::, _>>()?; Ok(format!("({})", parts.join(" OR "))) } From 1ffaf26ae5337cd9f2792099a58b80d2acfdaa88 Mon Sep 17 00:00:00 2001 From: Will Spencer Date: Thu, 26 Feb 2026 11:31:05 -0500 Subject: [PATCH 17/19] Update WHERE condition escaping, additional checks on filter size, log injection fix --- runtime/plaid/src/apis/gcp/bigquery.rs | 32 ++++++++++++++++++++------ 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/runtime/plaid/src/apis/gcp/bigquery.rs b/runtime/plaid/src/apis/gcp/bigquery.rs index 275e4c99..26cafe52 100644 --- a/runtime/plaid/src/apis/gcp/bigquery.rs +++ b/runtime/plaid/src/apis/gcp/bigquery.rs @@ -19,6 +19,10 @@ const TOKEN_URI: &str = "https://oauth2.googleapis.com/token"; /// Maximum `And`/`Or` nesting depth for a [`Filter`] tree. const MAX_FILTER_DEPTH: usize = 4; +/// Maximum number of sibling conditions allowed inside a single `And`/`Or` +/// node. Prevents unbounded SQL string growth from a flat list of conditions. +const MAX_FILTER_CHILDREN: usize = 16; + #[derive(Error, Debug)] pub enum BigQueryError { #[error("Authentication error: {0}")] @@ -283,6 +287,12 @@ impl BigQuery { module: &Arc, params: &QueryTableRequest, ) -> Result { + // Validate dataset and table identifiers before the permission check so + // that the names logged inside check_module_permission are guaranteed to + // be safe ASCII identifiers + if !is_valid_identifier(¶ms.dataset) || !is_valid_identifier(¶ms.table) { + return Err(ApiError::BadRequest); + } self.check_module_permission(module, ¶ms.dataset, ¶ms.table)?; let query = build_query_string( ¶ms.dataset, @@ -344,8 +354,8 @@ fn build_query_string( /// /// Column names inside `Condition` nodes are validated with /// [`is_valid_identifier`] before use. `And` and `Or` nodes must contain at -/// least one child. Nesting depth is limited to [`MAX_FILTER_DEPTH`]; trees -/// deeper than that are rejected with `BadRequest`. +/// least one child and no more than [`MAX_FILTER_CHILDREN`]. Nesting depth is +/// limited to [`MAX_FILTER_DEPTH`]; exceeding either limit returns `BadRequest`. fn build_filter_sql(filter: &Filter, depth: usize) -> Result { if depth > MAX_FILTER_DEPTH { return Err(ApiError::BadRequest); @@ -355,6 +365,9 @@ fn build_filter_sql(filter: &Filter, depth: usize) -> Result { Err(ApiError::BadRequest) } Filter::And(children) => { + if children.len() > MAX_FILTER_CHILDREN { + return Err(ApiError::BadRequest); + } let parts = children .iter() .map(|c| build_filter_sql(c, depth + 1)) @@ -362,6 +375,9 @@ fn build_filter_sql(filter: &Filter, depth: usize) -> Result { Ok(format!("({})", parts.join(" AND "))) } Filter::Or(children) => { + if children.len() > MAX_FILTER_CHILDREN { + return Err(ApiError::BadRequest); + } let parts = children .iter() .map(|c| build_filter_sql(c, depth + 1)) @@ -412,14 +428,16 @@ fn build_condition_sql( /// Formats a [`FilterValue`] as a safe SQL literal. /// -/// Strings are wrapped in single quotes with internal single quotes doubled -/// (`'` → `''`), which is the standard SQL escaping mechanism. NaN and -/// infinite floats are rejected because BigQuery has no SQL representation for -/// them. +/// Strings are wrapped in single quotes. BigQuery Standard SQL recognises both +/// `''` and `\'` as a literal single quote inside a string, so backslashes must +/// be doubled first — otherwise a trailing `\` would turn our closing `''` into +/// an escape sequence and allow injection. The order is critical: `\` → `\\`, +/// then `'` → `''`. NaN and infinite floats are rejected because BigQuery has +/// no SQL representation for them. fn build_value_sql(value: &FilterValue) -> Result { match value { FilterValue::String(s) => { - let escaped = s.replace('\'', "''"); + let escaped = s.replace('\\', "\\\\").replace('\'', "''"); Ok(format!("'{escaped}'")) } FilterValue::Integer(n) => Ok(n.to_string()), From 09bc509de98c40d1003578ee162d0ade2165a129 Mon Sep 17 00:00:00 2001 From: Will Spencer Date: Mon, 2 Mar 2026 09:07:47 -0500 Subject: [PATCH 18/19] Use GoogleSQL named query parameters --- runtime/plaid/src/apis/gcp/bigquery.rs | 102 +++++++++++++++++-------- 1 file changed, 71 insertions(+), 31 deletions(-) diff --git a/runtime/plaid/src/apis/gcp/bigquery.rs b/runtime/plaid/src/apis/gcp/bigquery.rs index 26cafe52..a82da86e 100644 --- a/runtime/plaid/src/apis/gcp/bigquery.rs +++ b/runtime/plaid/src/apis/gcp/bigquery.rs @@ -2,7 +2,10 @@ use std::{collections::HashMap, io, sync::Arc}; use google_cloud_bigquery::{ client::{google_cloud_auth, Client, ClientConfig}, - http::job::query::QueryRequest, + http::{ + job::query::QueryRequest, + types::{QueryParameter, QueryParameterType, QueryParameterValue}, + }, query::row::Row, }; use plaid_stl::gcp::bigquery::{ @@ -294,32 +297,45 @@ impl BigQuery { return Err(ApiError::BadRequest); } self.check_module_permission(module, ¶ms.dataset, ¶ms.table)?; - let query = build_query_string( + let (query, query_parameters) = build_query_string( ¶ms.dataset, ¶ms.table, ¶ms.columns, params.filter.as_ref(), )?; + + info!("{query}"); + info!("{query_parameters:#?}"); + + let parameter_mode = if query_parameters.is_empty() { + None + } else { + Some("NAMED".to_string()) + }; Ok(QueryRequest { timeout_ms: Some(self.config.timeout_ms), query, + parameter_mode, + query_parameters, ..Default::default() }) } } /// Validates every identifier and builds the full `SELECT … FROM … [WHERE …]` -/// query string. +/// query string together with its associated named query parameters. /// /// Column names, dataset, and table are validated against a strict identifier /// allowlist. The optional `filter` is rendered via [`build_filter_sql`], which -/// recursively validates all column names inside the condition tree. +/// recursively validates all column names inside the condition tree. Filter +/// values are not inlined — they are returned as [`QueryParameter`] entries to +/// be bound by the BigQuery API. fn build_query_string( dataset: &str, table: &str, columns: &[String], filter: Option<&Filter>, -) -> Result { +) -> Result<(String, Vec), ApiError> { if !is_valid_identifier(dataset) || !is_valid_identifier(table) { return Err(ApiError::BadRequest); } @@ -341,13 +357,14 @@ fn build_query_string( .join(", "); let mut sql = format!("SELECT {column_list} FROM `{dataset}`.`{table}`"); + let mut params: Vec = Vec::new(); if let Some(f) = filter { sql.push_str(" WHERE "); - sql.push_str(&build_filter_sql(f, 0)?); + sql.push_str(&build_filter_sql(f, 0, &mut params)?); } - Ok(sql) + Ok((sql, params)) } /// Recursively renders a [`Filter`] tree into a WHERE clause fragment. @@ -356,7 +373,14 @@ fn build_query_string( /// [`is_valid_identifier`] before use. `And` and `Or` nodes must contain at /// least one child and no more than [`MAX_FILTER_CHILDREN`]. Nesting depth is /// limited to [`MAX_FILTER_DEPTH`]; exceeding either limit returns `BadRequest`. -fn build_filter_sql(filter: &Filter, depth: usize) -> Result { +/// +/// Leaf values are not inlined as SQL literals — they are appended to `params` +/// as typed [`QueryParameter`] entries and referenced via `@pN` placeholders. +fn build_filter_sql( + filter: &Filter, + depth: usize, + params: &mut Vec, +) -> Result { if depth > MAX_FILTER_DEPTH { return Err(ApiError::BadRequest); } @@ -370,7 +394,7 @@ fn build_filter_sql(filter: &Filter, depth: usize) -> Result { } let parts = children .iter() - .map(|c| build_filter_sql(c, depth + 1)) + .map(|c| build_filter_sql(c, depth + 1, params)) .collect::, _>>()?; Ok(format!("({})", parts.join(" AND "))) } @@ -380,7 +404,7 @@ fn build_filter_sql(filter: &Filter, depth: usize) -> Result { } let parts = children .iter() - .map(|c| build_filter_sql(c, depth + 1)) + .map(|c| build_filter_sql(c, depth + 1, params)) .collect::, _>>()?; Ok(format!("({})", parts.join(" OR "))) } @@ -392,18 +416,21 @@ fn build_filter_sql(filter: &Filter, depth: usize) -> Result { if !is_valid_identifier(column) { return Err(ApiError::BadRequest); } - build_condition_sql(column, operator, value) + build_condition_sql(column, operator, value, params) } } } /// Renders a single `column OP value` condition. /// -/// `IsNull` and `IsNotNull` ignore `value` entirely. +/// `IsNull` and `IsNotNull` emit `IS NULL` / `IS NOT NULL` directly and leave +/// `params` unchanged. All other operators append a typed named parameter to +/// `params` and emit `column OP @pN`. fn build_condition_sql( column: &str, op: &Operator, value: &FilterValue, + params: &mut Vec, ) -> Result { let col = format!("`{column}`"); match op { @@ -423,33 +450,46 @@ fn build_condition_sql( Operator::IsNull | Operator::IsNotNull => unreachable!(), // safety: returns above }; - Ok(format!("{col} {op_str} {}", build_value_sql(value)?)) + let placeholder = push_value_param(value, params)?; + Ok(format!("{col} {op_str} {placeholder}")) } -/// Formats a [`FilterValue`] as a safe SQL literal. +/// Appends a typed [`QueryParameter`] to `params` and returns the corresponding +/// `@pN` placeholder string. /// -/// Strings are wrapped in single quotes. BigQuery Standard SQL recognises both -/// `''` and `\'` as a literal single quote inside a string, so backslashes must -/// be doubled first — otherwise a trailing `\` would turn our closing `''` into -/// an escape sequence and allow injection. The order is critical: `\` → `\\`, -/// then `'` → `''`. NaN and infinite floats are rejected because BigQuery has -/// no SQL representation for them. -fn build_value_sql(value: &FilterValue) -> Result { - match value { - FilterValue::String(s) => { - let escaped = s.replace('\\', "\\\\").replace('\'', "''"); - Ok(format!("'{escaped}'")) - } - FilterValue::Integer(n) => Ok(n.to_string()), +/// NaN and infinite floats are rejected because BigQuery has no wire representation for them. +fn push_value_param( + value: &FilterValue, + params: &mut Vec, +) -> Result { + let name = format!("p{}", params.len()); + + let (type_str, scalar_value) = match value { + FilterValue::String(s) => ("STRING", Some(s.clone())), + FilterValue::Integer(n) => ("INT64", Some(n.to_string())), FilterValue::Float(f) => { if f.is_nan() || f.is_infinite() { return Err(ApiError::BadRequest); } - Ok(f.to_string()) + ("FLOAT64", Some(f.to_string())) } - FilterValue::Boolean(b) => Ok(if *b { "TRUE" } else { "FALSE" }.to_string()), - FilterValue::Null => Ok("NULL".to_string()), - } + FilterValue::Boolean(b) => ("BOOL", Some(b.to_string())), + FilterValue::Null => ("STRING", None), + }; + + params.push(QueryParameter { + name: Some(name.clone()), + parameter_type: QueryParameterType { + parameter_type: type_str.to_string(), + ..Default::default() + }, + parameter_value: QueryParameterValue { + value: scalar_value, + ..Default::default() + }, + }); + + Ok(format!("@{name}")) } /// Extracts the value at `index` from `row`, parsing it into the From 266a8a52e3a0ba8af0ad1cbc1351133bda85e175 Mon Sep 17 00:00:00 2001 From: Will Spencer Date: Thu, 5 Mar 2026 09:26:45 -0500 Subject: [PATCH 19/19] Bump version --- modules/Cargo.lock | 2 +- runtime/Cargo.lock | 4 ++-- runtime/plaid-stl/Cargo.toml | 2 +- runtime/plaid/Cargo.toml | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/modules/Cargo.lock b/modules/Cargo.lock index de4d99d8..50e89fb2 100644 --- a/modules/Cargo.lock +++ b/modules/Cargo.lock @@ -215,7 +215,7 @@ checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" [[package]] name = "plaid_stl" -version = "0.37.0" +version = "0.38.0" dependencies = [ "base64 0.13.1", "chrono", diff --git a/runtime/Cargo.lock b/runtime/Cargo.lock index 9a408a2e..6aa2be7c 100644 --- a/runtime/Cargo.lock +++ b/runtime/Cargo.lock @@ -4112,7 +4112,7 @@ checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" [[package]] name = "plaid" -version = "0.37.0" +version = "0.38.0" dependencies = [ "aes 0.7.5", "alkali", @@ -4177,7 +4177,7 @@ dependencies = [ [[package]] name = "plaid_stl" -version = "0.37.0" +version = "0.38.0" dependencies = [ "base64 0.13.1", "chrono", diff --git a/runtime/plaid-stl/Cargo.toml b/runtime/plaid-stl/Cargo.toml index f92f8c9d..4d6d50b6 100644 --- a/runtime/plaid-stl/Cargo.toml +++ b/runtime/plaid-stl/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "plaid_stl" -version = "0.37.0" +version = "0.38.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/runtime/plaid/Cargo.toml b/runtime/plaid/Cargo.toml index 45dc710a..2e02feda 100644 --- a/runtime/plaid/Cargo.toml +++ b/runtime/plaid/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "plaid" -version = "0.37.0" +version = "0.38.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html