Skip to content

Commit 68af4ce

Browse files
authored
Merge pull request #93 from mutablelogic/v5
Various updates
2 parents 1510458 + e0bba95 commit 68af4ce

File tree

8 files changed

+121
-36
lines changed

8 files changed

+121
-36
lines changed

go.mod

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ go 1.23.5
55
require (
66
github.com/alecthomas/kong v1.10.0
77
github.com/djthorpe/go-marshaler v1.0.0
8-
github.com/djthorpe/go-pg v1.0.5
8+
github.com/djthorpe/go-pg v1.0.6
99
github.com/golang-jwt/jwt/v5 v5.2.2
1010
github.com/mutablelogic/go-client v1.0.12
1111
github.com/stretchr/testify v1.10.0
@@ -32,8 +32,6 @@ require (
3232
github.com/go-ole/go-ole v1.2.6 // indirect
3333
github.com/gogo/protobuf v1.3.2 // indirect
3434
github.com/google/uuid v1.6.0 // indirect
35-
github.com/hashicorp/errwrap v1.1.0 // indirect
36-
github.com/hashicorp/go-multierror v1.1.1 // indirect
3735
github.com/jackc/pgpassfile v1.0.0 // indirect
3836
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
3937
github.com/jackc/pgx/v5 v5.7.3 // indirect

go.sum

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5Qvfr
3232
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
3333
github.com/djthorpe/go-errors v1.0.3 h1:GZeMPkC1mx2vteXLI/gvxZS0Ee9zxzwD1mcYyKU5jD0=
3434
github.com/djthorpe/go-errors v1.0.3/go.mod h1:HtfrZnMd6HsX75Mtbv9Qcnn0BqOrrFArvCaj3RMnZhY=
35-
github.com/djthorpe/go-marshaler v0.0.15 h1:ZXq5YHCsbREbbYJtc0ie9hz7HJ7vIeeDlMbe7cGh0C0=
36-
github.com/djthorpe/go-marshaler v0.0.15/go.mod h1:xCXhTzj52UL3YStRsqUSfrKses7ofmfTXYQfVedn8Lw=
37-
github.com/djthorpe/go-pg v1.0.5 h1:UYCV5fSXOJEFTafem1wB57RK2J0V7Nr9nCquC5sO+ZE=
38-
github.com/djthorpe/go-pg v1.0.5/go.mod h1:XHl/w8+66Hs746nOYd+gdjqPImNuLVZ5UsXLI47rb4c=
35+
github.com/djthorpe/go-marshaler v1.0.0 h1:dLIjr2vXXUi8VF93LIQPHfBjW3USbtYjqKt+b2WTf2M=
36+
github.com/djthorpe/go-marshaler v1.0.0/go.mod h1:xCXhTzj52UL3YStRsqUSfrKses7ofmfTXYQfVedn8Lw=
37+
github.com/djthorpe/go-pg v1.0.6 h1:v/ZcMhtgULa301LPYyUEo7fJJwuseOaAdpCyKid0qbU=
38+
github.com/djthorpe/go-pg v1.0.6/go.mod h1:XHl/w8+66Hs746nOYd+gdjqPImNuLVZ5UsXLI47rb4c=
3939
github.com/docker/docker v27.1.1+incompatible h1:hO/M4MtV36kzKldqnA37IWhebRA+LnqqcqDja6kVaKY=
4040
github.com/docker/docker v27.1.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
4141
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
@@ -64,9 +64,6 @@ github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
6464
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms=
6565
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
6666
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
67-
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
68-
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
69-
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
7067
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
7168
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
7269
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=

pkg/httprouter/httprouter.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,18 +63,20 @@ func (r *router) Run(ctx context.Context) error {
6363
// HTTP ROUTER
6464

6565
// Register a function to handle a URL path
66-
func (r *router) HandleFunc(ctx context.Context, prefix string, fn http.HandlerFunc) {
66+
func (r *router) HandleFunc(parent context.Context, prefix string, fn http.HandlerFunc) {
6767
// Wrap the function with middleware
6868
for _, middleware := range r.middleware {
6969
fn = middleware.HandleFunc(fn)
7070
}
7171

7272
// Apply middleware, set context
73-
ref.Log(ctx).Debug(ctx, "Register route: ", types.JoinPath(r.prefix, prefix))
73+
ref.Log(parent).Debug(parent, "Register route: ", types.JoinPath(r.prefix, prefix))
7474
r.ServeMux.HandleFunc(types.JoinPath(r.prefix, prefix), func(w http.ResponseWriter, r *http.Request) {
75-
r = r.WithContext(ref.WithLog(r.Context(), ref.Log(ctx)))
76-
// TODO: Add Log into the r context, but don't replace the original
77-
fn(w, r)
75+
// Add provider to context
76+
ctx := ref.WithProvider(r.Context(), ref.Provider(parent))
77+
78+
// Serve the request
79+
fn(w, r.WithContext(ctx))
7880
})
7981
}
8082

@@ -84,7 +86,7 @@ func (r *router) Origin() string {
8486
}
8587

8688
// Register serving of static files from a filesystem
87-
func (r *router) HandleFS(ctx context.Context, prefix string, fs fs.FS) {
89+
func (r *router) HandleFS(parent context.Context, prefix string, fs fs.FS) {
8890
// Create the file server
8991
fn := http.StripPrefix(types.JoinPath(r.prefix, prefix), http.FileServer(http.FS(fs))).ServeHTTP
9092

@@ -94,12 +96,12 @@ func (r *router) HandleFS(ctx context.Context, prefix string, fs fs.FS) {
9496
}
9597

9698
// Apply middleware
97-
ref.Log(ctx).Debug(ctx, "Register route: ", types.JoinPath(r.prefix, prefix))
99+
ref.Log(parent).Debug(parent, "Register static: ", types.JoinPath(r.prefix, prefix))
98100
r.ServeMux.HandleFunc(types.JoinPath(r.prefix, prefix), func(w http.ResponseWriter, req *http.Request) {
99101
// Set CORS headers
100102
httpresponse.Cors(w, req, r.origin, http.MethodGet)
101103

102104
// Call the file server
103-
fn(w, req.WithContext(ref.WithLog(ctx, ref.Log(ctx))))
105+
fn(w, req.WithContext(ref.WithProvider(parent, ref.Provider(parent))))
104106
})
105107
}

pkg/pgqueue/config/task.go

Lines changed: 79 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"errors"
66
"reflect"
7+
"regexp"
8+
"strconv"
79
"strings"
810
"sync"
911
"time"
@@ -42,6 +44,8 @@ func NewTask(manager *pgqueue.Manager, threads uint) (server.Task, error) {
4244
self.callbacks = make(map[string]server.PGCallback, 100)
4345
self.decoder = marshaler.NewDecoder("json",
4446
convertPtr,
47+
convertPGTime,
48+
convertPGDuration,
4549
convertFloatToIntUint,
4650
marshaler.ConvertTime,
4751
marshaler.ConvertDuration,
@@ -61,7 +65,7 @@ func (task *task) Run(parent context.Context) error {
6165

6266
// Create a cancelable context
6367
ctx, cancel := context.WithCancel(context.Background())
64-
ctx = ref.WithPath(ref.WithLog(ctx, ref.Log(parent)), ref.Path(parent)...)
68+
ctx = ref.WithPath(ref.WithProvider(ctx, ref.Provider(parent)), ref.Path(parent)...)
6569

6670
// Ticker loop
6771
tickerch := make(chan *schema.Ticker)
@@ -147,7 +151,9 @@ FOR_LOOP:
147151
}
148152
n += len(tasks)
149153
}
150-
ref.Log(ctx).With("ticker", evt).Debug(parent, "removed ", n, " tasks from queue")
154+
if n > 0 {
155+
ref.Log(ctx).With("ticker", evt).Debug(parent, "removed ", n, " tasks from queue")
156+
}
151157
}
152158
}
153159
}
@@ -167,6 +173,11 @@ func (t *task) Conn() pg.PoolConn {
167173
return t.manager.Conn()
168174
}
169175

176+
// Namespace returns the namespace of the queue.
177+
func (t *task) Namespace() string {
178+
return t.manager.Namespace()
179+
}
180+
170181
// RegisterTicker registers a periodic task (ticker) with a callback function.
171182
// It returns the metadata of the registered ticker.
172183
func (t *task) RegisterTicker(ctx context.Context, meta schema.TickerMeta, fn server.PGCallback) (*schema.Ticker, error) {
@@ -255,7 +266,8 @@ func (t *task) tryTask(ctx context.Context, taskpool *pgqueue.TaskPool, task *sc
255266
taskpool.RunTask(ctx, task, t.getTaskCallback(task), func(err error) {
256267
var status string
257268
delta := time.Since(now).Truncate(time.Millisecond)
258-
if _, err_ := t.manager.ReleaseTask(context.TODO(), task.Id, err == nil, err, &status); err_ != nil {
269+
child := ref.WithPath(ref.WithProvider(context.TODO(), ref.Provider(ctx)), ref.Path(ctx)...)
270+
if _, err_ := t.manager.ReleaseTask(child, task.Id, err == nil, err, &status); err_ != nil {
259271
err = errors.Join(err, err_)
260272
}
261273
switch {
@@ -293,16 +305,75 @@ func joinName(parts ...string) string {
293305
return strings.Join(parts, namespaceSeparator)
294306
}
295307

296-
func splitName(name string, n int) []string {
297-
return strings.SplitN(name, namespaceSeparator, n)
298-
}
299-
300308
// //////////////////////////////////////////////////////////////////////////////
301309
// PRIVATE METHODS
310+
302311
var (
303-
nilValue = reflect.ValueOf(nil)
312+
nilValue = reflect.ValueOf(nil)
313+
timeType = reflect.TypeOf(time.Time{})
314+
durationType = reflect.TypeOf(time.Duration(0))
315+
rePostgresDuration = regexp.MustCompile(`^(\d+):(\d+):(\d+)$`)
304316
)
305317

318+
// convertPGTime returns time from postgres format
319+
func convertPGTime(src reflect.Value, dest reflect.Type) (reflect.Value, error) {
320+
// Pass value through
321+
if src.Type() == dest {
322+
return src, nil
323+
}
324+
325+
if dest == timeType || dest.Kind() == reflect.Ptr && dest.Elem() == timeType {
326+
var v reflect.Value
327+
328+
// Convert time 2025-05-03T17:29:32.329803 => time.Time
329+
if t, err := time.Parse("2006-01-02T15:04:05.999999999", src.String()); err == nil {
330+
v = reflect.ValueOf(t)
331+
} else if t, err := time.Parse("2006-01-02T15:04:05.999999999Z", src.String()); err == nil {
332+
v = reflect.ValueOf(t)
333+
}
334+
335+
// Return value
336+
if v.IsValid() {
337+
if dest.Kind() == reflect.Ptr {
338+
value := reflect.New(dest.Elem())
339+
value.Elem().Set(v)
340+
return value, nil
341+
} else {
342+
return v, nil
343+
}
344+
}
345+
}
346+
347+
// Skip
348+
return nilValue, nil
349+
}
350+
351+
// convertPGDuration returns duration from postgres format
352+
func convertPGDuration(src reflect.Value, dest reflect.Type) (reflect.Value, error) {
353+
// Pass value through
354+
if src.Type() == dest {
355+
return src, nil
356+
}
357+
358+
if dest == durationType {
359+
// Convert 00:00:00 => time.Duration
360+
if parts := rePostgresDuration.FindStringSubmatch(src.String()); len(parts) == 4 {
361+
if hours, err := strconv.ParseUint(parts[1], 10, 64); err != nil {
362+
return nilValue, err
363+
} else if minutes, err := strconv.ParseUint(parts[2], 10, 64); err != nil {
364+
return nilValue, err
365+
} else if seconds, err := strconv.ParseUint(parts[3], 10, 64); err != nil {
366+
return nilValue, err
367+
} else {
368+
return reflect.ValueOf(time.Duration(hours)*time.Hour + time.Duration(minutes)*time.Minute + time.Duration(seconds)*time.Second), nil
369+
}
370+
}
371+
}
372+
373+
// Skip
374+
return nilValue, nil
375+
}
376+
306377
// convertPtr returns value if pointer
307378
func convertPtr(src reflect.Value, dest reflect.Type) (reflect.Value, error) {
308379
// Pass value through

pkg/pgqueue/manager.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
pg "github.com/djthorpe/go-pg"
1414
httpresponse "github.com/mutablelogic/go-server/pkg/httpresponse"
1515
schema "github.com/mutablelogic/go-server/pkg/pgqueue/schema"
16+
ref "github.com/mutablelogic/go-server/pkg/ref"
1617
types "github.com/mutablelogic/go-server/pkg/types"
1718
)
1819

@@ -462,14 +463,15 @@ func (manager *Manager) RunTaskLoop(ctx context.Context, ch chan<- *schema.Task)
462463

463464
// RunNotificationLoop runs a loop to process database notifications, until the context is cancelled
464465
// or an error occurs.
465-
func (manager *Manager) RunNotificationLoop(ctx context.Context, ch chan<- *pg.Notification) error {
466+
func (manager *Manager) RunNotificationLoop(parent context.Context, ch chan<- *pg.Notification) error {
466467
// Subscribe to topics
467468
for _, topic := range manager.topics {
468-
if err := manager.listener.Listen(ctx, topic); err != nil {
469+
if err := manager.listener.Listen(parent, topic); err != nil {
469470
return err
470471
}
471472
}
472473
defer func() {
474+
ctx := ref.WithProvider(context.Background(), ref.Provider(parent))
473475
for _, topic := range manager.topics {
474476
manager.listener.Unlisten(ctx, topic)
475477
}
@@ -478,10 +480,10 @@ func (manager *Manager) RunNotificationLoop(ctx context.Context, ch chan<- *pg.N
478480
// Loop until context is cancelled
479481
for {
480482
select {
481-
case <-ctx.Done():
483+
case <-parent.Done():
482484
return nil
483485
default:
484-
if notification, err := manager.listener.WaitForNotification(ctx); err != nil {
486+
if notification, err := manager.listener.WaitForNotification(parent); err != nil {
485487
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
486488
return err
487489
}

pkg/ref/context.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,10 @@ func Label(ctx context.Context) string {
5454
}
5555

5656
func Log(ctx context.Context) server.Logger {
57-
if value := ctx.Value(ctxLogger); value == nil {
57+
if provider := ctx.Value(ctxProvider); provider == nil {
5858
return nil
5959
} else {
60-
return value.(server.Logger)
60+
return provider.(server.Logger)
6161
}
6262
}
6363

@@ -93,10 +93,6 @@ func Task(ctx context.Context) *pgqueue.Task {
9393
}
9494
}
9595

96-
func WithLog(ctx context.Context, logger server.Logger) context.Context {
97-
return context.WithValue(ctx, ctxLogger, logger)
98-
}
99-
10096
func WithAuth(ctx context.Context, auth server.Auth) context.Context {
10197
return context.WithValue(ctx, ctxAuth, auth)
10298
}

pkg/types/hash.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package types
2+
3+
import (
4+
"crypto/sha256"
5+
"encoding/hex"
6+
)
7+
8+
///////////////////////////////////////////////////////////////////////////////
9+
// PUBLIC METHODS
10+
11+
func Hash(data []byte) string {
12+
// Return the SHA256 hash of the data
13+
h := sha256.New()
14+
h.Write(data)
15+
return hex.EncodeToString(h.Sum(nil))
16+
}

plugin.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,9 @@ type PGQueue interface {
119119
// Conn returns the underlying connection pool object.
120120
Conn() pg.PoolConn
121121

122+
// Namespace returns the namespace of the queue.
123+
Namespace() string
124+
122125
// RegisterTicker registers a periodic task (ticker) with a callback function.
123126
// It returns the metadata of the registered ticker.
124127
RegisterTicker(context.Context, pgschema.TickerMeta, PGCallback) (*pgschema.Ticker, error)

0 commit comments

Comments
 (0)