Skip to content

Commit

Permalink
Merge pull request #578 from nevalang/stream_rewrite
Browse files Browse the repository at this point in the history
Stream rewrite
  • Loading branch information
emil14 committed Apr 29, 2024
2 parents 2eb0c62 + 35ee82e commit 4c534e0
Show file tree
Hide file tree
Showing 32 changed files with 236 additions and 2,883 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"mode": "auto",
"program": "${workspaceFolder}/cmd/neva",
"cwd": "${workspaceFolder}",
"args": ["run", "examples/struct_selector"]
"args": ["run", "e2e/add_nums_verbose/main"]
},
{
"name": "LSP",
Expand Down
6 changes: 5 additions & 1 deletion e2e/const_refs_verbose/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@ import (
"github.com/stretchr/testify/require"
)

// Nested map has only one key because keys are unordered
// so having order in test will make it flacky.
func Test(t *testing.T) {
cmd := exec.Command("neva", "run", "main")

out, err := cmd.CombinedOutput()
require.NoError(t, err)

require.Equal(
t,
"map[l:[1 2 3] m:map[one:1 three:3 two:2]]\n",
`{"l": [1, 2, 3], "m": {"key": 1}}
`,
string(out),
)

Expand Down
6 changes: 1 addition & 5 deletions e2e/const_refs_verbose/main/main.neva
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@ const {
two int = 2
three int = 3
numsList list<int> = [one, two, three]
numsMap map<int> = {
one: one,
two: two,
three: three
}
numsMap map<int> = { key: one }
numsStruct NumsStruct = {
l: numsList,
m: numsMap
Expand Down
5 changes: 4 additions & 1 deletion examples/const_refs/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/stretchr/testify/require"
)

// Nested map has only one key because keys are unordered
// so having order in test will make it flacky.
func Test(t *testing.T) {
err := os.Chdir("..")
require.NoError(t, err)
Expand All @@ -22,7 +24,8 @@ func Test(t *testing.T) {
require.NoError(t, err)
require.Equal(
t,
"map[l:[1 2 3] m:map[one:1 three:3 two:2]]\n",
`{"l": [1, 2, 3], "m": {"key": 1}}
`,
string(out),
)

Expand Down
6 changes: 1 addition & 5 deletions examples/const_refs/main.neva
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@ const {
two int = 2
three int = 3
numsList list<int> = [one, two, three]
numsMap map<int> = {
one: one,
two: two,
three: three
}
numsMap map<int> = { key: one }
numsStruct NumsStruct = {
l: numsList,
m: numsMap
Expand Down
18 changes: 12 additions & 6 deletions examples/fizzbuzz/main.neva
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
component Main(start) (stop) {
nodes { Mod, Range, Match<int>, Unwrap<int>, Println, Lock<int> }
nodes { Range, PrintLine, Match }
net {
:start -> [
(1 -> range:from),
(101 -> range:to)
]
range -> unwrap
range.data -> printLine -> match:data
100 -> match:case[0] -> :stop
}
}

component PrintLine(data int) (data int) {
nodes { Mod, Println, Lock<int> }
net {
:data -> [mod:data, lock:data]

unwrap:some -> [mod:data, lock:data]
15 -> mod:case[0] -> ('FizzBuzz' -> println)
3 -> mod:case[1] -> ('Fizz' -> println)
5 -> mod:case[2] -> ('Buzz' -> println)
mod:else -> println

println -> lock:sig
lock -> match:data
100 -> match:case[0] -> :stop
lock:data -> :data
}
}
}
6 changes: 5 additions & 1 deletion examples/iterate_over_list/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ func Test(t *testing.T) {
require.NoError(t, err)
require.Equal(
t,
"50\n30\n20\n100\n<nil>\n",
`{"data": 50, "idx": 0, "last": false}
{"data": 30, "idx": 1, "last": false}
{"data": 20, "idx": 2, "last": false}
{"data": 100, "idx": 3, "last": true}
`,
string(out),
)

Expand Down
7 changes: 4 additions & 3 deletions examples/iterate_over_list/main.neva
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
const lst list<int> = [50, 30, 20, 100]

component Main(start) (stop) {
nodes { Println<maybe<int>>, Iter<int>, Unwrap<int> }
nodes { Println<stream<int>>, Iter<int>, Match<bool> }
net {
:start -> ($lst -> iter -> println -> unwrap)
unwrap:none -> :stop
:start -> ($lst -> iter -> println)
println.last -> match:data
true -> match:case[0] -> :stop
}
}
21 changes: 12 additions & 9 deletions internal/runtime/funcs/int_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import (

type intAdd struct{}

func (intAdd) Create(io runtime.FuncIO, _ runtime.Msg) (func(ctx context.Context), error) {
func (intAdd) Create(
io runtime.FuncIO,
_ runtime.Msg,
) (func(ctx context.Context), error) {
seqIn, err := io.In.Port("seq")
if err != nil {
return nil, err
Expand All @@ -20,29 +23,29 @@ func (intAdd) Create(io runtime.FuncIO, _ runtime.Msg) (func(ctx context.Context
}

return func(ctx context.Context) {
var (
acc int64
cur runtime.Msg
)
var acc int64 = 0

for {
var item map[string]runtime.Msg
select {
case <-ctx.Done():
return
case cur = <-seqIn:
case msg := <-seqIn:
item = msg.Map()
}

if cur == nil {
acc += item["data"].Int()

if item["last"].Bool() {
select {
case <-ctx.Done():
return
case resOut <- runtime.NewIntMsg(acc):
acc = 0
acc = 0 // reset
continue
}
}

acc += cur.Int()
}
}, nil
}
File renamed without changes.
17 changes: 8 additions & 9 deletions internal/runtime/funcs/int_mul.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,28 @@ func (intMul) Create(io runtime.FuncIO, _ runtime.Msg) (func(ctx context.Context
}

return func(ctx context.Context) {
var (
acc int64 = 1
cur runtime.Msg
)
var acc int64 = 1

for {
var item map[string]runtime.Msg
select {
case <-ctx.Done():
return
case cur = <-seqIn:
case msg := <-seqIn:
item = msg.Map()
}

if cur == nil {
acc *= item["data"].Int()

if item["last"].Bool() {
select {
case <-ctx.Done():
return
case resOut <- runtime.NewIntMsg(acc):
acc = 1
acc = 1 // reset
continue
}
}

acc *= cur.Int()
}
}, nil
}
27 changes: 14 additions & 13 deletions internal/runtime/funcs/int_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,36 @@ func (intSub) Create(io runtime.FuncIO, _ runtime.Msg) (func(ctx context.Context

return func(ctx context.Context) {
var (
acc int64 = 0
inProgress bool = false
cur runtime.Msg
acc int64 = 0
started bool = false
)

for {
var item map[string]runtime.Msg
select {
case <-ctx.Done():
return
case cur = <-seqIn:
case msg := <-seqIn:
item = msg.Map()
}

if cur == nil {
if !started {
acc = item["data"].Int()
started = true
} else {
acc -= item["data"].Int()
}

if item["last"].Bool() {
select {
case <-ctx.Done():
return
case resOut <- runtime.NewIntMsg(acc):
acc = 0
inProgress = false
started = false
continue
}
}

if !inProgress {
acc = cur.Int()
inProgress = true
} else {
acc -= cur.Int()
}
}
}, nil
}
35 changes: 20 additions & 15 deletions internal/runtime/funcs/list_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import (

type list_iter struct{}

func (c list_iter) Create(io runtime.FuncIO, _ runtime.Msg) (func(ctx context.Context), error) {
func (c list_iter) Create(
io runtime.FuncIO,
_ runtime.Msg,
) (func(ctx context.Context), error) {
dataIn, err := io.In.Port("data")
if err != nil {
return nil, err
Expand All @@ -21,25 +24,27 @@ func (c list_iter) Create(io runtime.FuncIO, _ runtime.Msg) (func(ctx context.Co

return func(ctx context.Context) {
for {
var list []runtime.Msg

select {
case data, ok := <-dataIn:
if !ok {
return // lstIn channel closed
}
for i := 0; i < len(data.List()); i++ {
select {
case <-ctx.Done():
return
case seqOut <- data.List()[i]:
}
}
case <-ctx.Done():
return
case dataMsg := <-dataIn:
list = dataMsg.List()
}

for idx := 0; idx < len(list); idx++ {
item := streamItem(
list[idx],
int64(idx),
idx == len(list)-1,
)

select {
case <-ctx.Done():
return
case seqOut <- nil:
case seqOut <- item:
}
case <-ctx.Done():
return
}
}
}, nil
Expand Down
File renamed without changes.
File renamed without changes.
25 changes: 13 additions & 12 deletions internal/runtime/funcs/port_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"github.com/nevalang/neva/internal/runtime"
)

type portSequencer struct{}
type portStreamer struct{}

func (portSequencer) Create(
func (portStreamer) Create(
io runtime.FuncIO,
_ runtime.Msg,
) (func(context.Context), error) {
Expand All @@ -24,28 +24,29 @@ func (portSequencer) Create(
}

return func(ctx context.Context) {
var msg runtime.Msg

for {
for _, slot := range portIn {
l := len(portIn)

for i, slot := range portIn {
var msg runtime.Msg
select {
case <-ctx.Done():
return
case msg = <-slot:
}

item := streamItem(
msg,
int64(i),
i == l-1,
)

select {
case <-ctx.Done():
return
case streamOut <- msg:
case streamOut <- item:
}
}

select {
case <-ctx.Done():
return
case streamOut <- nil:
}
}
}, nil
}
Loading

0 comments on commit 4c534e0

Please sign in to comment.