Skip to content

Commit d55408d

Browse files
authored
[feat] dataconv: SharedDict for concurrent access (#49)
* draft for shared dict * limit * move * SD * add tests * cnt it * SD * custom fn * udate test" ; * fir orias * callable type * more errors * for args cnt * testing frozen * remove unused code * all attr tests * check more equal * not self * noop self * S unit * lambda works' tig * unpacks * clean pack things --- now focus on lint * for comments * for missing * compare type * unsupported operator
1 parent 95cff61 commit d55408d

File tree

3 files changed

+942
-7
lines changed

3 files changed

+942
-7
lines changed

dataconv/share.go

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
package dataconv
2+
3+
import (
4+
"fmt"
5+
"sort"
6+
"sync"
7+
8+
"go.starlark.net/starlark"
9+
"go.starlark.net/syntax"
10+
)
11+
12+
// SharedDict is a dictionary that can be shared among multiple Starlark threads.
13+
type SharedDict struct {
14+
sync.RWMutex
15+
dict *starlark.Dict
16+
frozen bool
17+
}
18+
19+
const (
20+
defaultSharedDictSize = 8
21+
)
22+
23+
// NewSharedDict creates a new SharedDict instance.
24+
func NewSharedDict() *SharedDict {
25+
return &SharedDict{
26+
dict: starlark.NewDict(defaultSharedDictSize),
27+
}
28+
}
29+
30+
var (
31+
_ starlark.Value = (*SharedDict)(nil)
32+
_ starlark.Comparable = (*SharedDict)(nil)
33+
_ starlark.Mapping = (*SharedDict)(nil)
34+
_ starlark.HasAttrs = (*SharedDict)(nil)
35+
_ starlark.HasSetKey = (*SharedDict)(nil)
36+
)
37+
38+
func (s *SharedDict) String() string {
39+
var v string
40+
if s != nil && s.dict != nil {
41+
v = s.dict.String()
42+
}
43+
return fmt.Sprintf("shared_dict(%s)", v)
44+
}
45+
46+
// Type returns the type name of the SharedDict.
47+
func (s *SharedDict) Type() string {
48+
return "shared_dict"
49+
}
50+
51+
// Freeze prevents the SharedDict from being modified.
52+
func (s *SharedDict) Freeze() {
53+
s.Lock()
54+
defer s.Unlock()
55+
56+
s.frozen = true
57+
if s.dict != nil {
58+
s.dict.Freeze()
59+
}
60+
}
61+
62+
// Truth returns the truth value of the SharedDict.
63+
func (s *SharedDict) Truth() starlark.Bool {
64+
s.RLock()
65+
defer s.RUnlock()
66+
67+
return s != nil && s.dict != nil && s.dict.Truth()
68+
}
69+
70+
// Hash returns the hash value of the SharedDict, actually it's not hashable.
71+
func (s *SharedDict) Hash() (uint32, error) {
72+
return 0, fmt.Errorf("unhashable type: shared_dict")
73+
}
74+
75+
// Get returns the value corresponding to the specified key, or not found if the mapping does not contain the key.
76+
// It implements the starlark.Mapping interface.
77+
func (s *SharedDict) Get(k starlark.Value) (v starlark.Value, found bool, err error) {
78+
s.RLock()
79+
defer s.RUnlock()
80+
81+
if s.dict != nil {
82+
return s.dict.Get(k)
83+
}
84+
return nil, false, nil
85+
}
86+
87+
// SetKey sets the value for the specified key, supports update using x[k]=v syntax, like a dictionary.
88+
// It implements the starlark.HasSetKey interface.
89+
func (s *SharedDict) SetKey(k, v starlark.Value) error {
90+
s.Lock()
91+
defer s.Unlock()
92+
93+
// basic check
94+
if s.frozen {
95+
return fmt.Errorf("frozen dict")
96+
}
97+
98+
// maybe create the dictionary (perhaps this line is unreachable)
99+
if s.dict == nil {
100+
s.dict = starlark.NewDict(defaultSharedDictSize)
101+
}
102+
103+
// check if the value is a shared dict -- reject it
104+
if sd, ok := v.(*SharedDict); ok {
105+
return fmt.Errorf("unsupported value: %s", sd.Type())
106+
}
107+
return s.dict.SetKey(k, v)
108+
}
109+
110+
// Attr returns the value of the specified attribute, or (nil, nil) if the attribute is not found.
111+
// It implements the starlark.HasAttrs interface.
112+
func (s *SharedDict) Attr(name string) (starlark.Value, error) {
113+
s.Lock()
114+
defer s.Unlock()
115+
116+
// basic check
117+
if s.dict == nil {
118+
return nil, nil
119+
}
120+
121+
var (
122+
attr starlark.Value
123+
err error
124+
)
125+
// try to get the new custom builtin
126+
if b, ok := customSharedDictMethods[name]; ok {
127+
attr = b.BindReceiver(s.dict)
128+
} else {
129+
// get the builtin from the original dict
130+
attr, err = s.dict.Attr(name)
131+
}
132+
133+
// convert to builtin
134+
if attr == nil || err != nil {
135+
return attr, err
136+
}
137+
btl, ok := attr.(*starlark.Builtin)
138+
if !ok {
139+
return nil, fmt.Errorf("unsupported attribute: %s", name)
140+
}
141+
142+
// wrap the builtin
143+
return starlark.NewBuiltin(name, func(thread *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
144+
// lock the shared dict
145+
s.Lock()
146+
defer s.Unlock()
147+
148+
// call the original builtin
149+
return btl.CallInternal(thread, args, kwargs)
150+
}), nil
151+
}
152+
153+
// AttrNames returns a new slice containing the names of all the attributes of the SharedDict.
154+
// It implements the starlark.HasAttrs interface.
155+
func (s *SharedDict) AttrNames() []string {
156+
if s.dict != nil {
157+
names := s.dict.AttrNames()
158+
for cn := range customSharedDictMethods {
159+
names = append(names, cn)
160+
}
161+
sort.Strings(names)
162+
return names
163+
}
164+
return nil
165+
}
166+
167+
// CompareSameType compares the SharedDict with another value of the same type.
168+
// It implements the starlark.Comparable interface.
169+
func (s *SharedDict) CompareSameType(op syntax.Token, y_ starlark.Value, depth int) (bool, error) {
170+
// if they are the same object, they are equal
171+
if s == y_ {
172+
switch op {
173+
case syntax.EQL:
174+
return true, nil
175+
case syntax.NEQ:
176+
return false, nil
177+
default:
178+
return false, fmt.Errorf("unsupported operator: %s", op)
179+
}
180+
}
181+
182+
// scan the type
183+
y := y_.(*SharedDict)
184+
185+
// lock both objects
186+
s.RLock()
187+
defer s.RUnlock()
188+
y.RLock()
189+
defer y.RUnlock()
190+
191+
// compare the underlying dictionaries
192+
if s.dict != nil && y.dict != nil {
193+
return s.dict.CompareSameType(op, y.dict, depth)
194+
} else if s.dict == nil && y.dict == nil {
195+
// both are nil, they are equal, aha! (nil == nil)
196+
return true, nil
197+
}
198+
199+
// one is nil, the other is not, they are not equal
200+
switch op {
201+
case syntax.EQL:
202+
return false, nil
203+
case syntax.NEQ:
204+
return true, nil
205+
default:
206+
return false, fmt.Errorf("unsupported operator: %s", op)
207+
}
208+
}
209+
210+
var (
211+
customSharedDictMethods = map[string]*starlark.Builtin{
212+
"len": starlark.NewBuiltin("len", shardDictLen),
213+
"perform": starlark.NewBuiltin("perform", shardDictPerform),
214+
}
215+
)
216+
217+
// shardDictLen returns the length of the underlying dictionary.
218+
func shardDictLen(_ *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
219+
if err := starlark.UnpackPositionalArgs(b.Name(), args, kwargs, 0); err != nil {
220+
return nil, err
221+
}
222+
l := b.Receiver().(*starlark.Dict).Len()
223+
return starlark.MakeInt(l), nil
224+
}
225+
226+
// shardDictPerform calls the given function with the underlying receiver dictionary, and returns the result.
227+
// The function must be callable, like def perform(fn).
228+
func shardDictPerform(thread *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
229+
// get the perform function
230+
var pr starlark.Value
231+
if err := starlark.UnpackArgs(b.Name(), args, kwargs, "fn", &pr); err != nil {
232+
return nil, err
233+
}
234+
235+
// get the receiver
236+
d := b.Receiver().(*starlark.Dict)
237+
238+
// call the function with the receiver
239+
switch pr := pr.(type) {
240+
case starlark.Callable:
241+
return pr.CallInternal(thread, starlark.Tuple{d}, nil)
242+
default:
243+
return nil, fmt.Errorf("%s: not callable type: %s", b.Name(), pr.Type())
244+
}
245+
}

0 commit comments

Comments
 (0)