Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 41 additions & 39 deletions server/src/managers/denim/denim_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,45 +86,6 @@ where
Ok(payloads)
}

/// Store chunks in outgoing chunk buffer
pub async fn enqueue_outgoing_chunk_buffer(
&self,
receiver: &ProtocolAddress,
chunks: Vec<DenimChunk>,
) -> Result<u64> {
let mut count = 0;
for chunk in chunks {
count += self
.chunk_cache
.insert(
receiver,
Buffer::Receiver,
&chunk,
&Uuid::new_v4().to_string(),
)
.await?
}
Ok(count)
}

/// Dequeue all payloads from outgoing chunk buffer
pub async fn flush_outgoing_chunk_buffer(
&self,
receiver: &ProtocolAddress,
) -> Result<Vec<DeniablePayload>> {
let chunks = self.chunk_cache.dequeue_outgoing_chunks(receiver).await?;
let (payloads, pending_chunks) = self.create_deniable_payloads(chunks)?;

if !pending_chunks.is_empty() {
eprintln!("Error: payloads created but there are still chunks left");
let _ = self
.enqueue_outgoing_chunk_buffer(receiver, pending_chunks)
.await?;
}

Ok(payloads)
}

/// Store deniable payloads in outgoing payload buffer
pub async fn enqueue_outgoing_payload_buffer(
&self,
Expand Down Expand Up @@ -229,6 +190,47 @@ where
Ok((payloads, pending_chunks))
}

/// Store chunks in outgoing chunk buffer
#[cfg(test)]
pub async fn enqueue_outgoing_chunk_buffer(
&self,
receiver: &ProtocolAddress,
chunks: Vec<DenimChunk>,
) -> Result<u64> {
let mut count = 0;
for chunk in chunks {
count += self
.chunk_cache
.insert(
receiver,
Buffer::Receiver,
&chunk,
&Uuid::new_v4().to_string(),
)
.await?
}
Ok(count)
}

/// Dequeue all payloads from outgoing chunk buffer
#[cfg(test)]
pub async fn flush_outgoing_chunk_buffer(
&self,
receiver: &ProtocolAddress,
) -> Result<Vec<DeniablePayload>> {
let chunks = self.chunk_cache.dequeue_outgoing_chunks(receiver).await?;
let (payloads, pending_chunks) = self.create_deniable_payloads(chunks)?;

if !pending_chunks.is_empty() {
eprintln!("Error: payloads created but there are still chunks left");
let _ = self
.enqueue_outgoing_chunk_buffer(receiver, pending_chunks)
.await?;
}

Ok(payloads)
}

#[cfg(test)]
pub async fn get_incoming_chunks(&self, sender: &ProtocolAddress) -> Result<Vec<DenimChunk>> {
self.chunk_cache
Expand Down
81 changes: 43 additions & 38 deletions server/src/managers/denim/payload_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,45 @@ where
payload_id
}

pub async fn dequeue_payload_data(
&self,
address: &ProtocolAddress,
buffer: Buffer,
bytes_amount: usize,
) -> Result<(Vec<(Vec<u8>, i32)>, usize)> {
let queue_key = self.get_queue_key(address, buffer);
let queue_lock_key = self.get_persist_in_progress_key(address, buffer);
let queue_metadata_key: String = self.get_queue_metadata_key(address, buffer);
let queue_total_index_key: String = self.get_queue_index_key(buffer);

let mut result = Vec::new();
let mut take = bytes_amount;
while take >= constants::EMPTY_DENIMCHUNK_SIZE {
take -= constants::EMPTY_DENIMCHUNK_SIZE;
let connection = self.pool.get().await?;
let (data, taken, order) = redis::dequeue_bytes(
connection,
queue_key.clone(),
queue_metadata_key.clone(),
queue_total_index_key.clone(),
queue_lock_key.clone(),
take,
)
.await?;
if taken == 0 {
result.push((vec![0; take], ChunkType::Dummy.into()));
take = 0;
break;
} else {
take -= taken;
result.push((data, order));
}
}

Ok((result, take))
}

#[cfg(test)]
pub async fn remove(
&self,
address: &ProtocolAddress,
Expand All @@ -139,6 +178,7 @@ where
.await
}

#[cfg(test)]
pub async fn get_all_payloads(
&self,
address: &ProtocolAddress,
Expand All @@ -164,6 +204,7 @@ where
Ok(DeniablePayload::decode(values)?)
}

#[cfg(test)]
pub async fn get_all_payloads_raw(
&self,
address: &ProtocolAddress,
Expand All @@ -189,44 +230,7 @@ where
Ok(redis::Bytes::decode(values)?)
}

pub async fn dequeue_payload_data(
&self,
address: &ProtocolAddress,
buffer: Buffer,
bytes_amount: usize,
) -> Result<(Vec<(Vec<u8>, i32)>, usize)> {
let queue_key = self.get_queue_key(address, buffer);
let queue_lock_key = self.get_persist_in_progress_key(address, buffer);
let queue_metadata_key: String = self.get_queue_metadata_key(address, buffer);
let queue_total_index_key: String = self.get_queue_index_key(buffer);

let mut result = Vec::new();
let mut take = bytes_amount;
while take >= constants::EMPTY_DENIMCHUNK_SIZE {
take -= constants::EMPTY_DENIMCHUNK_SIZE;
let connection = self.pool.get().await?;
let (data, taken, order) = redis::dequeue_bytes(
connection,
queue_key.clone(),
queue_metadata_key.clone(),
queue_total_index_key.clone(),
queue_lock_key.clone(),
take,
)
.await?;
if taken == 0 {
result.push((vec![0; take], ChunkType::Dummy.into()));
take = 0;
break;
} else {
take -= taken;
result.push((data, order));
}
}

Ok((result, take))
}

#[cfg(test)]
pub async fn add_availability_listener(
&mut self,
address: &ProtocolAddress,
Expand All @@ -235,6 +239,7 @@ where
add(self.listeners.clone(), address, listener).await;
}

#[cfg(test)]
pub async fn remove_availability_listener(&mut self, address: &ProtocolAddress) {
remove(self.listeners.clone(), address).await;
}
Expand Down
16 changes: 10 additions & 6 deletions server/src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,6 @@ pub async fn handle_receiving_chunks<
return Ok(());
};

let receiver_device_id = destination
.devices()
.first()
.ok_or_else(|| anyhow!("Error"))?
.device_id();

let sender = authenticated_device.get_protocol_address(ServiceIdKind::Aci);

let _ = state
Expand Down Expand Up @@ -229,6 +223,12 @@ pub async fn handle_receiving_chunks<
ServiceId::parse_from_service_id_string(&pre_key_request.service_id)
.ok_or_else(|| anyhow!("Failed to get service id"))?;

let receiver_device_id = destination
.devices()
.first()
.ok_or_else(|| anyhow!("Error"))?
.device_id();

let pre_key_response = state
.key_manager
.handle_get_keys_id_device_id(
Expand All @@ -242,6 +242,7 @@ pub async fn handle_receiving_chunks<

let sender_account = authenticated_device.account();
let payload = DeniablePayload::KeyResponse(pre_key_response);

account_payloads_map
.entry(sender_account.clone())
.or_insert_with(Vec::new)
Expand All @@ -255,6 +256,7 @@ pub async fn handle_receiving_chunks<
.expect("Failed to get destination ACI"),
)
.expect("Failed to parse string to ServiceId");

let sender_account = authenticated_device.account();
let sender_device_id = u32::from(authenticated_device.device().device_id()) as u8;

Expand All @@ -270,7 +272,9 @@ pub async fn handle_receiving_chunks<
.account_manager
.get_account(&receiver_service_id)
.await?;

let payload = DeniablePayload::Envelope(envelope);

account_payloads_map
.entry(receiver_account)
.or_insert_with(Vec::new)
Expand Down