tokio_cron_scheduler/job/
cron_job.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#[cfg(not(feature = "has_bytes"))]
use crate::job::job_data::{JobStoredData, JobType};
#[cfg(feature = "has_bytes")]
use crate::job::job_data_prost::{JobStoredData, JobType};
use crate::job::{Job, JobToRunAsync};
use crate::{JobScheduler, JobSchedulerError, JobToRun};
use chrono::{DateTime, Utc};
use cron::Schedule;
use tokio::sync::oneshot::Receiver;
use tracing::error;
use uuid::Uuid;

pub struct CronJob {
    pub data: JobStoredData,
    pub run: Box<JobToRun>,
    pub run_async: Box<JobToRunAsync>,
    pub async_job: bool,
}

impl Job for CronJob {
    fn is_cron_job(&self) -> bool {
        true
    }

    fn schedule(&self) -> Option<Schedule> {
        self.data.schedule()
    }

    fn repeated_every(&self) -> Option<u64> {
        None
    }

    fn last_tick(&self) -> Option<DateTime<Utc>> {
        self.data.last_tick_utc()
    }

    fn set_last_tick(&mut self, tick: Option<DateTime<Utc>>) {
        self.data.set_last_tick(tick);
    }

    fn next_tick(&self) -> Option<DateTime<Utc>> {
        self.data.next_tick_utc()
    }

    fn set_next_tick(&mut self, tick: Option<DateTime<Utc>>) {
        self.data.set_next_tick(tick);
    }

    fn set_count(&mut self, count: u32) {
        self.data.count = count;
    }

    fn count(&self) -> u32 {
        self.data.count
    }

    fn increment_count(&mut self) {
        self.data.count = if self.data.count + 1 < u32::MAX {
            self.data.count + 1
        } else {
            0
        }; // Overflow check
    }

    fn job_id(&self) -> Uuid {
        self.data.id.as_ref().cloned().map(|e| e.into()).unwrap()
    }

    fn job_type(&self) -> JobType {
        JobType::Cron
    }

    fn ran(&self) -> bool {
        self.data.ran
    }

    fn set_ran(&mut self, ran: bool) {
        self.data.ran = ran;
    }

    fn stop(&self) -> bool {
        self.data.stopped
    }

    fn set_stopped(&mut self) {
        self.data.stopped = true;
    }

    fn set_started(&mut self) {
        self.data.stopped = false;
    }

    fn job_data_from_job(&mut self) -> Result<Option<JobStoredData>, JobSchedulerError> {
        Ok(Some(self.data.clone()))
    }

    fn set_job_data(&mut self, job_data: JobStoredData) -> Result<(), JobSchedulerError> {
        self.data = job_data;
        Ok(())
    }

    fn run(&mut self, jobs: JobScheduler) -> Receiver<bool> {
        let (tx, rx) = tokio::sync::oneshot::channel();
        let job_id = self.job_id();

        if !self.async_job {
            (self.run)(job_id, jobs);
            if let Err(e) = tx.send(true) {
                error!("Error notifying done {:?}", e);
            }
        } else {
            let future = (self.run_async)(job_id, jobs);
            tokio::task::spawn(async move {
                future.await;
                if let Err(e) = tx.send(true) {
                    error!("Error notifying done {:?}", e);
                }
            });
        }
        rx
    }
}