@@ -2,8 +2,10 @@ package cmd
2
2
3
3
import (
4
4
"context"
5
+ "net"
5
6
"os"
6
7
"path/filepath"
8
+ "strconv"
7
9
"time"
8
10
9
11
envoy_bootstrap_v3 "github.com/envoyproxy/go-control-plane/envoy/config/bootstrap/v3"
@@ -13,6 +15,8 @@ import (
13
15
mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
14
16
"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/accesslogs"
15
17
"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/certificate"
18
+ "github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/configfetcher"
19
+ "github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/dnsproxy"
16
20
"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/dnsserver"
17
21
"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/envoy"
18
22
"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/meshmetrics"
@@ -27,7 +31,9 @@ import (
27
31
"github.com/kumahq/kuma/pkg/core/resources/model/rest"
28
32
"github.com/kumahq/kuma/pkg/core/runtime/component"
29
33
core_xds "github.com/kumahq/kuma/pkg/core/xds"
30
- "github.com/kumahq/kuma/pkg/util/net"
34
+ dns_dpapi "github.com/kumahq/kuma/pkg/dns/dpapi"
35
+ meshmetric_dpapi "github.com/kumahq/kuma/pkg/plugins/policies/meshmetric/dpapi"
36
+ kuma_net "github.com/kumahq/kuma/pkg/util/net"
31
37
"github.com/kumahq/kuma/pkg/util/proto"
32
38
kuma_version "github.com/kumahq/kuma/pkg/version"
33
39
"github.com/kumahq/kuma/pkg/xds/bootstrap/types"
@@ -84,7 +90,7 @@ func newRunCmd(opts kuma_cmd.RunCmdOpts, rootCtx *RootContext) *cobra.Command {
84
90
85
91
proxyResource , err = readResource (cmd , & cfg .DataplaneRuntime )
86
92
if err != nil {
87
- runLog .Error (err , "failed to read policy " , "proxyType" , cfg .Dataplane .ProxyType )
93
+ runLog .Error (err , "failed to read dataplane " , "proxyType" , cfg .Dataplane .ProxyType )
88
94
89
95
return err
90
96
}
@@ -205,6 +211,12 @@ func newRunCmd(opts kuma_cmd.RunCmdOpts, rootCtx *RootContext) *cobra.Command {
205
211
}
206
212
opts .AdminPort = bootstrap .GetAdmin ().GetAddress ().GetSocketAddress ().GetPortValue ()
207
213
214
+ confFetcher := configfetcher .NewConfigFetcher (
215
+ core_xds .MeshMetricsDynamicConfigurationSocketName (cfg .DataplaneRuntime .SocketDir ),
216
+ time .NewTicker (cfg .DataplaneRuntime .DynamicConfiguration .RefreshInterval .Duration ),
217
+ cfg .DataplaneRuntime .DynamicConfiguration .RefreshInterval .Duration ,
218
+ )
219
+
208
220
if cfg .DNS .Enabled && ! cfg .Dataplane .IsZoneProxy () {
209
221
dnsOpts := & dnsserver.Opts {
210
222
Config : * cfg ,
@@ -216,29 +228,49 @@ func newRunCmd(opts kuma_cmd.RunCmdOpts, rootCtx *RootContext) *cobra.Command {
216
228
if len (kumaSidecarConfiguration .Networking .CorefileTemplate ) > 0 {
217
229
dnsOpts .ProvidedCorefileTemplate = kumaSidecarConfiguration .Networking .CorefileTemplate
218
230
}
231
+ if dnsOpts .Config .DNS .ProxyPort != 0 {
232
+ runLog .Info ("Running with embedded DNS proxy port" , "port" , dnsOpts .Config .DNS .ProxyPort )
233
+ // Using embedded DNS
234
+ dnsproxyServer , err := dnsproxy .NewServer (net .JoinHostPort ("localhost" , strconv .Itoa (int (dnsOpts .Config .DNS .ProxyPort ))))
235
+ if err != nil {
236
+ return err
237
+ }
238
+ if err := confFetcher .AddHandler (dns_dpapi .PATH , dnsproxyServer .ReloadMap ); err != nil {
239
+ return err
240
+ }
241
+ components = append (components , dnsproxyServer )
242
+ } else {
243
+ dnsServer , err := dnsserver .New (dnsOpts )
244
+ if err != nil {
245
+ return err
246
+ }
219
247
220
- dnsServer , err := dnsserver . New ( dnsOpts )
221
- if err != nil {
222
- return err
223
- }
248
+ version , err := dnsServer . GetVersion ( )
249
+ if err != nil {
250
+ return err
251
+ }
224
252
225
- version , err := dnsServer .GetVersion ()
226
- if err != nil {
227
- return err
253
+ rootCtx .BootstrapDynamicMetadata [core_xds .FieldPrefixDependenciesVersion + ".coredns" ] = version
254
+ components = append (components , dnsServer )
228
255
}
229
-
230
- rootCtx .BootstrapDynamicMetadata [core_xds .FieldPrefixDependenciesVersion + ".coredns" ] = version
231
-
232
- components = append (components , dnsServer )
233
256
}
234
257
235
258
envoyComponent , err := envoy .New (opts )
236
259
if err != nil {
237
260
return err
238
261
}
239
262
components = append (components , envoyComponent )
240
-
241
- observabilityComponents := setupObservability (kumaSidecarConfiguration , bootstrap , cfg )
263
+ components = append (components , component .NewResilientComponent (
264
+ runLog .WithName ("configfetcher" ),
265
+ confFetcher ,
266
+ cfg .Dataplane .ResilientComponentBaseBackoff .Duration ,
267
+ cfg .Dataplane .ResilientComponentMaxBackoff .Duration ,
268
+ ))
269
+
270
+ observabilityComponents , err := setupObservability (gracefulCtx , kumaSidecarConfiguration , bootstrap , cfg , confFetcher )
271
+ if err != nil {
272
+ return err
273
+ }
242
274
components = append (components , observabilityComponents ... )
243
275
244
276
var readinessReporter * readiness.Reporter
@@ -355,7 +387,7 @@ func getApplicationsToScrape(kumaSidecarConfiguration *types.KumaSidecarConfigur
355
387
Name : item .Name ,
356
388
Path : item .Path ,
357
389
Port : item .Port ,
358
- IsIPv6 : net .IsAddressIPv6 (item .Address ),
390
+ IsIPv6 : kuma_net .IsAddressIPv6 (item .Address ),
359
391
QueryModifier : metrics .RemoveQueryParameters ,
360
392
MeshMetricMutator : metrics .AggregatedOtelMutator (),
361
393
})
@@ -382,18 +414,16 @@ func writeFile(filename string, data []byte, perm os.FileMode) error {
382
414
return os .WriteFile (filename , data , perm )
383
415
}
384
416
385
- func setupObservability (kumaSidecarConfiguration * types.KumaSidecarConfiguration , bootstrap * envoy_bootstrap_v3.Bootstrap , cfg * kumadp.Config ) []component.Component {
386
- resilientComponentBaseBackoff := 5 * time .Second
387
- resilientComponentMaxBackoff := 1 * time .Minute
417
+ func setupObservability (ctx context.Context , kumaSidecarConfiguration * types.KumaSidecarConfiguration , bootstrap * envoy_bootstrap_v3.Bootstrap , cfg * kumadp.Config , fetcher * configfetcher.ConfigFetcher ) ([]component.Component , error ) {
388
418
baseApplicationsToScrape := getApplicationsToScrape (kumaSidecarConfiguration , bootstrap .GetAdmin ().GetAddress ().GetSocketAddress ().GetPortValue ())
389
419
390
420
accessLogStreamer := component .NewResilientComponent (
391
421
runLog .WithName ("access-log-streamer" ),
392
422
accesslogs .NewAccessLogStreamer (
393
423
core_xds .AccessLogSocketName (cfg .DataplaneRuntime .SocketDir , cfg .Dataplane .Name , cfg .Dataplane .Mesh ),
394
424
),
395
- resilientComponentBaseBackoff ,
396
- resilientComponentMaxBackoff ,
425
+ cfg . Dataplane . ResilientComponentBaseBackoff . Duration ,
426
+ cfg . Dataplane . ResilientComponentMaxBackoff . Duration ,
397
427
)
398
428
399
429
openTelemetryProducer := metrics .NewAggregatedMetricsProducer (
@@ -410,21 +440,18 @@ func setupObservability(kumaSidecarConfiguration *types.KumaSidecarConfiguration
410
440
openTelemetryProducer ,
411
441
)
412
442
413
- meshMetricsConfigFetcher := component .NewResilientComponent (
414
- runLog .WithName ("mesh-metric-config-fetcher" ),
415
- meshmetrics .NewMeshMetricConfigFetcher (
416
- core_xds .MeshMetricsDynamicConfigurationSocketName (cfg .DataplaneRuntime .SocketDir ),
417
- time .NewTicker (cfg .DataplaneRuntime .DynamicConfiguration .RefreshInterval .Duration ),
418
- metricsServer ,
419
- openTelemetryProducer ,
420
- kumaSidecarConfiguration .Networking .Address ,
421
- bootstrap .GetAdmin ().GetAddress ().GetSocketAddress ().GetPortValue (),
422
- bootstrap .GetAdmin ().GetAddress ().GetSocketAddress ().GetAddress (),
423
- cfg .Dataplane .DrainTime .Duration ,
424
- ),
425
- resilientComponentBaseBackoff ,
426
- resilientComponentMaxBackoff ,
443
+ mm := meshmetrics .NewManager (
444
+ ctx ,
445
+ metricsServer ,
446
+ openTelemetryProducer ,
447
+ kumaSidecarConfiguration .Networking .Address ,
448
+ bootstrap .GetAdmin ().GetAddress ().GetSocketAddress ().GetPortValue (),
449
+ bootstrap .GetAdmin ().GetAddress ().GetSocketAddress ().GetAddress (),
450
+ cfg .Dataplane .DrainTime .Duration ,
427
451
)
428
-
429
- return []component.Component {accessLogStreamer , meshMetricsConfigFetcher , metricsServer }
452
+ err := fetcher .AddHandler (meshmetric_dpapi .PATH , mm .OnChange )
453
+ if err != nil {
454
+ return nil , err
455
+ }
456
+ return []component.Component {accessLogStreamer , metricsServer , mm }, nil
430
457
}
0 commit comments