Skip to content

Commit

Permalink
prep release
Browse files Browse the repository at this point in the history
  • Loading branch information
OutOfBedlam committed Oct 8, 2024
1 parent d72cb99 commit 540dd89
Show file tree
Hide file tree
Showing 2 changed files with 293 additions and 0 deletions.
5 changes: 5 additions & 0 deletions mods/codec/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (
"github.com/machbase/neo-server/mods/codec/internal/html"
"github.com/machbase/neo-server/mods/codec/internal/json"
"github.com/machbase/neo-server/mods/codec/internal/markdown"
"github.com/machbase/neo-server/mods/codec/internal/ndjson"
"github.com/machbase/neo-server/mods/codec/opts"
)

const DISCARD = "discard"
const BOX = "box"
const CSV = "csv"
const JSON = "json"
const NDJSON = "ndjson"
const MARKDOWN = "markdown"
const HTML = "html"
const ECHART = "echart"
Expand Down Expand Up @@ -45,6 +47,7 @@ var (
_ RowsEncoder = &markdown.Exporter{}
_ RowsEncoder = &html.Exporter{}
_ RowsEncoder = &geomap.GeoMap{}
_ RowsEncoder = &ndjson.Exporter{}
)

type RowsDecoder interface {
Expand Down Expand Up @@ -83,6 +86,8 @@ func NewEncoder(encoderType string, opts ...opts.Option) RowsEncoder {
ret = &DiscardSink{}
case GEOMAP:
ret = geomap.New()
case NDJSON:
ret = ndjson.NewEncoder()
default: // "json"
ret = json.NewEncoder()
}
Expand Down
288 changes: 288 additions & 0 deletions mods/codec/internal/ndjson/encode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
package ndjson

import (
"database/sql"
"encoding/json"
"fmt"
"math"
"net"
"time"

"github.com/machbase/neo-server/api"
"github.com/machbase/neo-server/mods/stream/spec"
"github.com/machbase/neo-server/mods/util"
)

type Exporter struct {
tick time.Time
nrow int

output spec.OutputStream
Rownum bool
Heading bool
precision int
timeformatter *util.TimeFormatter

colNames []string
colTypes []string

transpose bool
rowsFlatten bool
rowsArray bool
series [][]any
}

func NewEncoder() *Exporter {
return &Exporter{
tick: time.Now(),
timeformatter: util.NewTimeFormatter(),
}
}

func (ex *Exporter) ContentType() string {
return "application/x-ndjson"
}

func (ex *Exporter) SetOutputStream(o spec.OutputStream) {
ex.output = o
}

func (ex *Exporter) SetTimeformat(format string) {
if format == "" {
return
}
ex.timeformatter.Set(util.Timeformat(format))
}

func (ex *Exporter) SetTimeLocation(tz *time.Location) {
if tz == nil {
return
}
ex.timeformatter.Set(util.TimeLocation(tz))
}

func (ex *Exporter) SetPrecision(precision int) {
ex.precision = precision
}

func (ex *Exporter) SetRownum(show bool) {
ex.Rownum = show
}

func (ex *Exporter) SetHeader(show bool) {
ex.Heading = show
}

func (ex *Exporter) SetHeading(show bool) {
ex.Heading = show
}

func (ex *Exporter) SetColumns(labels ...string) {
ex.colNames = labels
}

func (ex *Exporter) SetColumnTypes(types ...string) {
ex.colTypes = types
}

func (ex *Exporter) SetTranspose(flag bool) {
ex.transpose = flag
}

func (ex *Exporter) SetRowsFlatten(flag bool) {
ex.rowsFlatten = flag
}

func (ex *Exporter) SetRowsArray(flag bool) {
ex.rowsArray = flag
}

func (ex *Exporter) Open() error {
var names []string
var types []string
if ex.Rownum && !ex.transpose { // rownum does not effective in transpose mode
names = append([]string{"ROWNUM"}, ex.colNames...)
types = append([]string{api.ColumnBufferTypeInt64}, ex.colTypes...)
} else {
names = ex.colNames
types = ex.colTypes
}

columnsJson, _ := json.Marshal(names)
typesJson, _ := json.Marshal(types)

if ex.transpose && !ex.rowsArray {
header := fmt.Sprintf(`{"data":{"columns":%s,"types":%s,"cols":[`,
string(columnsJson), string(typesJson))
ex.output.Write([]byte(header))
} else {
header := fmt.Sprintf(`{"data":{"columns":%s,"types":%s,"rows":[`,
string(columnsJson), string(typesJson))
ex.output.Write([]byte(header))
}

return nil
}

func (ex *Exporter) Close() {
if ex.transpose && !ex.rowsArray {
for n, ser := range ex.series {
recJson, err := json.Marshal(ser)
if err != nil {
// TODO how to report error?
break
}
if n > 0 {
ex.output.Write([]byte(","))
}
ex.output.Write(recJson)
}
}
footer := fmt.Sprintf(`]},"success":true,"reason":"success","elapse":"%s"}`, time.Since(ex.tick).String())
ex.output.Write([]byte(footer))
ex.output.Close()
}

func (ex *Exporter) Flush(heading bool) {
ex.output.Flush()
}

func (ex *Exporter) encodeFloat64(v float64) any {
if math.IsNaN(v) {
return "NaN"
} else if math.IsInf(v, -1) {
return "-Inf"
} else if math.IsInf(v, 1) {
return "+Inf"
}
return v
}

func (ex *Exporter) AddRow(source []any) error {
ex.nrow++

values := make([]any, len(source))
for i, field := range source {
switch v := field.(type) {
case *time.Time:
values[i] = ex.timeformatter.FormatEpoch(*v)
case time.Time:
values[i] = ex.timeformatter.FormatEpoch(v)
case *float64:
values[i] = ex.encodeFloat64(*v)
case float64:
values[i] = ex.encodeFloat64(v)
case *float32:
values[i] = ex.encodeFloat64(float64(*v))
case float32:
values[i] = ex.encodeFloat64(float64(v))
case *net.IP:
values[i] = v.String()
case net.IP:
values[i] = v.String()
case *sql.NullBool:
if v.Valid {
values[i] = v.Bool
}
case *sql.NullByte:
if v.Valid {
values[i] = v.Byte
}
case *sql.NullFloat64:
if v.Valid {
values[i] = v.Float64
}
case *sql.NullInt16:
if v.Valid {
values[i] = v.Int16
}
case *sql.NullInt32:
if v.Valid {
values[i] = v.Int32
}
case *sql.NullInt64:
if v.Valid {
values[i] = v.Int64
}
case *sql.NullString:
if v.Valid {
values[i] = v.String
}
case *sql.NullTime:
if v.Valid {
values[i] = ex.timeformatter.Format(v.Time)
}
default:
values[i] = field
}
}

if ex.rowsArray {
var vs = map[string]any{}
if ex.Rownum {
vs["ROWNUM"] = ex.nrow
}
for i, v := range values {
if i >= len(ex.colNames) {
break
}
vs[ex.colNames[i]] = v
}
recJson, err := json.Marshal(vs)
if err != nil {
return err
}
if ex.nrow > 1 {
ex.output.Write([]byte(","))
}
ex.output.Write(recJson)
} else if ex.transpose {
if ex.series == nil {
ex.series = make([][]any, len(values)-1)
}
if len(ex.series) < len(values) {
for i := 0; i < len(values)-len(ex.series); i++ {
ex.series = append(ex.series, []any{})
}
}
for n, v := range values {
ex.series[n] = append(ex.series[n], v)
}
} else if ex.rowsFlatten {
var recJson []byte
var err error
vs := values
if ex.Rownum {
vs = append([]any{ex.nrow}, values...)
}
for i, v := range vs {
recJson, err = json.Marshal(v)
if err != nil {
return err
}
if ex.nrow > 1 || i > 0 {
ex.output.Write([]byte(","))
}
ex.output.Write(recJson)
}
} else {
var recJson []byte
var err error
if ex.Rownum {
vs := append([]any{ex.nrow}, values...)
recJson, err = json.Marshal(vs)
} else {
recJson, err = json.Marshal(values)
}
if err != nil {
return err
}

if ex.nrow > 1 {
ex.output.Write([]byte(","))
}
ex.output.Write(recJson)
}

return nil
}

0 comments on commit 540dd89

Please sign in to comment.