Skip to content

Commit 0b3cff6

Browse files
committed
Add support in the data loader to write to multiple clusters
1 parent 19ba8da commit 0b3cff6

File tree

2 files changed

+179
-68
lines changed

2 files changed

+179
-68
lines changed

e2e/fixtures/fdb_data_loader.go

Lines changed: 110 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,10 @@ import (
2929
"text/template"
3030
"time"
3131

32-
k8serrors "k8s.io/apimachinery/pkg/api/errors"
33-
"k8s.io/utils/pointer"
34-
3532
"github.com/onsi/gomega"
3633
batchv1 "k8s.io/api/batch/v1"
3734
corev1 "k8s.io/api/core/v1"
35+
k8serrors "k8s.io/apimachinery/pkg/api/errors"
3836
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3937
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
4038
"k8s.io/apimachinery/pkg/runtime"
@@ -67,9 +65,11 @@ spec:
6765
name: {{ .Name }}
6866
# This configuration will load ~1GB per data loader.
6967
args:
70-
- --keys={{ .Keys }}
71-
- --batch-size=50
72-
- --value-size=1000
68+
- --keys={{ .Config.Keys }}
69+
- --batch-size={{ .Config.BatchSize }}
70+
- --value-size={{ .Config.ValueSize }}
71+
- --cluster-file-directory=/var/dynamic/fdb
72+
- --read-values={{ .Config.ReadValues }}
7373
env:
7474
- name: FDB_CLUSTER_FILE
7575
value: /var/dynamic/fdb/fdb.cluster
@@ -93,11 +93,9 @@ spec:
9393
value: /var/dynamic/fdb/libs
9494
- name: FDB_NETWORK_OPTION_TRACE_FORMAT
9595
value: json
96-
- name: PYTHONUNBUFFERED
97-
value: "on"
96+
- name: FDB_NETWORK_OPTION_CLIENT_THREADS_PER_VERSION
97+
value: "10"
9898
volumeMounts:
99-
- name: config-map
100-
mountPath: /var/dynamic-conf
10199
- name: fdb-libs
102100
mountPath: /var/dynamic/fdb
103101
- name: fdb-certs
@@ -128,9 +126,9 @@ spec:
128126
runAsGroup: 0
129127
# Install this library in a special location to force the operator to use it as the primary library.
130128
{{ if .CopyAsPrimary }}
131-
- name: foundationdb-kubernetes-init-7-1-primary
129+
- name: foundationdb-kubernetes-init-primary
132130
image: {{ .Image }}
133-
imagePullPolicy: {{ .ImagePullPolicy }}
131+
imagePullPolicy: Always
134132
args:
135133
# Note that we are only copying a library, rather than copying any binaries.
136134
- "--copy-library"
@@ -150,9 +148,9 @@ spec:
150148
- /bin/bash
151149
args:
152150
- -c
153-
- mkdir -p /var/dynamic/fdb/libs && {{ range $index, $version := .SidecarVersions -}} cp /var/dynamic/fdb/{{ .FDBVersion.Compact }}/lib/libfdb_c.so /var/dynamic/fdb/libs/libfdb_{{ .FDBVersion.Compact }}_c.so && {{ end }} cp /var/dynamic-conf/fdb.cluster /var/dynamic/fdb/fdb.cluster
151+
- mkdir -p /var/dynamic/fdb/libs && {{ range $index, $version := .SidecarVersions -}} cp /var/dynamic/fdb/{{ .FDBVersion.Compact }}/lib/libfdb_c.so /var/dynamic/fdb/libs/libfdb_{{ .FDBVersion.Compact }}_c.so && {{ end }} cp /var/dynamic-conf/*.cluster /var/dynamic/fdb/
154152
volumeMounts:
155-
- name: config-map
153+
- name: cluster-files
156154
mountPath: /var/dynamic-conf
157155
- name: fdb-libs
158156
mountPath: /var/dynamic/fdb
@@ -161,12 +159,17 @@ spec:
161159
readOnly: true
162160
restartPolicy: Never
163161
volumes:
164-
- name: config-map
165-
configMap:
166-
name: {{ .ClusterName }}-config
167-
items:
168-
- key: cluster-file
169-
path: fdb.cluster
162+
- name: cluster-files
163+
projected:
164+
sources:
165+
{{- range $index, $clusterName := .ClusterNames }}
166+
- name: {{ $clusterName }}-config
167+
configMap:
168+
name: {{ $clusterName }}-config
169+
items:
170+
- key: cluster-file
171+
path: {{ $clusterName }}.cluster
172+
{{- end }}
170173
- name: fdb-libs
171174
emptyDir: {}
172175
- name: fdb-logs
@@ -195,9 +198,11 @@ spec:
195198
name: {{ .Name }}
196199
# This configuration will load ~1GB per data loader.
197200
args:
198-
- --keys={{ .Keys }}
199-
- --batch-size=50
200-
- --value-size=1000
201+
- --keys={{ .Config.Keys }}
202+
- --batch-size={{ .Config.BatchSize }}
203+
- --value-size={{ .Config.ValueSize }}
204+
- --cluster-file-directory=/var/dynamic/fdb
205+
- --read-values={{ .Config.ReadValues }}
201206
env:
202207
- name: FDB_CLUSTER_FILE
203208
value: /var/dynamic/fdb/fdb.cluster
@@ -221,11 +226,9 @@ spec:
221226
value: /var/dynamic/fdb
222227
- name: FDB_NETWORK_OPTION_TRACE_FORMAT
223228
value: json
224-
- name: PYTHONUNBUFFERED
225-
value: "on"
229+
- name: FDB_NETWORK_OPTION_CLIENT_THREADS_PER_VERSION
230+
value: "10"
226231
volumeMounts:
227-
- name: config-map
228-
mountPath: /var/dynamic-conf
229232
- name: fdb-libs
230233
mountPath: /var/dynamic/fdb
231234
- name: fdb-certs
@@ -238,60 +241,67 @@ spec:
238241
cpu: "1"
239242
memory: 4Gi
240243
initContainers:
241-
{{ range $index, $version := .SidecarVersions }}
244+
{{- range $index, $version := .SidecarVersions }}
242245
- name: foundationdb-kubernetes-init-{{ $index }}
243246
image: {{ .Image }}
244-
imagePullPolicy: {{ .ImagePullPolicy }}
247+
imagePullPolicy: Always
245248
args:
246249
- --mode
247250
- init
248251
- --output-dir
249252
- /var/output-files
250253
- --copy-library
251254
- "{{ .FDBVersion.Compact }}"
252-
{{ if .CopyAsPrimary }}
255+
{{- if .CopyAsPrimary }}
253256
- --copy-primary-library
254257
- "{{ .FDBVersion.Compact }}"
255-
{{ end }}
258+
{{- end }}
256259
volumeMounts:
257260
- name: fdb-libs
258261
mountPath: /var/output-files
259262
securityContext:
260263
runAsUser: 0
261264
runAsGroup: 0
262-
{{ if .CopyAsPrimary }}
265+
{{- if .CopyAsPrimary }}
263266
- name: foundationdb-kubernetes-init-cluster-file
264267
image: {{ .Image }}
265-
imagePullPolicy: {{ .ImagePullPolicy }}
268+
imagePullPolicy: Always
266269
args:
267270
- --mode
268271
- init
269272
- --input-dir
270273
- /var/dynamic-conf
271274
- --output-dir
272275
- /var/output-files
276+
{{- range $index, $clusterName := $.ClusterNames }}
273277
- --copy-file
274-
- fdb.cluster
278+
- {{ $clusterName }}.cluster
275279
- --require-not-empty
276-
- fdb.cluster
280+
- {{ $clusterName }}.cluster
281+
{{- end }}
277282
volumeMounts:
278283
- name: fdb-libs
279284
mountPath: /var/output-files
280-
- name: config-map
281-
mountPath: /var/dynamic-conf
285+
- name: cluster-files
286+
mountPath: /var/dynamic-conf/
282287
securityContext:
283288
runAsUser: 0
284289
runAsGroup: 0
285-
{{ end }}
286-
{{ end }}
290+
{{- end }}
291+
{{- end }}
287292
restartPolicy: Never
288293
volumes:
289-
- name: config-map
290-
configMap:
291-
name: {{ .ClusterName }}-config
292-
items:
293-
- key: cluster-file
294-
path: fdb.cluster
294+
- name: cluster-files
295+
projected:
296+
sources:
297+
{{- range $index, $clusterName := .ClusterNames }}
298+
- name: {{ $clusterName }}-config
299+
configMap:
300+
name: {{ $clusterName }}-config
301+
items:
302+
- key: cluster-file
303+
path: {{ $clusterName }}.cluster
304+
{{- end }}
295305
- name: fdb-libs
296306
emptyDir: {}
297307
- name: fdb-logs
@@ -311,25 +321,60 @@ type dataLoaderConfig struct {
311321
SidecarVersions []SidecarConfig
312322
// Namespace represents the namespace for the Deployment and all associated resources
313323
Namespace string
314-
// ClusterName the name of the cluster to load data into.
315-
ClusterName string
324+
// ClusterNames the names of the clusters to load data into.
325+
ClusterNames []string
316326
// SecretName represents the Kubernetes secret that contains the certificates for communicating with the FoundationDB
317327
// cluster.
318328
SecretName string
329+
// Config defines the workload configuration.
330+
Config *WorkloadConfig
331+
}
332+
333+
// WorkloadConfig defines the workload configuration.
334+
type WorkloadConfig struct {
319335
// Keys defines how many keys should be written by the data loader.
320336
Keys int
337+
// BatchSize defines how many keys should be inserted per batch (transaction).
338+
BatchSize int
339+
// ValueSize defines the value size in bytes per key-value pair.
340+
ValueSize int
341+
// ReadValues defines if the data loader should be reading the written values again to add some read load.
342+
ReadValues bool
321343
}
322344

323-
func (factory *Factory) getDataLoaderConfig(cluster *FdbCluster, keys *int) *dataLoaderConfig {
324-
log.Println("keys:", keys, "deref:", pointer.IntDeref(keys, 1000000))
345+
func (config *WorkloadConfig) setDefaults() {
346+
if config.Keys == 0 {
347+
config.Keys = 1000000
348+
}
349+
350+
if config.BatchSize == 0 {
351+
config.BatchSize = 50
352+
}
353+
354+
if config.ValueSize == 0 {
355+
config.ValueSize = 1000
356+
}
357+
}
358+
359+
func (factory *Factory) getDataLoaderConfig(clusters []*FdbCluster, config *WorkloadConfig) *dataLoaderConfig {
360+
if config == nil {
361+
config = &WorkloadConfig{}
362+
}
363+
364+
config.setDefaults()
365+
366+
clusterNames := make([]string, 0, len(clusters))
367+
for _, cluster := range clusters {
368+
clusterNames = append(clusterNames, cluster.Name())
369+
}
325370
return &dataLoaderConfig{
326371
Name: dataLoaderName,
327-
Image: factory.GetDataLoaderImage(),
328-
Namespace: cluster.Namespace(),
372+
Image: "112664522426.dkr.ecr.us-west-2.amazonaws.com/foundationdb/fdb-data-loader:jscheuermann", //factory.GetDataLoaderImage(),
373+
Namespace: clusters[0].Namespace(),
329374
SidecarVersions: factory.GetSidecarConfigs(),
330-
ClusterName: cluster.Name(),
375+
ClusterNames: clusterNames,
331376
SecretName: factory.GetSecretName(),
332-
Keys: pointer.IntDeref(keys, 1000000),
377+
Config: config,
333378
}
334379
}
335380

@@ -338,9 +383,9 @@ func (factory *Factory) CreateDataLoaderIfAbsent(cluster *FdbCluster) {
338383
factory.CreateDataLoaderIfAbsentWithWait(cluster, nil, true)
339384
}
340385

341-
// CreateDataLoaderIfAbsentWithWait will create the data loader for the provided cluster and load some random data into the cluster.
342-
// If wait is true, the method will wait until the data loader has finished.
343-
func (factory *Factory) CreateDataLoaderIfAbsentWithWait(cluster *FdbCluster, keys *int, wait bool) {
386+
// CreateDataLoaderIfAbsentWithWaitForMultipleClusters will create a data loader configuration that loads data into multiple
387+
// FoundationDB clusters.
388+
func (factory *Factory) CreateDataLoaderIfAbsentWithWaitForMultipleClusters(clusters []*FdbCluster, config *WorkloadConfig, wait bool) {
344389
if !factory.options.enableDataLoading {
345390
return
346391
}
@@ -352,8 +397,7 @@ func (factory *Factory) CreateDataLoaderIfAbsentWithWait(cluster *FdbCluster, ke
352397
t, err := template.New("dataLoaderJob").Parse(dataLoaderJobTemplate)
353398
gomega.Expect(err).NotTo(gomega.HaveOccurred())
354399
buf := bytes.Buffer{}
355-
log.Println("keys:", keys)
356-
gomega.Expect(t.Execute(&buf, factory.getDataLoaderConfig(cluster, keys))).NotTo(gomega.HaveOccurred())
400+
gomega.Expect(t.Execute(&buf, factory.getDataLoaderConfig(clusters, config))).NotTo(gomega.HaveOccurred())
357401
decoder := yamlutil.NewYAMLOrJSONDecoder(&buf, 100000)
358402
for {
359403
var rawObj runtime.RawExtension
@@ -381,8 +425,14 @@ func (factory *Factory) CreateDataLoaderIfAbsentWithWait(cluster *FdbCluster, ke
381425
return
382426
}
383427

384-
factory.WaitUntilDataLoaderIsDone(cluster)
385-
factory.DeleteDataLoader(cluster)
428+
factory.WaitUntilDataLoaderIsDone(clusters[0])
429+
factory.DeleteDataLoader(clusters[0])
430+
}
431+
432+
// CreateDataLoaderIfAbsentWithWait will create the data loader for the provided cluster and load some random data into the cluster.
433+
// If wait is true, the method will wait until the data loader has finished.
434+
func (factory *Factory) CreateDataLoaderIfAbsentWithWait(cluster *FdbCluster, config *WorkloadConfig, wait bool) {
435+
factory.CreateDataLoaderIfAbsentWithWaitForMultipleClusters([]*FdbCluster{cluster}, config, wait)
386436
}
387437

388438
// DeleteDataLoader will delete the data loader job

0 commit comments

Comments
 (0)