diff --git a/server/src/managers/denim/denim_manager.rs b/server/src/managers/denim/denim_manager.rs index ea19a68..2ff715c 100644 --- a/server/src/managers/denim/denim_manager.rs +++ b/server/src/managers/denim/denim_manager.rs @@ -86,45 +86,6 @@ where Ok(payloads) } - /// Store chunks in outgoing chunk buffer - pub async fn enqueue_outgoing_chunk_buffer( - &self, - receiver: &ProtocolAddress, - chunks: Vec, - ) -> Result { - let mut count = 0; - for chunk in chunks { - count += self - .chunk_cache - .insert( - receiver, - Buffer::Receiver, - &chunk, - &Uuid::new_v4().to_string(), - ) - .await? - } - Ok(count) - } - - /// Dequeue all payloads from outgoing chunk buffer - pub async fn flush_outgoing_chunk_buffer( - &self, - receiver: &ProtocolAddress, - ) -> Result> { - let chunks = self.chunk_cache.dequeue_outgoing_chunks(receiver).await?; - let (payloads, pending_chunks) = self.create_deniable_payloads(chunks)?; - - if !pending_chunks.is_empty() { - eprintln!("Error: payloads created but there are still chunks left"); - let _ = self - .enqueue_outgoing_chunk_buffer(receiver, pending_chunks) - .await?; - } - - Ok(payloads) - } - /// Store deniable payloads in outgoing payload buffer pub async fn enqueue_outgoing_payload_buffer( &self, @@ -229,6 +190,47 @@ where Ok((payloads, pending_chunks)) } + /// Store chunks in outgoing chunk buffer + #[cfg(test)] + pub async fn enqueue_outgoing_chunk_buffer( + &self, + receiver: &ProtocolAddress, + chunks: Vec, + ) -> Result { + let mut count = 0; + for chunk in chunks { + count += self + .chunk_cache + .insert( + receiver, + Buffer::Receiver, + &chunk, + &Uuid::new_v4().to_string(), + ) + .await? + } + Ok(count) + } + + /// Dequeue all payloads from outgoing chunk buffer + #[cfg(test)] + pub async fn flush_outgoing_chunk_buffer( + &self, + receiver: &ProtocolAddress, + ) -> Result> { + let chunks = self.chunk_cache.dequeue_outgoing_chunks(receiver).await?; + let (payloads, pending_chunks) = self.create_deniable_payloads(chunks)?; + + if !pending_chunks.is_empty() { + eprintln!("Error: payloads created but there are still chunks left"); + let _ = self + .enqueue_outgoing_chunk_buffer(receiver, pending_chunks) + .await?; + } + + Ok(payloads) + } + #[cfg(test)] pub async fn get_incoming_chunks(&self, sender: &ProtocolAddress) -> Result> { self.chunk_cache diff --git a/server/src/managers/denim/payload_cache.rs b/server/src/managers/denim/payload_cache.rs index 6d981b9..5a1caa0 100644 --- a/server/src/managers/denim/payload_cache.rs +++ b/server/src/managers/denim/payload_cache.rs @@ -118,6 +118,45 @@ where payload_id } + pub async fn dequeue_payload_data( + &self, + address: &ProtocolAddress, + buffer: Buffer, + bytes_amount: usize, + ) -> Result<(Vec<(Vec, i32)>, usize)> { + let queue_key = self.get_queue_key(address, buffer); + let queue_lock_key = self.get_persist_in_progress_key(address, buffer); + let queue_metadata_key: String = self.get_queue_metadata_key(address, buffer); + let queue_total_index_key: String = self.get_queue_index_key(buffer); + + let mut result = Vec::new(); + let mut take = bytes_amount; + while take >= constants::EMPTY_DENIMCHUNK_SIZE { + take -= constants::EMPTY_DENIMCHUNK_SIZE; + let connection = self.pool.get().await?; + let (data, taken, order) = redis::dequeue_bytes( + connection, + queue_key.clone(), + queue_metadata_key.clone(), + queue_total_index_key.clone(), + queue_lock_key.clone(), + take, + ) + .await?; + if taken == 0 { + result.push((vec![0; take], ChunkType::Dummy.into())); + take = 0; + break; + } else { + take -= taken; + result.push((data, order)); + } + } + + Ok((result, take)) + } + + #[cfg(test)] pub async fn remove( &self, address: &ProtocolAddress, @@ -139,6 +178,7 @@ where .await } + #[cfg(test)] pub async fn get_all_payloads( &self, address: &ProtocolAddress, @@ -164,6 +204,7 @@ where Ok(DeniablePayload::decode(values)?) } + #[cfg(test)] pub async fn get_all_payloads_raw( &self, address: &ProtocolAddress, @@ -189,44 +230,7 @@ where Ok(redis::Bytes::decode(values)?) } - pub async fn dequeue_payload_data( - &self, - address: &ProtocolAddress, - buffer: Buffer, - bytes_amount: usize, - ) -> Result<(Vec<(Vec, i32)>, usize)> { - let queue_key = self.get_queue_key(address, buffer); - let queue_lock_key = self.get_persist_in_progress_key(address, buffer); - let queue_metadata_key: String = self.get_queue_metadata_key(address, buffer); - let queue_total_index_key: String = self.get_queue_index_key(buffer); - - let mut result = Vec::new(); - let mut take = bytes_amount; - while take >= constants::EMPTY_DENIMCHUNK_SIZE { - take -= constants::EMPTY_DENIMCHUNK_SIZE; - let connection = self.pool.get().await?; - let (data, taken, order) = redis::dequeue_bytes( - connection, - queue_key.clone(), - queue_metadata_key.clone(), - queue_total_index_key.clone(), - queue_lock_key.clone(), - take, - ) - .await?; - if taken == 0 { - result.push((vec![0; take], ChunkType::Dummy.into())); - take = 0; - break; - } else { - take -= taken; - result.push((data, order)); - } - } - - Ok((result, take)) - } - + #[cfg(test)] pub async fn add_availability_listener( &mut self, address: &ProtocolAddress, @@ -235,6 +239,7 @@ where add(self.listeners.clone(), address, listener).await; } + #[cfg(test)] pub async fn remove_availability_listener(&mut self, address: &ProtocolAddress) { remove(self.listeners.clone(), address).await; } diff --git a/server/src/server/server.rs b/server/src/server/server.rs index 4a6f146..788acbd 100644 --- a/server/src/server/server.rs +++ b/server/src/server/server.rs @@ -193,12 +193,6 @@ pub async fn handle_receiving_chunks< return Ok(()); }; - let receiver_device_id = destination - .devices() - .first() - .ok_or_else(|| anyhow!("Error"))? - .device_id(); - let sender = authenticated_device.get_protocol_address(ServiceIdKind::Aci); let _ = state @@ -229,6 +223,12 @@ pub async fn handle_receiving_chunks< ServiceId::parse_from_service_id_string(&pre_key_request.service_id) .ok_or_else(|| anyhow!("Failed to get service id"))?; + let receiver_device_id = destination + .devices() + .first() + .ok_or_else(|| anyhow!("Error"))? + .device_id(); + let pre_key_response = state .key_manager .handle_get_keys_id_device_id( @@ -242,6 +242,7 @@ pub async fn handle_receiving_chunks< let sender_account = authenticated_device.account(); let payload = DeniablePayload::KeyResponse(pre_key_response); + account_payloads_map .entry(sender_account.clone()) .or_insert_with(Vec::new) @@ -255,6 +256,7 @@ pub async fn handle_receiving_chunks< .expect("Failed to get destination ACI"), ) .expect("Failed to parse string to ServiceId"); + let sender_account = authenticated_device.account(); let sender_device_id = u32::from(authenticated_device.device().device_id()) as u8; @@ -270,7 +272,9 @@ pub async fn handle_receiving_chunks< .account_manager .get_account(&receiver_service_id) .await?; + let payload = DeniablePayload::Envelope(envelope); + account_payloads_map .entry(receiver_account) .or_insert_with(Vec::new)