Goit

Simple and lightweight Git web server
git clone http://git.omkov.net/Goit
Log | Tree | Refs | README | Download

Goit/src/cron/cron.go (205 lines, 3.7 KiB) -rw-r--r-- blame download


// Copyright (C) 2023, Jakob Wakeling
// All rights reserved.

package cron

import (
	"log"
	"slices"
	"sync"
	"sync/atomic"
	"time"

	"github.com/Jamozed/Goit/src/util"
)

type Cron struct {
	jobs    []Job
	stop    chan struct{}
	update  chan struct{}
	running atomic.Bool
	mutex   sync.Mutex
	lastId  uint64
	waiter  sync.WaitGroup
}

type Job struct {
	Id         uint64
	Rid        int64
	Schedule   Schedule
	Next, Last time.Time
	fn         func()
}

const maxDuration time.Duration = 1<<63 - 1

func New() *Cron {
	return &Cron{
		jobs:   []Job{},
		stop:   make(chan struct{}),
		update: make(chan struct{}),
	}
}

func (c *Cron) Start() {
	c.mutex.Lock()
	util.Debugln("[cron.Start] Cron mutex lock")
	defer c.mutex.Unlock()
	defer util.Debugln("[cron.Start] Cron mutex unlock")

	if !c.running.CompareAndSwap(false, true) {
		return
	}

	for i, job := range c.jobs {
		c.jobs[i].Next = job.Schedule.Next(time.Now().UTC())
	}

	go func() {
		for {
			c.mutex.Lock()
			util.Debugln("[cron.run] Cron mutex lock")

			var timer *time.Timer

			if len(c.jobs) == 0 {
				timer = time.NewTimer(maxDuration)
			} else {
				timer = time.NewTimer(c.jobs[0].Next.Sub(time.Now().UTC()))
			}

			c.mutex.Unlock()
			util.Debugln("[cron.run] Cron mutex unlock")

			select {
			case now := <-timer.C:
				now = now.UTC()
				log.Println("[cron] timer expired")

				c.mutex.Lock()
				util.Debugln("[cron.now] Cron mutex lock")

				tmp := c.jobs[:0]
				for _, job := range c.jobs {
					if job.Next.After(now) || job.Next.IsZero() {
						tmp = append(tmp, job)
						continue
					}

					log.Println("[cron] running job", job.Id, job.Rid)

					j := job
					c.waiter.Add(1)
					go func() {
						defer c.waiter.Done()
						j.fn()
					}()

					if !job.Schedule.IsImmediate() {
						job.Next = job.Schedule.Next(now)
						job.Last = now
						tmp = append(tmp, job)
					}
				}

				c.jobs = tmp

				util.Debugln("[cron.now] Cron mutex unlock")
				c.mutex.Unlock()

				c._update()

			case <-c.stop:
				timer.Stop()

				c.mutex.Lock()
				util.Debugln("[cron.stop] Cron mutex lock")

				c.waiter.Wait()
				c.running.Store(false)

				util.Debugln("[cron.stop] Cron mutex unlock")
				c.mutex.Unlock()

				return

			case <-c.update:
				c._update()
			}
		}
	}()
}

func (c *Cron) Stop() {
	if !c.running.Load() {
		return
	}

	close(c.stop)
}

func (c *Cron) Update() {
	if !c.running.Load() {
		return
	}

	c.update <- struct{}{}
}

func (c *Cron) _update() {
	c.mutex.Lock()
	util.Debugln("[cron.Update] Cron mutex lock")
	defer c.mutex.Unlock()
	defer util.Debugln("[cron.Update] Cron mutex unlock")

	now := time.Now().UTC()
	slices.SortFunc(c.jobs, func(a, b Job) int {
		return a.Schedule.Next(now).Compare(b.Schedule.Next(now))
	})
}

func (c *Cron) Jobs() []Job {
	c.mutex.Lock()
	util.Debugln("[cron.Jobs] Cron mutex lock")
	defer c.mutex.Unlock()
	defer util.Debugln("[cron.Jobs] Cron mutex unlock")

	jobs := make([]Job, len(c.jobs))
	copy(jobs, c.jobs)
	return jobs
}

func (c *Cron) Add(rid int64, schedule Schedule, fn func()) uint64 {
	c.mutex.Lock()
	util.Debugln("[cron.Add] Cron mutex lock")
	defer c.mutex.Unlock()
	defer util.Debugln("[cron.Add] Cron mutex unlock")

	c.lastId += 1

	job := Job{Id: c.lastId, Rid: rid, Schedule: schedule, fn: fn}
	job.Next = job.Schedule.Next(time.Now().UTC())
	c.jobs = append(c.jobs, job)

	log.Println("[cron] added job", job.Id, "for", job.Rid)
	return job.Id
}

func (c *Cron) RemoveFor(rid int64) {
	c.mutex.Lock()
	util.Debugln("[cron.RemoveFor] Cron mutex lock")
	defer c.mutex.Unlock()
	defer util.Debugln("[cron.RemoveFor] Cron mutex unlock")

	tmp := c.jobs[:0]
	for _, job := range c.jobs {
		if job.Rid != rid {
			tmp = append(tmp, job)
		} else {
			log.Println("[cron] removing job", job.Id, "for", job.Rid)
		}
	}

	c.jobs = tmp
}