Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::ffi::CString;
use std::collections::HashMap;
use std::ffi::{c_int, CString};
use std::io;
use std::mem::MaybeUninit;

Expand Down Expand Up @@ -54,6 +55,7 @@ pub struct Client {
pub struct ClientBuilder {
name_node: String,
user: Option<String>,
configs: HashMap<String, String>,
kerberos_ticket_cache_path: Option<String>,
}

Expand Down Expand Up @@ -86,6 +88,7 @@ impl ClientBuilder {
ClientBuilder {
name_node: name_node.to_string(),
user: None,
configs: HashMap::new(),
kerberos_ticket_cache_path: None,
}
}
Expand All @@ -104,6 +107,25 @@ impl ClientBuilder {
self
}

/// Set the configs to override those defined in `hdfs-site.xml` for
/// existing ClientBuilder
///
/// # Example
///
/// ```no_run
/// use hdrs::{Client, ClientBuilder};
///
/// let client = ClientBuilder::new("default")
/// .with_config("dfs.client.socket-timeout", "1000")
/// .connect();
pub fn with_config(mut self, key: &str, value: &str) -> ClientBuilder {
self.configs
.entry(key.to_string())
.and_modify(|e| *e = value.to_string())
.or_insert_with(|| value.to_string());
self
}

/// Set the krb5 ticket cache path for existing ClientBuilder
///
/// # Examples
Expand Down Expand Up @@ -165,6 +187,29 @@ impl ClientBuilder {
}
}

if !self.configs.is_empty() {
let mut ret: c_int;
let mut key = MaybeUninit::uninit();
let mut value = MaybeUninit::uninit();

for (k, v) in self.configs {
key.write(CString::new(k)?);
value.write(CString::new(v)?);

unsafe {
ret = hdfsBuilderConfSetStr(
builder,
key.assume_init_ref().as_ptr(),
value.assume_init_ref().as_ptr(),
)
};

if 0 != ret {
return Err(io::Error::last_os_error());
}
}
}

unsafe { hdfsBuilderConnect(builder) }
};

Expand Down Expand Up @@ -472,6 +517,17 @@ mod tests {
assert!(!fs.fs.is_null())
}

#[test]
fn test_client_config() {
let _ = env_logger::try_init();

let fs = ClientBuilder::new("default")
.with_config("dfs.client.socket-timeout", "1000")
.connect()
.expect("init success");
assert!(!fs.fs.is_null())
}

#[test]
fn test_client_open() {
let _ = env_logger::try_init();
Expand Down
166 changes: 0 additions & 166 deletions tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,172 +199,6 @@ fn test_file() -> Result<()> {
Ok(())
}

#[cfg(feature = "futures-io")]
#[tokio::test]
async fn test_tokio_file() -> Result<()> {
use futures::io::*;

let _ = env_logger::try_init();
dotenv::from_filename(".env").ok();

if env::var("HDRS_TEST").unwrap_or_default() != "on" {
return Ok(());
}

let name_node = env::var("HDRS_NAMENODE")?;
let work_dir = env::var("HDRS_WORKDIR").unwrap_or_default();

let fs = ClientBuilder::new(&name_node).connect()?;

let path = format!("{work_dir}{}", uuid::Uuid::new_v4());

let mut rng = rand::thread_rng();
let mut content = vec![0; rng.gen_range(1024..4 * 1024 * 1024)];
rng.fill_bytes(&mut content);

{
// Write file
debug!("test file write");
let mut f = fs.open_file().create(true).write(true).open(&path)?;
f.write_all(&content).await?;
// Flush file
debug!("test file flush");
f.flush().await?;
}

{
// Read file
debug!("test file read");
let mut f = fs.open_file().read(true).open(&path)?;
let mut buf = Vec::new();
let n = f.read_to_end(&mut buf).await?;
assert_eq!(n, content.len());
assert_eq!(buf, content);
}

{
// Stat file.
debug!("test file stat");
let fi = fs.metadata(&path)?;
assert!(fi.is_file());
assert_eq!(&path, fi.path());
assert_eq!(fi.len(), content.len() as u64);
}

{
// Seek file.
debug!("test file seek");
let mut f = fs.open_file().read(true).open(&path)?;
let offset = content.len() / 2;
let size = content.len() - offset;
let mut buf = Vec::new();
let _ = f.seek(SeekFrom::Start(offset as u64)).await?;
let n = f.read_to_end(&mut buf).await?;
assert_eq!(n, size);
assert_eq!(buf, content[offset..]);
}

{
// Remove file
debug!("test file remove");
let result = fs.remove_file(&path);
assert!(result.is_ok());
}

{
// Stat it again, we should get a NotFound.
debug!("test file stat again");
let fi = fs.metadata(&path);
assert!(fi.is_err());
assert_eq!(fi.unwrap_err().kind(), io::ErrorKind::NotFound);
}

Ok(())
}

#[cfg(feature = "tokio-io")]
#[tokio::test]
async fn test_futures_file() -> Result<()> {
use tokio::io::*;

let _ = env_logger::try_init();
dotenv::from_filename(".env").ok();

if env::var("HDRS_TEST").unwrap_or_default() != "on" {
return Ok(());
}

let name_node = env::var("HDRS_NAMENODE")?;
let work_dir = env::var("HDRS_WORKDIR").unwrap_or_default();

let fs = ClientBuilder::new(&name_node).connect()?;

let path = format!("{work_dir}{}", uuid::Uuid::new_v4());

let mut rng = rand::thread_rng();
let mut content = vec![0; rng.gen_range(1024..4 * 1024 * 1024)];
rng.fill_bytes(&mut content);

{
// Write file
debug!("test file write");
let mut f = fs.open_file().create(true).write(true).open(&path)?;
f.write_all(&content).await?;
// Flush file
debug!("test file flush");
f.flush().await?;
}

{
// Read file
debug!("test file read");
let mut f = fs.open_file().read(true).open(&path)?;
let mut buf = Vec::new();
let n = f.read_to_end(&mut buf).await?;
assert_eq!(n, content.len());
assert_eq!(buf, content);
}

{
// Stat file.
debug!("test file stat");
let fi = fs.metadata(&path)?;
assert!(fi.is_file());
assert_eq!(&path, fi.path());
assert_eq!(fi.len(), content.len() as u64);
}

{
// Seek file.
debug!("test file seek");
let mut f = fs.open_file().read(true).open(&path)?;
let offset = content.len() / 2;
let size = content.len() - offset;
let mut buf = Vec::new();
let _ = f.seek(SeekFrom::Start(offset as u64)).await?;
let n = f.read_to_end(&mut buf).await?;
assert_eq!(n, size);
assert_eq!(buf, content[offset..]);
}

{
// Remove file
debug!("test file remove");
let result = fs.remove_file(&path);
assert!(result.is_ok());
}

{
// Stat it again, we should get a NotFound.
debug!("test file stat again");
let fi = fs.metadata(&path);
assert!(fi.is_err());
assert_eq!(fi.unwrap_err().kind(), io::ErrorKind::NotFound);
}

Ok(())
}

#[test]
fn test_client_with_user() -> Result<()> {
let _ = env_logger::try_init();
Expand Down