Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/addSchedulePublish #169

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,16 @@ Note that `delivery.Push()` has the same affect as `delivery.Reject()` if the
queue has no push queue set up. So in our example above, if the delivery fails
in the consumer on `pushQ2`, then the `Push()` call will reject the delivery.

### Schedule Publish

If you want to publish a task after a given time(with a maximum delay of no more than 2 hours),you can call `queue.SchedulePublish`

```go
things, err := connection.OpenQueue("things")
things.SchedulePublish("abc", 180) //publish a task after 180s
```


### Stop Consuming

If you want to stop consuming from the queue, you can call `StopConsuming()`:
Expand Down
10 changes: 6 additions & 4 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ type redisConnection struct {
unackedTemplate string
readyTemplate string
rejectedTemplate string

redisClient RedisClient
errChan chan<- error
heartbeatStop chan chan struct{} // used to stop heartbeat() in stopHeartbeat(), nil once stopped
scheduleTemplate string
redisClient RedisClient
errChan chan<- error
heartbeatStop chan chan struct{} // used to stop heartbeat() in stopHeartbeat(), nil once stopped

lock sync.Mutex
stopped bool
Expand Down Expand Up @@ -110,6 +110,7 @@ func openConnection(tag string, redisClient RedisClient, useRedisHashTags bool,
unackedTemplate: getTemplate(connectionQueueUnackedBaseTemplate, useRedisHashTags),
readyTemplate: getTemplate(queueReadyBaseTemplate, useRedisHashTags),
rejectedTemplate: getTemplate(queueRejectedBaseTemplate, useRedisHashTags),
scheduleTemplate: getTemplate(queueScheduleBaseTemplate, useRedisHashTags),
redisClient: redisClient,
errChan: errChan,
heartbeatStop: make(chan chan struct{}, 1), // mark heartbeat as active, can be stopped
Expand Down Expand Up @@ -328,6 +329,7 @@ func (connection *redisConnection) openQueue(name string) Queue {
connection.unackedTemplate,
connection.readyTemplate,
connection.rejectedTemplate,
connection.scheduleTemplate,
connection.redisClient,
connection.errChan,
)
Expand Down
13 changes: 13 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,16 @@
func (e *DeliveryError) Unwrap() error {
return e.RedisErr
}

type EnqueuingError struct {
RedisErr error
Count int // number of consecutive errors
}

func (e *EnqueuingError) Error() string {
return fmt.Sprintf("rmq.EnqueuingError (%d): %s", e.Count, e.RedisErr.Error())
}

Check notice on line 62 in errors.go

View workflow job for this annotation

GitHub Actions / test (1.19.x)

1 statement(s) are not covered by tests.

func (e *EnqueuingError) Unwrap() error {
return e.RedisErr
}

Check notice on line 66 in errors.go

View workflow job for this annotation

GitHub Actions / test (1.19.x)

1 statement(s) are not covered by tests.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/google/uuid v1.6.0
github.com/kr/pretty v0.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/yuin/gopher-lua v1.1.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand Down
92 changes: 91 additions & 1 deletion queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
"strings"
"sync"
"time"

"github.com/google/uuid"
"github.com/redis/go-redis/v9"
)

const (
Expand All @@ -17,6 +20,7 @@
type Queue interface {
Publish(payload ...string) error
PublishBytes(payload ...[]byte) error
SchedulePublish(payload string, publishDelay uint64) error
SetPushQueue(pushQueue Queue)
Remove(payload string, count int64, removeFromRejected bool) error
RemoveBytes(payload []byte, count int64, removeFromRejected bool) error
Expand Down Expand Up @@ -51,6 +55,7 @@
unackedKey string // key to list of currently consuming deliveries
readyKey string // key to list of ready deliveries
rejectedKey string // key to list of rejected deliveries
scheduleKey string // key to set of schedule deliveries
pushKey string // key to list of pushed deliveries
redisClient RedisClient
errChan chan<- error
Expand All @@ -67,7 +72,7 @@

func newQueue(
name, connectionName, queuesKey string,
consumersTemplate, unackedTemplate, readyTemplate, rejectedTemplate string,
consumersTemplate, unackedTemplate, readyTemplate, rejectedTemplate, scheduleTemplate string,
redisClient RedisClient,
errChan chan<- error,
) *redisQueue {
Expand All @@ -80,6 +85,7 @@

readyKey := strings.Replace(readyTemplate, phQueue, name, 1)
rejectedKey := strings.Replace(rejectedTemplate, phQueue, name, 1)
scheduleKey := strings.Replace(scheduleTemplate, phQueue, name, 1)

consumingStopped := make(chan struct{})
ackCtx, ackCancel := context.WithCancel(context.Background())
Expand All @@ -92,6 +98,7 @@
unackedKey: unackedKey,
readyKey: readyKey,
rejectedKey: rejectedKey,
scheduleKey: scheduleKey,
redisClient: redisClient,
errChan: errChan,
consumingStopped: consumingStopped,
Expand Down Expand Up @@ -172,6 +179,7 @@
queue.deliveryChan = make(chan Delivery, prefetchLimit)
// log.Printf("rmq queue started consuming %s %d %s", queue, prefetchLimit, pollDuration)
go queue.consume()
go queue.enqueueSchedule()
return nil
}

Expand Down Expand Up @@ -454,6 +462,20 @@
return queue.deleteRedisList(queue.rejectedKey)
}

// SchedulePublish publishes a task after a given time (in seconds), with a maximum delay of no more than 2 hours
func (queue *redisQueue) SchedulePublish(payload string, delay uint64) error {
if delay > 7200 {
delay = 7200
}

Check notice on line 469 in queue.go

View workflow job for this annotation

GitHub Actions / test (1.19.x)

2 statement(s) are not covered by tests.
// add uuid for payload as prefix , so that make every payload is unique to avoid overwriting when publishing same string value
uniquePayload := fmt.Sprintf("%s:%s", uuid.NewString(), payload)
_, err := queue.redisClient.ZAdd(queue.scheduleKey, redis.Z{
Score: float64(time.Now().Unix()) + float64(delay),
Member: uniquePayload,
})
return err

Check notice on line 476 in queue.go

View workflow job for this annotation

GitHub Actions / test (1.19.x)

3 statement(s) are not covered by tests.
}

// return number of deleted list items
// https://www.redisgreen.net/blog/deleting-large-lists
func (queue *redisQueue) deleteRedisList(key string) (int64, error) {
Expand Down Expand Up @@ -603,3 +625,71 @@
factor := 0.9 + rand.Float64()*0.2 // a jitter factor between 0.9 and 1.1 (+-10%)
return time.Duration(float64(duration) * factor)
}

// enqueueSchedule enqueues tasks that already reach the given time to ready list
func (queue *redisQueue) enqueueSchedule() error {
errorCount := 0 //number of consecutive errors

for {
select {
case <-queue.consumingStopped:
return ErrorConsumingStopped
default:
}

now := time.Now().Unix()
result, err := queue.redisClient.ZRangeByScore(queue.scheduleKey, &redis.ZRangeBy{
Min: "-inf",
Max: fmt.Sprintf("%d", now),
})

if err != nil {
errorCount++

select { // try to add error to channel, but don't block
case queue.errChan <- &EnqueuingError{RedisErr: err, Count: errorCount}:
default:

Check notice on line 651 in queue.go

View workflow job for this annotation

GitHub Actions / test (1.19.x)

2 statement(s) on lines 646:651 are not covered by tests.
}

continue

Check notice on line 654 in queue.go

View workflow job for this annotation

GitHub Actions / test (1.19.x)

1 statement(s) are not covered by tests.
} else { //success
errorCount = 0
}

if len(result) > 0 {
err := queue.redisClient.TxPipelined(func(pipe redis.Pipeliner) error {
for _, val := range result {

parts := strings.SplitN(val, ":", 2)

if len(parts) < 2 {
return fmt.Errorf("invalid payload format: %s", val)
}

Check notice on line 667 in queue.go

View workflow job for this annotation

GitHub Actions / test (1.19.x)

8 statement(s) on lines 659:667 are not covered by tests.

originalPayload := parts[1]

_, err := pipe.LPush(context.TODO(), queue.readyKey, originalPayload).Result()
if err != nil {
return err
}
_, err = pipe.ZRem(context.TODO(), queue.scheduleKey, val).Result()
if err != nil {
return err
}

Check notice on line 678 in queue.go

View workflow job for this annotation

GitHub Actions / test (1.19.x)

7 statement(s) are not covered by tests.
}
return nil

Check notice on line 680 in queue.go

View workflow job for this annotation

GitHub Actions / test (1.19.x)

1 statement(s) are not covered by tests.
})
if err != nil {
errorCount++
select { // try to add error to channel, but don't block
case queue.errChan <- &EnqueuingError{RedisErr: err, Count: errorCount}:
default:

Check notice on line 686 in queue.go

View workflow job for this annotation

GitHub Actions / test (1.19.x)

3 statement(s) are not covered by tests.
}
continue
} else { //success
errorCount = 0
}
}
time.Sleep(time.Second)
}
}
11 changes: 10 additions & 1 deletion redis_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package rmq

import "time"
import (
"time"

"github.com/redis/go-redis/v9"
)

type RedisClient interface {
// simple keys
Expand All @@ -20,6 +24,11 @@ type RedisClient interface {
SAdd(key, value string) (total int64, err error)
SMembers(key string) (members []string, err error)
SRem(key, value string) (affected int64, err error)
ZAdd(key string, members ...redis.Z) (total int64, err error)
ZRangeByScore(key string, opt *redis.ZRangeBy) (result []string, err error)

// tx
TxPipelined(fn func(pipe redis.Pipeliner) error) error

// special
FlushDb() error
Expand Down
1 change: 1 addition & 0 deletions redis_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
queuesKey = "rmq::queues" // Set of all open queues
queueReadyBaseTemplate = "rmq::queue::[{queue}]::ready" // List of deliveries in that {queue} (right is first and oldest, left is last and youngest)
queueRejectedBaseTemplate = "rmq::queue::[{queue}]::rejected" // List of rejected deliveries from that {queue}
queueScheduleBaseTemplate = "rmq::queue::[{queue}]::schedule" // List of schedule deliveries

phConnection = "{connection}" // connection name
phQueue = "{queue}" // queue name
Expand Down
13 changes: 13 additions & 0 deletions redis_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ func (wrapper RedisWrapper) SRem(key, value string) (affected int64, err error)
return wrapper.rawClient.SRem(context.TODO(), key, value).Result()
}

func (wrapper RedisWrapper) ZAdd(key string, members ...redis.Z) (affected int64, err error) {
return wrapper.rawClient.ZAdd(context.TODO(), key, members...).Result()
}

func (wrapper RedisWrapper) ZRangeByScore(key string, opt *redis.ZRangeBy) (result []string, err error) {
return wrapper.rawClient.ZRangeByScore(context.TODO(), key, opt).Result()
}

func (wrapper RedisWrapper) TxPipelined(fn func(pipe redis.Pipeliner) error) error {
_, err := wrapper.rawClient.TxPipelined(context.TODO(), fn)
return err
}

func (wrapper RedisWrapper) FlushDb() error {
// NOTE: using Err() here because Result() string is always "OK"
return wrapper.rawClient.FlushDB(context.TODO()).Err()
Expand Down
1 change: 1 addition & 0 deletions test_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (queue *TestQueue) RemoveBytes(payload []byte, count int64, removeFromRejec
}

func (*TestQueue) SetPushQueue(Queue) { panic(errorNotSupported) }
func (*TestQueue) SchedulePublish(string, uint64) error { panic(errorNotSupported) }
func (*TestQueue) StartConsuming(int64, time.Duration) error { panic(errorNotSupported) }
func (*TestQueue) StopConsuming() <-chan struct{} { panic(errorNotSupported) }
func (*TestQueue) AddConsumer(string, Consumer) (string, error) { panic(errorNotSupported) }
Expand Down
12 changes: 12 additions & 0 deletions test_redis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,3 +433,15 @@ func (client *TestRedisClient) findList(key string) ([]string, error) {
// return an empty list if not found
return []string{}, nil
}

func (client *TestRedisClient) TxPipelined(fn func(pipe redis.Pipeliner) error) error {
panic(errorNotSupported)
}

func (client *TestRedisClient) ZAdd(key string, members ...redis.Z) (total int64, err error) {
panic(errorNotSupported)
}

func (client *TestRedisClient) ZRangeByScore(key string, opt *redis.ZRangeBy) (result []string, err error) {
panic(errorNotSupported)
}
Loading