cached/stores/
expiring_sized.rs

1use std::borrow::Borrow;
2use std::cmp::Ordering;
3use std::collections::{BTreeSet, HashMap};
4use std::hash::{Hash, Hasher};
5use std::ops::Bound::{Excluded, Included};
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9/// Wrap keys so they don't need to implement Clone
10#[derive(Eq)]
11// todo: can we switch to an Rc?
12struct CacheArc<T>(Arc<T>);
13
14impl<T> CacheArc<T> {
15    fn new(key: T) -> Self {
16        CacheArc(Arc::new(key))
17    }
18}
19
20impl<T> Clone for CacheArc<T> {
21    fn clone(&self) -> Self {
22        CacheArc(self.0.clone())
23    }
24}
25
26impl<T: PartialEq> PartialEq for CacheArc<T> {
27    fn eq(&self, other: &Self) -> bool {
28        self.0.eq(&other.0)
29    }
30}
31
32impl<T: PartialOrd> PartialOrd for CacheArc<T> {
33    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
34        self.0.partial_cmp(&other.0)
35    }
36}
37impl<T: Ord> Ord for CacheArc<T> {
38    fn cmp(&self, other: &Self) -> Ordering {
39        self.0.cmp(&other.0)
40    }
41}
42
43impl<T: Hash> Hash for CacheArc<T> {
44    fn hash<H: Hasher>(&self, state: &mut H) {
45        self.0.hash(state);
46    }
47}
48
49impl<T> Borrow<T> for CacheArc<T> {
50    fn borrow(&self) -> &T {
51        &self.0
52    }
53}
54
55impl Borrow<str> for CacheArc<String> {
56    fn borrow(&self) -> &str {
57        self.0.as_str()
58    }
59}
60
61impl<T> Borrow<[T]> for CacheArc<Vec<T>> {
62    fn borrow(&self) -> &[T] {
63        self.0.as_slice()
64    }
65}
66
67#[derive(Debug)]
68pub enum Error {
69    /// Calculating expiration `Instant`s resulted in a
70    /// value outside of `Instant`s internal bounds
71    TimeBounds,
72}
73
74/// A timestamped key to allow identifying key ranges
75#[derive(Hash, Eq, PartialEq, Ord, PartialOrd)]
76struct Stamped<K> {
77    // note: the field order matters here since the derived ord traits
78    //       generate lexicographic ordering based on the top-to-bottom
79    //       declaration order
80    expiry: Instant,
81
82    // wrapped in an option so it's easy to generate
83    // a range bound containing None
84    key: Option<CacheArc<K>>,
85}
86
87impl<K> Clone for Stamped<K> {
88    fn clone(&self) -> Self {
89        Self {
90            expiry: self.expiry,
91            key: self.key.clone(),
92        }
93    }
94}
95
96impl<K> Stamped<K> {
97    fn bound(expiry: Instant) -> Stamped<K> {
98        Stamped { expiry, key: None }
99    }
100}
101
102/// A timestamped value to allow re-building a timestamped key
103struct Entry<K, V> {
104    expiry: Instant,
105    key: CacheArc<K>,
106    value: V,
107}
108
109impl<K, V> Entry<K, V> {
110    fn as_stamped(&self) -> Stamped<K> {
111        Stamped {
112            expiry: self.expiry,
113            key: Some(self.key.clone()),
114        }
115    }
116
117    fn is_expired(&self) -> bool {
118        self.expiry < Instant::now()
119    }
120}
121
122macro_rules! impl_get {
123    ($_self:expr, $key:expr) => {{
124        $_self.map.get($key).and_then(|entry| {
125            if entry.is_expired() {
126                None
127            } else {
128                Some(&entry.value)
129            }
130        })
131    }};
132}
133
134/// A cache enforcing time expiration and an optional maximum size.
135/// When a maximum size is specified, the values are dropped in the
136/// order of expiration date, e.g. the next value to expire is dropped.
137/// This cache is intended for high read scenarios to allow for concurrent
138/// reads while still enforcing expiration and an optional maximum cache size.
139///
140/// To accomplish this, there are a few trade-offs:
141///  - Maximum cache size logic cannot support "LRU", instead dropping the next value to expire
142///  - Cache keys must implement `Ord`
143///  - The cache's size, reported by `.len` is only guaranteed to be accurate immediately
144///    after a call to either `.evict` or `.retain_latest`
145///  - Eviction must be explicitly requested, either on its own or while inserting
146pub struct ExpiringSizedCache<K, V> {
147    // a minimum instant to compare ranges against since
148    // all keys must logically expire after the creation
149    // of the cache
150    min_instant: Instant,
151
152    // k/v where entry contains corresponds to an ordered value in `keys`
153    map: HashMap<CacheArc<K>, Entry<K, V>>,
154
155    // ordered in ascending expiration `Instant`s
156    // to support retaining/evicting without full traversal
157    keys: BTreeSet<Stamped<K>>,
158
159    pub ttl_millis: u64,
160    pub size_limit: Option<usize>,
161}
162impl<K: Hash + Eq + Ord, V> ExpiringSizedCache<K, V> {
163    pub fn new(ttl_millis: u64) -> Self {
164        Self {
165            min_instant: Instant::now(),
166            map: HashMap::new(),
167            keys: BTreeSet::new(),
168            ttl_millis,
169            size_limit: None,
170        }
171    }
172
173    pub fn with_capacity(ttl_millis: u64, size: usize) -> Self {
174        let mut new = Self::new(ttl_millis);
175        new.map.reserve(size);
176        new
177    }
178
179    /// Set a size limit. When reached, the next entries to expire are evicted.
180    /// Returns the previous value if one was set.
181    pub fn size_limit(&mut self, size: usize) -> Option<usize> {
182        let prev = self.size_limit;
183        self.size_limit = Some(size);
184        prev
185    }
186
187    /// Increase backing stores with enough capacity to store `more`
188    pub fn reserve(&mut self, more: usize) {
189        self.map.reserve(more);
190    }
191
192    /// Set ttl millis, return previous value
193    pub fn ttl_millis(&mut self, ttl_millis: u64) -> u64 {
194        let prev = self.ttl_millis;
195        self.ttl_millis = ttl_millis;
196        prev
197    }
198
199    /// Evict values that have expired.
200    /// Returns number of dropped items.
201    pub fn evict(&mut self) -> usize {
202        let cutoff = Instant::now();
203        let min = Stamped::bound(self.min_instant);
204        let max = Stamped::bound(cutoff);
205        let min = Included(&min);
206        let max = Excluded(&max);
207
208        let remove = self.keys.range((min, max)).count();
209        let mut count = 0;
210        while count < remove {
211            match self.keys.pop_first() {
212                None => break,
213                Some(stamped) => {
214                    self.map.remove(
215                        &stamped
216                            .key
217                            .expect("evicting: only artificial bounds are none"),
218                    );
219                    count += 1;
220                }
221            }
222        }
223        count
224    }
225
226    /// Retain only the latest `count` values, dropping the next values to expire.
227    /// If `evict`, then also evict values that have expired.
228    /// Returns number of dropped items.
229    pub fn retain_latest(&mut self, count: usize, evict: bool) -> usize {
230        let retain_drop_count = self.len().saturating_sub(count);
231
232        let remove = if evict {
233            let cutoff = Instant::now();
234            let min = Stamped::bound(self.min_instant);
235            let max = Stamped::bound(cutoff);
236            let min = Included(&min);
237            let max = Excluded(&max);
238            let to_evict_count = self.keys.range((min, max)).count();
239            retain_drop_count.max(to_evict_count)
240        } else {
241            retain_drop_count
242        };
243
244        let mut count = 0;
245        while count < remove {
246            match self.keys.pop_first() {
247                None => break,
248                Some(stamped) => {
249                    self.map.remove(
250                        &stamped
251                            .key
252                            .expect("retaining: only artificial bounds are none"),
253                    );
254                    count += 1;
255                }
256            }
257        }
258        count
259    }
260
261    /// Remove an entry, returning an unexpired value if it was present.
262    pub fn remove(&mut self, key: &K) -> Option<V> {
263        match self.map.remove(key) {
264            None => None,
265            Some(removed) => {
266                self.keys.remove(&removed.as_stamped());
267                if removed.is_expired() {
268                    None
269                } else {
270                    Some(removed.value)
271                }
272            }
273        }
274    }
275
276    /// Insert k/v pair without running eviction logic. See `.insert_ttl_evict`
277    pub fn insert(&mut self, key: K, value: V) -> Result<Option<V>, Error> {
278        self.insert_ttl_evict(key, value, None, false)
279    }
280
281    /// Insert k/v pair with explicit ttl. See `.insert_ttl_evict`
282    pub fn insert_ttl(&mut self, key: K, value: V, ttl_millis: u64) -> Result<Option<V>, Error> {
283        self.insert_ttl_evict(key, value, Some(ttl_millis), false)
284    }
285
286    /// Insert k/v pair and run eviction logic. See `.insert_ttl_evict`
287    pub fn insert_evict(&mut self, key: K, value: V, evict: bool) -> Result<Option<V>, Error> {
288        self.insert_ttl_evict(key, value, None, evict)
289    }
290
291    /// Optionally run eviction logic before inserting a k/v pair with an optional explicit TTL.
292    /// If a `size_limit` was specified, the next entry to expire will be evicted to make space.
293    /// Returns any existing unexpired value.
294    pub fn insert_ttl_evict(
295        &mut self,
296        key: K,
297        value: V,
298        ttl_millis: Option<u64>,
299        evict: bool,
300    ) -> Result<Option<V>, Error> {
301        // optionally evict and retain to size
302        if let Some(size_limit) = self.size_limit {
303            if self.len() > size_limit - 1 {
304                self.retain_latest(size_limit - 1, evict);
305            }
306        } else if evict {
307            self.evict();
308        }
309
310        let key = CacheArc::new(key);
311        let expiry = Instant::now()
312            .checked_add(Duration::from_millis(ttl_millis.unwrap_or(self.ttl_millis)))
313            .ok_or(Error::TimeBounds)?;
314
315        let new_stamped = Stamped {
316            expiry,
317            key: Some(key.clone()),
318        };
319        self.keys.insert(new_stamped.clone());
320        let old = self.map.insert(key.clone(), Entry { expiry, key, value });
321        if let Some(old) = &old {
322            let old_stamped = old.as_stamped();
323            // new-stamped didn't already replace an existing entry, delete it now
324            if old_stamped != new_stamped {
325                self.keys.remove(&old_stamped);
326            }
327        }
328        Ok(old.and_then(|entry| {
329            if entry.is_expired() {
330                None
331            } else {
332                Some(entry.value)
333            }
334        }))
335    }
336
337    /// Clear all cache entries. Does not release underlying containers
338    pub fn clear(&mut self) {
339        self.map.clear();
340        self.keys.clear();
341    }
342
343    /// Return cache size. Note, this does not evict so may return
344    /// a size that includes expired entries. Run `evict` or `retain_latest`
345    /// first to ensure an accurate length.
346    pub fn len(&self) -> usize {
347        self.map.len()
348    }
349
350    pub fn is_empty(&self) -> bool {
351        self.len() == 0
352    }
353
354    /// Retrieve unexpired entry
355    pub fn get(&self, key: &K) -> Option<&V> {
356        // todo: generically support keys being borrowed types like the underlying map
357        impl_get!(self, key)
358    }
359}
360
361impl<V> ExpiringSizedCache<String, V> {
362    /// Retrieve unexpired entry, accepting `&str` to check against `String` keys
363    /// ```rust
364    /// # use cached::stores::ExpiringSizedCache;
365    /// let mut cache = ExpiringSizedCache::<String, &str>::new(2_000);
366    /// cache.insert(String::from("a"), "a").unwrap();
367    /// assert_eq!(cache.get_borrowed("a").unwrap(), &"a");
368    /// ```
369    pub fn get_borrowed(&self, key: &str) -> Option<&V> {
370        impl_get!(self, key)
371    }
372}
373
374impl<T: Hash + Eq + PartialEq, V> ExpiringSizedCache<Vec<T>, V> {
375    /// Retrieve unexpired entry, accepting `&[T]` to check against `Vec<T>` keys
376    /// ```rust
377    /// # use cached::stores::ExpiringSizedCache;
378    /// let mut cache = ExpiringSizedCache::<Vec<usize>, &str>::new(2_000);
379    /// cache.insert(vec![0], "a").unwrap();
380    /// assert_eq!(cache.get_borrowed(&[0]).unwrap(), &"a");
381    /// ```
382    pub fn get_borrowed(&self, key: &[T]) -> Option<&V> {
383        impl_get!(self, key)
384    }
385}
386
387#[cfg(test)]
388mod test {
389    use crate::stores::ExpiringSizedCache;
390    use std::time::Duration;
391
392    #[test]
393    fn borrow_keys() {
394        let mut cache = ExpiringSizedCache::with_capacity(100, 100);
395        cache.insert(String::from("a"), "a").unwrap();
396        assert_eq!(cache.get_borrowed("a").unwrap(), &"a");
397
398        let mut cache = ExpiringSizedCache::with_capacity(100, 100);
399        cache.insert(vec![0], "a").unwrap();
400        assert_eq!(cache.get_borrowed(&[0]).unwrap(), &"a");
401    }
402
403    #[test]
404    fn kitchen_sink() {
405        let mut cache = ExpiringSizedCache::with_capacity(100, 100);
406        assert_eq!(0, cache.evict());
407        assert_eq!(0, cache.retain_latest(100, true));
408        assert!(cache.get(&"a".into()).is_none());
409
410        cache.insert("a".to_string(), "A".to_string()).unwrap();
411        assert_eq!(cache.get(&"a".into()), Some("A".to_string()).as_ref());
412        assert_eq!(cache.len(), 1);
413        std::thread::sleep(Duration::from_millis(200));
414        assert_eq!(1, cache.evict());
415        assert!(cache.get(&"a".into()).is_none());
416        assert_eq!(cache.len(), 0);
417
418        cache.insert("a".to_string(), "A".to_string()).unwrap();
419        assert_eq!(cache.get(&"a".into()), Some("A".to_string()).as_ref());
420        assert_eq!(cache.len(), 1);
421        std::thread::sleep(Duration::from_millis(200));
422        assert_eq!(0, cache.retain_latest(1, false));
423        // expired
424        assert_eq!(cache.get(&"a".into()), None);
425        // in size until eviction
426        assert_eq!(cache.len(), 1);
427        assert_eq!(1, cache.retain_latest(1, true));
428        assert!(cache.get(&"a".into()).is_none());
429        assert_eq!(cache.len(), 0);
430
431        cache.insert("a".to_string(), "a".to_string()).unwrap();
432        cache.insert("b".to_string(), "b".to_string()).unwrap();
433        cache.insert("c".to_string(), "c".to_string()).unwrap();
434        cache.insert("d".to_string(), "d".to_string()).unwrap();
435        cache.insert("e".to_string(), "e".to_string()).unwrap();
436        assert_eq!(3, cache.retain_latest(2, false));
437        assert_eq!(2, cache.len());
438        assert_eq!(cache.get(&"a".into()), None);
439        assert_eq!(cache.get(&"b".into()), None);
440        assert_eq!(cache.get(&"c".into()), None);
441        assert_eq!(cache.get(&"d".into()), Some("d".to_string()).as_ref());
442        assert_eq!(cache.get(&"e".into()), Some("e".to_string()).as_ref());
443
444        cache.insert("a".to_string(), "a".to_string()).unwrap();
445        cache.insert("a".to_string(), "a".to_string()).unwrap();
446        cache.insert("b".to_string(), "b".to_string()).unwrap();
447        cache.insert("b".to_string(), "b".to_string()).unwrap();
448        assert_eq!(4, cache.len());
449
450        assert_eq!(2, cache.retain_latest(2, false));
451        assert_eq!(cache.get(&"d".into()), None);
452        assert_eq!(cache.get(&"e".into()), None);
453        assert_eq!(cache.get(&"a".into()), Some("a".to_string()).as_ref());
454        assert_eq!(cache.get(&"b".into()), Some("b".to_string()).as_ref());
455        assert_eq!(2, cache.len());
456
457        std::thread::sleep(Duration::from_millis(200));
458        assert_eq!(cache.remove(&"a".into()), None);
459        // trying to get something expired will expire values
460        assert_eq!(1, cache.len());
461
462        cache.insert("a".to_string(), "a".to_string()).unwrap();
463        assert_eq!(cache.remove(&"a".into()), Some("a".to_string()));
464        // we haven't done anything to evict "b" so there's still one entry
465        assert_eq!(1, cache.len());
466
467        assert_eq!(1, cache.evict());
468        assert_eq!(0, cache.len());
469
470        // default ttl is 100ms
471        cache
472            .insert_ttl("a".to_string(), "a".to_string(), 300)
473            .unwrap();
474        std::thread::sleep(Duration::from_millis(200));
475        assert_eq!(cache.get(&"a".into()), Some("a".to_string()).as_ref());
476        assert_eq!(1, cache.len());
477
478        std::thread::sleep(Duration::from_millis(200));
479        cache
480            .insert_ttl_evict("b".to_string(), "b".to_string(), Some(300), true)
481            .unwrap();
482        // a should now be evicted
483        assert_eq!(1, cache.len());
484        assert_eq!(cache.get_borrowed("a"), None);
485    }
486
487    #[test]
488    fn size_limit() {
489        let mut cache = ExpiringSizedCache::with_capacity(100, 100);
490        cache.size_limit(2);
491        assert_eq!(0, cache.evict());
492        assert_eq!(0, cache.retain_latest(100, true));
493        assert!(cache.get(&"a".into()).is_none());
494
495        cache.insert("a".to_string(), "A".to_string()).unwrap();
496        assert_eq!(cache.get(&"a".into()), Some("A".to_string()).as_ref());
497        assert_eq!(cache.len(), 1);
498        cache.insert("b".to_string(), "B".to_string()).unwrap();
499        assert_eq!(cache.get(&"b".into()), Some("B".to_string()).as_ref());
500        assert_eq!(cache.len(), 2);
501        cache.insert("c".to_string(), "C".to_string()).unwrap();
502        assert_eq!(cache.len(), 2);
503        assert_eq!(cache.get(&"b".into()), Some("B".to_string()).as_ref());
504        assert_eq!(cache.get(&"c".into()), Some("C".to_string()).as_ref());
505        assert_eq!(cache.get(&"a".into()), None);
506    }
507}