-
Notifications
You must be signed in to change notification settings - Fork 4
/
content-addressing.ss
165 lines (143 loc) · 6.54 KB
/
content-addressing.ss
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
;; Content Addressing
(export #t)
(import
(for-syntax :clan/syntax)
:gerbil/gambit
:std/format :std/lazy :std/misc/completion :std/misc/hash :std/misc/ports :std/sugar
:clan/base :clan/concurrency :clan/io :clan/string
:clan/poo/object :clan/poo/mop :clan/poo/fun :clan/poo/io :clan/poo/type :clan/poo/brace
:clan/crypto/keccak
./db ./db-queue ./persist)
;; Should the structures belo POO traits instead? Probably not until POO traits are efficient, too!
;; Should that be called a DigestingContext and ContentAddressingContext,
;; to be provided statically or dynamically to make ContentAddressable objects?
;; Note that in a given application, a *same* object may be considered from the point of view
;; of *several* Digesting contexts, because it may be tracked similtaneously in multiple
;; blockchains or registries.
;; But usually, there content-addressing is tied to a specific database,
;; and uses a fixed digesting context that is extended with extra caches.
(defstruct Digesting
(sexp ;; : SExp ;; how to print it.
Digest ;; : Type ;; some fixed-size Bytes.
digest)) ;; : Digest <- Bytes
(defstruct (ContentAddressing Digesting)
(key-prefix ;; : Bytes
mutex)) ;; : Mutex
;; : ContentAddressing
(def keccak-addressing
(make-ContentAddressing
'keccak-addressing
(BytesN 32)
keccak256<-bytes
(string->bytes "K2")
(make-mutex 'k2cas)))
;; : (Parameter ContentAddressing)
(def current-content-addressing (make-parameter keccak-addressing))
;; : Digest <- Bytes ?Digesting
(def (digest<-bytes bytes (digesting (current-content-addressing)))
((Digesting-digest digesting) bytes))
;; : Digest <- T:Type T ?Digesting
(def (digest<- type value (digesting (current-content-addressing)))
(digest<-bytes (bytes<- type value) digesting))
;; : Digest <- String ?Digesting
(def (digest<-string string (digesting (current-content-addressing)))
(digest<-bytes (string->bytes string) digesting))
;; : Digest <- String ?Digesting
(def (digest<-file path (digesting (current-content-addressing)))
;; TODO: make it work efficiently on large files without loading the entire file into memory,
;; just into buffers of say 8KB or 1MB, or whatever works best.
(digest<-bytes (read-file-u8vector path) digesting))
;; trait for digestability in a given content-addressing context
(define-type (Digestable @ [] .bytes<- .digesting)
.digest<-: (lambda (v (digesting .digesting)) (digest<-bytes (.bytes<- v) digesting)))
;; Non-functor function
(define-type (DigestWrapper^ @ [])
.tap: (lambda (t) (Digesting-Digest (.@ t .digesting)))
.ap^: (cut .call <> .digest<- <>)
.unap^: invalid
.marshal^: (lambda (t v port) (marshal (Digesting-Digest (.@ t .digesting)) v port))
.unmarshal^: (lambda (t port) (unmarshal (Digesting-Digest (.@ t .digesting)) port)))
;; CAVEAT EMPTOR: This trait statically but *lazily* captures
;; the dynamic current-content-addressing in this interface at time of first reference.
;; This allows you to define all your interfaces independently from which digest function will be used,
;; but a given poo interface should be used in one context only, they should be initialized together,
;; you may want to statically clone and override in some cases, etc.
(define-type (CurrentDigesting @ [Digestable])
.digesting: (current-content-addressing))
;; : Bytes <- Digest ?ContentAddressing
(def (content-addressing-key digest (content-addressing (current-content-addressing)))
(u8vector-append (ContentAddressing-key-prefix content-addressing) digest))
(define-type (ContentAddressable @ [] sexp .digesting .digest<- .<-bytes .bytes<-)
;; CAVEAT EMPTOR: The application developers must ensure there are no collisions
;; with respect to sexp for types stored in a given content-addressable context.
.content-cache: (make-hash-table weak-values: #t)
;; @ <- Digest TX
.<-digest:
(lambda (digest tx)
;; TODO: figure out what are or aren't Gambit's guarantees regarding
;; concurrent access to a table.
;; Concurrency, reentrance, etc., may cause issues here, but a mutex doesn't seem composable.
;; Some kind of transactional memory may be required, at which point,
;; should the caching, decoding and transacting service be moved "upstream"
;; into the database thread and/or mutex?
(hash-ensure-ref .content-cache digest
(cut .<-bytes (db-get (content-addressing-key digest .digesting) tx))))
.make-persistent:
(lambda (x tx)
(def b (.bytes<- x))
(def d (digest<-bytes b .digesting))
(def k (content-addressing-key d .digesting))
(unless (db-key? k tx)
(make-dependencies-persistent @ x tx)
(db-put! k b tx))))
(defstruct DV ;; (forall T:Type Type)
(type ;; : T:Type
value ;; : (Lazy T)
digest ;; : (Lazy Digest)
persisted?)) ;; : Bool
(def (value<-dv dv) (force (DV-value dv)))
(defrule (dv t x) (let (t t) (DV t (lazy x) (lazy (digest<- t x)) #f)))
(def (digest<-dv dv) (force (DV-digest dv)))
(def (dv<-digest t d) (DV t (lazy (.call t .<-digest d)) (lazy d) #t))
;; ContentAddressed
(define-type (ContentAddressed. @ [ContentAddressable] T .digesting)
Wrapper: {(:: @ [Wrapper.])
.ap: (lambda (v) (dv T v))
.unap: value<-dv}
.validate:
(lambda (dv)
(unless (DV? dv) (raise-type-error "not a DV" dv))
(match (std/lazy#&lazy-e (DV-value dv))
(['resolved . v]
(validate T v)
(match (std/lazy#&lazy-e (DV-digest dv))
(['resolved . d]
(unless (equal? d (digest<- T v .digesting)) (raise-type-error "digest does not match" dv))
dv)
(_ dv)))
(_ dv)))
.Digest: (Digesting-Digest .digesting)
.bytes<-: digest<-dv
.<-bytes: (cut dv<-digest @ <>)
.digest<-: .bytes<- ;; don't double-digest!
.marshal: (lambda (dv port) (marshal .Digest (digest<-dv dv) port))
.unmarshal: (lambda (port) (.<-bytes (unmarshal .Digest port)))
.make-persistent:
(lambda (dv tx)
(unless (DV-persisted? dv)
(let* ((d (digest<-dv dv))
(k (content-addressing-key d .digesting)))
(unless (db-key? k tx)
(let (v (value<-dv dv))
(make-dependencies-persistent T v tx)
(db-put! k (bytes<- T v) tx)))))))
(def (ContentAddressed T)
{(:: @ ContentAddressed.) T
sexp: `(ContentAddressed ,(.@ T sexp))})
(def (digest<-marshal marshal (digesting (current-content-addressing)))
(digest<-bytes (call-with-output-u8vector marshal) digesting))
(defrules digest-product ()
((_ (digesting) (val type) ...)
(digest<-marshal (lambda (port) (marshal-product port (val type) ...)) digesting))
((d (val type) ...)
(d ((current-content-addressing)) (val type) ...)))