Skip to content

Commit d2dde1f

Browse files
committed
Call parameterized views from sql #3489, plan and execution
1 parent 13110e8 commit d2dde1f

File tree

21 files changed

+746
-232
lines changed

21 files changed

+746
-232
lines changed

crates/core/src/db/relational_db.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1122,6 +1122,10 @@ impl RelationalDB {
11221122
Ok(tx.create_view(module_def, view_def)?)
11231123
}
11241124

1125+
pub fn create_or_get_params(&self, tx: &mut MutTx, params: &ProductValue) -> Result<ArgId, DBError> {
1126+
Ok(tx.create_or_get_params(params)?)
1127+
}
1128+
11251129
pub fn drop_view(&self, tx: &mut MutTx, view_id: ViewId) -> Result<(), DBError> {
11261130
Ok(tx.drop_view(view_id)?)
11271131
}
@@ -2217,6 +2221,7 @@ pub mod tests_utils {
22172221
db: &RelationalDB,
22182222
name: &str,
22192223
schema: &[(&str, AlgebraicType)],
2224+
params: ProductType,
22202225
is_anonymous: bool,
22212226
) -> Result<(ViewId, TableId), DBError> {
22222227
let mut builder = RawModuleDefV9Builder::new();
@@ -2234,7 +2239,7 @@ pub mod tests_utils {
22342239
0,
22352240
true,
22362241
is_anonymous,
2237-
ProductType::unit(),
2242+
params,
22382243
AlgebraicType::array(AlgebraicType::Ref(type_ref)),
22392244
);
22402245

crates/core/src/sql/ast.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ use anyhow::Context;
44
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
55
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
66
use spacetimedb_datastore::system_tables::{StRowLevelSecurityFields, ST_ROW_LEVEL_SECURITY_ID};
7-
use spacetimedb_expr::check::SchemaView;
7+
use spacetimedb_expr::check::{SchemaView, TypingResult};
88
use spacetimedb_expr::statement::compile_sql_stmt;
99
use spacetimedb_lib::identity::AuthCtx;
10-
use spacetimedb_primitives::{ColId, TableId};
11-
use spacetimedb_sats::{AlgebraicType, AlgebraicValue};
10+
use spacetimedb_primitives::{ArgId, ColId, TableId};
11+
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductValue};
1212
use spacetimedb_schema::def::error::RelationError;
1313
use spacetimedb_schema::relation::{ColExpr, FieldName};
1414
use spacetimedb_schema::schema::{ColumnSchema, TableOrViewSchema, TableSchema};
@@ -477,7 +477,7 @@ fn compile_where(table: &From, filter: Option<SqlExpr>) -> Result<Option<Selecti
477477
}
478478

479479
pub struct SchemaViewer<'a, T> {
480-
tx: &'a T,
480+
pub(crate) tx: &'a T,
481481
auth: &'a AuthCtx,
482482
}
483483

crates/core/src/sql/execute.rs

Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
use std::sync::Arc;
12
use std::time::Duration;
23

34
use super::ast::SchemaViewer;
4-
use crate::db::relational_db::{RelationalDB, Tx};
5+
use crate::db::relational_db::{MutTx, RelationalDB, Tx};
56
use crate::energy::EnergyQuanta;
67
use crate::error::DBError;
78
use crate::estimation::estimate_rows_scanned;
@@ -19,13 +20,18 @@ use anyhow::anyhow;
1920
use spacetimedb_datastore::execution_context::Workload;
2021
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
2122
use spacetimedb_datastore::traits::IsolationLevel;
23+
use spacetimedb_expr::check::SchemaView;
24+
use spacetimedb_expr::errors::TypingError;
25+
use spacetimedb_expr::expr::CallParams;
2226
use spacetimedb_expr::statement::Statement;
2327
use spacetimedb_lib::identity::AuthCtx;
2428
use spacetimedb_lib::metrics::ExecutionMetrics;
2529
use spacetimedb_lib::Timestamp;
2630
use spacetimedb_lib::{AlgebraicType, ProductType, ProductValue};
31+
use spacetimedb_primitives::{ArgId, TableId};
2732
use spacetimedb_query::{compile_sql_stmt, execute_dml_stmt, execute_select_stmt};
2833
use spacetimedb_schema::relation::FieldName;
34+
use spacetimedb_schema::schema::TableOrViewSchema;
2935
use spacetimedb_vm::eval::run_ast;
3036
use spacetimedb_vm::expr::{CodeResult, CrudExpr, Expr};
3137
use spacetimedb_vm::relation::MemTable;
@@ -185,6 +191,19 @@ pub struct SqlResult {
185191
pub metrics: ExecutionMetrics,
186192
}
187193

194+
struct DbParams<'a> {
195+
db: &'a RelationalDB,
196+
tx: &'a mut MutTx,
197+
}
198+
199+
impl CallParams for DbParams<'_> {
200+
fn create_or_get_param(&mut self, param: &ProductValue) -> Result<ArgId, TypingError> {
201+
self.db
202+
.create_or_get_params(self.tx, &param)
203+
.map_err(|err| TypingError::Other(err.into()))
204+
}
205+
}
206+
188207
/// Run the `SQL` string using the `auth` credentials
189208
pub async fn run(
190209
db: &RelationalDB,
@@ -196,9 +215,20 @@ pub async fn run(
196215
) -> Result<SqlResult, DBError> {
197216
// We parse the sql statement in a mutable transaction.
198217
// If it turns out to be a query, we downgrade the tx.
199-
let (tx, stmt) = db.with_auto_rollback(db.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql), |tx| {
200-
compile_sql_stmt(sql_text, &SchemaViewer::new(tx, &auth), &auth)
201-
})?;
218+
let (tx, stmt) =
219+
db.with_auto_rollback(
220+
db.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql),
221+
|tx| match compile_sql_stmt(sql_text, &SchemaViewer::new(tx, &auth), &auth) {
222+
Ok(Statement::Select(mut stmt)) => {
223+
stmt.for_each_fun_call(&mut |param| {
224+
db.create_or_get_params(tx, &param)
225+
.map_err(|err| TypingError::Other(err.into()))
226+
})?;
227+
Ok(Statement::Select(stmt))
228+
}
229+
result => result,
230+
},
231+
)?;
202232

203233
let mut metrics = ExecutionMetrics::default();
204234

@@ -345,7 +375,8 @@ pub(crate) mod tests {
345375
use itertools::Itertools;
346376
use pretty_assertions::assert_eq;
347377
use spacetimedb_datastore::system_tables::{
348-
StRowLevelSecurityRow, StTableFields, ST_ROW_LEVEL_SECURITY_ID, ST_TABLE_ID, ST_TABLE_NAME,
378+
StRowLevelSecurityRow, StTableFields, ST_RESERVED_SEQUENCE_RANGE, ST_ROW_LEVEL_SECURITY_ID, ST_TABLE_ID,
379+
ST_TABLE_NAME,
349380
};
350381
use spacetimedb_lib::bsatn::ToBsatn;
351382
use spacetimedb_lib::db::auth::{StAccess, StTableType};
@@ -958,7 +989,7 @@ pub(crate) mod tests {
958989
let db = TestDB::in_memory()?;
959990

960991
let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::U8)];
961-
let (_, table_id) = tests_utils::create_view_for_test(&db, "my_view", &schema, false)?;
992+
let (_, table_id) = tests_utils::create_view_for_test(&db, "my_view", &schema, ProductType::unit(), false)?;
962993

963994
with_auto_commit(&db, |tx| -> Result<_, DBError> {
964995
tests_utils::insert_into_view(&db, tx, table_id, Some(identity_from_u8(1)), product![0u8, 1u8])?;
@@ -979,7 +1010,7 @@ pub(crate) mod tests {
9791010
let db = TestDB::in_memory()?;
9801011

9811012
let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::U8)];
982-
let (_, table_id) = tests_utils::create_view_for_test(&db, "my_view", &schema, true)?;
1013+
let (_, table_id) = tests_utils::create_view_for_test(&db, "my_view", &schema, ProductType::unit(), true)?;
9831014

9841015
with_auto_commit(&db, |tx| -> Result<_, DBError> {
9851016
tests_utils::insert_into_view(&db, tx, table_id, None, product![0u8, 1u8])?;
@@ -1000,7 +1031,7 @@ pub(crate) mod tests {
10001031
let db = TestDB::in_memory()?;
10011032

10021033
let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::U8)];
1003-
let (_, v_id) = tests_utils::create_view_for_test(&db, "v", &schema, false)?;
1034+
let (_, v_id) = tests_utils::create_view_for_test(&db, "v", &schema, ProductType::unit(), false)?;
10041035

10051036
let schema = [("c", AlgebraicType::U8), ("d", AlgebraicType::U8)];
10061037
let t_id = db.create_table_for_test("t", &schema, &[0.into()])?;
@@ -1060,10 +1091,10 @@ pub(crate) mod tests {
10601091
let db = TestDB::in_memory()?;
10611092

10621093
let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::U8)];
1063-
let (_, u_id) = tests_utils::create_view_for_test(&db, "u", &schema, false)?;
1094+
let (_, u_id) = tests_utils::create_view_for_test(&db, "u", &schema, ProductType::unit(), false)?;
10641095

10651096
let schema = [("c", AlgebraicType::U8), ("d", AlgebraicType::U8)];
1066-
let (_, v_id) = tests_utils::create_view_for_test(&db, "v", &schema, false)?;
1097+
let (_, v_id) = tests_utils::create_view_for_test(&db, "v", &schema, ProductType::unit(), false)?;
10671098

10681099
with_auto_commit(&db, |tx| -> Result<_, DBError> {
10691100
tests_utils::insert_into_view(&db, tx, u_id, Some(identity_from_u8(1)), product![0u8, 1u8])?;
@@ -1574,4 +1605,37 @@ pub(crate) mod tests {
15741605

15751606
Ok(())
15761607
}
1608+
1609+
// Verify calling views with params
1610+
#[test]
1611+
fn test_view_params() -> ResultTest<()> {
1612+
let db = TestDB::durable()?;
1613+
let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::I64)];
1614+
let (_view_id, table_id) = tests_utils::create_view_for_test(
1615+
&db,
1616+
"my_view",
1617+
&schema,
1618+
ProductType::from([("x", AlgebraicType::U8)]),
1619+
true,
1620+
)?;
1621+
let arg_id = ST_RESERVED_SEQUENCE_RANGE as u64;
1622+
1623+
with_auto_commit(&db, |tx| -> Result<_, DBError> {
1624+
tests_utils::insert_into_view(&db, tx, table_id, None, product![arg_id + 1, 0u8, 1i64])?;
1625+
tests_utils::insert_into_view(&db, tx, table_id, None, product![arg_id, 1u8, 2i64])?;
1626+
Ok(())
1627+
})?;
1628+
1629+
assert_eq!(
1630+
run_for_testing(&db, "select * from my_view(1)")?,
1631+
vec![product![1u8, 2i64]]
1632+
);
1633+
1634+
assert_eq!(
1635+
run_for_testing(&db, "select * from my_view(2)")?,
1636+
vec![product![0u8, 1i64]]
1637+
);
1638+
1639+
Ok(())
1640+
}
15771641
}

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ impl ModuleSubscriptions {
327327
let view_info = plans
328328
.first()
329329
.and_then(|plan| plan.return_table())
330-
.and_then(|schema| schema.view_info);
330+
.and_then(|schema| schema.view_info.clone());
331331

332332
let num_cols = plans
333333
.first()

crates/core/src/subscription/subscription.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,7 @@ impl AuthAccess for ExecutionSet {
609609
}
610610
}
611611

612-
/// Querieshttps://github.com/clockworklabs/SpacetimeDBPrivate/pull/2207 all the [`StTableType::User`] tables *right now*
612+
/// Queries https://github.com/clockworklabs/SpacetimeDBPrivate/pull/2207 all the [`StTableType::User`] tables *right now*
613613
/// and turns them into [`QueryExpr`],
614614
/// the moral equivalent of `SELECT * FROM table`.
615615
pub(crate) fn get_all<T, F, I>(

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use super::{
88
tx_state::{IndexIdMap, PendingSchemaChange, TxState, TxTableForInsertion},
99
SharedMutexGuard, SharedWriteGuard,
1010
};
11+
use crate::error::DatastoreError;
12+
use crate::system_tables::{StViewArgFields, StViewArgRow, ST_VIEW_ARG_ID};
1113
use crate::{
1214
error::ViewError,
1315
locking_tx_datastore::state_view::EqOnColumn,
@@ -48,6 +50,7 @@ use spacetimedb_lib::{
4850
use spacetimedb_primitives::{
4951
col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewFnPtr, ViewId,
5052
};
53+
use spacetimedb_sats::bsatn::ToBsatn;
5154
use spacetimedb_sats::{
5255
bsatn::{self, to_writer, DecodeError, Deserializer},
5356
de::{DeserializeSeed, WithBound},
@@ -589,6 +592,33 @@ impl MutTxId {
589592
Ok(())
590593
}
591594

595+
pub fn get_params(&self, params: ProductValue) -> Result<Option<ArgId>> {
596+
self.iter_by_col_eq(ST_VIEW_ARG_ID, StViewArgFields::Bytes, &AlgebraicValue::Product(params))?
597+
.next()
598+
.map(|row| Ok(ArgId(StViewArgRow::try_from(row)?.id)))
599+
.transpose()
600+
}
601+
602+
/// Create parameters for a view, storing the values in `st_view_arg`.
603+
pub fn create_or_get_params(&mut self, params: &ProductValue) -> Result<ArgId> {
604+
if let Some(arg_id) = self.get_params(params.clone())? {
605+
return Ok(arg_id);
606+
}
607+
let row = StViewArgRow {
608+
id: ArgId::SENTINEL.0,
609+
bytes: params
610+
.to_bsatn_vec()
611+
.map_err(|err| DatastoreError::ReadViaBsatnError(err.into()))?
612+
.into(),
613+
};
614+
let arg_id = self
615+
.insert_via_serialize_bsatn(ST_VIEW_ARG_ID, &row)?
616+
.1
617+
.collapse()
618+
.read_col::<u64>(StViewArgFields::Id)?;
619+
Ok(ArgId(arg_id))
620+
}
621+
592622
/// Create a table.
593623
///
594624
/// Requires:

crates/datastore/src/locking_tx_datastore/state_view.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,17 @@ use crate::locking_tx_datastore::mut_tx::{IndexScanPoint, IndexScanRanged};
55
use crate::system_tables::{
66
ConnectionIdViaU128, StColumnFields, StColumnRow, StConnectionCredentialsFields, StConnectionCredentialsRow,
77
StConstraintFields, StConstraintRow, StIndexFields, StIndexRow, StScheduledFields, StScheduledRow,
8-
StSequenceFields, StSequenceRow, StTableFields, StTableRow, StViewFields, StViewParamFields, StViewRow,
9-
SystemTable, ST_COLUMN_ID, ST_CONNECTION_CREDENTIALS_ID, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_SCHEDULED_ID,
8+
StSequenceFields, StSequenceRow, StTableFields, StTableRow, StViewFields, StViewParamFields, StViewParamRow,
9+
StViewRow, SystemTable, ST_COLUMN_ID, ST_CONNECTION_CREDENTIALS_ID, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_SCHEDULED_ID,
1010
ST_SEQUENCE_ID, ST_TABLE_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID,
1111
};
1212
use anyhow::anyhow;
1313
use core::ops::RangeBounds;
1414
use spacetimedb_lib::ConnectionId;
1515
use spacetimedb_primitives::{ColList, TableId};
1616
use spacetimedb_sats::AlgebraicValue;
17+
use spacetimedb_schema::def::ViewParamDefSimple;
18+
use spacetimedb_schema::identifier::Identifier;
1719
use spacetimedb_schema::schema::{ColumnSchema, TableSchema, ViewDefInfo};
1820
use spacetimedb_table::table::IndexScanPointIter;
1921
use spacetimedb_table::{
@@ -133,14 +135,20 @@ pub trait StateView {
133135
.map(|mut iter| {
134136
iter.next().map(|row| -> Result<_> {
135137
let row = StViewRow::try_from(row)?;
136-
let has_args = self
138+
let args: Vec<_> = self
137139
.iter_by_col_eq(ST_VIEW_PARAM_ID, StViewParamFields::ViewId, &row.view_id.into())?
138-
.next()
139-
.is_some();
140+
.map(|param_row| {
141+
let param_row = StViewParamRow::try_from(param_row)?;
142+
Ok(ViewParamDefSimple {
143+
name: Identifier::new(param_row.param_name).expect("valid identifier"),
144+
ty: param_row.param_type.0,
145+
})
146+
})
147+
.collect::<Result<Vec<_>>>()?;
140148

141149
Ok(ViewDefInfo {
142150
view_id: row.view_id,
143-
has_args,
151+
args,
144152
is_anonymous: row.is_anonymous,
145153
})
146154
})

crates/datastore/src/system_tables.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -991,6 +991,13 @@ pub struct StViewParamRow {
991991
pub param_type: AlgebraicTypeViaBytes,
992992
}
993993

994+
impl TryFrom<RowRef<'_>> for StViewParamRow {
995+
type Error = DatastoreError;
996+
fn try_from(row: RowRef<'_>) -> Result<Self, DatastoreError> {
997+
read_via_bsatn(row)
998+
}
999+
}
1000+
9941001
/// System table [ST_VIEW_SUB_NAME]
9951002
///
9961003
/// | view_id | arg_id | identity | num_subscribers | has_subscribers | last_called |

0 commit comments

Comments
 (0)