-
Notifications
You must be signed in to change notification settings - Fork 39
feat: Add workspace locking middleware using PostgreSQL advisory locks #817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Changed Files
|
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughA new workspace locking middleware using PostgreSQL advisory locks is introduced to serialize write operations on a per-workspace basis. The middleware is exported as a public module and integrated across multiple API route scopes in the main application to enforce workspace-level concurrency control. Changes
Sequence DiagramsequenceDiagram
participant Client
participant Middleware as WorkspaceLock<br/>Middleware
participant DBPool as DB Pool
participant PG as PostgreSQL
participant Service as Inner Service
Client->>Middleware: Request (POST/PUT/DELETE/PATCH)
activate Middleware
Note over Middleware: Extract org_id &<br/>workspace_id
Middleware->>Middleware: compute_lock_keys()<br/>(org_key, workspace_key)
Middleware->>DBPool: Get PgConnection
activate DBPool
DBPool-->>Middleware: Connection
deactivate DBPool
rect rgb(200, 220, 255)
Note over Middleware,PG: Retry loop (up to 10 attempts)
Middleware->>PG: pg_try_advisory_lock<br/>(org_key, workspace_key)
alt Lock acquired
PG-->>Middleware: Success
Middleware->>Service: Forward request
activate Service
Service-->>Middleware: Response
deactivate Service
Middleware->>PG: pg_advisory_unlock<br/>(org_key, workspace_key)
PG-->>Middleware: Released
else Lock unavailable
Note over Middleware: Exponential backoff retry
PG-->>Middleware: Failed
end
end
Middleware-->>Client: ServiceResponse
deactivate Middleware
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (2)
crates/superposition/src/main.rs (2)
173-175: Same middleware ordering issue applies here.All these route scopes have the same incorrect middleware order. After fixing
/context, apply the same fix consistently across/dimension,/default-config,/config,/audit,/function, and/types.Also applies to: 180-182, 187-189, 194-196, 201-203, 208-210
215-217: Same middleware ordering issue applies here.Apply the same middleware order fix to
/experiments,/experiment-groups,/webhook,/variables,/resolve, and/auth.Also applies to: 221-223, 237-239, 244-246, 251-253, 258-260
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
crates/service_utils/src/middlewares.rscrates/service_utils/src/middlewares/workspace_lock.rscrates/superposition/src/main.rs
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2026-01-03T13:27:14.072Z
Learnt from: ayushjain17
Repo: juspay/superposition PR: 816
File: crates/frontend/src/pages/type_template.rs:82-87
Timestamp: 2026-01-03T13:27:14.072Z
Learning: In the frontend crate, both `Workspace` and `OrganisationId` types implement `Deref` trait (via `#[derive(Deref)]`), allowing automatic coercion from `&Workspace` to `&str` and `&OrganisationId` to `&str`. When passing these types to functions expecting `&str`, use `&workspace` or `&org` directly instead of `&workspace.0` or `&org.0`.
Applied to files:
crates/superposition/src/main.rs
📚 Learning: 2026-01-03T13:25:40.584Z
Learnt from: ayushjain17
Repo: juspay/superposition PR: 816
File: crates/frontend/src/pages/webhook.rs:136-137
Timestamp: 2026-01-03T13:25:40.584Z
Learning: In the superposition codebase (Rust frontend), the `Workspace` and `OrganisationId` newtype wrappers implement `Deref`, which allows `&Workspace` and `&OrganisationId` to be automatically coerced to `&str` when passed to functions expecting `&str` parameters. Manual `.0` dereferencing is not needed.
Applied to files:
crates/superposition/src/main.rs
📚 Learning: 2026-01-02T20:59:01.233Z
Learnt from: ayushjain17
Repo: juspay/superposition PR: 543
File: crates/service_utils/src/middlewares/auth_z.rs:141-152
Timestamp: 2026-01-02T20:59:01.233Z
Learning: In crates/service_utils/src/middlewares/auth_z.rs, the AuthZHandler::init function is intentionally designed to panic on startup if AUTH_Z_PROVIDER environment variable is missing or set to an unknown value. This fail-fast behavior is expected and preferred for this critical authorization configuration.
Applied to files:
crates/superposition/src/main.rs
🧬 Code graph analysis (1)
crates/superposition/src/main.rs (2)
crates/service_utils/src/middlewares/workspace_lock.rs (1)
new(26-28)crates/service_utils/src/middlewares/workspace_context.rs (1)
new(28-33)
🔇 Additional comments (8)
crates/service_utils/src/middlewares.rs (1)
5-5: LGTM!The new module export follows the existing pattern in this file.
crates/superposition/src/main.rs (1)
33-33: LGTM!Import follows the existing import pattern for middleware factories.
crates/service_utils/src/middlewares/workspace_lock.rs (6)
1-54: LGTM!The factory and
Transformimplementation follow standard actix-web middleware patterns correctly.
72-86: LGTM!Write operation detection and early return for reads is well structured.
161-173: Hash stability note:DefaultHasheris not guaranteed stable across Rust versions.For advisory locks within a single running process, this is fine. However, be aware that
DefaultHashercan change between Rust versions. If you ever need cross-process or persistent lock key consistency (e.g., multiple service instances coordinating), consider a stable hash likexxhashorfnv.
230-235: LGTM!The helper struct correctly deserializes the
pg_try_advisory_lockresult.
237-248: LGTM!The release function is straightforward. Ignoring the return value of
pg_advisory_unlockis acceptable since we're just ensuring cleanup.
250-292: LGTM!Good unit test coverage for
compute_lock_keys. The uniqueness assertions have a theoretical (but negligible) chance of hash collision. Consider adding integration tests for the actual advisory lock acquire/release behavior if a test database is available.
| // Call the actual handler | ||
| let result = srv.call(req).await; | ||
|
|
||
| // Release advisory lock if we acquired one | ||
| if let Some((org_key, workspace_key)) = lock_keys { | ||
| if let Err(e) = release_advisory_lock(&mut db_conn, org_key, workspace_key) { | ||
| log::error!( | ||
| "failed to release advisory lock for org_key: {}, workspace_key: {}: {}", | ||
| org_key, workspace_key, e | ||
| ); | ||
| // Continue even if unlock fails - PostgreSQL will auto-release on connection close | ||
| } | ||
| log::debug!( | ||
| "released advisory lock for workspace (org_key: {}, workspace_key: {})", | ||
| org_key, workspace_key | ||
| ); | ||
| } | ||
|
|
||
| // Return the result | ||
| result.map(|r| r.map_into_left_body()) | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lock release may be skipped if the inner handler panics.
If srv.call(req).await panics, the code at lines 141–153 never runs, leaving the advisory lock held on the pooled connection. Consider using a drop guard (RAII pattern) to ensure the lock is released even on panics or early returns.
🔎 Suggested approach
Create a guard struct that releases the lock on drop:
struct AdvisoryLockGuard<'a> {
conn: &'a mut PgConnection,
org_key: i32,
workspace_key: i32,
}
impl Drop for AdvisoryLockGuard<'_> {
fn drop(&mut self) {
if let Err(e) = release_advisory_lock(self.conn, self.org_key, self.workspace_key) {
log::error!("failed to release advisory lock in guard: {}", e);
}
}
}Then use it after acquiring the lock to ensure cleanup.
🤖 Prompt for AI Agents
In crates/service_utils/src/middlewares/workspace_lock.rs around lines 137 to
157, the advisory lock release logic runs after awaiting the inner handler so it
will be skipped if the handler panics or unwinds; create an RAII drop guard that
holds a mutable reference to the pooled PgConnection plus org_key and
workspace_key and calls release_advisory_lock in its Drop impl (logging any
error) immediately after acquiring the lock, then let that guard live for the
duration of the handler call so it always releases the lock on normal return,
early return, or panic.
| "lock contention detected, retrying in {}ms (attempt {}/{}, org_key: {}, workspace_key: {})", | ||
| backoff_ms, attempt + 1, MAX_RETRIES, org_key, workspace_key | ||
| ); | ||
| std::thread::sleep(std::time::Duration::from_millis(backoff_ms)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocking std::thread::sleep in async context will starve other tasks.
std::thread::sleep blocks the entire Actix worker thread, preventing other requests from being processed during the retry backoff. Use an async-compatible sleep instead.
🔎 Proposed fix
The acquire_advisory_lock function needs to become async:
-fn acquire_advisory_lock(
+async fn acquire_advisory_lock(
conn: &mut PgConnection,
org_key: i32,
workspace_key: i32,
) -> Result<(), diesel::result::Error> {
// ... retry loop ...
- std::thread::sleep(std::time::Duration::from_millis(backoff_ms));
+ actix_web::rt::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
// ...
}Then update the call site at line 122:
-if let Err(e) = acquire_advisory_lock(&mut db_conn, org_key, workspace_key) {
+if let Err(e) = acquire_advisory_lock(&mut db_conn, org_key, workspace_key).await {Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In crates/service_utils/src/middlewares/workspace_lock.rs around line 216, the
code uses std::thread::sleep which blocks the Actix worker thread; make the
retry backoff async by changing the acquire_advisory_lock function to async,
replace std::thread::sleep(...) with an awaitable sleep such as
tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await (or
actix_rt::time::sleep), remove the blocking import, and update the function’s
call site at line 122 to await the call (e.g.,
acquire_advisory_lock(...).await).
crates/superposition/src/main.rs
Outdated
| .wrap(OrgWorkspaceMiddlewareFactory::new(true, true)) | ||
| .wrap(WorkspaceLockMiddlewareFactory::new()) | ||
| .service(context::endpoints()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Middleware order causes locking to be bypassed.
In actix-web, the last .wrap() runs first on incoming requests. Currently WorkspaceLockMiddlewareFactory is wrapped after OrgWorkspaceMiddlewareFactory, meaning the lock middleware runs before the org/workspace IDs are extracted and set in request extensions. The lock middleware finds no IDs and proceeds without acquiring a lock.
Swap the middleware order so OrgWorkspaceMiddlewareFactory runs first:
🔎 Proposed fix
.service(
scope("/context")
.app_data(Resource::Context)
- .wrap(OrgWorkspaceMiddlewareFactory::new(true, true))
- .wrap(WorkspaceLockMiddlewareFactory::new())
+ .wrap(WorkspaceLockMiddlewareFactory::new())
+ .wrap(OrgWorkspaceMiddlewareFactory::new(true, true))
.service(context::endpoints()),
)Apply the same fix to all other route scopes using both middlewares.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| .wrap(OrgWorkspaceMiddlewareFactory::new(true, true)) | |
| .wrap(WorkspaceLockMiddlewareFactory::new()) | |
| .service(context::endpoints()), | |
| .wrap(WorkspaceLockMiddlewareFactory::new()) | |
| .wrap(OrgWorkspaceMiddlewareFactory::new(true, true)) | |
| .service(context::endpoints()), |
🤖 Prompt for AI Agents
In crates/superposition/src/main.rs around lines 166 to 168, the middleware
order is wrong: WorkspaceLockMiddlewareFactory is wrapped after
OrgWorkspaceMiddlewareFactory so it runs before IDs are extracted, bypassing
locking; swap the wraps so OrgWorkspaceMiddlewareFactory is wrapped first and
WorkspaceLockMiddlewareFactory second (i.e., ensure
OrgWorkspaceMiddlewareFactory executes before the lock middleware on incoming
requests) and apply the same swap to all other route scopes that use both
middlewares so locks run after IDs have been set in request extensions.
This commit introduces a new middleware that serializes all write operations (POST, PUT, DELETE, PATCH) per workspace using PostgreSQL advisory locks. Changes: - Created WorkspaceLockMiddleware that: - Extracts org_id and workspace_id from requests - Computes a unique lock key using hash of org_id:workspace_id - Acquires PostgreSQL advisory lock before processing write operations - Ensures lock is released after request completion - Skips locking for read operations (GET, etc.) - Registered the middleware on all workspace-scoped endpoints: /context, /dimension, /default-config, /config, /audit, /function, /types, /experiments, /experiment-groups, /webhook, /variables, /resolve, /auth This ensures write operations to the same workspace are serialized, preventing race conditions and maintaining data consistency.
…lization Changed from single-argument pg_advisory_lock(bigint) to two-argument pg_advisory_lock(int, int) form for workspace locking. Benefits: - More natural mapping: org_id and workspace_id get separate hash spaces - Better lock space utilization: each ID gets full 32-bit space - Lower collision probability: separate hashing reduces conflicts - Easier debugging: both components visible in pg_locks table Implementation: - compute_lock_keys() now returns (i32, i32) tuple - org_id and workspace_id are hashed independently - Updated acquire/release functions to use two-argument SQL - Enhanced tests to verify component separation
Changed from blocking pg_advisory_lock() to non-blocking pg_try_advisory_lock() with intelligent retry logic to prevent indefinite request blocking. **Previous Behavior:** - pg_advisory_lock() blocks indefinitely until lock is available - Requests could hang for extended periods during high contention - No visibility into lock acquisition delays - Risk of cascading timeouts **New Behavior:** - pg_try_advisory_lock() returns immediately with success/failure - Exponential backoff retry: 10ms, 20ms, 40ms, 80ms... up to 500ms max - Maximum 10 attempts (total ~5 seconds max wait) - Clear error message after exhausting retries - Logs retry attempts for observability **Retry Configuration:** - MAX_RETRIES: 10 attempts - INITIAL_BACKOFF_MS: 10ms - MAX_BACKOFF_MS: 500ms (cap to prevent excessive delays) **Benefits:** - Predictable maximum wait time (~5 seconds) - Better user experience with faster failures - Reduced risk of cascading timeouts - Visibility into lock contention via logs - Graceful degradation under high load
Fixed critical async/blocking issues flagged by code review: **Issue 1: Blocking sleep in async context** - Changed std::thread::sleep() to actix_web::rt::time::sleep().await - Using blocking sleep in async middleware would block the entire worker thread - This prevented other requests from being processed on that thread - Now properly yields control back to the async executor during backoff **Issue 2: Made acquire_advisory_lock async** - Function signature changed from sync to async - Properly propagates async behavior through the call chain - Maintains non-blocking execution throughout retry attempts **Impact:** - Before: Worker threads would be blocked during lock retry delays - After: Worker threads can process other requests while waiting - Much better concurrency and throughput under lock contention
Implemented AdvisoryLockGuard using RAII pattern to guarantee lock
release in all code paths, including when handlers panic.
**Problem:**
Previous implementation would skip lock release if the handler panicked:
```rust
acquire_lock()
handler() // <-- If this panics...
release_lock() // <-- ...this never runs!
```
This would leave locks held until DB connection closes, potentially
causing deadlocks or severe contention.
**Solution:**
Created AdvisoryLockGuard struct that implements Drop:
```rust
struct AdvisoryLockGuard<'a> {
conn: &'a mut PgConnection,
org_key: i32,
workspace_key: i32,
}
impl Drop for AdvisoryLockGuard<'_> {
fn drop(&mut self) {
// Always releases lock, even on panic
release_advisory_lock(...)
}
}
```
**How it works:**
1. Acquire lock
2. Create guard (holds mutable reference to connection)
3. Call handler
4. Guard is automatically dropped when scope ends
- On normal return: guard drops, lock released
- On panic: guard drops during unwinding, lock released
- On early return: guard drops, lock released
**Benefits:**
- Guaranteed lock cleanup in all code paths
- Panic-safe resource management
- Prevents lock leaks that could cause deadlocks
- Follows Rust RAII best practices
CRITICAL FIX: Swapped middleware order to fix locking bypass vulnerability. **The Bug:** In actix-web, middlewares execute in REVERSE order of .wrap() calls. Previous ordering caused lock middleware to run BEFORE workspace validation: ```rust .wrap(OrgWorkspaceMiddlewareFactory::new(true, true)) // Ran 2nd ❌ .wrap(WorkspaceLockMiddlewareFactory::new()) // Ran 1st ❌ ``` **Execution flow (BROKEN):** 1. WorkspaceLockMiddleware runs → tries to extract IDs 2. IDs not yet validated → lock_keys = None 3. Proceeds WITHOUT acquiring lock 🔓💥 4. OrgWorkspaceMiddleware runs → validates IDs 5. Handler executes → NO LOCK HELD! **The Fix:** Swapped order so validation runs before locking: ```rust .wrap(WorkspaceLockMiddlewareFactory::new()) // Runs 2nd ✓ .wrap(OrgWorkspaceMiddlewareFactory::new(true, true)) // Runs 1st ✓ ``` **Execution flow (CORRECT):** 1. OrgWorkspaceMiddleware runs → validates and extracts IDs 2. WorkspaceLockMiddleware runs → gets validated IDs 3. Acquires advisory lock 🔒 ✓ 4. Handler executes → WITH LOCK HELD! 5. Guard releases lock on completion/panic **Impact:** - Before: ALL write operations were running WITHOUT locks (race conditions!) - After: All write operations are properly serialized per workspace This was a complete bypass of the locking mechanism. Without this fix, the entire workspace locking feature was non-functional.
0c41930 to
46764cd
Compare
Problem
Currently, changes within workspace are not serialized leading to potential race conditions in version_state generation. We need to serialize them.
Solution
This commit introduces a new middleware that serializes all write operations
(POST, PUT, DELETE, PATCH) per workspace using PostgreSQL advisory locks.
Changes:
Created WorkspaceLockMiddleware that:
Registered the middleware on all workspace-scoped endpoints:
/context, /dimension, /default-config, /config, /audit, /function,
/types, /experiments, /experiment-groups, /webhook, /variables,
/resolve, /auth
This ensures write operations to the same workspace are serialized,
preventing race conditions and maintaining data consistency.
Environment variable changes
NA
Pre-deployment activity
NA
Post-deployment activity
NA
API changes
All workspace write endpoints
Possible Issues in the future
Higher latency for write APIs
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.