diff --git a/Cargo.toml b/Cargo.toml index 738953d..753c2c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,3 +17,17 @@ exclude = [ "travis_test.sh", "tests-ios/**", ] + +[package.metadata.docs.rs] +default-target = "stable-x86_64-apple-darwin" + +[dependencies] +log = "0.4" +bitflags = "1.0" +libc = "0.2" +block = "0.1" + +[dev-dependencies] +pretty_env_logger = "0.2" +tempfile = "3.0" + diff --git a/src/blk.rs b/src/blk.rs new file mode 100644 index 0000000..d980e07 --- /dev/null +++ b/src/blk.rs @@ -0,0 +1,530 @@ +use std::cell::RefCell; +use std::fmt; +use std::os::raw::c_void; +use std::sync::Arc; + +use ffi::*; +use {Group, IntoTimeout, QosClass, Queue, WaitTimeout}; + +bitflags! { + /// Flags to pass to the `DispatchBlock::create()` functions. + pub struct BlockFlags: dispatch_block_flags_t { + /// Flag indicating that a dispatch block object should act as a barrier block + /// when submitted to a DISPATCH_QUEUE_CONCURRENT queue. + const BARRIER = 0x1; + /// Flag indicating that a dispatch block object should execute disassociated + /// from current execution context attributes such as QOS class, os_activity_t + /// and properties of the current IPC request (if any). + const DETACHED = 0x2; + /// Flag indicating that a dispatch block object should be assigned the execution + /// context attributes that are current at the time the block object is created. + const ASSIGN_CURRENT = 0x4; + /// Flag indicating that a dispatch block object should be not be assigned a QOS class. + const NO_QOS_CLASS = 0x8; + /// Flag indicating that execution of a dispatch block object submitted to + /// a queue should prefer the QOS class assigned to the queue over the QOS class + /// assigned to the block (resp. associated with the block at the time of submission). + const INHERIT_QOS_CLASS = 0x10; + /// Flag indicating that execution of a dispatch block object submitted to + /// a queue should prefer the QOS class assigned to the block (resp. associated + /// with the block at the time of submission) over the QOS class assigned to + /// the queue, as long as doing so will not result in a lower QOS class. + const ENFORCE_QOS_CLASS = 0x20; + } +} + +/// Creates, synchronously executes, and releases a dispatch block from the specified block and flags. +pub fn perform(flags: BlockFlags, closure: F) +where + F: 'static + Fn(), +{ + unsafe { dispatch_block_perform(flags.bits(), block(closure)) } +} + +/// Dispatch blocks allow you to configure properties of individual units of work on a queue directly. +/// +/// They also allow you to address individual work units for the purposes of waiting for their completion, +/// getting notified about their completion, and/or canceling them. +pub struct DispatchBlock { + ptr: dispatch_block_t, +} + +impl DispatchBlock { + /// Creates a new dispatch block on the heap using a closure. + pub fn new(closure: F) -> Self + where + F: 'static + Fn(), + { + Self::create(BlockFlags::INHERIT_QOS_CLASS, closure) + } + + /// Creates a new dispatch block on the heap using a closure and the given flags. + pub fn create(flags: BlockFlags, closure: F) -> Self + where + F: 'static + Fn(), + { + let ptr = unsafe { dispatch_block_create(flags.bits(), block(closure)) }; + + DispatchBlock { ptr } + } + + /// Creates a new dispatch block on the heap from a closure and the given flags, + /// and assigns it the specified QoS class and relative priority. + pub fn create_with_qos_class( + flags: BlockFlags, + qos_class: QosClass, + relative_priority: i32, + closure: F, + ) -> Self + where + F: 'static + Fn(), + { + let ptr = unsafe { + dispatch_block_create_with_qos_class( + flags.bits(), + qos_class as u32, + relative_priority, + block(closure), + ) + }; + + DispatchBlock { ptr } + } + + /// Extracts the raw `dispatch_block_t`. + pub fn as_raw(&self) -> dispatch_block_t { + self.ptr + } + + /// Consumes the `DispatchBlock`, returning the wrapped `dispatch_block_t`. + pub fn into_raw(self) -> dispatch_block_t { + self.ptr + } + + /// Waits synchronously until execution of the specified dispatch block has completed or until the specified timeout has elapsed. + pub fn wait_timeout(&self, timeout: T) -> Result<(), WaitTimeout> { + let when = timeout.into_raw(); + + if unsafe { dispatch_block_wait(self.ptr, when) } == 0 { + Ok(()) + } else { + Err(WaitTimeout) + } + } + + /// Waits synchronously until execution of the specified dispatch block has completed or forever. + pub fn wait(&self) -> Result<(), WaitTimeout> { + if unsafe { dispatch_block_wait(self.ptr, DISPATCH_TIME_FOREVER) } == 0 { + Ok(()) + } else { + Err(WaitTimeout) + } + } + + /// The dispatch block object completed. + pub fn done(&self) -> bool { + unsafe { dispatch_block_wait(self.ptr, DISPATCH_TIME_NOW) == 0 } + } + + /// Schedules a notification block to be submitted to a queue when the execution of a specified dispatch block has completed. + pub fn notify(&self, queue: &Queue, notification_block: Self) { + unsafe { dispatch_block_notify(self.ptr, queue.as_raw(), notification_block.ptr) } + } + + /// Asynchronously cancels the specified dispatch block. + pub fn cancel(&self) { + unsafe { dispatch_block_cancel(self.ptr) } + } + + /// Tests whether the given dispatch block has been canceled. + pub fn canceled(&self) -> bool { + unsafe { dispatch_block_testcancel(self.ptr) != 0 } + } +} + +unsafe impl Sync for DispatchBlock {} +unsafe impl Send for DispatchBlock {} + +impl Clone for DispatchBlock { + fn clone(&self) -> Self { + let ptr = unsafe { _Block_copy(self.ptr as *const c_void) as dispatch_block_t }; + + DispatchBlock { ptr } + } +} + +impl Drop for DispatchBlock { + fn drop(&mut self) { + unsafe { + _Block_release(self.ptr as *mut c_void); + } + } +} + +impl From for DispatchBlock { + fn from(closure: F) -> Self { + DispatchBlock::new(closure) + } +} + +impl Queue { + /// Submits a closure for execution on self and waits until it completes. + pub fn sync_block(&self, flags: BlockFlags, work: F) -> T + where + F: 'static + Send + Fn() -> T, + T: 'static + Send + fmt::Debug, + { + let result = Arc::new(RefCell::new(None)); + { + let result_ref = result.clone(); + let work = move || { + *result_ref.borrow_mut() = Some(work()); + }; + let block = DispatchBlock::create(flags, work); + + unsafe { + dispatch_sync(self.ptr, block.clone().into_raw()); + } + } + // This was set so it's safe to unwrap + let result = result.borrow_mut().take(); + result.unwrap() + } + + /// Submits a closure for asynchronous execution on self and returns dispatch block immediately. + pub fn async_block(&self, work: B) -> DispatchBlock + where + B: Into, + { + let block = work.into(); + + unsafe { + dispatch_async(self.ptr, block.clone().into_raw()); + } + + block + } + + /// After the specified delay, submits a closure for asynchronous execution + /// on self and returns dispatch block immediately. + pub fn after_block(&self, delay: T, work: B) -> DispatchBlock + where + T: IntoTimeout, + B: Into, + { + let when = delay.into_raw(); + let block = work.into(); + + unsafe { + dispatch_after(when, self.ptr, block.clone().into_raw()); + } + + block + } + + /// Submits a closure to be executed on self the given number of iterations + /// and waits until it completes. + pub fn apply_block(&self, iterations: usize, work: F) + where + F: 'static + Sync + Fn(usize), + { + let block = block(work); + + unsafe { + dispatch_apply(iterations, self.ptr, block); + } + } + + /// Submits a closure to be executed on self as a barrier and waits until + /// it completes. + /// + /// Barriers create synchronization points within a concurrent queue. + /// If self is concurrent, when it encounters a barrier it delays execution + /// of the closure (and any further ones) until all closures submitted + /// before the barrier finish executing. + /// At that point, the barrier closure executes by itself. + /// Upon completion, self resumes its normal execution behavior. + /// + /// If self is a serial queue or one of the global concurrent queues, + /// this method behaves like the normal `sync` method. + pub fn barrier_sync_block(&self, flags: BlockFlags, work: F) -> T + where + F: 'static + Send + Fn() -> T, + T: 'static + Send + fmt::Debug, + { + let result = Arc::new(RefCell::new(None)); + { + let result_ref = result.clone(); + let work = move || { + *result_ref.borrow_mut() = Some(work()); + }; + let block = DispatchBlock::create(flags, work); + + unsafe { + dispatch_barrier_sync(self.ptr, block.clone().into_raw()); + } + } + // This was set so it's safe to unwrap + let result = result.borrow_mut().take(); + result.unwrap() + } + + /// Submits a closure to be executed on self as a barrier and returns + /// a `DispatchBlock` immediately. + /// + /// Barriers create synchronization points within a concurrent queue. + /// If self is concurrent, when it encounters a barrier it delays execution + /// of the closure (and any further ones) until all closures submitted + /// before the barrier finish executing. + /// At that point, the barrier closure executes by itself. + /// Upon completion, self resumes its normal execution behavior. + /// + /// If self is a serial queue or one of the global concurrent queues, + /// this method behaves like the normal `async` method. + pub fn barrier_async_block(&self, work: B) -> DispatchBlock + where + B: Into, + { + let block = work.into(); + + unsafe { + dispatch_barrier_async(self.ptr, block.clone().into_raw()); + } + + block + } +} + +impl Group { + /// Submits a closure asynchronously to the given `Queue` and associates it + /// with self and returns a `DispatchBlock` immediately. + pub fn async_block(&self, queue: &Queue, work: B) + where + B: Into, + { + let block = work.into(); + + unsafe { + dispatch_group_async(self.ptr, queue.ptr, block.clone().into_raw()); + } + } + + /// Schedules a closure to be submitted to the given `Queue` when all tasks + /// associated with self have completed and returns a `DispatchBlock` immediately. + /// If self is empty, the closure is submitted immediately. + pub fn notify_block(&self, queue: &Queue, work: B) + where + B: Into, + { + let block = work.into(); + + unsafe { + dispatch_group_notify(self.ptr, queue.ptr, block.clone().into_raw()); + } + } +} + +#[cfg(test)] +mod tests { + use std::cell::Cell; + use std::sync::{Arc, Mutex}; + use std::time::{Duration, Instant}; + + use super::*; + use QueueAttribute; + + fn async_increment_block(queue: &Queue, num: &Arc>) { + let num = num.clone(); + queue.async_block(DispatchBlock::create( + BlockFlags::ASSIGN_CURRENT, + move || { + let mut num = num.lock().unwrap(); + *num += 1; + }, + )); + } + + #[test] + fn test_perform_block() { + let n = Arc::new(Cell::new(0)); + let c = n.clone(); + + perform(BlockFlags::NO_QOS_CLASS, move || c.set(123)); + + assert_eq!(n.get(), 123); + } + + #[test] + fn test_block_block() { + let n = Arc::new(Cell::new(0)); + let c = n.clone(); + let block = DispatchBlock::create(BlockFlags::NO_QOS_CLASS, move || c.set(123)); + + assert!(!block.canceled()); + + assert_eq!(block.wait_timeout(100u32), Err(WaitTimeout)); + + block.cancel(); + + assert!(block.canceled()); + + assert_eq!(block.wait_timeout(100u32), Err(WaitTimeout)); + + assert!(!block.done()); + } + + #[test] + fn test_serial_queue_block() { + let q = Queue::create("", QueueAttribute::Serial); + let mut num = 0; + + q.sync(|| num = 1); + assert_eq!(num, 1); + assert_eq!(q.qos_class(), (QosClass::Unspecified, 0)); + + assert_eq!(q.sync_block(BlockFlags::ASSIGN_CURRENT, move || num), 1); + } + + #[test] + fn test_serial_queue_async_block() { + let q = Queue::create("", QueueAttribute::Serial); + let num = Arc::new(Mutex::new(0)); + + async_increment_block(&q, &num); + + // Sync an empty block to ensure the async one finishes + q.sync(|| ()); + assert_eq!(*num.lock().unwrap(), 1); + } + + #[test] + fn test_after_block() { + let q = Queue::create("", QueueAttribute::Serial); + let group = Group::create(); + let num = Arc::new(Mutex::new(0)); + + let delay = Duration::from_millis(0); + let num2 = num.clone(); + let start = Instant::now(); + { + let group = group.clone(); + let guard = RefCell::new(Some(group.enter())); + q.after_block(delay, move || { + let mut num = num2.lock().unwrap(); + *num = 1; + guard.borrow_mut().take().unwrap().leave(); + }); + } + + // Wait for the previous block to complete + assert!(group.wait_timeout(Duration::from_millis(5000))); + assert!(start.elapsed() >= delay); + assert_eq!(*num.lock().unwrap(), 1); + } + + #[test] + fn test_apply_block() { + let q = Queue::create("", QueueAttribute::Serial); + let num = Arc::new(Mutex::new(0)); + { + let num = num.clone(); + q.apply_block(5, move |_| *num.lock().unwrap() += 1); + } + assert_eq!(*num.lock().unwrap(), 5); + } + + #[test] + fn test_barrier_sync_block() { + let q = Queue::create("", QueueAttribute::Concurrent); + let num = Arc::new(Mutex::new(0)); + + async_increment_block(&q, &num); + async_increment_block(&q, &num); + + let num2 = num.clone(); + let result = q.barrier_sync_block(BlockFlags::ASSIGN_CURRENT, move || { + let mut num = num2.lock().unwrap(); + if *num == 2 { + *num = 10; + } + *num + }); + assert_eq!(result, 10); + + async_increment_block(&q, &num); + async_increment_block(&q, &num); + + q.barrier_sync(|| ()); + assert_eq!(*num.lock().unwrap(), 12); + } + + #[test] + fn test_barrier_async_block() { + let q = Queue::create("", QueueAttribute::Concurrent); + let num = Arc::new(Mutex::new(0)); + + async_increment_block(&q, &num); + async_increment_block(&q, &num); + + let num2 = num.clone(); + q.barrier_async_block(move || { + let mut num = num2.lock().unwrap(); + if *num == 2 { + *num = 10; + } + }); + + async_increment_block(&q, &num); + async_increment_block(&q, &num); + + q.barrier_sync(|| ()); + assert_eq!(*num.lock().unwrap(), 12); + } + + #[test] + fn test_group_block() { + let group = Group::create(); + let q = Queue::create("", QueueAttribute::Serial); + let num = Arc::new(Mutex::new(0)); + { + let num = num.clone(); + group.async_block(&q, move || { + let mut num = num.lock().unwrap(); + *num += 1; + }); + } + + { + let group = group.clone(); + let guard = RefCell::new(Some(group.enter())); + let num = num.clone(); + q.async_block(DispatchBlock::create( + BlockFlags::ASSIGN_CURRENT, + move || { + let mut num = num.lock().unwrap(); + *num += 1; + guard.borrow_mut().take().unwrap().leave(); + }, + )); + } + + let notify_group = Group::create(); + + { + let guard = RefCell::new(Some(notify_group.enter())); + let num = num.clone(); + group.notify_block(&q, move || { + let mut num = num.lock().unwrap(); + *num *= 5; + guard.borrow_mut().take().unwrap().leave(); + }); + } + + // Wait for the notify block to finish + notify_group.wait(); + // If the notify ran, the group should be empty + assert!(group.is_empty()); + // The notify must have run after the two blocks of the group + assert_eq!(*num.lock().unwrap(), 10); + } +} diff --git a/src/data.rs b/src/data.rs new file mode 100644 index 0000000..4ce762e --- /dev/null +++ b/src/data.rs @@ -0,0 +1,324 @@ +use std::ops::Add; +use std::os::raw::c_void; +use std::ptr; +use std::slice; + +use ffi::*; +use Queue; + +/// The destructor responsible for freeing the data when it is no longer needed. +pub trait IntoDestructor { + /// Consumes the `Destructor`, returning the raw `dispatch_block_t`. + fn into_raw(self) -> dispatch_block_t; +} + +impl IntoDestructor for dispatch_block_t { + fn into_raw(self) -> dispatch_block_t { + self + } +} + +impl IntoDestructor for F { + fn into_raw(self) -> dispatch_block_t { + block(self) + } +} + +/// The build-in destructor responsible for freeing the data when it is no longer needed. +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum Destructor { + /// The default destructor for dispatch data objects. + /// Used at data object creation to indicate that the supplied buffer + /// should be copied into internal storage managed by the system. + Default, + /// The destructor for dispatch data objects created from a malloc'd buffer. + /// Used at data object creation to indicate that the supplied buffer + /// was allocated by the malloc() family and should be destroyed with free(3). + Free, + /// The destructor for dispatch data objects that have been created + /// from buffers that require deallocation with munmap(2). + Munmap, +} + +impl Default for Destructor { + fn default() -> Self { + Destructor::Default + } +} + +impl IntoDestructor for Destructor { + fn into_raw(self) -> dispatch_block_t { + match self { + Destructor::Default => ptr::null(), + Destructor::Free => unsafe { _dispatch_data_destructor_free }, + Destructor::Munmap => unsafe { _dispatch_data_destructor_free }, + } + } +} + +/// An immutable object representing a contiguous or sparse region of memory. +#[derive(Debug)] +pub struct Data { + ptr: dispatch_data_t, +} + +impl Data { + pub(crate) fn borrow(ptr: dispatch_data_t) -> Data { + if ptr.is_null() { + Data::empty() + } else { + unsafe { dispatch_retain(ptr) }; + + Data { ptr } + } + } + + /// Creates a new dispatch data object with the specified memory buffer. + pub fn create(queue: &Queue, buf: &[u8]) -> Self { + Self::create_with_destructor(queue, buf.as_ptr() as *const c_void, buf.len(), ptr::null()) + } + + /// Creates a new dispatch data object with the specified memory buffer and destructor. + pub fn create_with_destructor( + queue: &Queue, + buffer: *const c_void, + size: usize, + destructor: F, + ) -> Self { + let ptr = + unsafe { dispatch_data_create(buffer, size, queue.as_raw(), destructor.into_raw()) }; + + debug!("create data with {} bytes, data: {:?}", size, ptr); + + Data { ptr } + } + + /// Extracts the raw `dispatch_data_t`. + pub fn as_raw(&self) -> dispatch_data_t { + self.ptr + } + + /// The singleton dispatch data object representing a zero-length memory region. + pub fn empty() -> Self { + Data { + ptr: unsafe { &_dispatch_data_empty as *const dispatch_object_s as dispatch_data_t }, + } + } + + /// Returns the logical size of the memory managed by a dispatch data object + pub fn len(&self) -> usize { + unsafe { dispatch_data_get_size(self.ptr) } + } + + /// Returns `true` if the data has a length of 0. + pub fn is_empty(&self) -> bool { + self.ptr == unsafe { &_dispatch_data_empty as *const dispatch_object_s as dispatch_data_t } + || self.len() == 0 + } + + /// Returns a new dispatch data object containing a contiguous representation of the specified object’s memory. + pub fn map(&self) -> (Self, &[u8]) { + let mut buf = ptr::null_mut(); + let mut len = 0; + + let ptr = unsafe { dispatch_data_create_map(self.ptr, &mut buf, &mut len) }; + let data = Data { ptr }; + let buf = unsafe { slice::from_raw_parts(buf as *const u8, len) }; + + debug!( + "map data to {} bytes contiguous memory, data: {:?}", + buf.len(), + ptr + ); + + (data, buf) + } + + /// Returns a new dispatch data object consisting of the concatenated data from two other data objects. + pub fn concat(&self, other: &Data) -> Self { + let ptr = unsafe { dispatch_data_create_concat(self.ptr, other.ptr) }; + + debug!( + "concat data: {:?} with data: {:?} to data: {:?}", + self.ptr, other.ptr, ptr + ); + + Data { ptr } + } + + /// Returns a new dispatch data object whose contents consist of a portion of another object’s memory region. + pub fn subrange(&self, offset: usize, length: usize) -> Self { + let ptr = unsafe { dispatch_data_create_subrange(self.ptr, offset, length) }; + + Data { ptr } + } + + /// Traverses the memory of a dispatch data object and executes custom code on each region. + pub fn apply(&self, applier: F) -> bool + where + F: 'static + Fn(&Self, usize, &[u8]) -> bool, + { + let data = self.clone(); + let applier = block( + move |ptr: dispatch_data_t, offset: usize, buffer: *const c_void, size: usize| { + assert_eq!(data.ptr, ptr); + + let buf = unsafe { slice::from_raw_parts(buffer as *const u8, size) }; + + applier(&data, offset, buf) + }, + ); + + unsafe { dispatch_data_apply(self.ptr, applier) } + } + + /// Returns a data object containing a portion of the data in another data object. + pub fn copy_region(&self, location: usize) -> (Self, usize) { + let mut p = 0; + + let ptr = unsafe { dispatch_data_copy_region(self.ptr, location, &mut p) }; + let data = Data { ptr }; + + (data, p) + } +} + +impl From for Data { + fn from(ptr: dispatch_data_t) -> Self { + if ptr.is_null() { + Data::empty() + } else { + Data { ptr } + } + } +} + +unsafe impl Sync for Data {} +unsafe impl Send for Data {} + +impl Clone for Data { + fn clone(&self) -> Self { + unsafe { + dispatch_retain(self.ptr); + } + Data { ptr: self.ptr } + } +} + +impl Drop for Data { + fn drop(&mut self) { + unsafe { + dispatch_release(self.ptr); + } + } +} + +impl Add for Data { + type Output = Self; + + fn add(self, rhs: Self) -> Self::Output { + self.concat(&rhs) + } +} + +impl Queue { + /// Returns an immutable object representing a contiguous or sparse region of memory. + pub fn data(&self, buf: &[u8]) -> Data { + Data::create(self, buf) + } +} + +#[cfg(test)] +mod tests { + use pretty_env_logger; + + use std::cell::Cell; + use std::mem; + use std::sync::Arc; + + use libc; + + use super::*; + use QueueAttribute; + + #[test] + fn test_data() { + let queue = Queue::main(); + + assert!(!Data::empty().ptr.is_null()); + assert_eq!(Data::empty().len(), 0); + + let data = queue.data(b"hello world"); + + assert_eq!(data.len(), 11); + + let data = queue.data(b"hello") + queue.data(b"world"); + + assert_eq!(data.len(), 10); + + let (left, off) = data.copy_region(2); + + assert_eq!(left.len(), 5); + assert_eq!(off, 0); + + let (right, off) = data.copy_region(8); + + assert_eq!(right.len(), 5); + assert_eq!(off, 5); + + let (data, buf) = data.map(); + + assert_eq!(data.len(), 10); + assert_eq!(buf, b"helloworld"); + + let data = data.subrange(3, 5); + + assert_eq!(data.len(), 5); + + let (data, buf) = data.map(); + + assert_eq!(data.len(), 5); + assert_eq!(buf, b"lowor"); + + let n = Arc::new(Cell::new(0)); + let c = n.clone(); + + assert!(data.apply(move |_data: &Data, offset: usize, buf: &[u8]| { + assert_eq!(offset, 0); + assert_eq!(buf, b"lowor"); + + c.set(buf.len()); + + true + })); + + assert_eq!(n.get(), 5); + } + + #[test] + fn test_free_destructor() { + let queue = Queue::main(); + let p = unsafe { libc::malloc(8) } as *const c_void; + + let data = Data::create_with_destructor(&queue, p, 8, Destructor::Free); + + assert_eq!(data.len(), 8); + + mem::drop(data); + } + + #[test] + fn test_func_destructor() { + let _ = pretty_env_logger::try_init(); + + let q = Queue::create("test", QueueAttribute::Serial); + let buf = "foo"; + + let data = + Data::create_with_destructor(&q, buf.as_ptr() as *const c_void, buf.len(), move || { + trace!("data destructed"); + }); + + assert_eq!(data.len(), 3); + } +} diff --git a/src/ffi.rs b/src/ffi.rs index a40cb0d..d34ac16 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -1,12 +1,27 @@ -#![allow(missing_docs)] -#![allow(non_camel_case_types)] +#![allow(missing_docs, non_camel_case_types, improper_ctypes)] -use std::os::raw::{c_char, c_long, c_ulong, c_void}; +use std::os::raw::{c_char, c_int, c_long, c_uint, c_ulong, c_void}; -pub enum dispatch_object_s { } +use block::{Block, BlockArguments, ConcreteBlock, IntoConcreteBlock}; +use libc::{mode_t, off_t, timespec}; -// dispatch_block_t -pub type dispatch_function_t = extern fn(*mut c_void); +pub fn block(closure: F) -> *const Block +where + A: BlockArguments, + F: 'static + IntoConcreteBlock, +{ + let block = ConcreteBlock::new(closure); + let block = block.copy(); + let ptr = (&*block) as *const _; + let ptr = unsafe { _Block_copy(ptr as *const c_void) }; + + ptr as *const Block +} + +pub enum dispatch_object_s {} + +pub type dispatch_block_t = *const Block<(), ()>; +pub type dispatch_function_t = extern "C" fn(*mut c_void); pub type dispatch_semaphore_t = *mut dispatch_object_s; pub type dispatch_group_t = *mut dispatch_object_s; pub type dispatch_object_t = *mut dispatch_object_s; @@ -14,49 +29,122 @@ pub type dispatch_once_t = c_long; pub type dispatch_queue_t = *mut dispatch_object_s; pub type dispatch_time_t = u64; // dispatch_source_type_t -// dispatch_fd_t -// dispatch_data_t -// dispatch_data_applier_t -// dispatch_io_t -// dispatch_io_handler_t -// dispatch_io_type_t -// dispatch_io_close_flags_t -// dispatch_io_interval_flags_t +pub type dispatch_fd_t = c_int; +pub type dispatch_data_handler_t = *const Block<(dispatch_data_t, c_int), ()>; +pub type dispatch_data_t = *mut dispatch_object_s; +pub type dispatch_data_applier_t = + *const Block<(dispatch_data_t, usize, *const c_void, usize), bool>; +pub type dispatch_io_t = *mut dispatch_object_s; +pub type dispatch_io_handler_t = *const Block<(bool, dispatch_data_t, c_int), ()>; +pub type dispatch_cleanup_handler_t = *const Block<(c_int,), ()>; +pub type dispatch_io_type_t = c_ulong; +pub type dispatch_io_close_flags_t = c_ulong; +pub type dispatch_io_interval_flags_t = c_ulong; pub type dispatch_queue_attr_t = *const dispatch_object_s; +pub type dispatch_block_flags_t = c_ulong; +pub type dispatch_qos_class_t = c_uint; -#[cfg_attr(any(target_os = "macos", target_os = "ios"), - link(name = "System", kind = "dylib"))] -#[cfg_attr(not(any(target_os = "macos", target_os = "ios")), - link(name = "dispatch", kind = "dylib"))] -extern { +#[cfg_attr(any(target_os = "macos", target_os = "ios"), link(name = "System", kind = "dylib"))] +#[cfg_attr( + not(any(target_os = "macos", target_os = "ios")), link(name = "dispatch", kind = "dylib") +)] +extern "C" { static _dispatch_main_q: dispatch_object_s; static _dispatch_queue_attr_concurrent: dispatch_object_s; + pub static _dispatch_data_destructor_free: dispatch_block_t; + pub static _dispatch_data_destructor_munmap: dispatch_block_t; + pub static _dispatch_data_empty: dispatch_object_s; + + pub fn _Block_copy(block: *const c_void) -> *mut c_void; + pub fn _Block_release(block: *mut c_void); + + pub fn qos_class_self() -> dispatch_qos_class_t; + pub fn qos_class_main() -> dispatch_qos_class_t; + + pub fn dispatch_queue_attr_make_initially_inactive( + attr: dispatch_queue_attr_t, + ) -> dispatch_queue_attr_t; + pub fn dispatch_queue_attr_make_with_qos_class( + attr: dispatch_queue_attr_t, + qos_class: dispatch_qos_class_t, + relative_priority: c_int, + ) -> dispatch_queue_attr_t; pub fn dispatch_get_global_queue(identifier: c_long, flags: c_ulong) -> dispatch_queue_t; - pub fn dispatch_queue_create(label: *const c_char, attr: dispatch_queue_attr_t) -> dispatch_queue_t; - // dispatch_queue_attr_t dispatch_queue_attr_make_with_qos_class ( dispatch_queue_attr_t attr, dispatch_qos_class_t qos_class, int relative_priority ); + pub fn dispatch_queue_create( + label: *const c_char, + attr: dispatch_queue_attr_t, + ) -> dispatch_queue_t; pub fn dispatch_queue_get_label(queue: dispatch_queue_t) -> *const c_char; + pub fn dispatch_queue_get_qos_class( + queue: dispatch_queue_t, + relative_priority_ptr: *mut c_int, + ) -> dispatch_qos_class_t; pub fn dispatch_set_target_queue(object: dispatch_object_t, queue: dispatch_queue_t); pub fn dispatch_main(); - // void dispatch_async ( dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_async_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); - // void dispatch_sync ( dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_sync_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); - // void dispatch_after ( dispatch_time_t when, dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_after_f(when: dispatch_time_t, queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); - // void dispatch_apply ( size_t iterations, dispatch_queue_t queue, void (^block)(size_t) ); - pub fn dispatch_apply_f(iterations: usize, queue: dispatch_queue_t, context: *mut c_void, work: extern fn(*mut c_void, usize)); - // void dispatch_once ( dispatch_once_t *predicate, dispatch_block_t block ); - pub fn dispatch_once_f(predicate: *mut dispatch_once_t, context: *mut c_void, function: dispatch_function_t); - - // void dispatch_group_async ( dispatch_group_t group, dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_group_async_f(group: dispatch_group_t, queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_async(queue: dispatch_queue_t, block: dispatch_block_t); + pub fn dispatch_async_f( + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); + pub fn dispatch_sync(queue: dispatch_queue_t, block: dispatch_block_t); + pub fn dispatch_sync_f( + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); + pub fn dispatch_after(when: dispatch_time_t, queue: dispatch_queue_t, block: dispatch_block_t); + pub fn dispatch_after_f( + when: dispatch_time_t, + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); + pub fn dispatch_apply( + iterations: usize, + queue: dispatch_queue_t, + block: *const Block<(usize,), ()>, + ); + pub fn dispatch_apply_f( + iterations: usize, + queue: dispatch_queue_t, + context: *mut c_void, + work: extern "C" fn(*mut c_void, usize), + ); + pub fn dispatch_once(predicate: *mut dispatch_once_t, block: dispatch_block_t); + pub fn dispatch_once_f( + predicate: *mut dispatch_once_t, + context: *mut c_void, + function: dispatch_function_t, + ); + + pub fn dispatch_group_async( + group: dispatch_group_t, + queue: dispatch_queue_t, + block: dispatch_block_t, + ); + pub fn dispatch_group_async_f( + group: dispatch_group_t, + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); pub fn dispatch_group_create() -> dispatch_group_t; pub fn dispatch_group_enter(group: dispatch_group_t); pub fn dispatch_group_leave(group: dispatch_group_t); - // void dispatch_group_notify ( dispatch_group_t group, dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_group_notify_f(group: dispatch_group_t, queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_group_notify( + group: dispatch_group_t, + queue: dispatch_queue_t, + block: dispatch_block_t, + ); + pub fn dispatch_group_notify_f( + group: dispatch_group_t, + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); pub fn dispatch_group_wait(group: dispatch_group_t, timeout: dispatch_time_t) -> c_long; pub fn dispatch_get_context(object: dispatch_object_t) -> *mut c_void; @@ -69,12 +157,21 @@ extern { pub fn dispatch_semaphore_create(value: c_long) -> dispatch_semaphore_t; pub fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) -> c_long; - pub fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) -> c_long; + pub fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) + -> c_long; - // void dispatch_barrier_async ( dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_barrier_async_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); - // void dispatch_barrier_sync ( dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_barrier_sync_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_barrier_async(queue: dispatch_queue_t, block: dispatch_block_t); + pub fn dispatch_barrier_async_f( + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); + pub fn dispatch_barrier_sync(queue: dispatch_queue_t, block: dispatch_block_t); + pub fn dispatch_barrier_sync_f( + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); // void dispatch_source_cancel ( dispatch_source_t source ); // dispatch_source_t dispatch_source_create ( dispatch_source_type_t type, uintptr_t handle, unsigned long mask, dispatch_queue_t queue ); @@ -91,60 +188,208 @@ extern { // void dispatch_source_set_timer ( dispatch_source_t source, dispatch_time_t start, uint64_t interval, uint64_t leeway ); // long dispatch_source_testcancel ( dispatch_source_t source ); - // void dispatch_read ( dispatch_fd_t fd, size_t length, dispatch_queue_t queue, void (^handler)(dispatch_data_t data, int error) ); - // void dispatch_write ( dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue, void (^handler)(dispatch_data_t data, int error) ); - - // dispatch_io_t dispatch_io_create ( dispatch_io_type_t type, dispatch_fd_t fd, dispatch_queue_t queue, void (^cleanup_handler)(int error) ); - // dispatch_io_t dispatch_io_create_with_path ( dispatch_io_type_t type, const char *path, int oflag, mode_t mode, dispatch_queue_t queue, void (^cleanup_handler)(int error) ); - // dispatch_io_t dispatch_io_create_with_io ( dispatch_io_type_t type, dispatch_io_t io, dispatch_queue_t queue, void (^cleanup_handler)(int error) ); - // void dispatch_io_read ( dispatch_io_t channel, off_t offset, size_t length, dispatch_queue_t queue, dispatch_io_handler_t io_handler ); - // void dispatch_io_write ( dispatch_io_t channel, off_t offset, dispatch_data_t data, dispatch_queue_t queue, dispatch_io_handler_t io_handler ); - // void dispatch_io_close ( dispatch_io_t channel, dispatch_io_close_flags_t flags ); - // void dispatch_io_barrier ( dispatch_io_t channel, dispatch_block_t barrier ); - // void dispatch_io_set_high_water ( dispatch_io_t channel, size_t high_water ); - // void dispatch_io_set_low_water ( dispatch_io_t channel, size_t low_water ); - // void dispatch_io_set_interval ( dispatch_io_t channel, uint64_t interval, dispatch_io_interval_flags_t flags ); - // dispatch_fd_t dispatch_io_get_descriptor ( dispatch_io_t channel ); - - // dispatch_data_t dispatch_data_create ( const void *buffer, size_t size, dispatch_queue_t queue, dispatch_block_t destructor ); - // size_t dispatch_data_get_size ( dispatch_data_t data ); - // dispatch_data_t dispatch_data_create_map ( dispatch_data_t data, const void **buffer_ptr, size_t *size_ptr ); - // dispatch_data_t dispatch_data_create_concat ( dispatch_data_t data1, dispatch_data_t data2 ); - // dispatch_data_t dispatch_data_create_subrange ( dispatch_data_t data, size_t offset, size_t length ); - // bool dispatch_data_apply ( dispatch_data_t data, dispatch_data_applier_t applier ); - // dispatch_data_t dispatch_data_copy_region ( dispatch_data_t data, size_t location, size_t *offset_ptr ); + pub fn dispatch_read( + fd: dispatch_fd_t, + length: usize, + queue: dispatch_queue_t, + handler: dispatch_data_handler_t, + ); + pub fn dispatch_write( + fd: dispatch_fd_t, + data: dispatch_data_t, + queue: dispatch_queue_t, + handler: dispatch_data_handler_t, + ); + + pub fn dispatch_io_create( + io_type: dispatch_io_type_t, + fd: dispatch_fd_t, + queue: dispatch_queue_t, + cleanup_handler: dispatch_cleanup_handler_t, + ) -> dispatch_io_t; + pub fn dispatch_io_create_with_path( + io_type: dispatch_io_type_t, + path: *const c_char, + oflag: c_int, + mode: mode_t, + queue: dispatch_queue_t, + cleanup_handler: dispatch_cleanup_handler_t, + ) -> dispatch_io_t; + pub fn dispatch_io_create_with_io( + io_type: dispatch_io_type_t, + io: dispatch_io_t, + queue: dispatch_queue_t, + cleanup_handler: dispatch_cleanup_handler_t, + ) -> dispatch_io_t; + + pub fn dispatch_io_read( + channel: dispatch_io_t, + offset: off_t, + length: usize, + queue: dispatch_queue_t, + io_handler: dispatch_io_handler_t, + ); + pub fn dispatch_io_write( + channel: dispatch_io_t, + offset: off_t, + data: dispatch_data_t, + queue: dispatch_queue_t, + io_handler: dispatch_io_handler_t, + ); + pub fn dispatch_io_close(channel: dispatch_io_t, flags: dispatch_io_close_flags_t); + pub fn dispatch_io_barrier(channel: dispatch_io_t, barrier: dispatch_block_t); + pub fn dispatch_io_set_high_water(channel: dispatch_io_t, high_water: usize); + pub fn dispatch_io_set_low_water(channel: dispatch_io_t, low_water: usize); + pub fn dispatch_io_set_interval( + channel: dispatch_io_t, + interval: u64, + flags: dispatch_io_interval_flags_t, + ); + pub fn dispatch_io_get_descriptor(channel: dispatch_io_t) -> dispatch_fd_t; + + pub fn dispatch_data_create( + buffer: *const c_void, + size: usize, + queue: dispatch_queue_t, + destructor: dispatch_block_t, + ) -> dispatch_data_t; + pub fn dispatch_data_get_size(data: dispatch_data_t) -> usize; + pub fn dispatch_data_create_map( + data: dispatch_data_t, + buffer_ptr: *const *mut c_void, + size_ptr: *mut usize, + ) -> dispatch_data_t; + pub fn dispatch_data_create_concat( + data1: dispatch_data_t, + data2: dispatch_data_t, + ) -> dispatch_data_t; + pub fn dispatch_data_create_subrange( + data: dispatch_data_t, + offset: usize, + length: usize, + ) -> dispatch_data_t; + pub fn dispatch_data_apply(data: dispatch_data_t, applier: dispatch_data_applier_t) -> bool; + pub fn dispatch_data_copy_region( + data: dispatch_data_t, + location: usize, + offset_ptr: *mut usize, + ) -> dispatch_data_t; pub fn dispatch_time(when: dispatch_time_t, delta: i64) -> dispatch_time_t; - // dispatch_time_t dispatch_walltime( const struct timespec *when, int64_t delta); + pub fn dispatch_walltime(when: *const timespec, delta: i64) -> dispatch_time_t; // void dispatch_queue_set_specific ( dispatch_queue_t queue, const void *key, void *context, dispatch_function_t destructor ); // void * dispatch_queue_get_specific ( dispatch_queue_t queue, const void *key ); // void * dispatch_get_specific ( const void *key ); - // dispatch_block_t dispatch_block_create(dispatch_block_flags_t flags, dispatch_block_t block); - // dispatch_block_t dispatch_block_create_with_qos_class(dispatch_block_flags_t flags, dispatch_qos_class_t qos_class, int relative_priority, dispatch_block_t block); - // void dispatch_block_perform(dispatch_block_flags_t flags, dispatch_block_t block); - // long dispatch_block_wait(dispatch_block_t block, dispatch_time_t timeout); - // dispatch_block_notify(dispatch_block_t block, dispatch_queue_t queue, dispatch_block_t notification_block); - // void dispatch_block_cancel(dispatch_block_t block); - // long dispatch_block_testcancel(dispatch_block_t block); + pub fn dispatch_block_create( + flags: dispatch_block_flags_t, + block: dispatch_block_t, + ) -> dispatch_block_t; + pub fn dispatch_block_create_with_qos_class( + flags: dispatch_block_flags_t, + qos_class: dispatch_qos_class_t, + relative_priority: c_int, + block: dispatch_block_t, + ) -> dispatch_block_t; + pub fn dispatch_block_perform(flags: dispatch_block_flags_t, block: dispatch_block_t); + pub fn dispatch_block_wait(block: dispatch_block_t, timeout: dispatch_time_t) -> c_long; + pub fn dispatch_block_notify( + block: dispatch_block_t, + queue: dispatch_queue_t, + notification_block: dispatch_block_t, + ); + pub fn dispatch_block_cancel(block: dispatch_block_t); + pub fn dispatch_block_testcancel(block: dispatch_block_t) -> c_long; } pub fn dispatch_get_main_queue() -> dispatch_queue_t { unsafe { &_dispatch_main_q as *const _ as dispatch_queue_t } } +/// A QOS class which indicates work performed by this thread is interactive with the user. +pub const QOS_CLASS_USER_INTERACTIVE: dispatch_qos_class_t = 0x21; +/// A QOS class which indicates work performed by this thread was initiated by the user +/// and that the user is likely waiting for the results. +pub const QOS_CLASS_USER_INITIATED: dispatch_qos_class_t = 0x19; +/// A default QOS class used by the system in cases where more specific QOS class information is not available. +pub const QOS_CLASS_DEFAULT: dispatch_qos_class_t = 0x15; +/// A QOS class which indicates work performed by this thread may or may not be initiated by the user +/// and that the user is unlikely to be immediately waiting for the results. +pub const QOS_CLASS_UTILITY: dispatch_qos_class_t = 0x11; +/// A QOS class which indicates work performed by this thread was not initiated by the user +/// and that the user may be unaware of the results. +pub const QOS_CLASS_BACKGROUND: dispatch_qos_class_t = 0x09; +/// A QOS class value which indicates the absence or removal of QOS class information. +pub const QOS_CLASS_UNSPECIFIED: dispatch_qos_class_t = 0x00; + +/// The queue executes blocks serially in FIFO order. pub const DISPATCH_QUEUE_SERIAL: dispatch_queue_attr_t = 0 as dispatch_queue_attr_t; -pub static DISPATCH_QUEUE_CONCURRENT: &'static dispatch_object_s = unsafe { &_dispatch_queue_attr_concurrent }; +/// The queue executes blocks concurrently. +pub static DISPATCH_QUEUE_CONCURRENT: &'static dispatch_object_s = + unsafe { &_dispatch_queue_attr_concurrent }; -pub const DISPATCH_QUEUE_PRIORITY_HIGH: c_long = 2; -pub const DISPATCH_QUEUE_PRIORITY_DEFAULT: c_long = 0; -pub const DISPATCH_QUEUE_PRIORITY_LOW: c_long = -2; +/// Items dispatched to the queue will run at high priority, +/// i.e. the queue will be scheduled for execution +/// before any default priority or low priority queue. +pub const DISPATCH_QUEUE_PRIORITY_HIGH: c_long = 2; +/// Items dispatched to the queue will run at the default priority, +/// i.e. the queue will be scheduled for execution +/// after all high priority queues have been scheduled, +/// but before any low priority queues have been scheduled. +pub const DISPATCH_QUEUE_PRIORITY_DEFAULT: c_long = 0; +/// Items dispatched to the queue will run at low priority, +/// i.e. the queue will be scheduled for execution +/// after all default priority and high priority queues have been scheduled. +pub const DISPATCH_QUEUE_PRIORITY_LOW: c_long = -2; +/// Items dispatched to the queue will run at background priority, +/// i.e. the queue will be scheduled for execution +/// after all higher priority queues have been scheduled +/// and the system will run items on this queue on a thread +/// with background status as per setpriority(2) +/// (i.e. disk I/O is throttled and the thread's scheduling priority is set to lowest value). pub const DISPATCH_QUEUE_PRIORITY_BACKGROUND: c_long = -1 << 15; -pub const DISPATCH_TIME_NOW: dispatch_time_t = 0; +/// A `dispatch_time_t` corresponding to the current time. +pub const DISPATCH_TIME_NOW: dispatch_time_t = 0; +/// A `dispatch_time_t` corresponding to the maximum time. pub const DISPATCH_TIME_FOREVER: dispatch_time_t = !0; +/// A dispatch I/O channel representing a stream of bytes. +pub const DISPATCH_IO_STREAM: dispatch_io_type_t = 0; +/// A dispatch I/O channel representing a random access file. +pub const DISPATCH_IO_RANDOM: dispatch_io_type_t = 1; + +/// Stop outstanding operations on a channel when the channel is closed. +pub const DISPATCH_IO_STOP: dispatch_io_close_flags_t = 0x1; + +/// Enqueue I/O handlers at a channel's interval setting +/// even if the amount of data ready to be delivered +/// is inferior to the low water mark (or zero). +pub const DISPATCH_IO_STRICT_INTERVAL: dispatch_io_interval_flags_t = 0x1; + +/// Flag indicating that a dispatch block object should act as a barrier block +/// when submitted to a DISPATCH_QUEUE_CONCURRENT queue. +pub const DISPATCH_BLOCK_BARRIER: dispatch_block_flags_t = 0x1; +/// Flag indicating that a dispatch block object should execute disassociated +/// from current execution context attributes such as QOS class, os_activity_t +/// and properties of the current IPC request (if any). +pub const DISPATCH_BLOCK_DETACHED: dispatch_block_flags_t = 0x2; +/// Flag indicating that a dispatch block object should be assigned the execution +/// context attributes that are current at the time the block object is created. +pub const DISPATCH_BLOCK_ASSIGN_CURRENT: dispatch_block_flags_t = 0x4; +/// Flag indicating that a dispatch block object should be not be assigned a QOS class. +pub const DISPATCH_BLOCK_NO_QOS_CLASS: dispatch_block_flags_t = 0x8; +/// Flag indicating that execution of a dispatch block object submitted to +/// a queue should prefer the QOS class assigned to the queue over the QOS class +/// assigned to the block (resp. associated with the block at the time of submission). +pub const DISPATCH_BLOCK_INHERIT_QOS_CLASS: dispatch_block_flags_t = 0x10; +/// Flag indicating that execution of a dispatch block object submitted to +/// a queue should prefer the QOS class assigned to the block (resp. associated +/// with the block at the time of submission) over the QOS class assigned to +/// the queue, as long as doing so will not result in a lower QOS class. +pub const DISPATCH_BLOCK_ENFORCE_QOS_CLASS: dispatch_block_flags_t = 0x20; + #[cfg(test)] mod tests { use super::*; @@ -154,7 +399,7 @@ mod tests { use std::os::raw::c_void; use std::ptr; - extern fn serial_queue_test_add(num: *mut c_void) { + extern "C" fn serial_queue_test_add(num: *mut c_void) { unsafe { *(num as *mut u32) = 1; } diff --git a/src/io.rs b/src/io.rs new file mode 100644 index 0000000..b99b733 --- /dev/null +++ b/src/io.rs @@ -0,0 +1,541 @@ +use std::ffi::CStr; +use std::io; +use std::os::raw::c_int; +use std::os::unix::ffi::OsStrExt; +use std::os::unix::io::{AsRawFd, IntoRawFd}; +use std::path::Path; +use std::ptr; +use std::time::Duration; + +use libc::{mode_t, off_t}; + +use ffi::*; +use {Data, Queue}; + +bitflags! { + /// The type of flags you can set on a `Channel::close()` call + pub struct CloseFlags: dispatch_io_close_flags_t { + /// Stop outstanding operations on a channel when the channel is closed. + const STOP = DISPATCH_IO_STOP; + } +} + +bitflags! { + /// Type of flags to set on `Channel::set_interval()` call + pub struct IntervalFlags: dispatch_io_interval_flags_t { + /// Enqueue I/O handlers at a channel's interval setting + /// even if the amount of data ready to be delivered is inferior to + /// the low water mark (or zero). + const STRICT_INTERVAL = DISPATCH_IO_STRICT_INTERVAL; + } +} + +/// The type of a dispatch I/O channel. +#[cfg_attr(target_os = "macos", repr(u64))] +#[cfg_attr(target_os = "ios", repr(u32))] +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum ChannelType { + /// A dispatch I/O channel representing a stream of bytes. + Stream = DISPATCH_IO_STREAM, + /// A dispatch I/O channel representing a random access file. + Random = DISPATCH_IO_RANDOM, +} + +/// A dispatch I/O channel represents the asynchronous I/O policy applied to a file descriptor. +pub struct Channel { + ptr: dispatch_io_t, +} + +impl Channel { + /// Create a dispatch I/O channel associated with a file descriptor. + /// + /// The system takes control of the file descriptor until the channel is closed, + /// an error occurs on the file descriptor or all references to the channel are released. + /// At that time the specified cleanup handler will be enqueued + /// and control over the file descriptor relinquished. + pub fn open( + channel_type: ChannelType, + f: F, + queue: &Queue, + cleanup_handler: Option, + ) -> Self + where + F: IntoRawFd, + H: 'static + Fn(i32), + { + let fd = f.into_raw_fd() as dispatch_fd_t; + + let ptr = unsafe { + dispatch_io_create( + channel_type as dispatch_io_type_t, + fd, + queue.ptr, + cleanup_handler.map_or(ptr::null(), block), + ) + }; + + debug!( + "create {:?} channel on queue `{}` with fd: {:?}, channel: {:?}", + channel_type, + queue.label(), + fd, + ptr + ); + + Channel { ptr } + } + + /// Create a dispatch I/O channel associated with a path name. + /// + /// The specified path, oflag and mode parameters will be passed to open(2) + /// when the first I/O operation on the channel is ready to execute + /// and the resulting file descriptor will remain open + /// and under the control of the system until the channel is closed, + /// an error occurs on the file descriptor or all references to the channel are released. + /// At that time the file descriptor will be closed + /// and the specified cleanup handler will be enqueued. + pub fn create( + channel_type: ChannelType, + path: P, + flags: c_int, + mode: mode_t, + queue: &Queue, + cleanup_handler: Option, + ) -> Self + where + P: AsRef, + H: 'static + Fn(i32), + { + let path = path.as_ref(); + let mut v = path.as_os_str().as_bytes().to_vec(); + v.push(0); + let s = unsafe { CStr::from_bytes_with_nul_unchecked(&v) }; + let ptr = unsafe { + dispatch_io_create_with_path( + channel_type as dispatch_io_type_t, + s.as_ptr(), + flags, + mode, + queue.ptr, + cleanup_handler.map_or(ptr::null(), block), + ) + }; + + debug!( + "create {:?} channel on queue `{}` with path name {:?}, channel: {:?}", + channel_type, + queue.label(), + path, + ptr + ); + + Channel { ptr } + } + + /// Create a new stream dispatch I/O channel from an existing dispatch I/O channel. + /// + /// The new channel inherits the file descriptor or path name associated with the existing channel, + /// but not its channel type or policies. + pub fn open_as_stream(&self, queue: &Queue, cleanup_handler: Option) -> Self + where + H: 'static + Fn(i32), + { + let ptr = unsafe { + dispatch_io_create_with_io( + DISPATCH_IO_STREAM, + self.ptr, + queue.as_raw(), + cleanup_handler.map_or(ptr::null(), block), + ) + }; + + debug!( + "create stream channel on queue `{}` with fd: {:?}, channel: {:?}", + queue.label(), + self.descriptor(), + ptr + ); + + Channel { ptr } + } + + /// Create a new random dispatch I/O channel from an existing dispatch I/O channel. + /// + /// The new channel inherits the file descriptor or path name associated with the existing channel, + /// but not its channel type or policies. + pub fn open_as_file(&self, queue: &Queue, cleanup_handler: Option) -> Self + where + H: 'static + Fn(i32), + { + let ptr = unsafe { + dispatch_io_create_with_io( + DISPATCH_IO_RANDOM, + self.ptr, + queue.as_raw(), + cleanup_handler.map_or(ptr::null(), block), + ) + }; + + debug!( + "create random channel on queue `{}` with fd: {:?}, channel: {:?}", + queue.label(), + self.descriptor(), + ptr, + ); + + Channel { ptr } + } + + /// Schedule a read operation for asynchronous execution on the specified I/O channel. + /// The I/O handler is enqueued one or more times depending on the general load of the system + /// and the policy specified on the I/O channel. + pub fn async_read(&self, offset: isize, length: usize, queue: &Queue, io_handler: H) + where + H: 'static + Fn(io::Result<(Data, bool)>), + { + debug!( + "read {} bytes at offset {}, channel: {:?}", + length, offset, self.ptr + ); + + let fd = self.descriptor(); + let handler = block(move |done: bool, ptr: dispatch_data_t, error: i32| { + let result = if error == 0 { + unsafe { dispatch_retain(ptr) }; + + let data = Data::borrow(ptr); + + trace!("read {} bytes with fd: {:?}", data.len(), fd); + + Ok((data, done)) + } else { + trace!("read failed, error: {}", error); + + Err(io::Error::from_raw_os_error(error)) + }; + + io_handler(result) + }); + + unsafe { dispatch_io_read(self.ptr, offset as off_t, length, queue.as_raw(), handler) } + } + + /// Schedule a write operation for asynchronous execution on the specified I/O channel. + /// The I/O handler is enqueued one or more times depending on the general load of the system + /// and the policy specified on the I/O channel. + pub fn async_write(&self, offset: isize, data: &Data, queue: &Queue, io_handler: H) + where + H: 'static + Fn(io::Result<(Data, bool)>), + { + debug!( + "write {} bytes at offset {}, channel: {:?}", + data.len(), + offset, + self.ptr + ); + + let fd = self.descriptor(); + let handler = block(move |done: bool, ptr: dispatch_data_t, error: i32| { + let result = if done { + let data = Data::borrow(ptr); + + trace!( + "write finished with fd: {:?}, remaining {} bytes", + fd, + data.len() + ); + + Ok((data, done)) + } else { + trace!("write failed, error: {}", error); + + Err(io::Error::from_raw_os_error(error)) + }; + + io_handler(result) + }); + + unsafe { + dispatch_io_write( + self.ptr, + offset as off_t, + data.as_raw(), + queue.as_raw(), + handler, + ) + } + } + + /// Close the specified I/O channel to new read or write operations; + /// scheduling operations on a closed channel results in their handler returning an error. + pub fn close(&self, flags: CloseFlags) { + debug!("close channel: {:?}", self.ptr); + + unsafe { dispatch_io_close(self.ptr, flags.bits()) } + } + + /// Schedule a barrier operation on the specified I/O channel; + /// all previously scheduled operations on the channel will complete + /// before the provided barrier block is enqueued onto the global queue + /// determined by the channel's target queue, + /// and no subsequently scheduled operations will start + /// until the barrier block has returned. + pub fn barrier_async(&self, barrier: F) + where + F: 'static + Fn(), + { + debug!("schedule barrier, channel: {:?}", self.ptr); + + unsafe { dispatch_io_barrier(self.ptr, block(barrier)) } + } + + /// Returns the file descriptor underlying a dispatch I/O channel. + pub fn descriptor(&self) -> dispatch_fd_t { + unsafe { dispatch_io_get_descriptor(self.ptr) } + } + + /// Set a high water mark on the I/O channel for all operations. + pub fn set_high_water(&self, high_water: usize) { + debug!( + "set high water mark to {}, channel: {:?}", + high_water, self.ptr + ); + + unsafe { dispatch_io_set_high_water(self.ptr, high_water) } + } + + /// Set a low water mark on the I/O channel for all operations. + pub fn set_low_water(&self, low_water: usize) { + debug!( + "set low water mark to {}, channel: {:?}", + low_water, self.ptr + ); + + unsafe { dispatch_io_set_low_water(self.ptr, low_water) } + } + + /// Set a interval at which I/O handlers are to be enqueued + /// on the I/O channel for all operations. + pub fn set_interval(&self, internal: Duration, flags: IntervalFlags) { + debug!("set internal to {:?}, channel: {:?}", internal, self.ptr); + + unsafe { + dispatch_io_set_interval( + self.ptr, + internal + .as_secs() + .checked_mul(1_000_000_000) + .and_then(|dur| dur.checked_add(internal.subsec_nanos() as u64)) + .unwrap_or(u64::max_value()), + flags.bits(), + ) + } + } +} + +unsafe impl Sync for Channel {} +unsafe impl Send for Channel {} + +impl Clone for Channel { + fn clone(&self) -> Self { + unsafe { + dispatch_retain(self.ptr); + } + Channel { ptr: self.ptr } + } +} + +impl Drop for Channel { + fn drop(&mut self) { + unsafe { + dispatch_release(self.ptr); + } + } +} + +impl Queue { + /// A dispatch I/O channel representing a stream of bytes. + /// + /// Read and write operations on a channel of this type + /// are performed serially (in order of creation) and + /// read/write data at the file pointer position + /// that is current at the time the operation starts executing. + /// Operations of different type (read vs. write) may be performed simultaneously. + /// Offsets passed to operations on a channel of this type are ignored. + pub fn open_stream(&self, f: F, cleanup_handler: Option) -> Channel + where + F: IntoRawFd, + H: 'static + Fn(i32), + { + Channel::open(ChannelType::Stream, f, self, cleanup_handler) + } + + /// A dispatch I/O channel representing a random access file. + /// + /// Read and write operations on a channel of this type + /// may be performed concurrently and read/write data at the specified offset. + /// Offsets are interpreted relative to the file pointer + /// position current at the time the I/O channel is created. + /// Attempting to create a channel of this type for a file descriptor + /// that is not seekable will result in an error. + pub fn open_file(&self, f: F, cleanup_handler: Option) -> Channel + where + F: IntoRawFd, + H: 'static + Fn(i32), + { + Channel::open(ChannelType::Random, f, self, cleanup_handler) + } + + /// Schedule a read operation for asynchronous execution on the specified file descriptor. + /// The specified handler is enqueued with the data read from the file descriptor + /// when the operation has completed or an error occurs. + pub fn async_read(&self, f: &F, length: usize, handler: H) + where + F: AsRawFd, + H: 'static + Fn(io::Result), + { + let fd = f.as_raw_fd() as dispatch_fd_t; + + debug!("read {} bytes with fd: {:?}", length, fd); + + let handler = block(move |ptr: dispatch_data_t, error: i32| { + let result = if error == 0 { + let data = Data::borrow(ptr); + + trace!("read {} bytes with fd: {:?}", data.len(), fd); + + Ok(data) + } else { + trace!("read failed, error: {}", error); + + Err(io::Error::from_raw_os_error(error)) + }; + + handler(result) + }); + + unsafe { dispatch_read(fd, length, self.as_raw(), handler) } + } + + /// Schedule a write operation for asynchronous execution on the specified file descriptor. + /// The specified handler is enqueued when the operation has completed or an error occurs. + pub fn async_write(&self, f: &F, data: &Data, handler: H) + where + F: AsRawFd, + H: 'static + Fn(io::Result), + { + let fd = f.as_raw_fd() as dispatch_fd_t; + + debug!("write {} bytes with fd: {:?}", data.len(), fd); + + let handler = block(move |ptr: dispatch_data_t, error: i32| { + let result = if error == 0 { + let data = Data::borrow(ptr); + + trace!( + "write finished with fd: {:?}, remaining {} bytes", + fd, + data.len() + ); + + Ok(data) + } else { + trace!("write failed, error: {}", error); + + Err(io::Error::from_raw_os_error(error)) + }; + + handler(result) + }); + + unsafe { dispatch_write(fd, data.as_raw(), self.as_raw(), handler) } + } +} + +#[cfg(test)] +mod tests { + use std::io::{Seek, SeekFrom, Write}; + use std::mem; + use std::sync::{Arc, Barrier}; + + use pretty_env_logger; + use tempfile::tempfile; + + use super::*; + use QueueAttribute; + + #[test] + fn test_stream_channel() { + let _ = pretty_env_logger::try_init(); + + let f = tempfile().unwrap(); + let q = Queue::create("test", QueueAttribute::Serial); + let c = q.open_file(f, Some(|error| trace!("cleanup channel, error: {}", error))); + + let data = q.data(b"hello world"); + + c.async_write(0, &data, &q, move |result| match result { + Ok((data, done)) => { + assert!(data.is_empty()); + assert!(done); + } + Err(err) => warn!("write channel failed, {}", err), + }); + + c.async_read(0, 5, &q, move |result| match result { + Ok((data, done)) => { + assert_eq!(data.len(), 5); + assert!(done); + } + Err(err) => warn!("read channel failed, {}", err), + }); + + let barrier = Arc::new(Barrier::new(2)); + let b = barrier.clone(); + c.barrier_async(move || { + trace!("force sync up channel"); + + b.wait(); + }); + + barrier.wait(); + + trace!("sync up channel finished"); + + c.close(CloseFlags::empty()); + } + + #[test] + fn test_stream_operations() { + let _ = pretty_env_logger::try_init(); + + let mut f = tempfile().unwrap(); + let q = Queue::create("test", QueueAttribute::Concurrent); + f.write_all(b"hello world").unwrap(); + f.seek(SeekFrom::Start(0)).unwrap(); + f.sync_all().unwrap(); + + let barrier = Arc::new(Barrier::new(2)); + let b = barrier.clone(); + q.async_read(&f, 5, move |result| { + match result { + Ok(data) => { + assert_eq!(data.len(), 5); + } + Err(err) => warn!("read file failed, {}", err), + } + + b.wait(); + }); + + q.barrier_async(move || { + trace!("force sync up queue"); + }); + + barrier.wait(); + + trace!("sync up queue finished"); + + mem::drop(q); + } +} diff --git a/src/lib.rs b/src/lib.rs index e3be8dc..19726d2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,6 +44,18 @@ assert!(nums[0] == "2"); #![warn(missing_docs)] +extern crate block; +extern crate libc; +#[macro_use] +extern crate log; +#[macro_use] +extern crate bitflags; + +#[cfg(test)] +extern crate pretty_env_logger; +#[cfg(test)] +extern crate tempfile; + use std::cell::UnsafeCell; use std::ffi::{CStr, CString}; use std::mem; @@ -57,13 +69,59 @@ use ffi::*; /// Raw foreign function interface for libdispatch. pub mod ffi; +#[cfg(any(target_os = "macos", target_os = "ios"))] +mod blk; +#[cfg(any(target_os = "macos", target_os = "ios"))] +mod data; +#[cfg(any(target_os = "macos", target_os = "ios"))] +mod io; +mod qos; +#[cfg(any(target_os = "macos", target_os = "ios"))] +mod sem; +mod time; + +#[cfg(any(target_os = "macos", target_os = "ios"))] +pub use blk::{perform, BlockFlags, DispatchBlock}; +#[cfg(any(target_os = "macos", target_os = "ios"))] +pub use data::{Data, Destructor, IntoDestructor}; +#[cfg(any(target_os = "macos", target_os = "ios"))] +pub use io::{Channel, ChannelType, CloseFlags, IntervalFlags}; +pub use qos::QosClass; +#[cfg(any(target_os = "macos", target_os = "ios"))] +pub use sem::Semaphore; +pub use time::{after, at, IntoTimeout, WaitTimeout, FOREVER, NOW}; + /// The type of a dispatch queue. -#[derive(Clone, Debug, Hash, PartialEq)] +#[derive(Debug, Hash, PartialEq)] pub enum QueueAttribute { /// The queue executes blocks serially in FIFO order. Serial, /// The queue executes blocks concurrently. Concurrent, + /// Attribute for dispatch queues. + Value(dispatch_queue_attr_t), +} + +impl Clone for QueueAttribute { + fn clone(&self) -> Self { + match *self { + QueueAttribute::Serial => QueueAttribute::Serial, + QueueAttribute::Concurrent => QueueAttribute::Concurrent, + QueueAttribute::Value(attr) => { + unsafe { dispatch_retain(attr as *mut _) }; + + QueueAttribute::Value(attr) + } + } + } +} + +impl Drop for QueueAttribute { + fn drop(&mut self) { + if let &mut QueueAttribute::Value(attr) = self { + unsafe { dispatch_release(attr as *mut _) } + } + } } impl QueueAttribute { @@ -72,6 +130,7 @@ impl QueueAttribute { match *self { QueueAttribute::Serial => DISPATCH_QUEUE_SERIAL, QueueAttribute::Concurrent => DISPATCH_QUEUE_CONCURRENT, + QueueAttribute::Value(attr) => attr, } } @@ -82,6 +141,30 @@ impl QueueAttribute { // Back then, the attr for dispatch_queue_create must be NULL. ptr::null() } + + /// Returns an attribute value which may be provided to `Queue::create` or `Queue::with_target_queue`, + /// in order to make the created queue initially inactive. + #[cfg(any(target_os = "macos", target_os = "ios"))] + pub fn inactive(self) -> Self { + let attr = unsafe { dispatch_queue_attr_make_initially_inactive(self.as_raw()) }; + + QueueAttribute::Value(attr) + } + + /// Returns an attribute value which may be provided to `Queue::create` or `Queue::with_target_queue`, + /// in order to assign a QOS class and relative priority to the queue. + #[cfg(any(target_os = "macos", target_os = "ios"))] + pub fn with_qos_class(self, qos_class: QosClass, relative_priority: i32) -> Self { + let attr = unsafe { + dispatch_queue_attr_make_with_qos_class( + self.as_raw(), + qos_class as dispatch_qos_class_t, + relative_priority, + ) + }; + + QueueAttribute::Value(attr) + } } /// The priority of a global concurrent queue. @@ -106,9 +189,9 @@ pub enum QueuePriority { impl QueuePriority { fn as_raw(&self) -> c_long { match *self { - QueuePriority::High => DISPATCH_QUEUE_PRIORITY_HIGH, - QueuePriority::Default => DISPATCH_QUEUE_PRIORITY_DEFAULT, - QueuePriority::Low => DISPATCH_QUEUE_PRIORITY_LOW, + QueuePriority::High => DISPATCH_QUEUE_PRIORITY_HIGH, + QueuePriority::Default => DISPATCH_QUEUE_PRIORITY_DEFAULT, + QueuePriority::Low => DISPATCH_QUEUE_PRIORITY_LOW, QueuePriority::Background => DISPATCH_QUEUE_PRIORITY_BACKGROUND, } } @@ -122,58 +205,54 @@ pub struct Queue { ptr: dispatch_queue_t, } -fn time_after_delay(delay: Duration) -> dispatch_time_t { - delay.as_secs().checked_mul(1_000_000_000).and_then(|i| { - i.checked_add(delay.subsec_nanos() as u64) - }).and_then(|i| { - if i < (i64::max_value() as u64) { Some(i as i64) } else { None } - }).map_or(DISPATCH_TIME_FOREVER, |i| unsafe { - dispatch_time(DISPATCH_TIME_NOW, i) - }) -} - fn context_and_function(closure: F) -> (*mut c_void, dispatch_function_t) - where F: FnOnce() { - extern fn work_execute_closure(context: Box) where F: FnOnce() { +where + F: FnOnce(), +{ + extern "C" fn work_execute_closure(context: Box) + where + F: FnOnce(), + { (*context)(); } let closure = Box::new(closure); - let func: extern fn(Box) = work_execute_closure::; - unsafe { - (mem::transmute(closure), mem::transmute(func)) - } + let func: extern "C" fn(Box) = work_execute_closure::; + unsafe { (mem::transmute(closure), mem::transmute(func)) } } -fn context_and_sync_function(closure: &mut Option) -> - (*mut c_void, dispatch_function_t) - where F: FnOnce() { - extern fn work_read_closure(context: &mut Option) where F: FnOnce() { +fn context_and_sync_function(closure: &mut Option) -> (*mut c_void, dispatch_function_t) +where + F: FnOnce(), +{ + extern "C" fn work_read_closure(context: &mut Option) + where + F: FnOnce(), + { // This is always passed Some, so it's safe to unwrap let closure = context.take().unwrap(); closure(); } let context: *mut Option = closure; - let func: extern fn(&mut Option) = work_read_closure::; - unsafe { - (context as *mut c_void, mem::transmute(func)) - } + let func: extern "C" fn(&mut Option) = work_read_closure::; + unsafe { (context as *mut c_void, mem::transmute(func)) } } -fn context_and_apply_function(closure: &F) -> - (*mut c_void, extern fn(*mut c_void, usize)) - where F: Fn(usize) { - extern fn work_apply_closure(context: &F, iter: usize) - where F: Fn(usize) { +fn context_and_apply_function(closure: &F) -> (*mut c_void, extern "C" fn(*mut c_void, usize)) +where + F: Fn(usize), +{ + extern "C" fn work_apply_closure(context: &F, iter: usize) + where + F: Fn(usize), + { context(iter); } let context: *const F = closure; - let func: extern fn(&F, usize) = work_apply_closure::; - unsafe { - (context as *mut c_void, mem::transmute(func)) - } + let func: extern "C" fn(&F, usize) = work_apply_closure::; + unsafe { (context as *mut c_void, mem::transmute(func)) } } impl Queue { @@ -200,19 +279,21 @@ impl Queue { /// Creates a new dispatch `Queue`. pub fn create(label: &str, attr: QueueAttribute) -> Self { let label = CString::new(label).unwrap(); - let queue = unsafe { - dispatch_queue_create(label.as_ptr(), attr.as_raw()) - }; + let queue = unsafe { dispatch_queue_create(label.as_ptr(), attr.as_raw()) }; Queue { ptr: queue } } + /// Extracts the raw dispatch queue object. + pub fn as_raw(&self) -> dispatch_queue_t { + self.ptr + } + /// Creates a new dispatch `Queue` with the given target queue. /// /// A dispatch queue's priority is inherited from its target queue. /// Additionally, if both the queue and its target are serial queues, /// their blocks will not be invoked concurrently. - pub fn with_target_queue(label: &str, attr: QueueAttribute, target: &Queue) - -> Self { + pub fn with_target_queue(label: &str, attr: QueueAttribute, target: &Queue) -> Self { let queue = Queue::create(label, attr); unsafe { dispatch_set_target_queue(queue.ptr, target.ptr); @@ -232,9 +313,23 @@ impl Queue { str::from_utf8(label.to_bytes()).unwrap() } + /// Returns the QOS class and relative priority of the given queue. + #[cfg(any(target_os = "macos", target_os = "ios"))] + pub fn qos_class(&self) -> (QosClass, i32) { + let mut relative_priority = 0; + + let qos_class = + unsafe { dispatch_queue_get_qos_class(self.ptr, &mut relative_priority) }.into(); + + (qos_class, relative_priority) + } + /// Submits a closure for execution on self and waits until it completes. pub fn sync(&self, work: F) -> T - where F: Send + FnOnce() -> T, T: Send { + where + F: Send + FnOnce() -> T, + T: Send, + { let mut result = None; { let result_ref = &mut result; @@ -254,7 +349,10 @@ impl Queue { /// Submits a closure for asynchronous execution on self and returns /// immediately. - pub fn async(&self, work: F) where F: 'static + Send + FnOnce() { + pub fn async(&self, work: F) + where + F: 'static + Send + FnOnce(), + { let (context, work) = context_and_function(work); unsafe { dispatch_async_f(self.ptr, context, work); @@ -264,15 +362,20 @@ impl Queue { /// After the specified delay, submits a closure for asynchronous execution /// on self. pub fn after_ms(&self, ms: u32, work: F) - where F: 'static + Send + FnOnce() { + where + F: 'static + Send + FnOnce(), + { self.after(Duration::from_millis(ms as u64), work); } /// After the specified delay, submits a closure for asynchronous execution /// on self. - pub fn after(&self, delay: Duration, work: F) - where F: 'static + Send + FnOnce() { - let when = time_after_delay(delay); + pub fn after(&self, delay: T, work: F) + where + F: 'static + Send + FnOnce(), + T: IntoTimeout, + { + let when = delay.into_raw(); let (context, work) = context_and_function(work); unsafe { dispatch_after_f(when, self.ptr, context, work); @@ -282,7 +385,9 @@ impl Queue { /// Submits a closure to be executed on self the given number of iterations /// and waits until it completes. pub fn apply(&self, iterations: usize, work: F) - where F: Sync + Fn(usize) { + where + F: Sync + Fn(usize), + { let (context, work) = context_and_apply_function(&work); unsafe { dispatch_apply_f(iterations, self.ptr, context, work); @@ -292,7 +397,10 @@ impl Queue { /// Submits a closure to be executed on self for each element of the /// provided slice and waits until it completes. pub fn foreach(&self, slice: &mut [T], work: F) - where F: Sync + Fn(&mut T), T: Send { + where + F: Sync + Fn(&mut T), + T: Send, + { let slice_ptr = slice.as_mut_ptr(); let work = move |i| unsafe { work(&mut *slice_ptr.offset(i as isize)); @@ -306,7 +414,11 @@ impl Queue { /// Submits a closure to be executed on self for each element of the /// provided vector and returns a `Vec` of the mapped elements. pub fn map(&self, vec: Vec, work: F) -> Vec - where F: Sync + Fn(T) -> U, T: Send, U: Send { + where + F: Sync + Fn(T) -> U, + T: Send, + U: Send, + { let mut src = vec; let len = src.len(); let src_ptr = src.as_ptr(); @@ -341,7 +453,10 @@ impl Queue { /// If self is a serial queue or one of the global concurrent queues, /// this method behaves like the normal `sync` method. pub fn barrier_sync(&self, work: F) -> T - where F: Send + FnOnce() -> T, T: Send { + where + F: Send + FnOnce() -> T, + T: Send, + { let mut result = None; { let result_ref = &mut result; @@ -372,7 +487,9 @@ impl Queue { /// If self is a serial queue or one of the global concurrent queues, /// this method behaves like the normal `async` method. pub fn barrier_async(&self, work: F) - where F: 'static + Send + FnOnce() { + where + F: 'static + Send + FnOnce(), + { let (context, work) = context_and_function(work); unsafe { dispatch_barrier_async_f(self.ptr, context, work); @@ -390,8 +507,8 @@ impl Queue { } } -unsafe impl Sync for Queue { } -unsafe impl Send for Queue { } +unsafe impl Sync for Queue {} +unsafe impl Send for Queue {} impl Clone for Queue { fn clone(&self) -> Self { @@ -420,11 +537,13 @@ impl SuspendGuard { unsafe { dispatch_suspend(queue.ptr); } - SuspendGuard { queue: queue.clone() } + SuspendGuard { + queue: queue.clone(), + } } /// Drops self, allowing the suspended `Queue` to resume. - pub fn resume(self) { } + pub fn resume(self) {} } impl Clone for SuspendGuard { @@ -454,7 +573,9 @@ impl Group { /// Creates a new dispatch `Group`. pub fn create() -> Group { unsafe { - Group { ptr: dispatch_group_create() } + Group { + ptr: dispatch_group_create(), + } } } @@ -468,7 +589,9 @@ impl Group { /// Submits a closure asynchronously to the given `Queue` and associates it /// with self. pub fn async(&self, queue: &Queue, work: F) - where F: 'static + Send + FnOnce() { + where + F: 'static + Send + FnOnce(), + { let (context, work) = context_and_function(work); unsafe { dispatch_group_async_f(self.ptr, queue.ptr, context, work); @@ -479,7 +602,9 @@ impl Group { /// associated with self have completed. /// If self is empty, the closure is submitted immediately. pub fn notify(&self, queue: &Queue, work: F) - where F: 'static + Send + FnOnce() { + where + F: 'static + Send + FnOnce(), + { let (context, work) = context_and_function(work); unsafe { dispatch_group_notify_f(self.ptr, queue.ptr, context, work); @@ -488,9 +613,7 @@ impl Group { /// Waits synchronously for all tasks associated with self to complete. pub fn wait(&self) { - let result = unsafe { - dispatch_group_wait(self.ptr, DISPATCH_TIME_FOREVER) - }; + let result = unsafe { dispatch_group_wait(self.ptr, DISPATCH_TIME_FOREVER) }; assert!(result == 0, "Dispatch group wait errored"); } @@ -504,25 +627,21 @@ impl Group { /// Waits for all tasks associated with self to complete within the /// specified duration. /// Returns true if the tasks completed or false if the timeout elapsed. - pub fn wait_timeout(&self, timeout: Duration) -> bool { - let when = time_after_delay(timeout); - let result = unsafe { - dispatch_group_wait(self.ptr, when) - }; + pub fn wait_timeout(&self, timeout: T) -> bool { + let when = timeout.into_raw(); + let result = unsafe { dispatch_group_wait(self.ptr, when) }; result == 0 } /// Returns whether self is currently empty. pub fn is_empty(&self) -> bool { - let result = unsafe { - dispatch_group_wait(self.ptr, DISPATCH_TIME_NOW) - }; + let result = unsafe { dispatch_group_wait(self.ptr, DISPATCH_TIME_NOW) }; result == 0 } } -unsafe impl Sync for Group { } -unsafe impl Send for Group { } +unsafe impl Sync for Group {} +unsafe impl Send for Group {} impl Clone for Group { fn clone(&self) -> Self { @@ -551,11 +670,13 @@ impl GroupGuard { unsafe { dispatch_group_enter(group.ptr); } - GroupGuard { group: group.clone() } + GroupGuard { + group: group.clone(), + } } /// Drops self, leaving the `Group`. - pub fn leave(self) { } + pub fn leave(self) {} } impl Clone for GroupGuard { @@ -581,15 +702,22 @@ struct Once { impl Once { // TODO: make this a const fn when the feature is stable pub fn new() -> Once { - Once { predicate: UnsafeCell::new(0) } + Once { + predicate: UnsafeCell::new(0), + } } #[inline(always)] - pub fn call_once(&'static self, work: F) where F: FnOnce() { + pub fn call_once(&'static self, work: F) + where + F: FnOnce(), + { #[cold] #[inline(never)] fn once(predicate: *mut dispatch_once_t, work: F) - where F: FnOnce() { + where + F: FnOnce(), + { let mut work = Some(work); let (context, work) = context_and_sync_function(&mut work); unsafe { @@ -606,13 +734,13 @@ impl Once { } } -unsafe impl Sync for Once { } +unsafe impl Sync for Once {} #[cfg(test)] mod tests { + use super::*; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; - use super::*; fn async_increment(queue: &Queue, num: &Arc>) { let num = num.clone(); @@ -633,6 +761,22 @@ mod tests { assert_eq!(q.sync(|| num), 1); } + #[cfg(any(target_os = "macos", target_os = "ios"))] + #[test] + fn test_serial_queue_with_qos_class() { + let q = Queue::create( + "", + QueueAttribute::Serial.with_qos_class(QosClass::UserInteractive, 0), + ); + let mut num = 0; + + q.sync(|| num = 1); + assert_eq!(num, 1); + + assert_eq!(q.qos_class(), (QosClass::UserInteractive, 0)); + assert_eq!(q.sync(|| num), 1); + } + #[test] fn test_sync_owned() { let q = Queue::create("", QueueAttribute::Serial); diff --git a/src/qos.rs b/src/qos.rs new file mode 100644 index 0000000..b25b065 --- /dev/null +++ b/src/qos.rs @@ -0,0 +1,48 @@ +use std::mem; + +use ffi::*; + +/// An abstract thread quality of service (QOS) classification. +#[repr(u32)] +#[derive(Clone, Copy, Debug, PartialEq, PartialOrd)] +pub enum QosClass { + /// A QOS class which indicates work performed by this thread is interactive with the user. + UserInteractive = QOS_CLASS_USER_INTERACTIVE, + /// A QOS class which indicates work performed by this thread was initiated by the user + /// and that the user is likely waiting for the results. + UserInitiated = QOS_CLASS_USER_INITIATED, + /// A default QOS class used by the system in cases where more specific QOS class information is not available. + Default = QOS_CLASS_DEFAULT, + /// A QOS class which indicates work performed by this thread may or may not be initiated by the user + /// and that the user is unlikely to be immediately waiting for the results. + Utility = QOS_CLASS_UTILITY, + /// A QOS class which indicates work performed by this thread was not initiated by the user + /// and that the user may be unaware of the results. + Background = QOS_CLASS_BACKGROUND, + /// A QOS class value which indicates the absence or removal of QOS class information. + Unspecified = QOS_CLASS_UNSPECIFIED, +} + +impl Default for QosClass { + fn default() -> Self { + QosClass::Default + } +} + +impl From for QosClass { + fn from(v: u32) -> Self { + unsafe { mem::transmute(v) } + } +} + +impl QosClass { + /// Returns the requested QOS class of the current thread. + pub fn current() -> Self { + unsafe { qos_class_self() }.into() + } + + /// Returns the initial requested QOS class of the main thread. + pub fn main() -> Self { + unsafe { qos_class_main() }.into() + } +} diff --git a/src/sem.rs b/src/sem.rs new file mode 100644 index 0000000..6f5c0f8 --- /dev/null +++ b/src/sem.rs @@ -0,0 +1,99 @@ +use std::os::raw::c_long; + +use ffi::*; +use {IntoTimeout, WaitTimeout}; + +/// A counting semaphore. +/// +/// Calls to `Semaphore::signal` must be balanced with calls to `Semaphore::wait`. +/// Attempting to dispose of a semaphore with a count lower than value causes an EXC_BAD_INSTRUCTION exception. +#[derive(Debug)] +pub struct Semaphore { + ptr: dispatch_semaphore_t, +} + +impl Semaphore { + /// Creates new counting semaphore with an initial value. + /// + /// Passing zero for the value is useful for + /// when two threads need to reconcile the completion of a particular event. + /// Passing a value greater than zero is useful for managing a finite pool of resources, + /// where the pool size is equal to the value. + pub fn new(n: u64) -> Self { + let ptr = unsafe { dispatch_semaphore_create(n as c_long) }; + + Semaphore { ptr } + } + + /// Wait (decrement) for a semaphore. + /// + /// Decrement the counting semaphore. + pub fn wait(&self) -> Result<(), WaitTimeout> { + self.wait_timeout(DISPATCH_TIME_FOREVER) + } + + /// Wait (decrement) for a semaphoreor until the specified timeout has elapsed. + /// + /// Decrement the counting semaphore. + pub fn wait_timeout(&self, timeout: T) -> Result<(), WaitTimeout> { + let when = timeout.into_raw(); + + let n = unsafe { dispatch_semaphore_wait(self.ptr, when) }; + + if n == 0 { + Ok(()) + } else { + Err(WaitTimeout) + } + } + + /// Signal (increment) a semaphore. + /// + /// Increment the counting semaphore. + /// If the previous value was less than zero, this function wakes a waiting thread before returning. + /// + /// This function returns `true` if a thread is woken. Otherwise, `false` is returned. + pub fn signal(&self) -> bool { + unsafe { dispatch_semaphore_signal(self.ptr) != 0 } + } +} + +unsafe impl Sync for Semaphore {} +unsafe impl Send for Semaphore {} + +impl Clone for Semaphore { + fn clone(&self) -> Self { + unsafe { + dispatch_retain(self.ptr); + } + Semaphore { ptr: self.ptr } + } +} + +impl Drop for Semaphore { + fn drop(&mut self) { + unsafe { + dispatch_release(self.ptr); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_semaphore() { + let sem = Semaphore::new(1); + + assert!(sem.wait().is_ok()); + assert_eq!(sem.wait_timeout(0).unwrap_err(), WaitTimeout); + + assert!(!sem.signal()); + assert!(sem.wait_timeout(DISPATCH_TIME_FOREVER).is_ok()); + + // Calls to dispatch_semaphore_signal must be balanced with calls to wait(). + // Attempting to dispose of a semaphore with a count lower than value causes an EXC_BAD_INSTRUCTION exception. + assert!(!sem.signal()); + } +} diff --git a/src/time.rs b/src/time.rs new file mode 100644 index 0000000..5938305 --- /dev/null +++ b/src/time.rs @@ -0,0 +1,109 @@ +use std::error::Error; +use std::fmt; +use std::time::{Duration, Instant, SystemTime, SystemTimeError, UNIX_EPOCH}; + +use libc::{c_long, time_t, timespec}; + +use ffi::*; + +/// A `dispatch_time_t` corresponding to the current time. +pub const NOW: dispatch_time_t = DISPATCH_TIME_NOW; +/// A `dispatch_time_t` corresponding to the maximum time. +pub const FOREVER: dispatch_time_t = DISPATCH_TIME_FOREVER; + +/// A type indicating whether a timed wait on a dispatch object returned due to a time out or not. +#[derive(Clone, Copy, Debug, PartialEq)] +pub struct WaitTimeout; + +impl fmt::Display for WaitTimeout { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "operation time out") + } +} + +impl Error for WaitTimeout { + fn description(&self) -> &str { + "operation time out" + } +} + +/// When to timeout. +pub trait IntoTimeout { + /// Consumes the `IntoTimeout`, returning the raw `dispatch_time_t`. + fn into_raw(self) -> dispatch_time_t; +} + +impl IntoTimeout for Option { + fn into_raw(self) -> dispatch_time_t { + if let Some(timeout) = self { + timeout.into_raw() + } else { + DISPATCH_TIME_NOW + } + } +} + +impl IntoTimeout for i32 { + fn into_raw(self) -> dispatch_time_t { + Duration::from_millis(self as u64).into_raw() + } +} + +impl IntoTimeout for u32 { + fn into_raw(self) -> dispatch_time_t { + Duration::from_millis(self as u64).into_raw() + } +} + +impl IntoTimeout for Duration { + fn into_raw(self) -> dispatch_time_t { + after(self) + } +} + +impl IntoTimeout for dispatch_time_t { + fn into_raw(self) -> dispatch_time_t { + self + } +} + +impl IntoTimeout for Instant { + fn into_raw(self) -> dispatch_time_t { + self.duration_since(Instant::now()).into_raw() + } +} + +impl IntoTimeout for SystemTime { + fn into_raw(self) -> dispatch_time_t { + self.duration_since(SystemTime::now()).unwrap().into_raw() + } +} + +/// Returns a `dispatch_time_t` corresponding to the given duration. +pub fn after(delay: Duration) -> dispatch_time_t { + delay + .as_secs() + .checked_mul(1_000_000_000) + .and_then(|i| i.checked_add(delay.subsec_nanos() as u64)) + .and_then(|i| { + if i < (i64::max_value() as u64) { + Some(i as i64) + } else { + None + } + }) + .map_or(DISPATCH_TIME_FOREVER, |i| unsafe { + dispatch_time(DISPATCH_TIME_NOW, i) + }) +} + +/// Returns a `dispatch_time_t` corresponding to the given time. +pub fn at(tm: SystemTime) -> Result { + let dur = tm.duration_since(UNIX_EPOCH)?; + let ts = timespec { + tv_sec: dur.as_secs() as time_t, + tv_nsec: dur.subsec_nanos() as c_long, + }; + + Ok(unsafe { dispatch_walltime(&ts, 0) }) +} diff --git a/tests-ios/prelude.rs b/tests-ios/prelude.rs index 0b2f738..825683d 100644 --- a/tests-ios/prelude.rs +++ b/tests-ios/prelude.rs @@ -1,10 +1,20 @@ +#[macro_use] +extern crate log; +extern crate pretty_env_logger; +extern crate tempfile; + extern crate dispatch; -use std::sync::{Arc, Mutex}; +use std::cell::RefCell; +use std::mem; +use std::os::raw::{c_long, c_void}; +use std::sync::{Arc, Barrier, Mutex}; use std::time::{Duration, Instant}; -use dispatch::*; +use tempfile::tempfile; + use dispatch::ffi::*; +use dispatch::*; fn async_increment(queue: &Queue, num: &Arc>) { let num = num.clone(); @@ -13,3 +23,14 @@ fn async_increment(queue: &Queue, num: &Arc>) { *num += 1; }); } + +fn async_increment_block(queue: &Queue, num: &Arc>) { + let num = num.clone(); + queue.async_block(DispatchBlock::create( + BlockFlags::ASSIGN_CURRENT, + move || { + let mut num = num.lock().unwrap(); + *num += 1; + }, + )); +}