Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add schedule publish #168

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,16 @@ Note that `delivery.Push()` has the same affect as `delivery.Reject()` if the
queue has no push queue set up. So in our example above, if the delivery fails
in the consumer on `pushQ2`, then the `Push()` call will reject the delivery.

### Schedule Publish

If you want to publish a task after a given time(with a maximum delay of no more than 2 hours),you can call `queue.SchedulePublish`

```go
things, err := connection.OpenQueue("things")
things.SchedulePublish("abc", 180) //publish a task after 180s
```


### Stop Consuming

If you want to stop consuming from the queue, you can call `StopConsuming()`:
Expand Down
10 changes: 6 additions & 4 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ type redisConnection struct {
unackedTemplate string
readyTemplate string
rejectedTemplate string

redisClient RedisClient
errChan chan<- error
heartbeatStop chan chan struct{} // used to stop heartbeat() in stopHeartbeat(), nil once stopped
scheduleTemplate string
redisClient RedisClient
errChan chan<- error
heartbeatStop chan chan struct{} // used to stop heartbeat() in stopHeartbeat(), nil once stopped

lock sync.Mutex
stopped bool
Expand Down Expand Up @@ -110,6 +110,7 @@ func openConnection(tag string, redisClient RedisClient, useRedisHashTags bool,
unackedTemplate: getTemplate(connectionQueueUnackedBaseTemplate, useRedisHashTags),
readyTemplate: getTemplate(queueReadyBaseTemplate, useRedisHashTags),
rejectedTemplate: getTemplate(queueRejectedBaseTemplate, useRedisHashTags),
scheduleTemplate: getTemplate(queueScheduleBaseTemplate, useRedisHashTags),
redisClient: redisClient,
errChan: errChan,
heartbeatStop: make(chan chan struct{}, 1), // mark heartbeat as active, can be stopped
Expand Down Expand Up @@ -328,6 +329,7 @@ func (connection *redisConnection) openQueue(name string) Queue {
connection.unackedTemplate,
connection.readyTemplate,
connection.rejectedTemplate,
connection.scheduleTemplate,
connection.redisClient,
connection.errChan,
)
Expand Down
13 changes: 13 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,16 @@ func (e *DeliveryError) Error() string {
func (e *DeliveryError) Unwrap() error {
return e.RedisErr
}

type EnqueuingError struct {
RedisErr error
Count int // number of consecutive errors
}

func (e *EnqueuingError) Error() string {
return fmt.Sprintf("rmq.EnqueuingError (%d): %s", e.Count, e.RedisErr.Error())
}

func (e *EnqueuingError) Unwrap() error {
return e.RedisErr
}
76 changes: 75 additions & 1 deletion queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"
"sync"
"time"

"github.com/redis/go-redis/v9"
)

const (
Expand All @@ -17,6 +19,7 @@ const (
type Queue interface {
Publish(payload ...string) error
PublishBytes(payload ...[]byte) error
SchedulePublish(payload string, publishDelay uint64) error
SetPushQueue(pushQueue Queue)
Remove(payload string, count int64, removeFromRejected bool) error
RemoveBytes(payload []byte, count int64, removeFromRejected bool) error
Expand Down Expand Up @@ -51,6 +54,7 @@ type redisQueue struct {
unackedKey string // key to list of currently consuming deliveries
readyKey string // key to list of ready deliveries
rejectedKey string // key to list of rejected deliveries
scheduleKey string // key to list of schedule deliveries
pushKey string // key to list of pushed deliveries
redisClient RedisClient
errChan chan<- error
Expand All @@ -67,7 +71,7 @@ type redisQueue struct {

func newQueue(
name, connectionName, queuesKey string,
consumersTemplate, unackedTemplate, readyTemplate, rejectedTemplate string,
consumersTemplate, unackedTemplate, readyTemplate, rejectedTemplate, scheduleTemplate string,
redisClient RedisClient,
errChan chan<- error,
) *redisQueue {
Expand All @@ -80,6 +84,7 @@ func newQueue(

readyKey := strings.Replace(readyTemplate, phQueue, name, 1)
rejectedKey := strings.Replace(rejectedTemplate, phQueue, name, 1)
scheduleKey := strings.Replace(scheduleTemplate, phQueue, name, 1)

consumingStopped := make(chan struct{})
ackCtx, ackCancel := context.WithCancel(context.Background())
Expand All @@ -92,6 +97,7 @@ func newQueue(
unackedKey: unackedKey,
readyKey: readyKey,
rejectedKey: rejectedKey,
scheduleKey: scheduleKey,
redisClient: redisClient,
errChan: errChan,
consumingStopped: consumingStopped,
Expand Down Expand Up @@ -172,6 +178,7 @@ func (queue *redisQueue) StartConsuming(prefetchLimit int64, pollDuration time.D
queue.deliveryChan = make(chan Delivery, prefetchLimit)
// log.Printf("rmq queue started consuming %s %d %s", queue, prefetchLimit, pollDuration)
go queue.consume()
go queue.enqueueSchedule()
return nil
}

Expand Down Expand Up @@ -454,6 +461,15 @@ func (queue *redisQueue) PurgeRejected() (int64, error) {
return queue.deleteRedisList(queue.rejectedKey)
}

// SchedulePublish publishes a task after a given time (in seconds), with a maximum delay of no more than 2 hours
func (queue *redisQueue) SchedulePublish(payload string, delay uint64) error {
if delay > 7200 {
delay = 7200
}
_, err := queue.redisClient.ZAdd(queue.scheduleKey, redis.Z{Score: float64(time.Now().Unix()) + float64(delay), Member: payload})
return err
}

// return number of deleted list items
// https://www.redisgreen.net/blog/deleting-large-lists
func (queue *redisQueue) deleteRedisList(key string) (int64, error) {
Expand Down Expand Up @@ -603,3 +619,61 @@ func jitteredDuration(duration time.Duration) time.Duration {
factor := 0.9 + rand.Float64()*0.2 // a jitter factor between 0.9 and 1.1 (+-10%)
return time.Duration(float64(duration) * factor)
}

func (queue *redisQueue) enqueueSchedule() error {
errorCount := 0 //number of consecutive errors

for {
select {
case <-queue.consumingStopped:
return ErrorConsumingStopped
default:
}

now := time.Now().Unix()
result, err := queue.redisClient.ZRangeByScore(queue.scheduleKey, &redis.ZRangeBy{
Min: "-inf",
Max: fmt.Sprintf("%d", now),
})

if err != nil {
errorCount++

select { // try to add error to channel, but don't block
case queue.errChan <- &EnqueuingError{RedisErr: err, Count: errorCount}:
default:
}

continue
} else { //success
errorCount = 0
}

if len(result) > 0 {
err := queue.redisClient.TxPipelined(func(pipe redis.Pipeliner) error {
for _, val := range result {
_, err := pipe.LPush(context.TODO(), queue.readyKey, val).Result()
if err != nil {
return err
}
_, err = pipe.ZRem(context.TODO(), queue.scheduleKey, val).Result()
if err != nil {
return err
}
}
return nil
})
if err != nil {
errorCount++
select { // try to add error to channel, but don't block
case queue.errChan <- &EnqueuingError{RedisErr: err, Count: errorCount}:
default:
}
continue
} else { //success
errorCount = 0
}
}
time.Sleep(jitteredDuration(1000))
}
}
11 changes: 10 additions & 1 deletion redis_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package rmq

import "time"
import (
"time"

"github.com/redis/go-redis/v9"
)

type RedisClient interface {
// simple keys
Expand All @@ -20,6 +24,11 @@ type RedisClient interface {
SAdd(key, value string) (total int64, err error)
SMembers(key string) (members []string, err error)
SRem(key, value string) (affected int64, err error)
ZAdd(key string, members ...redis.Z) (total int64, err error)
ZRangeByScore(key string, opt *redis.ZRangeBy) (result []string, err error)

// tx
TxPipelined(fn func(pipe redis.Pipeliner) error) error

// special
FlushDb() error
Expand Down
1 change: 1 addition & 0 deletions redis_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
queuesKey = "rmq::queues" // Set of all open queues
queueReadyBaseTemplate = "rmq::queue::[{queue}]::ready" // List of deliveries in that {queue} (right is first and oldest, left is last and youngest)
queueRejectedBaseTemplate = "rmq::queue::[{queue}]::rejected" // List of rejected deliveries from that {queue}
queueScheduleBaseTemplate = "rmq::queue::[{queue}]::schedule" // List of schedule deliveries

phConnection = "{connection}" // connection name
phQueue = "{queue}" // queue name
Expand Down
13 changes: 13 additions & 0 deletions redis_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ func (wrapper RedisWrapper) SRem(key, value string) (affected int64, err error)
return wrapper.rawClient.SRem(context.TODO(), key, value).Result()
}

func (wrapper RedisWrapper) ZAdd(key string, members ...redis.Z) (affected int64, err error) {
return wrapper.rawClient.ZAdd(context.TODO(), key, members...).Result()
}

func (wrapper RedisWrapper) ZRangeByScore(key string, opt *redis.ZRangeBy) (result []string, err error) {
return wrapper.rawClient.ZRangeByScore(context.TODO(), key, opt).Result()
}

func (wrapper RedisWrapper) TxPipelined(fn func(pipe redis.Pipeliner) error) error {
_, err := wrapper.rawClient.TxPipelined(context.TODO(), fn)
return err
}

func (wrapper RedisWrapper) FlushDb() error {
// NOTE: using Err() here because Result() string is always "OK"
return wrapper.rawClient.FlushDB(context.TODO()).Err()
Expand Down
1 change: 1 addition & 0 deletions test_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (queue *TestQueue) RemoveBytes(payload []byte, count int64, removeFromRejec
}

func (*TestQueue) SetPushQueue(Queue) { panic(errorNotSupported) }
func (*TestQueue) SchedulePublish(string, uint64) error { panic(errorNotSupported) }
func (*TestQueue) StartConsuming(int64, time.Duration) error { panic(errorNotSupported) }
func (*TestQueue) StopConsuming() <-chan struct{} { panic(errorNotSupported) }
func (*TestQueue) AddConsumer(string, Consumer) (string, error) { panic(errorNotSupported) }
Expand Down
12 changes: 12 additions & 0 deletions test_redis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,3 +433,15 @@ func (client *TestRedisClient) findList(key string) ([]string, error) {
// return an empty list if not found
return []string{}, nil
}

func (client *TestRedisClient) TxPipelined(fn func(pipe redis.Pipeliner) error) error {
panic(errorNotSupported)
}

func (client *TestRedisClient) ZAdd(key string, members ...redis.Z) (total int64, err error) {
panic(errorNotSupported)
}

func (client *TestRedisClient) ZRangeByScore(key string, opt *redis.ZRangeBy) (result []string, err error) {
panic(errorNotSupported)
}
Loading