Introduction
more-changetoken
is a crate for creating change tokens in Rust. Change tokens are used to signal changes to consumers via a registered callback or polling.
Crate Features
This crate provides the following features:
- default - Abstractions and default implementations for change tokens
- fs - File system change tokens
Contributing
more-changetoken
is free and open source. You can find the source code on GitHub
and issues and feature requests can be posted on the GitHub issue tracker.
more-changetoken
relies on the community to fix bugs and add features: if you'd like to contribute, please read the
CONTRIBUTING guide and consider opening
a pull request.
License
This project is licensed under the MIT license.
Getting Started
The simplest way to get started is to install the crate using the default features.
cargo add more-changetoken
Example
A change token provides a way to signal a consumer that a change has occurred. This can commonly occur when using Interior Mutability or when changes happen asynchronously. The most common usage scenario is sharing a change token between a producer and one or more consumers.
use tokens::*;
use std::sync::{Arc, RwLock};
#[derive(Default)]
pub struct Counter {
token: SharedChangeToken,
value: RwLock<usize>,
}
impl Counter {
pub fn increment(&self) {
*self.value.write().unwrap() += 1;
self.token.notify();
}
pub fn watch(&self) -> impl ChangeToken {
self.token.clone()
}
}
impl ToString for Counter {
fn to_string(&self) -> String {
format!("Value: {}", *self.value.read().unwrap())
}
}
fn main() {
let counter = Arc::new(Counter::default());
let registration = counter.watch().register(
Box::new(|state| {
let printable = state.unwrap().downcast_ref::<Counter>().unwrap();
println!("{}", printable.to_string());
}),
Some(counter.clone()));
counter.increment(); // prints 'Value 1'
counter.increment(); // prints 'Value 2'
counter.increment(); // prints 'Value 3'
}
Since the registered callback might occur on another thread, the specified function must implement Send
and Sync
to ensure it is safe to invoke. Although the backing implementation - SharedChangeToken
- can be shared, the caller is intentionally unaware of that capability because only an implementation of ChangeToken
is returned, which does not implement Clone
. This behavior ensures that a consumer cannot unintentionally propagate copies of the provided to change token to others.
register
returns a Registration
that represents the registration of a callback. When the Registration
is dropped, the callback is also dropped. A change token will not leak callbacks over time, which means it is important to hold onto the registration for as long as it is needed. Using a discard (e.g. _
) will immediately drop the registration and callback reference.
Default Change Token
A ChangeToken
has the following capabilities.
pub type Callback = Box<dyn Fn(Option<Arc<dyn Any>>) + Send + Sync>;
pub trait ChangeToken: Send + Sync {
fn changed(&self) -> bool;
fn must_poll(&self) -> bool;
fn register(
&self,
callback: Callback,
state: Option<Arc<dyn Any>>) -> Registration;
}
All of the out-of-the-box change tokens use callbacks to signal a change, but must_poll
can return true
to indicate that a consumer should poll changed
. changed
is expected to return true
when a change has been observed. The result may vary between invocations depending on the implementations.
When register
is called, a Registration
is returned. A Registration
is an opaque struct that is used to terminate the registration. When the Registration
struct is dropped, the callback will be removed from the change token's callback list. The caller owns the Registration
, which ensures that a memory leak never occurs from the ChangeToken
holding onto a callback longer than it should.
The DefaultChangeToken
is the default implementation from which all other ChangeToken
implementations are based on. This simple ChangeToken
manages a list of callbacks and invokes them whenever DefaultChangeToken::notify
is called by the producer. This ChangeToken
supports triggering callbacks multiple times.
Since the token may be signaled multiple times, changed
only reports true
while it is actively invoking callbacks. When used in a synchronous context, this means the return value will always be false
. When used in an asynchronous context, the return value may be true
and potentially useful to a caller. For most usage scenarios, the act of invoking a callback signals a change and the value of changed
is uninteresting.
Single Change Token
The SingleChangeToken
behaves the same as the DefaultChangeToken
with a single exception. It changes exactly once. Once SingleChangeToken::notify
has been invoked, changed
will always return true
. Any registered callbacks will be invoked, at most, once. If a callback is registered after SingleChangeToken::notify
has been called, it will never be invoked.
The design of a ChangeToken
does not indicate whether it supports multiple notifications. As a result, consumers are likely to create new change tokens from producers often. SingleChangeToken
tends to be the most commonly used change token. It guarantees at-most once execution and prevents change tokens from living longer than they need to.
use tokens::*;
use std::sync::{Arc, RwLock};
#[derive(Default)]
pub struct Counter {
token: RwLock<SharedChangeToken<SingleChangeToken>>,
value: RwLock<usize>,
}
impl Counter {
pub fn increment(&self) {
*self.value.write().unwrap() += 1;
let token = std::mem.replace(
&mut *self.token.write().unwrap(),
Default::default());
token.notify();
}
pub fn watch(&self) -> impl ChangeToken {
self.token.clone()
}
}
impl ToString for Counter {
fn to_string(&self) -> String {
format!("Value: {}", *self.value.read().unwrap())
}
}
fn main() {
let counter = Arc::new(Counter::default());
let mut registration = counter.watch().register(
Box::new(|state| {
let printable = state.unwrap().downcast_ref::<Counter>().unwrap();
println!("{}", printable.to_string());
}),
Some(counter.clone()));
counter.increment(); // prints 'Value 1'
counter.increment(); // doesn't print because token already fired
registration = counter.watch().register(
Box::new(|state| {
let printable = state.unwrap().downcast_ref::<Counter>().unwrap();
println!("{}", printable.to_string());
}),
Some(counter.clone()));
counter.increment(); // prints 'Value 3'
}
Never Change Token
There may be edge cases where a ChangeToken
is required, but there will never been any changes. This is the usage scenario for NeverChangeToken
. This ChangeToken
will never register any callbacks, changed
will always return false
, and must_poll
will always return true
. NeverChangeToken
effectively implements the Null Object pattern for the ChangeToken
trait.
use tokens::*;
use std::sync::{Arc, RwLock};
#[derive(Default)]
pub struct Counter {
value: RwLock<usize>,
}
impl Counter {
pub fn increment(&self) {
*self.value.write().unwrap() += 1;
}
pub fn watch(&self) -> impl ChangeToken {
// TODO: placeholder until change support is implemented
NeverChangeToken::new()
}
}
impl ToString for Counter {
fn to_string(&self) -> String {
format!("Value: {}", *self.value.read().unwrap())
}
}
fn main() {
let counter = Arc::new(Counter::default());
let registration = counter.watch().register(
Box::new(|state| {
let printable = state.unwrap().downcast_ref::<Counter>().unwrap();
println!("{}", printable.to_string());
}),
Some(counter.clone()));
counter.increment(); // prints nothing; callback not invoked
}
Shared Change Token
SharedChangeToken
is one of, if not the most, commonly used change token. For all intents and purposes, SharedChangeToken
functions the same as Rc
or Arc
without divulging that information. It also directly implements the ChangeToken
trait, which means it can be used anywhere a ChangeToken
is returned or accepted.
SharedChangeToken
also defines T: DefaultChangeToken
. This means SharedChangeToken
is equivalent to SharedChangeToken<DefaultChangeToken>
unless there is a more specific type on the left-hand side of an assignment. Although SingleChangeToken
is more common in typical usage, if the default type T
wasn't DefaultChangeToken
, it would be counter intuitive. SharedChangeToken
can adapt over any other ChangeToken
implementation.
Composite Change Token
Some applications may need to compose or aggregate multiple change tokens. This is the use case for a CompositeChangeToken
. The CompositeChangeToken
accepts a sequence of other ChangeToken
instances and mediates their change notifications. A consumer of a CompositeChangeToken
will be called back via CompositeChangeToken::notify
whenever the owner explicitly signals a change or when one of the mediated children signals a change.
use tokens::*;
use std::sync::{Arc, RwLock};
pub struct Counter {
id: usize,
token: RwLock<SharedChangeToken<SingleChangeToken>>,
value: RwLock<usize>,
}
impl Counter {
pub fn new(id: usize) -> Self {
Self {
id,
..Default::default()
}
}
pub fn increment(&self) {
*self.value.write().unwrap() += 1;
let token = std::mem.replace(
&mut *self.token.write().unwrap(),
Default::default());
token.notify();
}
pub fn watch(&self) -> impl ChangeToken {
self.token.clone()
}
}
impl ToString for Counter {
fn to_string(&self) -> String {
format!("[{}] Value: {}", self.id, *self.value.read().unwrap())
}
}
fn main() {
let counters = Arc::new(vec![Counter::new(1), Counter::new(2), Counter::new(3)]);
let token = CompositeChangeToken::new(counters.iter().map(|c| Box::new(c.watch())));
let mut registration = token.register(
Box::new(|state| {
let printables = state.unwrap().downcast_ref::<Vec<Counter>>().unwrap();
for printable in printables {
println!("{}", printable.to_string());
}
}),
Some(counters.clone()));
// prints '[1] Value 0'
// prints '[2] Value 0'
// prints '[3] Value 1'
counters[2].increment();
}
File Change Token
This type is only available if the fs feature is activated
The FileChangeToken
is a special type of ChangeToken
, which watches for changes to a file and notifies its consumers when a change is observed. The FileChangeToken
only considers a single change. Once a change has been observed, it will not monitor further changes. The implementation is functionally equivalent to SingleChangeToken
, but for a file change.
Important:
FileChangeToken
callbacks are always invoked on another thread; otherwise, the caller would be blocked waiting for a change.
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use tokens::FileChangeToken;
fn main() {
let path = PathBuf::from("./my-app/files/some.txt");
let state = Arc::new((Mutex::new(false), Condvar::new()));
let token = FileChangeToken::new(&path);
let registration = token.register(
Box::new(|state| {
let data = state.unwrap();
let (fired, event) = &*data.downcast_ref::<(Mutex<bool>, Condvar)>().unwrap();
*fired.lock().unwrap() = true;
event.notify_one();
}),
Some(state.clone()));
let mut file = File::create(&path).unwrap();
// make a change to the file
file.write_all("updated".as_bytes()).unwrap();
let (mutex, event) = &*state;
let mut fired = mutex.lock().unwrap();
// the callback happens on another thread so wait
// here until the callback notifies us
while !*fired {
fired = event.wait(fired).unwrap();
}
println!("'{}' changed.", path.display());
}
Functions
The following are utility functions that are useful when working with change tokens.
On Change
The tokens::on_change
function mediates a producer method that returns a ChangeToken
and a consumer function that responds to a change. Unlike the CompositeChangeToken
, the tokens::on_change
function will facilitate calling back to the specified consumer, immediately drop the last ChangeToken
, and request a new ChangeToken
from the producer.
The return value of the function is an opaque struct that implements the Subscription
trait representing the perpetual subscription. tokens::on_change
will continue to signal the consumer with changes and refresh the producer ChangeToken
until the opaque subscription object has been dropped.
use std::path::PathBuf;
use tokens::FileChangeToken;
let path = PathBuf::from("./my-app/some.txt");
let subscription = tokens::on_change(
move || FileChangeToken::new(path.clone()),
|state| {
println!("{}", state.unwrap().display());
},
Some(path.clone()));