Skip to content

Commit 3a48caf

Browse files
authored
Merge pull request #45 from shunfei/develop
v0.2.2
2 parents 0cfcfb1 + 68826f8 commit 3a48caf

File tree

15 files changed

+374
-42
lines changed

15 files changed

+374
-42
lines changed

job.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/shunfei/cronsun/conf"
2222
"github.com/shunfei/cronsun/log"
2323
"github.com/shunfei/cronsun/node/cron"
24+
"github.com/shunfei/cronsun/utils"
2425
)
2526

2627
const (
@@ -387,7 +388,15 @@ func (j *Job) alone() {
387388
}
388389

389390
func (j *Job) splitCmd() {
390-
j.cmd = strings.Split(j.Command, " ")
391+
ps := strings.SplitN(j.Command, " ", 2)
392+
if len(ps) == 1 {
393+
j.cmd = ps
394+
return
395+
}
396+
397+
j.cmd = make([]string, 0, 2)
398+
j.cmd = append(j.cmd, ps[0])
399+
j.cmd = append(j.cmd, utils.ParseCmdArguments(ps[1])...)
391400
}
392401

393402
func (j *Job) String() string {

node.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,14 @@ func GetNodesBy(query interface{}) (nodes []*Node, err error) {
9090
return
9191
}
9292

93-
func ISNodeFault(id string) (bool, error) {
93+
func RemoveNode(query interface{}) error {
94+
return mgoDB.WithC(Coll_Node, func(c *mgo.Collection) error {
95+
return c.Remove(query)
96+
})
97+
98+
}
99+
100+
func ISNodeAlive(id string) (bool, error) {
94101
n := 0
95102
err := mgoDB.WithC(Coll_Node, func(c *mgo.Collection) error {
96103
var e error

noticer.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,15 +182,16 @@ func monitorNodes(n Noticer) {
182182
switch {
183183
case ev.Type == client.EventTypeDelete:
184184
id = GetIDFromKey(string(ev.Kv.Key))
185-
ok, err = ISNodeFault(id)
185+
ok, err = ISNodeAlive(id)
186186
if err != nil {
187187
log.Warnf("query node[%s] err: %s", id, err.Error())
188188
continue
189189
}
190190

191191
if ok {
192192
n.Send(&Message{
193-
Subject: "node[" + id + "] fault at time[" + time.Now().Format(time.RFC3339) + "]",
193+
Subject: "Node[" + id + "] break away cluster, this happed at " + time.Now().Format(time.RFC3339),
194+
Body: "Node breaked away cluster, this might happed when node crash or network problems.",
194195
To: conf.Config.Mail.To,
195196
})
196197
}

utils/argument_parser.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package utils
2+
3+
import (
4+
"errors"
5+
)
6+
7+
type fmsState int
8+
9+
const (
10+
stateArgumentOutside fmsState = iota
11+
stateArgumentStart
12+
stateArgumentEnd
13+
)
14+
15+
var errEndOfLine = errors.New("End of line")
16+
17+
type cmdArgumentParser struct {
18+
s string
19+
i int
20+
length int
21+
state fmsState
22+
startToken byte
23+
shouldEscape bool
24+
currArgument []byte
25+
err error
26+
}
27+
28+
func newCmdArgumentParser(s string) *cmdArgumentParser {
29+
return &cmdArgumentParser{
30+
s: s,
31+
i: -1,
32+
length: len(s),
33+
currArgument: make([]byte, 0, 16),
34+
}
35+
}
36+
37+
func (cap *cmdArgumentParser) parse() (arguments []string) {
38+
for {
39+
cap.next()
40+
41+
if cap.err != nil {
42+
if cap.shouldEscape {
43+
cap.currArgument = append(cap.currArgument, '\\')
44+
}
45+
46+
if len(cap.currArgument) > 0 {
47+
arguments = append(arguments, string(cap.currArgument))
48+
}
49+
50+
return
51+
}
52+
53+
switch cap.state {
54+
case stateArgumentOutside:
55+
cap.detectStartToken()
56+
case stateArgumentStart:
57+
if !cap.detectEnd() {
58+
cap.detectContent()
59+
}
60+
case stateArgumentEnd:
61+
cap.state = stateArgumentOutside
62+
arguments = append(arguments, string(cap.currArgument))
63+
cap.currArgument = cap.currArgument[:0]
64+
}
65+
}
66+
}
67+
68+
func (cap *cmdArgumentParser) previous() {
69+
if cap.i >= 0 {
70+
cap.i--
71+
}
72+
}
73+
74+
func (cap *cmdArgumentParser) next() {
75+
if cap.length-cap.i == 1 {
76+
cap.err = errEndOfLine
77+
return
78+
}
79+
cap.i++
80+
}
81+
82+
func (cap *cmdArgumentParser) detectStartToken() {
83+
c := cap.s[cap.i]
84+
if c == ' ' {
85+
return
86+
}
87+
88+
switch c {
89+
case '\\':
90+
cap.startToken = 0
91+
cap.shouldEscape = true
92+
case '"', '\'':
93+
cap.startToken = c
94+
default:
95+
cap.startToken = 0
96+
cap.previous()
97+
}
98+
cap.state = stateArgumentStart
99+
}
100+
101+
func (cap *cmdArgumentParser) detectContent() {
102+
c := cap.s[cap.i]
103+
104+
if cap.shouldEscape {
105+
switch c {
106+
case ' ', '\\', cap.startToken:
107+
cap.currArgument = append(cap.currArgument, c)
108+
default:
109+
cap.currArgument = append(cap.currArgument, '\\', c)
110+
}
111+
cap.shouldEscape = false
112+
return
113+
}
114+
115+
if c == '\\' {
116+
cap.shouldEscape = true
117+
} else {
118+
cap.currArgument = append(cap.currArgument, c)
119+
}
120+
}
121+
122+
func (cap *cmdArgumentParser) detectEnd() (detected bool) {
123+
c := cap.s[cap.i]
124+
125+
if cap.startToken == 0 {
126+
if c == ' ' && !cap.shouldEscape {
127+
cap.state = stateArgumentEnd
128+
cap.previous()
129+
return true
130+
}
131+
return false
132+
}
133+
134+
if c == cap.startToken && !cap.shouldEscape {
135+
cap.state = stateArgumentEnd
136+
return true
137+
}
138+
139+
return false
140+
}
141+
142+
func ParseCmdArguments(s string) (arguments []string) {
143+
return newCmdArgumentParser(s).parse()
144+
}

utils/argument_parser_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package utils
2+
3+
import (
4+
"testing"
5+
6+
. "github.com/smartystreets/goconvey/convey"
7+
)
8+
9+
func TestCmdArgumentParser(t *testing.T) {
10+
var args []string
11+
var str string
12+
13+
Convey("Parse Cmd Arguments ["+str+"]", t, func() {
14+
args = ParseCmdArguments(str)
15+
So(len(args), ShouldEqual, 0)
16+
})
17+
18+
str = " "
19+
Convey("Parse Cmd Arguments ["+str+"]", t, func() {
20+
args = ParseCmdArguments(str)
21+
So(len(args), ShouldEqual, 0)
22+
})
23+
24+
str = "aa bbb ccc "
25+
Convey("Parse Cmd Arguments ["+str+"]", t, func() {
26+
args = ParseCmdArguments(str)
27+
So(len(args), ShouldEqual, 3)
28+
So(args[0], ShouldEqual, "aa")
29+
So(args[1], ShouldEqual, "bbb")
30+
So(args[2], ShouldEqual, "ccc")
31+
})
32+
33+
str = "' \\\""
34+
Convey("Parse Cmd Arguments ["+str+"]", t, func() {
35+
args = ParseCmdArguments(str)
36+
So(len(args), ShouldEqual, 1)
37+
So(args[0], ShouldEqual, " \\\"")
38+
})
39+
40+
str = `a "b c"` // a "b c"
41+
Convey("Parse Cmd Arguments ["+str+"]", t, func() {
42+
args = ParseCmdArguments(str)
43+
So(len(args), ShouldEqual, 2)
44+
So(args[0], ShouldEqual, "a")
45+
So(args[1], ShouldEqual, "b c")
46+
})
47+
48+
str = `a '\''"`
49+
Convey("Parse Cmd Arguments ["+str+"]", t, func() {
50+
args = ParseCmdArguments(str)
51+
So(len(args), ShouldEqual, 2)
52+
So(args[0], ShouldEqual, "a")
53+
So(args[1], ShouldEqual, "'")
54+
})
55+
56+
str = ` \\a 'b c' c\ d\ `
57+
Convey("Parse Cmd Arguments ["+str+"]", t, func() {
58+
args = ParseCmdArguments(str)
59+
So(len(args), ShouldEqual, 3)
60+
So(args[0], ShouldEqual, "\\a")
61+
So(args[1], ShouldEqual, "b c")
62+
So(args[2], ShouldEqual, "c d ")
63+
})
64+
65+
str = `\`
66+
Convey("Parse Cmd Arguments ["+str+"]", t, func() {
67+
args = ParseCmdArguments(str)
68+
So(len(args), ShouldEqual, 1)
69+
So(args[0], ShouldEqual, "\\")
70+
})
71+
72+
str = ` \ ` // \SPACE
73+
Convey("Parse Cmd Arguments ["+str+"]", t, func() {
74+
args = ParseCmdArguments(str)
75+
So(len(args), ShouldEqual, 1)
76+
So(args[0], ShouldEqual, " ")
77+
})
78+
}

web/gen_bindata.sh

100644100755
File mode changed.

web/node.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
v3 "github.com/coreos/etcd/clientv3"
1010
"github.com/gorilla/mux"
11+
"gopkg.in/mgo.v2/bson"
1112

1213
"github.com/shunfei/cronsun"
1314
"github.com/shunfei/cronsun/conf"
@@ -163,3 +164,62 @@ func (n *Node) GetNodes(ctx *Context) {
163164

164165
outJSONWithCode(ctx.W, http.StatusOK, nodes)
165166
}
167+
168+
// DeleteNode force remove node (by ip) which state in offline or damaged.
169+
func (n *Node) DeleteNode(ctx *Context) {
170+
vars := mux.Vars(ctx.R)
171+
ip := strings.TrimSpace(vars["ip"])
172+
if len(ip) == 0 {
173+
outJSONWithCode(ctx.W, http.StatusBadRequest, "node ip is required.")
174+
return
175+
}
176+
177+
resp, err := cronsun.DefalutClient.Get(conf.Config.Node + ip)
178+
if err != nil {
179+
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
180+
return
181+
}
182+
183+
if len(resp.Kvs) > 0 {
184+
outJSONWithCode(ctx.W, http.StatusBadRequest, "can not remove a running node.")
185+
return
186+
}
187+
188+
err = cronsun.RemoveNode(bson.M{"_id": ip})
189+
if err != nil {
190+
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
191+
return
192+
}
193+
194+
// remove node from group
195+
var errmsg = "failed to remove node %s from groups, please remove it manually: %s"
196+
resp, err = cronsun.DefalutClient.Get(conf.Config.Group, v3.WithPrefix())
197+
if err != nil {
198+
outJSONWithCode(ctx.W, http.StatusInternalServerError, fmt.Sprintf(errmsg, ip, err.Error()))
199+
return
200+
}
201+
202+
for i := range resp.Kvs {
203+
g := cronsun.Group{}
204+
err = json.Unmarshal(resp.Kvs[i].Value, &g)
205+
if err != nil {
206+
outJSONWithCode(ctx.W, http.StatusInternalServerError, fmt.Sprintf(errmsg, ip, err.Error()))
207+
return
208+
}
209+
210+
var nids = make([]string, 0, len(g.NodeIDs))
211+
for _, nid := range g.NodeIDs {
212+
if nid != ip {
213+
nids = append(nids, nid)
214+
}
215+
}
216+
g.NodeIDs = nids
217+
218+
if _, err = g.Put(resp.Kvs[i].ModRevision); err != nil {
219+
outJSONWithCode(ctx.W, http.StatusInternalServerError, fmt.Sprintf(errmsg, ip, err.Error()))
220+
return
221+
}
222+
}
223+
224+
outJSONWithCode(ctx.W, http.StatusNoContent, nil)
225+
}

web/routers.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ func initRouters() (s *http.Server, err error) {
8282

8383
h = NewAuthHandler(nodeHandler.GetNodes)
8484
subrouter.Handle("/nodes", h).Methods("GET")
85+
h = NewAuthHandler(nodeHandler.DeleteNode)
86+
subrouter.Handle("/node/{ip}", h).Methods("DELETE")
8587
// get node group list
8688
h = NewAuthHandler(nodeHandler.GetGroups)
8789
subrouter.Handle("/node/groups", h).Methods("GET")

web/static_assets.go

Lines changed: 11 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

web/ui/src/components/Job.vue

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ export default {
169169
},
170170
171171
formatLatest: function(latest){
172-
return this.$L('{begin ~ end}, on {node} took {times}', formatTime(latest.beginTime, latest.endTime), latest.node, formatDuration(latest.beginTime, latest.endTime))
172+
return this.$L('on {node} took {times}, {begin ~ end}', latest.node, formatDuration(latest.beginTime, latest.endTime), formatTime(latest.beginTime, latest.endTime));
173173
},
174174
175175
showExecuteJobModal: function(jobName, jobGroup, jobId){
@@ -182,4 +182,4 @@ export default {
182182
ExecuteJob
183183
}
184184
}
185-
</script>
185+
</script>

web/ui/src/components/Log.vue

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ export default {
165165
},
166166
167167
formatTime: function(log){
168-
return this.$L('{begin ~ end}, took {times}', formatTime(log.beginTime, log.endTime), formatDuration(log.beginTime, log.endTime));
168+
return this.$L('took {times}, {begin ~ end}', formatDuration(log.beginTime, log.endTime), formatTime(log.beginTime, log.endTime));
169169
},
170170
171171
showExecuteJobModal: function(jobName, jobGroup, jobId){
@@ -177,4 +177,4 @@ export default {
177177
ExecuteJob
178178
}
179179
}
180-
</script>
180+
</script>

0 commit comments

Comments
 (0)