Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions runtime/lite/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use dotenv::dotenv;
#[tokio::main]
async fn main() {
dotenv().ok();
let db = CoreStorage::new("runtime").unwrap();
let db = CoreStorage::new("runtime-db").unwrap();
let logger = CoreLogger::new(Some("./ssb-consumer.log"));
let state_manager = GlobalState::new(logger.clone());

Expand Down Expand Up @@ -47,9 +47,6 @@ async fn main() {
.await
.unwrap();

let logger = ssb_context.clone().lock().unwrap().get_logger().clone();
logger.info("consumer successfully started✅");

client
.live_feed_with_execution_of_workflow(true, ssb_context)
.await
Expand Down Expand Up @@ -89,7 +86,7 @@ fn handle_client(mut stream: TcpStream, ctx: Arc<Mutex<dyn Ctx>>) {
logger.info("Data Deserialized");
let db = ctx.get_db();

db.insert(&body.pub_id.clone(), body).unwrap();
db.insert_request_body(&body.pub_id.clone(), body).unwrap();
logger.info("Data inserted successfully");

// Respond to the client (optional)
Expand Down
4 changes: 2 additions & 2 deletions runtime/lite/src/modules/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub struct RequestBody {
pub wasm: Vec<u8>,
pub invite: String,
pub pub_id: String,
pub allowed_hosts : Option<Vec<String>>,
pub allowed_hosts: Option<Vec<String>>,
pub input: Value,
}

Expand All @@ -24,4 +24,4 @@ pub fn combine_values(dest: &mut serde_json::Value, src: &serde_json::Value) {
}
(_, _) => panic!("update_with only works with two serde_json::Value::Object s"),
}
}
}
38 changes: 28 additions & 10 deletions runtime/lite/src/modules/kuska_ssb_client/client/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::sync::{Arc, Mutex};

use crate::{
modules::logger::Logger, modules::storage::Storage, modules::wasmtime_wasi_module, Ctx,
modules::{
logger::Logger, state_manager::GlobalStateManager, storage::Storage, wasmtime_wasi_module,
},
Ctx,
};

use super::*;
Expand Down Expand Up @@ -80,9 +83,9 @@ impl Client {

let server_pk =
ed25519::PublicKey::from_slice(&base64::decode(&server_pk)?).expect("bad public key");
let server_ipport = format!("{}:{}", ip, port);
let server_ip_port = format!("{}:{}", ip, port);

let mut socket = TcpStream::connect(server_ipport).await?;
let mut socket = TcpStream::connect(server_ip_port).await?;

let handshake =
handshake_client(&mut socket, ssb_net_id(), pk, sk.clone(), server_pk).await?;
Expand Down Expand Up @@ -187,7 +190,13 @@ impl Client {
Ok(())
}

pub async fn publish_event(&mut self, event: &str, section: &str, content: &str, mentions: Option<Vec<Mention>>) -> Result<()> {
pub async fn publish_event(
&mut self,
event: &str,
section: &str,
content: &str,
mentions: Option<Vec<Mention>>,
) -> Result<()> {
let _req_id = self
.api
.publish_req_send(TypedMessage::Event {
Expand Down Expand Up @@ -217,7 +226,8 @@ impl Client {
let (id, msg) = self.rpc_reader.recv().await?;
let ctx = ctx.lock().unwrap();
let db = ctx.get_db();
let logger = ctx.get_logger();
let mut logger = ctx.get_logger();
let mut state_manager = ctx.get_state_manager();

if id == req_no {
match msg {
Expand All @@ -234,23 +244,31 @@ impl Client {
match serde_json::from_str::<serde_json::Value>(&x.text)
{
Ok(mut event) => {


logger.info(&format!("Event: {:#?}", event));

match db.get(&x.mentions.unwrap()[0].link) {
match db.get_request_body(
&x.mentions.unwrap()[0].link,
) {
Ok(body) => {
let data = serde_json::json!({
"data" : crate::common::combine_values(&mut event, &body.input),
"allowed_hosts": body.allowed_hosts
});

let workflow_index = ctx
.get_state_manager()
.new_workflow(0, "hello");

let _ =
wasmtime_wasi_module::run_workflow(
&mut state_manager,
&mut logger,
serde_json::to_value(data)
.unwrap(),
body.wasm,
0,
"hello",
workflow_index,
false,
None
);
}
Err(e) => logger.error(&e.to_string()),
Expand Down
2 changes: 1 addition & 1 deletion runtime/lite/src/modules/kuska_ssb_client/client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod tests {
use serde_json::{json, Value};

// ssb-server should keep running for testing
/* configure the env variables such as ssb-sercret file path, ip and port where
/* configure the env variables such as ssb-secret file path, ip and port where
ssb-server is running in .env file */
// use `cargo test -- --ignored` command for testing

Expand Down
2 changes: 0 additions & 2 deletions runtime/lite/src/modules/logger/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@ impl CoreLogger {
let file = match log_file {
Some(file) => OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(file)
.unwrap(),
None => OpenOptions::new()
.create(true)
.write(true)
.append(true)
.open("./workflows.log")
.unwrap(),
};
Expand Down
4 changes: 2 additions & 2 deletions runtime/lite/src/modules/logger/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn test_writing_to_log_file() {
}

#[test]
fn test_logger_in_multi_threads(){
fn test_logger_in_multi_threads() {
let logger = CoreLogger::new(Some("test3.log"));
let mut handles = vec![];

Expand All @@ -44,4 +44,4 @@ fn test_logger_in_multi_threads(){
}

fs::remove_file("test3.log").unwrap();
}
}
Loading