Skip to content

Commit 4289cd8

Browse files
committed
Making tx mut so we can auto-create params on compilation
1 parent d2dde1f commit 4289cd8

File tree

21 files changed

+379
-324
lines changed

21 files changed

+379
-324
lines changed

crates/bench/benches/subscription.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,10 @@ fn eval(c: &mut Criterion) {
126126
// A benchmark runner for the new query engine
127127
let bench_query = |c: &mut Criterion, name, sql| {
128128
c.bench_function(name, |b| {
129-
let tx = raw.db.begin_tx(Workload::Subscribe);
129+
let mut tx = raw.db.begin_tx(Workload::Subscribe);
130130
let auth = AuthCtx::for_testing();
131-
let schema_viewer = &SchemaViewer::new(&tx, &auth);
132-
let (plans, table_id, table_name, _) = compile_subscription(sql, schema_viewer, &auth).unwrap();
131+
let mut schema_viewer = SchemaViewer::new(&mut tx, &auth);
132+
let (plans, table_id, table_name, _) = compile_subscription(sql, &mut schema_viewer, &auth).unwrap();
133133
let plans = plans
134134
.into_iter()
135135
.map(|plan| plan.optimize(&auth).unwrap())
@@ -155,8 +155,8 @@ fn eval(c: &mut Criterion) {
155155

156156
let bench_eval = |c: &mut Criterion, name, sql| {
157157
c.bench_function(name, |b| {
158-
let tx = raw.db.begin_tx(Workload::Update);
159-
let query = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), &tx, sql).unwrap();
158+
let mut tx = raw.db.begin_tx(Workload::Update);
159+
let query = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), &mut tx, sql).unwrap();
160160
let query: ExecutionSet = query.into();
161161

162162
b.iter(|| {
@@ -207,11 +207,11 @@ fn eval(c: &mut Criterion) {
207207
// A passthru executed independently of the database.
208208
let select_lhs = "select * from footprint";
209209
let select_rhs = "select * from location";
210-
let tx = &raw.db.begin_tx(Workload::Update);
211-
let query_lhs = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), tx, select_lhs).unwrap();
212-
let query_rhs = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), tx, select_rhs).unwrap();
210+
let mut tx = raw.db.begin_tx(Workload::Update);
211+
let query_lhs = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), &mut tx, select_lhs).unwrap();
212+
let query_rhs = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), &mut tx, select_rhs).unwrap();
213213
let query = ExecutionSet::from_iter(query_lhs.into_iter().chain(query_rhs));
214-
let tx = &tx.into();
214+
let tx = &(&mut tx).into();
215215

216216
b.iter(|| drop(black_box(query.eval_incr_for_test(&raw.db, tx, &update, None))))
217217
});
@@ -226,10 +226,10 @@ fn eval(c: &mut Criterion) {
226226
from footprint join location on footprint.entity_id = location.entity_id \
227227
where location.chunk_index = {chunk_index}"
228228
);
229-
let tx = &raw.db.begin_tx(Workload::Update);
230-
let query = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), tx, &join).unwrap();
229+
let mut tx = raw.db.begin_tx(Workload::Update);
230+
let query = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), &mut tx, &join).unwrap();
231231
let query: ExecutionSet = query.into();
232-
let tx = &tx.into();
232+
let tx = &(&mut tx).into();
233233

234234
b.iter(|| drop(black_box(query.eval_incr_for_test(&raw.db, tx, &update, None))));
235235
});

crates/core/src/estimation.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,8 @@ mod tests {
181181
}
182182

183183
fn num_rows_for(db: &RelationalDB, sql: &str) -> u64 {
184-
let tx = begin_tx(db);
185-
match &*compile_sql(db, &AuthCtx::for_testing(), &tx, sql).expect("Failed to compile sql") {
184+
let mut tx = begin_tx(db);
185+
match &*compile_sql(db, &AuthCtx::for_testing(), &mut tx, sql).expect("Failed to compile sql") {
186186
[CrudExpr::Query(expr)] => num_rows(&tx, expr),
187187
exprs => panic!("unexpected result from compilation: {exprs:#?}"),
188188
}
@@ -191,10 +191,10 @@ mod tests {
191191
/// Using the new query plan
192192
fn new_row_estimate(db: &RelationalDB, sql: &str) -> u64 {
193193
let auth = AuthCtx::for_testing();
194-
let tx = begin_tx(db);
195-
let tx = SchemaViewer::new(&tx, &auth);
194+
let mut tx = begin_tx(db);
195+
let mut tx = SchemaViewer::new(&mut tx, &auth);
196196

197-
compile_subscription(sql, &tx, &auth)
197+
compile_subscription(sql, &mut tx, &auth)
198198
.map(|(plans, ..)| plans)
199199
.expect("failed to compile sql query")
200200
.into_iter()

crates/core/src/host/module_host.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1869,7 +1869,7 @@ impl ModuleHost {
18691869
let metrics = self
18701870
.on_module_thread("one_off_query", move || {
18711871
let (tx_offset_sender, tx_offset_receiver) = oneshot::channel();
1872-
let tx = scopeguard::guard(db.begin_tx(Workload::Sql), |tx| {
1872+
let mut tx = scopeguard::guard(db.begin_tx(Workload::Sql), |tx| {
18731873
let (tx_offset, tx_metrics, reducer) = db.release_tx(tx);
18741874
let _ = tx_offset_sender.send(tx_offset);
18751875
db.report_read_tx_metrics(reducer, tx_metrics);
@@ -1878,7 +1878,7 @@ impl ModuleHost {
18781878
// We wrap the actual query in a closure so we can use ? to handle errors without making
18791879
// the entire transaction abort with an error.
18801880
let result: Result<(OneOffTable<F>, ExecutionMetrics), anyhow::Error> = (|| {
1881-
let tx = SchemaViewer::new(&*tx, &auth);
1881+
let mut tx = SchemaViewer::new(&mut *tx, &auth);
18821882

18831883
let (
18841884
// A query may compile down to several plans.
@@ -1888,7 +1888,7 @@ impl ModuleHost {
18881888
_,
18891889
table_name,
18901890
_,
1891-
) = compile_subscription(&query, &tx, &auth)?;
1891+
) = compile_subscription(&query, &mut tx, &auth)?;
18921892

18931893
// Optimize each fragment
18941894
let optimized = plans

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,10 +1126,10 @@ impl InstanceCommon {
11261126

11271127
// Views bypass RLS, since views should enforce their own access control procedurally.
11281128
let auth = AuthCtx::for_current(self.info.database_identity);
1129-
let schema_view = SchemaViewer::new(&*tx, &auth);
1129+
let mut schema_view = SchemaViewer::new(&mut *tx, &auth);
11301130

11311131
// Compile to subscription plans.
1132-
let (plans, has_params) = SubscriptionPlan::compile(the_query, &schema_view, &auth)?;
1132+
let (plans, has_params) = SubscriptionPlan::compile(the_query, &mut schema_view, &auth)?;
11331133
ensure!(
11341134
!has_params,
11351135
"parameterized SQL is not supported for view materialization yet"

crates/core/src/sql/ast.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
55
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
66
use spacetimedb_datastore::system_tables::{StRowLevelSecurityFields, ST_ROW_LEVEL_SECURITY_ID};
77
use spacetimedb_expr::check::{SchemaView, TypingResult};
8+
use spacetimedb_expr::errors::TypingError;
89
use spacetimedb_expr::statement::compile_sql_stmt;
910
use spacetimedb_lib::identity::AuthCtx;
1011
use spacetimedb_primitives::{ArgId, ColId, TableId};
@@ -22,7 +23,7 @@ use sqlparser::ast::{
2223
};
2324
use sqlparser::dialect::PostgreSqlDialect;
2425
use sqlparser::parser::Parser;
25-
use std::ops::Deref;
26+
use std::ops::{Deref, DerefMut};
2627
use std::sync::Arc;
2728

2829
/// Simplify to detect features of the syntax we don't support yet
@@ -477,7 +478,7 @@ fn compile_where(table: &From, filter: Option<SqlExpr>) -> Result<Option<Selecti
477478
}
478479

479480
pub struct SchemaViewer<'a, T> {
480-
pub(crate) tx: &'a T,
481+
tx: &'a mut T,
481482
auth: &'a AuthCtx,
482483
}
483484

@@ -489,6 +490,12 @@ impl<T> Deref for SchemaViewer<'_, T> {
489490
}
490491
}
491492

493+
impl<T> DerefMut for SchemaViewer<'_, T> {
494+
fn deref_mut(&mut self) -> &mut Self::Target {
495+
self.tx
496+
}
497+
}
498+
492499
impl<T: StateView> SchemaView for SchemaViewer<'_, T> {
493500
fn table_id(&self, name: &str) -> Option<TableId> {
494501
// Get the schema from the in-memory state instead of fetching from the database for speed
@@ -536,10 +543,15 @@ impl<T: StateView> SchemaView for SchemaViewer<'_, T> {
536543
})
537544
.collect::<anyhow::Result<_>>()
538545
}
546+
547+
fn get_or_create_params(&mut self, _params: ProductValue) -> TypingResult<ArgId> {
548+
// Caller should have used `SchemaViewerMut` on crate `core`
549+
Err(TypingError::ParamsReadOnly)
550+
}
539551
}
540552

541553
impl<'a, T> SchemaViewer<'a, T> {
542-
pub fn new(tx: &'a T, auth: &'a AuthCtx) -> Self {
554+
pub fn new(tx: &'a mut T, auth: &'a AuthCtx) -> Self {
543555
Self { tx, auth }
544556
}
545557
}
@@ -1000,13 +1012,12 @@ fn compile_statement<T: TableSchemaView + StateView>(
10001012
pub(crate) fn compile_to_ast<T: TableSchemaView + StateView>(
10011013
db: &RelationalDB,
10021014
auth: &AuthCtx,
1003-
tx: &T,
1015+
tx: &mut T,
10041016
sql_text: &str,
10051017
) -> Result<Vec<SqlAst>, DBError> {
10061018
// NOTE: The following ensures compliance with the 1.0 sql api.
10071019
// Come 1.0, it will have replaced the current compilation stack.
1008-
compile_sql_stmt(sql_text, &SchemaViewer::new(tx, auth), auth)?;
1009-
1020+
compile_sql_stmt(sql_text, &mut SchemaViewer::new(tx, auth), auth)?;
10101021
let dialect = PostgreSqlDialect {};
10111022
let ast = Parser::parse_sql(&dialect, sql_text).map_err(|error| DBError::SqlParser {
10121023
sql: sql_text.to_string(),

0 commit comments

Comments
 (0)