Introduction

more-changetoken is a crate for creating change tokens in Rust. Change tokens are used to signal changes to consumers via a registered callback or polling.

Crate Features

This crate provides the following features:

  • default - Abstractions and default implementations for change tokens
  • fs - File system change tokens

Contributing

more-changetoken is free and open source. You can find the source code on GitHub and issues and feature requests can be posted on the GitHub issue tracker. more-changetoken relies on the community to fix bugs and add features: if you'd like to contribute, please read the CONTRIBUTING guide and consider opening a pull request.

License

This project is licensed under the MIT license.

Getting Started

The simplest way to get started is to install the crate using the default features.

cargo add more-changetoken

Example

A change token provides a way to signal a consumer that a change has occurred. This can commonly occur when using Interior Mutability or when changes happen asynchronously. The most common usage scenario is sharing a change token between a producer and one or more consumers.

use tokens::*;
use std::sync::{Arc, RwLock};

#[derive(Default)]
pub struct Counter {
    token: SharedChangeToken,
    value: RwLock<usize>,
}

impl Counter {
    pub fn increment(&self) {
        *self.value.write().unwrap() += 1;
        self.token.notify();
    }

    pub fn watch(&self) -> impl ChangeToken {
        self.token.clone()
    }
}

impl ToString for Counter {
    fn to_string(&self) -> String {
        format!("Value: {}", *self.value.read().unwrap())
    }
}

fn main() {
    let counter = Arc::new(Counter::default());
    let registration = counter.watch().register(
        Box::new(|state| {
            let printable = state.unwrap().downcast_ref::<Counter>().unwrap();
            println!("{}", printable.to_string());
        }),
        Some(counter.clone()));

    counter.increment(); // prints 'Value 1'
    counter.increment(); // prints 'Value 2'
    counter.increment(); // prints 'Value 3'
}

Since the registered callback might occur on another thread, the specified function must implement Send and Sync to ensure it is safe to invoke. Although the backing implementation - SharedChangeToken - can be shared, the caller is intentionally unaware of that capability because only an implementation of ChangeToken is returned, which does not implement Clone. This behavior ensures that a consumer cannot unintentionally propagate copies of the provided to change token to others.

register returns a Registration that represents the registration of a callback. When the Registration is dropped, the callback is also dropped. A change token will not leak callbacks over time, which means it is important to hold onto the registration for as long as it is needed. Using a discard (e.g. _) will immediately drop the registration and callback reference.

Default Change Token

A ChangeToken has the following capabilities.

pub type Callback = Box<dyn Fn(Option<Arc<dyn Any>>) + Send + Sync>;

pub trait ChangeToken: Send + Sync {
    fn changed(&self) -> bool;
    fn must_poll(&self) -> bool;
    fn register(
        &self,
        callback: Callback,
        state: Option<Arc<dyn Any>>) -> Registration;
}

All of the out-of-the-box change tokens use callbacks to signal a change, but must_poll can return true to indicate that a consumer should poll changed. changed is expected to return true when a change has been observed. The result may vary between invocations depending on the implementations.

When register is called, a Registration is returned. A Registration is an opaque struct that is used to terminate the registration. When the Registration struct is dropped, the callback will be removed from the change token's callback list. The caller owns the Registration, which ensures that a memory leak never occurs from the ChangeToken holding onto a callback longer than it should.

The DefaultChangeToken is the default implementation from which all other ChangeToken implementations are based on. This simple ChangeToken manages a list of callbacks and invokes them whenever DefaultChangeToken::notify is called by the producer. This ChangeToken supports triggering callbacks multiple times.

Since the token may be signaled multiple times, changed only reports true while it is actively invoking callbacks. When used in a synchronous context, this means the return value will always be false. When used in an asynchronous context, the return value may be true and potentially useful to a caller. For most usage scenarios, the act of invoking a callback signals a change and the value of changed is uninteresting.

Single Change Token

The SingleChangeToken behaves the same as the DefaultChangeToken with a single exception. It changes exactly once. Once SingleChangeToken::notify has been invoked, changed will always return true. Any registered callbacks will be invoked, at most, once. If a callback is registered after SingleChangeToken::notify has been called, it will never be invoked.

The design of a ChangeToken does not indicate whether it supports multiple notifications. As a result, consumers are likely to create new change tokens from producers often. SingleChangeToken tends to be the most commonly used change token. It guarantees at-most once execution and prevents change tokens from living longer than they need to.

use tokens::*;
use std::sync::{Arc, RwLock};

#[derive(Default)]
pub struct Counter {
    token: RwLock<SharedChangeToken<SingleChangeToken>>,
    value: RwLock<usize>,
}

impl Counter {
    pub fn increment(&self) {
        *self.value.write().unwrap() += 1;
        let token = std::mem.replace(
            &mut *self.token.write().unwrap(),
            Default::default());
        token.notify();
    }

    pub fn watch(&self) -> impl ChangeToken {
        self.token.clone()
    }
}

impl ToString for Counter {
    fn to_string(&self) -> String {
        format!("Value: {}", *self.value.read().unwrap())
    }
}

fn main() {
    let counter = Arc::new(Counter::default());
    let mut registration = counter.watch().register(
        Box::new(|state| {
            let printable = state.unwrap().downcast_ref::<Counter>().unwrap();
            println!("{}", printable.to_string());
        }),
        Some(counter.clone()));

    counter.increment(); // prints 'Value 1'
    counter.increment(); // doesn't print because token already fired

    registration = counter.watch().register(
        Box::new(|state| {
            let printable = state.unwrap().downcast_ref::<Counter>().unwrap();
            println!("{}", printable.to_string());
        }),
        Some(counter.clone()));

    counter.increment(); // prints 'Value 3'
}

Never Change Token

There may be edge cases where a ChangeToken is required, but there will never been any changes. This is the usage scenario for NeverChangeToken. This ChangeToken will never register any callbacks, changed will always return false, and must_poll will always return true. NeverChangeToken effectively implements the Null Object pattern for the ChangeToken trait.

use tokens::*;
use std::sync::{Arc, RwLock};

#[derive(Default)]
pub struct Counter {
    value: RwLock<usize>,
}

impl Counter {
    pub fn increment(&self) {
        *self.value.write().unwrap() += 1;
    }

    pub fn watch(&self) -> impl ChangeToken {
        // TODO: placeholder until change support is implemented
        NeverChangeToken::new()
    }
}

impl ToString for Counter {
    fn to_string(&self) -> String {
        format!("Value: {}", *self.value.read().unwrap())
    }
}

fn main() {
    let counter = Arc::new(Counter::default());
    let registration = counter.watch().register(
        Box::new(|state| {
            let printable = state.unwrap().downcast_ref::<Counter>().unwrap();
            println!("{}", printable.to_string());
        }),
        Some(counter.clone()));

    counter.increment(); // prints nothing; callback not invoked
}

Shared Change Token

SharedChangeToken is one of, if not the most, commonly used change token. For all intents and purposes, SharedChangeToken functions the same as Rc or Arc without divulging that information. It also directly implements the ChangeToken trait, which means it can be used anywhere a ChangeToken is returned or accepted.

SharedChangeToken also defines T: DefaultChangeToken. This means SharedChangeToken is equivalent to SharedChangeToken<DefaultChangeToken> unless there is a more specific type on the left-hand side of an assignment. Although SingleChangeToken is more common in typical usage, if the default type T wasn't DefaultChangeToken, it would be counter intuitive. SharedChangeToken can adapt over any other ChangeToken implementation.

Composite Change Token

Some applications may need to compose or aggregate multiple change tokens. This is the use case for a CompositeChangeToken. The CompositeChangeToken accepts a sequence of other ChangeToken instances and mediates their change notifications. A consumer of a CompositeChangeToken will be called back via CompositeChangeToken::notify whenever the owner explicitly signals a change or when one of the mediated children signals a change.

use tokens::*;
use std::sync::{Arc, RwLock};

pub struct Counter {
    id: usize,
    token: RwLock<SharedChangeToken<SingleChangeToken>>,
    value: RwLock<usize>,
}

impl Counter {
    pub fn new(id: usize) -> Self {
        Self {
            id,
            ..Default::default()
        }
    }

    pub fn increment(&self) {
        *self.value.write().unwrap() += 1;
        let token = std::mem.replace(
            &mut *self.token.write().unwrap(),
            Default::default());
        token.notify();
    }

    pub fn watch(&self) -> impl ChangeToken {
        self.token.clone()
    }
}

impl ToString for Counter {
    fn to_string(&self) -> String {
        format!("[{}] Value: {}", self.id, *self.value.read().unwrap())
    }
}

fn main() {
  let counters = Arc::new(vec![Counter::new(1), Counter::new(2), Counter::new(3)]);
  let token = CompositeChangeToken::new(counters.iter().map(|c| Box::new(c.watch())));
  let mut registration = token.register(
    Box::new(|state| {
        let printables = state.unwrap().downcast_ref::<Vec<Counter>>().unwrap();
        for printable in printables {
            println!("{}", printable.to_string());
        }
    }),
    Some(counters.clone()));

  // prints '[1] Value 0'
  // prints '[2] Value 0'
  // prints '[3] Value 1'
  counters[2].increment();
}

File Change Token

This type is only available if the fs feature is activated

The FileChangeToken is a special type of ChangeToken, which watches for changes to a file and notifies its consumers when a change is observed. The FileChangeToken only considers a single change. Once a change has been observed, it will not monitor further changes. The implementation is functionally equivalent to SingleChangeToken, but for a file change.

Important: FileChangeToken callbacks are always invoked on another thread; otherwise, the caller would be blocked waiting for a change.

use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use tokens::FileChangeToken;

fn main() {
    let path = PathBuf::from("./my-app/files/some.txt");
    let state = Arc::new((Mutex::new(false), Condvar::new()));
    let token = FileChangeToken::new(&path);
    let registration = token.register(
      Box::new(|state| {
        let data = state.unwrap();
        let (fired, event) = &*data.downcast_ref::<(Mutex<bool>, Condvar)>().unwrap();
        *fired.lock().unwrap() = true;
        event.notify_one();
      }),
      Some(state.clone()));
    let mut file = File::create(&path).unwrap();

    // make a change to the file
    file.write_all("updated".as_bytes()).unwrap();

    let (mutex, event) = &*state;
    let mut fired = mutex.lock().unwrap();

    // the callback happens on another thread so wait
    // here until the callback notifies us
    while !*fired {
        fired = event.wait(fired).unwrap();
    }

    println!("'{}' changed.", path.display());
}

Functions

The following are utility functions that are useful when working with change tokens.

On Change

The tokens::on_change function mediates a producer method that returns a ChangeToken and a consumer function that responds to a change. Unlike the CompositeChangeToken, the tokens::on_change function will facilitate calling back to the specified consumer, immediately drop the last ChangeToken, and request a new ChangeToken from the producer.

The return value of the function is an opaque struct that implements the Subscription trait representing the perpetual subscription. tokens::on_change will continue to signal the consumer with changes and refresh the producer ChangeToken until the opaque subscription object has been dropped.

use std::path::PathBuf;
use tokens::FileChangeToken;

let path = PathBuf::from("./my-app/some.txt");
let subscription = tokens::on_change(
    move || FileChangeToken::new(path.clone()),
    |state| {
        println!("{}", state.unwrap().display());
    },
    Some(path.clone()));