From 4e02787c29c4eb8c3579ec9da22b68b3904570cf Mon Sep 17 00:00:00 2001 From: Lablace <67578380+Lablace@users.noreply.github.com> Date: Mon, 23 Sep 2024 21:44:05 +0800 Subject: [PATCH 1/2] feat: Set config strings --- src/client.rs | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index f0da472..8cdcec4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,5 @@ -use std::ffi::CString; +use std::collections::HashMap; +use std::ffi::{c_int, CString}; use std::io; use std::mem::MaybeUninit; @@ -54,6 +55,7 @@ pub struct Client { pub struct ClientBuilder { name_node: String, user: Option, + configs: HashMap, kerberos_ticket_cache_path: Option, } @@ -86,6 +88,7 @@ impl ClientBuilder { ClientBuilder { name_node: name_node.to_string(), user: None, + configs: HashMap::new(), kerberos_ticket_cache_path: None, } } @@ -104,6 +107,25 @@ impl ClientBuilder { self } + /// Set the configs to override those defined in `hdfs-site.xml` for + /// existing ClientBuilder + /// + /// # Example + /// + /// ```no_run + /// use hdrs::{Client, ClientBuilder}; + /// + /// let client = ClientBuilder::new("default") + /// .with_config("dfs.client.socket-timeout", "1000") + /// .connect(); + pub fn with_config(mut self, key: &str, value: &str) -> ClientBuilder { + self.configs + .entry(key.to_string()) + .and_modify(|e| *e = value.to_string()) + .or_insert_with(|| value.to_string()); + self + } + /// Set the krb5 ticket cache path for existing ClientBuilder /// /// # Examples @@ -165,6 +187,29 @@ impl ClientBuilder { } } + if !self.configs.is_empty() { + let mut ret: c_int; + let mut key = MaybeUninit::uninit(); + let mut value = MaybeUninit::uninit(); + + for (k, v) in self.configs { + key.write(CString::new(k)?); + value.write(CString::new(v)?); + + unsafe { + ret = hdfsBuilderConfSetStr( + builder, + key.assume_init_ref().as_ptr(), + value.assume_init_ref().as_ptr(), + ) + }; + + if 0 != ret { + return Err(io::Error::last_os_error()); + } + } + } + unsafe { hdfsBuilderConnect(builder) } }; @@ -472,6 +517,17 @@ mod tests { assert!(!fs.fs.is_null()) } + #[test] + fn test_client_config() { + let _ = env_logger::try_init(); + + let fs = ClientBuilder::new("default") + .with_config("dfs.client.socket-timeout", "1000") + .connect() + .expect("init success"); + assert!(!fs.fs.is_null()) + } + #[test] fn test_client_open() { let _ = env_logger::try_init(); From 60b20620dc904e7e5c3e4154034f17a36e7730d6 Mon Sep 17 00:00:00 2001 From: Lablace <67578380+Lablace@users.noreply.github.com> Date: Tue, 24 Sep 2024 19:18:22 +0800 Subject: [PATCH 2/2] test: Remove unused tests --- tests/main.rs | 166 -------------------------------------------------- 1 file changed, 166 deletions(-) diff --git a/tests/main.rs b/tests/main.rs index e4fabb8..4af7ec0 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -199,172 +199,6 @@ fn test_file() -> Result<()> { Ok(()) } -#[cfg(feature = "futures-io")] -#[tokio::test] -async fn test_tokio_file() -> Result<()> { - use futures::io::*; - - let _ = env_logger::try_init(); - dotenv::from_filename(".env").ok(); - - if env::var("HDRS_TEST").unwrap_or_default() != "on" { - return Ok(()); - } - - let name_node = env::var("HDRS_NAMENODE")?; - let work_dir = env::var("HDRS_WORKDIR").unwrap_or_default(); - - let fs = ClientBuilder::new(&name_node).connect()?; - - let path = format!("{work_dir}{}", uuid::Uuid::new_v4()); - - let mut rng = rand::thread_rng(); - let mut content = vec![0; rng.gen_range(1024..4 * 1024 * 1024)]; - rng.fill_bytes(&mut content); - - { - // Write file - debug!("test file write"); - let mut f = fs.open_file().create(true).write(true).open(&path)?; - f.write_all(&content).await?; - // Flush file - debug!("test file flush"); - f.flush().await?; - } - - { - // Read file - debug!("test file read"); - let mut f = fs.open_file().read(true).open(&path)?; - let mut buf = Vec::new(); - let n = f.read_to_end(&mut buf).await?; - assert_eq!(n, content.len()); - assert_eq!(buf, content); - } - - { - // Stat file. - debug!("test file stat"); - let fi = fs.metadata(&path)?; - assert!(fi.is_file()); - assert_eq!(&path, fi.path()); - assert_eq!(fi.len(), content.len() as u64); - } - - { - // Seek file. - debug!("test file seek"); - let mut f = fs.open_file().read(true).open(&path)?; - let offset = content.len() / 2; - let size = content.len() - offset; - let mut buf = Vec::new(); - let _ = f.seek(SeekFrom::Start(offset as u64)).await?; - let n = f.read_to_end(&mut buf).await?; - assert_eq!(n, size); - assert_eq!(buf, content[offset..]); - } - - { - // Remove file - debug!("test file remove"); - let result = fs.remove_file(&path); - assert!(result.is_ok()); - } - - { - // Stat it again, we should get a NotFound. - debug!("test file stat again"); - let fi = fs.metadata(&path); - assert!(fi.is_err()); - assert_eq!(fi.unwrap_err().kind(), io::ErrorKind::NotFound); - } - - Ok(()) -} - -#[cfg(feature = "tokio-io")] -#[tokio::test] -async fn test_futures_file() -> Result<()> { - use tokio::io::*; - - let _ = env_logger::try_init(); - dotenv::from_filename(".env").ok(); - - if env::var("HDRS_TEST").unwrap_or_default() != "on" { - return Ok(()); - } - - let name_node = env::var("HDRS_NAMENODE")?; - let work_dir = env::var("HDRS_WORKDIR").unwrap_or_default(); - - let fs = ClientBuilder::new(&name_node).connect()?; - - let path = format!("{work_dir}{}", uuid::Uuid::new_v4()); - - let mut rng = rand::thread_rng(); - let mut content = vec![0; rng.gen_range(1024..4 * 1024 * 1024)]; - rng.fill_bytes(&mut content); - - { - // Write file - debug!("test file write"); - let mut f = fs.open_file().create(true).write(true).open(&path)?; - f.write_all(&content).await?; - // Flush file - debug!("test file flush"); - f.flush().await?; - } - - { - // Read file - debug!("test file read"); - let mut f = fs.open_file().read(true).open(&path)?; - let mut buf = Vec::new(); - let n = f.read_to_end(&mut buf).await?; - assert_eq!(n, content.len()); - assert_eq!(buf, content); - } - - { - // Stat file. - debug!("test file stat"); - let fi = fs.metadata(&path)?; - assert!(fi.is_file()); - assert_eq!(&path, fi.path()); - assert_eq!(fi.len(), content.len() as u64); - } - - { - // Seek file. - debug!("test file seek"); - let mut f = fs.open_file().read(true).open(&path)?; - let offset = content.len() / 2; - let size = content.len() - offset; - let mut buf = Vec::new(); - let _ = f.seek(SeekFrom::Start(offset as u64)).await?; - let n = f.read_to_end(&mut buf).await?; - assert_eq!(n, size); - assert_eq!(buf, content[offset..]); - } - - { - // Remove file - debug!("test file remove"); - let result = fs.remove_file(&path); - assert!(result.is_ok()); - } - - { - // Stat it again, we should get a NotFound. - debug!("test file stat again"); - let fi = fs.metadata(&path); - assert!(fi.is_err()); - assert_eq!(fi.unwrap_err().kind(), io::ErrorKind::NotFound); - } - - Ok(()) -} - #[test] fn test_client_with_user() -> Result<()> { let _ = env_logger::try_init();