Goit

Simple and lightweight Git web server
git clone http://git.omkov.net/Goit
Log | Tree | Refs | README | Download

Goit/src/cron/cron.go (205 lines, 3.7 KiB) -rw-r--r-- file download

570144e Jakob Wakeling 2023-12-15 23:28:06
0
// Copyright (C) 2023, Jakob Wakeling
570144e Jakob Wakeling 2023-12-15 23:28:06
1
// All rights reserved.
570144e Jakob Wakeling 2023-12-15 23:28:06
2
570144e Jakob Wakeling 2023-12-15 23:28:06
3
package cron
570144e Jakob Wakeling 2023-12-15 23:28:06
4
570144e Jakob Wakeling 2023-12-15 23:28:06
5
import (
570144e Jakob Wakeling 2023-12-15 23:28:06
6
	"log"
570144e Jakob Wakeling 2023-12-15 23:28:06
7
	"slices"
570144e Jakob Wakeling 2023-12-15 23:28:06
8
	"sync"
1547a71 Jakob Wakeling 2023-12-16 15:24:55
9
	"sync/atomic"
570144e Jakob Wakeling 2023-12-15 23:28:06
10
	"time"
570144e Jakob Wakeling 2023-12-15 23:28:06
11
570144e Jakob Wakeling 2023-12-15 23:28:06
12
	"github.com/Jamozed/Goit/src/util"
570144e Jakob Wakeling 2023-12-15 23:28:06
13
)
570144e Jakob Wakeling 2023-12-15 23:28:06
14
570144e Jakob Wakeling 2023-12-15 23:28:06
15
type Cron struct {
570144e Jakob Wakeling 2023-12-15 23:28:06
16
	jobs    []Job
570144e Jakob Wakeling 2023-12-15 23:28:06
17
	stop    chan struct{}
570144e Jakob Wakeling 2023-12-15 23:28:06
18
	update  chan struct{}
1547a71 Jakob Wakeling 2023-12-16 15:24:55
19
	running atomic.Bool
570144e Jakob Wakeling 2023-12-15 23:28:06
20
	mutex   sync.Mutex
570144e Jakob Wakeling 2023-12-15 23:28:06
21
	lastId  uint64
570144e Jakob Wakeling 2023-12-15 23:28:06
22
	waiter  sync.WaitGroup
570144e Jakob Wakeling 2023-12-15 23:28:06
23
}
570144e Jakob Wakeling 2023-12-15 23:28:06
24
570144e Jakob Wakeling 2023-12-15 23:28:06
25
type Job struct {
594ec41 Jakob Wakeling 2023-12-29 15:18:46
26
	Id         uint64
594ec41 Jakob Wakeling 2023-12-29 15:18:46
27
	Rid        int64
594ec41 Jakob Wakeling 2023-12-29 15:18:46
28
	Schedule   Schedule
594ec41 Jakob Wakeling 2023-12-29 15:18:46
29
	Next, Last time.Time
594ec41 Jakob Wakeling 2023-12-29 15:18:46
30
	fn         func()
570144e Jakob Wakeling 2023-12-15 23:28:06
31
}
570144e Jakob Wakeling 2023-12-15 23:28:06
32
570144e Jakob Wakeling 2023-12-15 23:28:06
33
const maxDuration time.Duration = 1<<63 - 1
570144e Jakob Wakeling 2023-12-15 23:28:06
34
570144e Jakob Wakeling 2023-12-15 23:28:06
35
func New() *Cron {
570144e Jakob Wakeling 2023-12-15 23:28:06
36
	return &Cron{
570144e Jakob Wakeling 2023-12-15 23:28:06
37
		jobs:   []Job{},
570144e Jakob Wakeling 2023-12-15 23:28:06
38
		stop:   make(chan struct{}),
570144e Jakob Wakeling 2023-12-15 23:28:06
39
		update: make(chan struct{}),
570144e Jakob Wakeling 2023-12-15 23:28:06
40
	}
570144e Jakob Wakeling 2023-12-15 23:28:06
41
}
570144e Jakob Wakeling 2023-12-15 23:28:06
42
570144e Jakob Wakeling 2023-12-15 23:28:06
43
func (c *Cron) Start() {
570144e Jakob Wakeling 2023-12-15 23:28:06
44
	c.mutex.Lock()
570144e Jakob Wakeling 2023-12-15 23:28:06
45
	util.Debugln("[cron.Start] Cron mutex lock")
570144e Jakob Wakeling 2023-12-15 23:28:06
46
	defer c.mutex.Unlock()
570144e Jakob Wakeling 2023-12-15 23:28:06
47
	defer util.Debugln("[cron.Start] Cron mutex unlock")
570144e Jakob Wakeling 2023-12-15 23:28:06
48
1547a71 Jakob Wakeling 2023-12-16 15:24:55
49
	if !c.running.CompareAndSwap(false, true) {
570144e Jakob Wakeling 2023-12-15 23:28:06
50
		return
570144e Jakob Wakeling 2023-12-15 23:28:06
51
	}
570144e Jakob Wakeling 2023-12-15 23:28:06
52
32e8b4d Jakob Wakeling 2024-07-07 17:50:40
53
	for i, job := range c.jobs {
32e8b4d Jakob Wakeling 2024-07-07 17:50:40
54
		c.jobs[i].Next = job.Schedule.Next(time.Now().UTC())
570144e Jakob Wakeling 2023-12-15 23:28:06
55
	}
570144e Jakob Wakeling 2023-12-15 23:28:06
56
570144e Jakob Wakeling 2023-12-15 23:28:06
57
	go func() {
570144e Jakob Wakeling 2023-12-15 23:28:06
58
		for {
570144e Jakob Wakeling 2023-12-15 23:28:06
59
			c.mutex.Lock()
570144e Jakob Wakeling 2023-12-15 23:28:06
60
			util.Debugln("[cron.run] Cron mutex lock")
570144e Jakob Wakeling 2023-12-15 23:28:06
61
570144e Jakob Wakeling 2023-12-15 23:28:06
62
			var timer *time.Timer
570144e Jakob Wakeling 2023-12-15 23:28:06
63
570144e Jakob Wakeling 2023-12-15 23:28:06
64
			if len(c.jobs) == 0 {
570144e Jakob Wakeling 2023-12-15 23:28:06
65
				timer = time.NewTimer(maxDuration)
570144e Jakob Wakeling 2023-12-15 23:28:06
66
			} else {
594ec41 Jakob Wakeling 2023-12-29 15:18:46
67
				timer = time.NewTimer(c.jobs[0].Next.Sub(time.Now().UTC()))
570144e Jakob Wakeling 2023-12-15 23:28:06
68
			}
570144e Jakob Wakeling 2023-12-15 23:28:06
69
570144e Jakob Wakeling 2023-12-15 23:28:06
70
			c.mutex.Unlock()
570144e Jakob Wakeling 2023-12-15 23:28:06
71
			util.Debugln("[cron.run] Cron mutex unlock")
570144e Jakob Wakeling 2023-12-15 23:28:06
72
570144e Jakob Wakeling 2023-12-15 23:28:06
73
			select {
570144e Jakob Wakeling 2023-12-15 23:28:06
74
			case now := <-timer.C:
570144e Jakob Wakeling 2023-12-15 23:28:06
75
				now = now.UTC()
570144e Jakob Wakeling 2023-12-15 23:28:06
76
				log.Println("[cron] timer expired")
570144e Jakob Wakeling 2023-12-15 23:28:06
77
570144e Jakob Wakeling 2023-12-15 23:28:06
78
				c.mutex.Lock()
570144e Jakob Wakeling 2023-12-15 23:28:06
79
				util.Debugln("[cron.now] Cron mutex lock")
570144e Jakob Wakeling 2023-12-15 23:28:06
80
570144e Jakob Wakeling 2023-12-15 23:28:06
81
				tmp := c.jobs[:0]
570144e Jakob Wakeling 2023-12-15 23:28:06
82
				for _, job := range c.jobs {
594ec41 Jakob Wakeling 2023-12-29 15:18:46
83
					if job.Next.After(now) || job.Next.IsZero() {
b43fa87 Jakob Wakeling 2023-12-18 23:56:42
84
						tmp = append(tmp, job)
b43fa87 Jakob Wakeling 2023-12-18 23:56:42
85
						continue
570144e Jakob Wakeling 2023-12-15 23:28:06
86
					}
570144e Jakob Wakeling 2023-12-15 23:28:06
87
594ec41 Jakob Wakeling 2023-12-29 15:18:46
88
					log.Println("[cron] running job", job.Id, job.Rid)
570144e Jakob Wakeling 2023-12-15 23:28:06
89
1547a71 Jakob Wakeling 2023-12-16 15:24:55
90
					j := job
570144e Jakob Wakeling 2023-12-15 23:28:06
91
					c.waiter.Add(1)
570144e Jakob Wakeling 2023-12-15 23:28:06
92
					go func() {
570144e Jakob Wakeling 2023-12-15 23:28:06
93
						defer c.waiter.Done()
1547a71 Jakob Wakeling 2023-12-16 15:24:55
94
						j.fn()
570144e Jakob Wakeling 2023-12-15 23:28:06
95
					}()
570144e Jakob Wakeling 2023-12-15 23:28:06
96
594ec41 Jakob Wakeling 2023-12-29 15:18:46
97
					if !job.Schedule.IsImmediate() {
594ec41 Jakob Wakeling 2023-12-29 15:18:46
98
						job.Next = job.Schedule.Next(now)
594ec41 Jakob Wakeling 2023-12-29 15:18:46
99
						job.Last = now
570144e Jakob Wakeling 2023-12-15 23:28:06
100
						tmp = append(tmp, job)
570144e Jakob Wakeling 2023-12-15 23:28:06
101
					}
570144e Jakob Wakeling 2023-12-15 23:28:06
102
				}
570144e Jakob Wakeling 2023-12-15 23:28:06
103
570144e Jakob Wakeling 2023-12-15 23:28:06
104
				c.jobs = tmp
570144e Jakob Wakeling 2023-12-15 23:28:06
105
570144e Jakob Wakeling 2023-12-15 23:28:06
106
				util.Debugln("[cron.now] Cron mutex unlock")
1547a71 Jakob Wakeling 2023-12-16 15:24:55
107
				c.mutex.Unlock()
570144e Jakob Wakeling 2023-12-15 23:28:06
108
570144e Jakob Wakeling 2023-12-15 23:28:06
109
				c._update()
570144e Jakob Wakeling 2023-12-15 23:28:06
110
570144e Jakob Wakeling 2023-12-15 23:28:06
111
			case <-c.stop:
570144e Jakob Wakeling 2023-12-15 23:28:06
112
				timer.Stop()
570144e Jakob Wakeling 2023-12-15 23:28:06
113
570144e Jakob Wakeling 2023-12-15 23:28:06
114
				c.mutex.Lock()
570144e Jakob Wakeling 2023-12-15 23:28:06
115
				util.Debugln("[cron.stop] Cron mutex lock")
1547a71 Jakob Wakeling 2023-12-16 15:24:55
116
570144e Jakob Wakeling 2023-12-15 23:28:06
117
				c.waiter.Wait()
1547a71 Jakob Wakeling 2023-12-16 15:24:55
118
				c.running.Store(false)
1547a71 Jakob Wakeling 2023-12-16 15:24:55
119
570144e Jakob Wakeling 2023-12-15 23:28:06
120
				util.Debugln("[cron.stop] Cron mutex unlock")
1547a71 Jakob Wakeling 2023-12-16 15:24:55
121
				c.mutex.Unlock()
570144e Jakob Wakeling 2023-12-15 23:28:06
122
570144e Jakob Wakeling 2023-12-15 23:28:06
123
				return
570144e Jakob Wakeling 2023-12-15 23:28:06
124
570144e Jakob Wakeling 2023-12-15 23:28:06
125
			case <-c.update:
570144e Jakob Wakeling 2023-12-15 23:28:06
126
				c._update()
570144e Jakob Wakeling 2023-12-15 23:28:06
127
			}
570144e Jakob Wakeling 2023-12-15 23:28:06
128
		}
570144e Jakob Wakeling 2023-12-15 23:28:06
129
	}()
570144e Jakob Wakeling 2023-12-15 23:28:06
130
}
570144e Jakob Wakeling 2023-12-15 23:28:06
131
570144e Jakob Wakeling 2023-12-15 23:28:06
132
func (c *Cron) Stop() {
1547a71 Jakob Wakeling 2023-12-16 15:24:55
133
	if !c.running.Load() {
570144e Jakob Wakeling 2023-12-15 23:28:06
134
		return
570144e Jakob Wakeling 2023-12-15 23:28:06
135
	}
570144e Jakob Wakeling 2023-12-15 23:28:06
136
570144e Jakob Wakeling 2023-12-15 23:28:06
137
	close(c.stop)
570144e Jakob Wakeling 2023-12-15 23:28:06
138
}
570144e Jakob Wakeling 2023-12-15 23:28:06
139
570144e Jakob Wakeling 2023-12-15 23:28:06
140
func (c *Cron) Update() {
1547a71 Jakob Wakeling 2023-12-16 15:24:55
141
	if !c.running.Load() {
570144e Jakob Wakeling 2023-12-15 23:28:06
142
		return
570144e Jakob Wakeling 2023-12-15 23:28:06
143
	}
570144e Jakob Wakeling 2023-12-15 23:28:06
144
570144e Jakob Wakeling 2023-12-15 23:28:06
145
	c.update <- struct{}{}
570144e Jakob Wakeling 2023-12-15 23:28:06
146
}
570144e Jakob Wakeling 2023-12-15 23:28:06
147
570144e Jakob Wakeling 2023-12-15 23:28:06
148
func (c *Cron) _update() {
570144e Jakob Wakeling 2023-12-15 23:28:06
149
	c.mutex.Lock()
570144e Jakob Wakeling 2023-12-15 23:28:06
150
	util.Debugln("[cron.Update] Cron mutex lock")
570144e Jakob Wakeling 2023-12-15 23:28:06
151
	defer c.mutex.Unlock()
570144e Jakob Wakeling 2023-12-15 23:28:06
152
	defer util.Debugln("[cron.Update] Cron mutex unlock")
570144e Jakob Wakeling 2023-12-15 23:28:06
153
570144e Jakob Wakeling 2023-12-15 23:28:06
154
	now := time.Now().UTC()
570144e Jakob Wakeling 2023-12-15 23:28:06
155
	slices.SortFunc(c.jobs, func(a, b Job) int {
594ec41 Jakob Wakeling 2023-12-29 15:18:46
156
		return a.Schedule.Next(now).Compare(b.Schedule.Next(now))
570144e Jakob Wakeling 2023-12-15 23:28:06
157
	})
594ec41 Jakob Wakeling 2023-12-29 15:18:46
158
}
594ec41 Jakob Wakeling 2023-12-29 15:18:46
159
594ec41 Jakob Wakeling 2023-12-29 15:18:46
160
func (c *Cron) Jobs() []Job {
594ec41 Jakob Wakeling 2023-12-29 15:18:46
161
	c.mutex.Lock()
594ec41 Jakob Wakeling 2023-12-29 15:18:46
162
	util.Debugln("[cron.Jobs] Cron mutex lock")
594ec41 Jakob Wakeling 2023-12-29 15:18:46
163
	defer c.mutex.Unlock()
594ec41 Jakob Wakeling 2023-12-29 15:18:46
164
	defer util.Debugln("[cron.Jobs] Cron mutex unlock")
594ec41 Jakob Wakeling 2023-12-29 15:18:46
165
594ec41 Jakob Wakeling 2023-12-29 15:18:46
166
	jobs := make([]Job, len(c.jobs))
594ec41 Jakob Wakeling 2023-12-29 15:18:46
167
	copy(jobs, c.jobs)
594ec41 Jakob Wakeling 2023-12-29 15:18:46
168
	return jobs
570144e Jakob Wakeling 2023-12-15 23:28:06
169
}
570144e Jakob Wakeling 2023-12-15 23:28:06
170
1547a71 Jakob Wakeling 2023-12-16 15:24:55
171
func (c *Cron) Add(rid int64, schedule Schedule, fn func()) uint64 {
570144e Jakob Wakeling 2023-12-15 23:28:06
172
	c.mutex.Lock()
570144e Jakob Wakeling 2023-12-15 23:28:06
173
	util.Debugln("[cron.Add] Cron mutex lock")
570144e Jakob Wakeling 2023-12-15 23:28:06
174
	defer c.mutex.Unlock()
570144e Jakob Wakeling 2023-12-15 23:28:06
175
	defer util.Debugln("[cron.Add] Cron mutex unlock")
570144e Jakob Wakeling 2023-12-15 23:28:06
176
570144e Jakob Wakeling 2023-12-15 23:28:06
177
	c.lastId += 1
570144e Jakob Wakeling 2023-12-15 23:28:06
178
594ec41 Jakob Wakeling 2023-12-29 15:18:46
179
	job := Job{Id: c.lastId, Rid: rid, Schedule: schedule, fn: fn}
594ec41 Jakob Wakeling 2023-12-29 15:18:46
180
	job.Next = job.Schedule.Next(time.Now().UTC())
570144e Jakob Wakeling 2023-12-15 23:28:06
181
	c.jobs = append(c.jobs, job)
570144e Jakob Wakeling 2023-12-15 23:28:06
182
594ec41 Jakob Wakeling 2023-12-29 15:18:46
183
	log.Println("[cron] added job", job.Id, "for", job.Rid)
594ec41 Jakob Wakeling 2023-12-29 15:18:46
184
	return job.Id
570144e Jakob Wakeling 2023-12-15 23:28:06
185
}
1547a71 Jakob Wakeling 2023-12-16 15:24:55
186
1547a71 Jakob Wakeling 2023-12-16 15:24:55
187
func (c *Cron) RemoveFor(rid int64) {
1547a71 Jakob Wakeling 2023-12-16 15:24:55
188
	c.mutex.Lock()
1547a71 Jakob Wakeling 2023-12-16 15:24:55
189
	util.Debugln("[cron.RemoveFor] Cron mutex lock")
1547a71 Jakob Wakeling 2023-12-16 15:24:55
190
	defer c.mutex.Unlock()
1547a71 Jakob Wakeling 2023-12-16 15:24:55
191
	defer util.Debugln("[cron.RemoveFor] Cron mutex unlock")
1547a71 Jakob Wakeling 2023-12-16 15:24:55
192
1547a71 Jakob Wakeling 2023-12-16 15:24:55
193
	tmp := c.jobs[:0]
1547a71 Jakob Wakeling 2023-12-16 15:24:55
194
	for _, job := range c.jobs {
594ec41 Jakob Wakeling 2023-12-29 15:18:46
195
		if job.Rid != rid {
1547a71 Jakob Wakeling 2023-12-16 15:24:55
196
			tmp = append(tmp, job)
1547a71 Jakob Wakeling 2023-12-16 15:24:55
197
		} else {
594ec41 Jakob Wakeling 2023-12-29 15:18:46
198
			log.Println("[cron] removing job", job.Id, "for", job.Rid)
1547a71 Jakob Wakeling 2023-12-16 15:24:55
199
		}
1547a71 Jakob Wakeling 2023-12-16 15:24:55
200
	}
1547a71 Jakob Wakeling 2023-12-16 15:24:55
201
1547a71 Jakob Wakeling 2023-12-16 15:24:55
202
	c.jobs = tmp
1547a71 Jakob Wakeling 2023-12-16 15:24:55
203
}
204