Skip to content

Commit

Permalink
If buffer size if too large, send immediately
Browse files Browse the repository at this point in the history
This patch enhance some details to handle the console buffer
considering the performance and error handling.

partial: xcat2#14
  • Loading branch information
chenglch committed Jan 15, 2018
1 parent 6d4d4a6 commit d43c256
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 6 deletions.
2 changes: 1 addition & 1 deletion console/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (c *ConsoleClient) input(args ...interface{}) {
}
exit, pos := c.checkEscape(b, n, node)
if exit == true {
b = []byte(ExitSequence)
b = EXIT_SEQUENCE[0:]
n = len(b)
c.retry = false
printConsoleDisconnectPrompt()
Expand Down
7 changes: 4 additions & 3 deletions console/console.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package console

import (
"bytes"
"fmt"
"github.com/chenglch/goconserver/common"
pl "github.com/chenglch/goconserver/console/pipeline"
Expand All @@ -12,15 +13,15 @@ import (
)

const (
ExitSequence = "\x05c." // ctrl-e, c
CLIENT_CMD_EXIT = '.'
CLIENT_CMD_HELP = '?'
CLIENT_CMD_REPLAY = 'r'
CLIENT_CMD_WHO = 'w'
)

var (
CLIENT_CMDS = []byte{CLIENT_CMD_HELP, CLIENT_CMD_REPLAY, CLIENT_CMD_WHO}
CLIENT_CMDS = []byte{CLIENT_CMD_HELP, CLIENT_CMD_REPLAY, CLIENT_CMD_WHO}
EXIT_SEQUENCE = [...]byte{'\x05', 'c', '.'} // ctrl-e, c
)

type Console struct {
Expand Down Expand Up @@ -97,7 +98,7 @@ func (self *Console) writeTarget(conn net.Conn) {
plog.WarningNode(self.node.StorageNode.Name, fmt.Sprintf("Failed to receive message from client. Error:%s.", err.Error()))
return
}
if string(b) == ExitSequence {
if bytes.Equal(b, EXIT_SEQUENCE[0:]) {
plog.InfoNode(self.node.StorageNode.Name, "Received exit signal from client")
return
}
Expand Down
19 changes: 17 additions & 2 deletions console/pipeline/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ func (self *LineLogger) MakeRecord(node string, b []byte, last *RemainBuffer) er
self.bufChan <- append(buf, []byte{'\n'}...)
}
}
if len(b[p:]) >= 4096 {
lineBuf := NewLineBuf(CONSOLE_TYPE, string(b[p:]), node, serverConfig.Console.LogTimestamp)
buf, err = lineBuf.Marshal()
if err != nil {
plog.ErrorNode(node, err)
}
self.bufChan <- append(buf, []byte{'\n'}...)
p = len(b)
}
copyLast(last, b[p:])
return nil
}
Expand Down Expand Up @@ -139,19 +148,25 @@ func (self *LineLogger) Prompt(node string, message string) error {
}

func (self *LineLogger) run() {
var buf bytes.Buffer
buf := new(bytes.Buffer)
plog.Debug("Starting line logger")
ticker := time.NewTicker(common.PIPELINE_SEND_INTERVAL)
// timeout may block data
for {
select {
case b := <-self.bufChan:
buf.Write(b)
if buf.Len() > 8192 {
// too large, send immediately
self.emit(buf.Bytes())
// slice in channel is transferred by reference
buf = new(bytes.Buffer)
}
case <-ticker.C:
if buf.Len() > 0 {
// send buf to the channel
self.emit(buf.Bytes())
buf.Reset()
buf = new(bytes.Buffer)
}
}
}
Expand Down

0 comments on commit d43c256

Please sign in to comment.