Skip to content

Commit 2d1cfe3

Browse files
committed
Migrage task worker types to new package
1 parent eecc06a commit 2d1cfe3

File tree

6 files changed

+55
-48
lines changed

6 files changed

+55
-48
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ GoSchedule is an in-process scheduler. It's modularized, efficient, high availab
88

99
A web based console [goschedule-console](https://github.com/jasonjoo2010/goschedule-console) is provided as an easy to use operating panel to review runtimes/statistics, manage tasks/strategies/schedulers and manage data of storage.
1010

11-
The first version of GoSchedule is implemented based on `tbschedule` which comes from Taobao®. Tbschedule became opensouce in 2011~2013 and stopped updating then. For an alternated please refer to [tbschedule](https://github.com/jasonjoo2010/tbschedule).
11+
The first version of GoSchedule is implemented based on `tbschedule` which comes from Taobao®. Tbschedule became opensource in 2011~2013 and stopped updating then. For an alternated please refer to [tbschedule](https://github.com/jasonjoo2010/tbschedule).
1212

1313
The overview of design:
1414

@@ -111,7 +111,7 @@ FuncWorker works perfectly in scenarios implementing simple and repeated logic w
111111

112112
TaskWorker is a more complicated and powerful framework for select()->execute() like jobs. Partitioning can be easily configured.
113113

114-
For more detail on design or explaination please refer to [Workers](WORKERS.md).
114+
For more detail on design or explanation please refer to [Workers](WORKERS.md).
115115

116116
#### TaskItem of TaskWorker
117117

@@ -129,7 +129,7 @@ For more details please refer to [MODELS](MODELS.md).
129129

130130
### Load balancing
131131

132-
Your workers are distributed between nodes that can be scheduled on. The `balancing` has a meaning in two dimentions: In same strategy and over strategies.
132+
Your workers are distributed between nodes that can be scheduled on. The `balancing` has a meaning in two dimensions: In same strategy and over strategies.
133133

134134
In the same strategy, requested count of worker are well distributed based on nodes. But if you have more single-worker strategy there may be still unbalanced. So a shuffling is introduced when rescheduling to optimize balancing over strategies.
135135

WORKERS.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@ We can use a figure to get an overview of them:
1010

1111
Simple worker acts like a thread with start-stop lifecycle. You can use it in start-stop scenarios like consumers of queue like RocketMQ/Kafka, goroutine loop, etc. .
1212

13-
Besides start/stop hooks it also supports parameter, cron expressions of begin/end. They are clustered togethor and defined as `strategy`.
13+
Besides start/stop hooks it also supports parameter, cron expressions of begin/end. They are clustered together and defined as `strategy`.
1414

1515
## Func Worker
1616

17-
Compared to `Simple` worker `Func` worker doesn't care about the lifecyle and it focuses on business in single loop. The single loop logic can be scheduled in fixed rate, or fixed time driven by cron expression of begin, or invoked repeatedly in specified time segments driven by cron expressions. It acts more like a legacy `scheduled task`.
17+
Compared to `Simple` worker `Func` worker doesn't care about the lifecycle and it focuses on business in single loop. The single loop logic can be scheduled in fixed rate, or fixed time driven by cron expression of begin, or invoked repeatedly in specified time segments driven by cron expressions. It acts more like a legacy `scheduled task`.
1818

1919
## Task Worker
2020

21-
`Task` worker is more complicated. A task worker can act quite differently in different scenarios. It supports partitioning, parellelism, batch processing, distributing and evironment definition. For simple worker which runs in single instance globally an arbitary partition is given and enough. But for heavier jobs in which partitions are necessary you can carefully define the partitions and they can be distributed among all worker instances well:
21+
`Task` worker is more complicated. A task worker can act quite differently in different scenarios. It supports partitioning, parallelism, batch processing, distributing and environment definition. For simple worker which runs in single instance globally an arbitrary partition is given and enough. But for heavier jobs in which partitions are necessary you can carefully define the partitions and they can be distributed among all worker instances well:
2222

2323
![Partitioning in task](doc/partition.png)
2424

core/worker/task_worker/executor_batch.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@ import (
88
"sync"
99
"time"
1010

11+
"github.com/jasonjoo2010/goschedule/types"
1112
"github.com/sirupsen/logrus"
1213
)
1314

1415
type BatchExecutor struct {
1516
worker *TaskWorker
16-
task TaskBatch
17+
task types.TaskBatch
1718
pool sync.Pool
1819
}
1920

core/worker/task_worker/executor_single.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@ package task_worker
77
import (
88
"time"
99

10+
"github.com/jasonjoo2010/goschedule/types"
1011
"github.com/sirupsen/logrus"
1112
)
1213

1314
type SingleExecutor struct {
1415
worker *TaskWorker
15-
task TaskSingle
16+
task types.TaskSingle
1617
}
1718

1819
func (m *SingleExecutor) execute(item interface{}) {

core/worker/task_worker/task_worker.go

Lines changed: 14 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -25,37 +25,11 @@ var (
2525
taskRegistryMap sync.Map
2626
)
2727

28-
// TaskBase defines the task used in scheduling.
29-
type TaskBase interface {
30-
// Select returns tasks to be dealed later.
31-
// It will be guaranteed in serial model.
32-
// parameter, items, eachFetchNum are from definition of task
33-
// ownSign is from name of strategy binded in the form of 'name$ownsign'
34-
// It's a kind of relation to strategy but generally task doesn't care about strategy in user's view.
35-
Select(parameter, ownSign string, items []definition.TaskItem, eachFetchNum int) []interface{}
36-
}
37-
38-
// TaskSingle represents one task one time(routine) model
39-
type TaskSingle interface {
40-
TaskBase
41-
// return true if succ false otherwise, but things will still go on
42-
Execute(task interface{}, ownSign string) bool
43-
}
44-
45-
// TaskBatch represents multiple tasks one time(routine) model
46-
type TaskBatch interface {
47-
TaskBase
48-
// return true if succ false otherwise, but things will still go on
49-
Execute(tasks []interface{}, ownSign string) bool
50-
}
51-
52-
type TaskComparable interface {
53-
Less(a, b interface{}) bool
54-
}
55-
5628
// TaskWorker implements a task-driven worker.
5729
// Strategy.Bind should be the identifier of task(on console panel).
5830
type TaskWorker struct {
31+
types.Worker
32+
5933
mu sync.Mutex
6034
selectLock sync.Mutex
6135
parameter string
@@ -72,7 +46,7 @@ type TaskWorker struct {
7246
queuedData []interface{}
7347
model TaskModel
7448
executor TaskExecutor
75-
task TaskBase
49+
task types.TaskBase
7650
executors int32
7751
schedStart cron.Schedule
7852
schedEnd cron.Schedule
@@ -89,15 +63,15 @@ type TaskWorker struct {
8963
Statistics definition.Statistics
9064
}
9165

92-
func getTaskFromType(t reflect.Type) TaskBase {
93-
if v, ok := reflect.New(t).Interface().(TaskBase); ok {
66+
func getTaskFromType(t reflect.Type) types.TaskBase {
67+
if v, ok := reflect.New(t).Interface().(types.TaskBase); ok {
9468
return v
9569
}
9670
logrus.Warn("Entry registered is not a convertable type: ", t)
9771
return nil
9872
}
9973

100-
func getTask(name string) TaskBase {
74+
func getTask(name string) types.TaskBase {
10175
var (
10276
ok bool
10377
v interface{}
@@ -110,7 +84,7 @@ func getTask(name string) TaskBase {
11084
if ok {
11185
return getTaskFromType(t)
11286
}
113-
val, ok := v.(TaskBase)
87+
val, ok := v.(types.TaskBase)
11488
if ok {
11589
return val
11690
}
@@ -119,15 +93,15 @@ func getTask(name string) TaskBase {
11993
}
12094

12195
// RegisterTaskType registers a task type with key inferred by its type
122-
func RegisterTaskType(task TaskBase) {
96+
func RegisterTaskType(task types.TaskBase) {
12397
if task == nil {
12498
panic("Could not register a task using nil as value")
12599
}
126100
RegisterTaskTypeName(utils.TypeName(utils.Dereference(task)), task)
127101
}
128102

129103
// RegisterTaskTypeName registers a task type with key
130-
func RegisterTaskTypeName(name string, task TaskBase) {
104+
func RegisterTaskTypeName(name string, task types.TaskBase) {
131105
if name == "" {
132106
panic("Could not register a task using empty name")
133107
}
@@ -140,20 +114,20 @@ func RegisterTaskTypeName(name string, task TaskBase) {
140114
}
141115

142116
// RegisterTaskInst registers a task in single instance model with key inferred by its type
143-
func RegisterTaskInst(task TaskBase) {
117+
func RegisterTaskInst(task types.TaskBase) {
144118
RegisterTaskInstName(utils.TypeName(task), task)
145119
}
146120

147121
// RegisterTaskInstName registers a task in single instance model with given key
148-
func RegisterTaskInstName(name string, task TaskBase) {
122+
func RegisterTaskInstName(name string, task types.TaskBase) {
149123
taskRegistryMap.Store(name, task)
150124
logrus.Info("Register a task instance: ", name)
151125
}
152126

153127
// NewTask creates a new task and initials necessary fields
154128
// Please don't initial TaskWorker manually
155129
func NewTask(strategy definition.Strategy, task definition.Task, store store.Store, schedulerId string) (types.Worker, error) {
156-
var inst TaskBase
130+
var inst types.TaskBase
157131
sequence, err := store.Sequence()
158132
if err != nil {
159133
logrus.Error("Generate sequence from storage failed: ", err.Error())
@@ -202,7 +176,7 @@ func NewTask(strategy definition.Strategy, task definition.Task, store store.Sto
202176
w.model = NewNormalModel(w)
203177
}
204178
if w.taskDefine.BatchCount > 1 {
205-
t, ok := inst.(TaskBatch)
179+
t, ok := inst.(types.TaskBatch)
206180
if !ok {
207181
return nil, errors.New("Specific bind is not a TaskBatch: " + task.Bind)
208182
}
@@ -216,7 +190,7 @@ func NewTask(strategy definition.Strategy, task definition.Task, store store.Sto
216190
},
217191
}
218192
} else {
219-
t, ok := inst.(TaskSingle)
193+
t, ok := inst.(types.TaskSingle)
220194
if !ok {
221195
return nil, errors.New("Specific bind is not a TaskSingle: " + task.Bind)
222196
}

types/task.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package types
2+
3+
import "github.com/jasonjoo2010/goschedule/definition"
4+
5+
// TaskBase defines the task used in scheduling.
6+
type TaskBase interface {
7+
// Select returns tasks to be dealed later.
8+
// It will be guaranteed in serial model.
9+
// parameter, items, eachFetchNum are from definition of task
10+
// ownSign is from name of strategy bond in the form of 'name$ownsign'
11+
// It's a kind of relation to strategy but generally task doesn't care about strategy in user's view.
12+
Select(parameter, ownSign string, items []definition.TaskItem, eachFetchNum int) []interface{}
13+
}
14+
15+
// TaskSingle represents one task one time(routine) model
16+
type TaskSingle interface {
17+
TaskBase
18+
// return true if succ false otherwise, but things will still go on
19+
Execute(task interface{}, ownSign string) bool
20+
}
21+
22+
// TaskBatch represents multiple tasks one time(routine) model
23+
type TaskBatch interface {
24+
TaskBase
25+
// return true if succ false otherwise, but things will still go on
26+
Execute(tasks []interface{}, ownSign string) bool
27+
}
28+
29+
type TaskComparable interface {
30+
Less(a, b interface{}) bool
31+
}

0 commit comments

Comments
 (0)