1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
//! Utility for tracking network requests that will be retried in the future.

use core::cmp::Ordering;
use std::collections::BinaryHeap;
use std::time::{Duration, Instant};

/// A tracker for network requests that have failed, and are awaiting to be
/// retried in the future.
pub struct SleepTracker<T> {
    /// This is a priority queue that tracks the time when the next sleeper
    /// should awaken (based on the [`Sleeper::wakeup`] property).
    heap: BinaryHeap<Sleeper<T>>,
}

/// An individual network request that is waiting to be retried in the future.
struct Sleeper<T> {
    /// The time when this requests should be retried.
    wakeup: Instant,
    /// Information about the network request.
    data: T,
}

impl<T> PartialEq for Sleeper<T> {
    fn eq(&self, other: &Sleeper<T>) -> bool {
        self.wakeup == other.wakeup
    }
}

impl<T> PartialOrd for Sleeper<T> {
    fn partial_cmp(&self, other: &Sleeper<T>) -> Option<Ordering> {
        // This reverses the comparison so that the BinaryHeap tracks the
        // entry with the *lowest* wakeup time.
        Some(other.wakeup.cmp(&self.wakeup))
    }
}

impl<T> Eq for Sleeper<T> {}

impl<T> Ord for Sleeper<T> {
    fn cmp(&self, other: &Sleeper<T>) -> Ordering {
        self.wakeup.cmp(&other.wakeup)
    }
}

impl<T> SleepTracker<T> {
    pub fn new() -> SleepTracker<T> {
        SleepTracker {
            heap: BinaryHeap::new(),
        }
    }

    /// Adds a new download that should be retried in the future.
    pub fn push(&mut self, sleep: u64, data: T) {
        self.heap.push(Sleeper {
            wakeup: Instant::now()
                .checked_add(Duration::from_millis(sleep))
                .expect("instant should not wrap"),
            data,
        });
    }

    pub fn len(&self) -> usize {
        self.heap.len()
    }

    /// Returns any downloads that are ready to go now.
    pub fn to_retry(&mut self) -> Vec<T> {
        let now = Instant::now();
        let mut result = Vec::new();
        while let Some(next) = self.heap.peek() {
            if next.wakeup < now {
                result.push(self.heap.pop().unwrap().data);
            } else {
                break;
            }
        }
        result
    }

    /// Returns the time when the next download is ready to go.
    ///
    /// Returns None if there are no sleepers remaining.
    pub fn time_to_next(&self) -> Option<Duration> {
        self.heap
            .peek()
            .map(|s| s.wakeup.saturating_duration_since(Instant::now()))
    }
}

#[test]
fn returns_in_order() {
    let mut s = SleepTracker::new();
    s.push(3, 3);
    s.push(1, 1);
    s.push(6, 6);
    s.push(5, 5);
    s.push(2, 2);
    s.push(10000, 10000);
    assert_eq!(s.len(), 6);
    std::thread::sleep(Duration::from_millis(100));
    assert_eq!(s.to_retry(), &[1, 2, 3, 5, 6]);
}