diff --git a/dsnexec/README.md b/dsnexec/README.md new file mode 100644 index 00000000..1429ba98 --- /dev/null +++ b/dsnexec/README.md @@ -0,0 +1,38 @@ + +DSNExec +------- + +DSNExec is a cli tool that watches files and then reacts to those changes +by executing commands. The commands can be SQL commands or they can be +shell commands. + +Configuration +------------- + +Sources: + driver: The driver is used for parsing the source file. The parser allows + the content of the file to be referenced in a template. There are + several drivers available. + + json: a json parser + yaml: a yaml parser + postgres: a postgres connection string parser + none: does not try to parse the input file + + filename: the name of the file to use as a source. + +Destination: + driver: A database driver to execute the commands with. This is used + with the golang sql package. A "shelldb" driver also is registed + so that any executable can be run with the same abstraction. + + dsn: A DSN for the driver to connect with. + +Commands: + command: A SQL command to exectute. This sql command is parsed with + the golang text/template package with the sources map as the + template context. + + args: An array of arguments to the SQL command. The args are parsed with + the golang text/template package with the sources map as the + template context. \ No newline at end of file diff --git a/dsnexec/cmd/run.go b/dsnexec/cmd/run.go index d4fe5163..3e6bb07b 100644 --- a/dsnexec/cmd/run.go +++ b/dsnexec/cmd/run.go @@ -1,26 +1,69 @@ package cmd import ( - "fmt" + "context" "os" + "os/signal" - "github.com/infobloxopen/db-controller/dsnexec/pkg/dsnexec" + "github.com/infobloxopen/db-controller/dsnexec/pkg/fdsnexec" + _ "github.com/infobloxopen/db-controller/dsnexec/pkg/fprintf" + _ "github.com/infobloxopen/db-controller/dsnexec/pkg/shelldb" + "github.com/infobloxopen/hotload" + _ "github.com/infobloxopen/hotload/fsnotify" + "github.com/lib/pq" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "gopkg.in/yaml.v2" ) +func init() { + hotload.RegisterSQLDriver("postgres", pq.Driver{}) +} + // runCmd represents the run command var runCmd = &cobra.Command{ Use: "run", Short: "run a dsnexec watcher", Long: `dsnexec run will run a dsnexec watcher based on the config file provided.`, Run: func(cmd *cobra.Command, args []string) { - fmt.Println("run called") + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, os.Interrupt) + + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + <-signalChan + cancel() + }() + + c, err := parseConfig(confFile) + if err != nil { + log.Fatalf("failed to parse config: %s", err) + } + + for _, e := range enablingFlags { + if _, ok := c.Configs[e]; ok { + c.Configs[e].Disabled = false + } + } + for _, d := range disablingFlags { + if _, ok := c.Configs[d]; ok { + c.Configs[d].Disabled = true + } + } + + handler, err := fdsnexec.NewHandler(c) + if err != nil { + log.Fatalf("failed to create handler: %s", err) + } + if err := handler.Run(ctx); err != nil { + log.Fatalf("failed to run handler: %s", err) + } }, } -func parseConfig(f string) (*Config, error) { - var c Config +func parseConfig(f string) (*fdsnexec.InputFile, error) { + var c fdsnexec.InputFile // read file bs, err := os.ReadFile(f) if err != nil { @@ -34,11 +77,9 @@ func parseConfig(f string) (*Config, error) { return &c, nil } -type Config struct { - Configs map[string]dsnexec.Config `yaml:"configs"` -} - var confFile string +var enablingFlags []string +var disablingFlags []string func init() { rootCmd.AddCommand(runCmd) @@ -51,5 +92,8 @@ func init() { // Cobra supports local flags which will only run when this command // is called directly, e.g.: - runCmd.Flags().StringVarP(&confFile, "config-file", "c", "conf.yaml", "Path to config file") + runCmd.Flags().StringVarP(&confFile, "config-file", "c", "config.yaml", "Path to config file") + + runCmd.Flags().StringSliceVarP(&enablingFlags, "enable", "e", []string{}, "Enable a config by name") + runCmd.Flags().StringSliceVarP(&disablingFlags, "disable", "d", []string{}, "Disable a config by name") } diff --git a/dsnexec/config.yaml b/dsnexec/config.yaml new file mode 100644 index 00000000..ba184cb5 --- /dev/null +++ b/dsnexec/config.yaml @@ -0,0 +1,53 @@ +configs: + # this is an example config to use the shelldb driver + test: + sources: + - driver: json + filename: test/source.json + - driver: yaml + filename: test/source.yaml + - driver: postgres + filename: test/source.pg.dsn + - driver: postgres + filename: test/source.pg.uri + destination: + driver: shelldb + dsn: my shell program + commands: + - command: /bin/bash + args: + - "-c" + - echo "{{ index . "test/source.json" }}" + + # this is an example config to use the fprintf driver + # it will write json of the sources to a file + filewrite: + sources: + - driver: json + filename: test/source.json + destination: + driver: fprintf + # The $tmp variable is a special variable that is the path to a temporary directory + # The hostname endcodes the rendering driver. fmt.Fprintf is the default driver + # which can be specificed by 'fprintf' + dsn: file:///$tmp/myfile.json + commands: + - command: "%s" # commands can have text/templates also + # this is the format string for the fprintf driver + args: + # these are arguments to the fmt.Fprintf function + - '{{ index . "test/source.json" | toJson }}' + + # This is an example of executing sql commands in response to a change. + sql: + disabled: true + sources: + - driver: json + filename: test/source.json + destination: + driver: hotload + dsn: fsnotify://postgres/path/to/myfile.txt + commands: + - command: ALTER SERVER myserver OPTIONS (SET host=?); + args: + - '{{ index . "test/source.json" "host" }}' \ No newline at end of file diff --git a/dsnexec/go.mod b/dsnexec/go.mod index ae4937a3..422f2dbb 100644 --- a/dsnexec/go.mod +++ b/dsnexec/go.mod @@ -3,12 +3,28 @@ module github.com/infobloxopen/db-controller/dsnexec go 1.20 require ( + github.com/Masterminds/sprig/v3 v3.2.3 github.com/infobloxopen/dsnutil v0.0.2 + github.com/infobloxopen/hotload v1.2.0 + github.com/lib/pq v1.10.8 + github.com/sirupsen/logrus v1.9.0 github.com/spf13/cobra v1.7.0 gopkg.in/yaml.v2 v2.3.0 ) require ( + github.com/Masterminds/goutils v1.1.1 // indirect + github.com/Masterminds/semver/v3 v3.2.0 // indirect + github.com/fsnotify/fsnotify v1.6.0 // indirect + github.com/google/uuid v1.1.1 // indirect + github.com/huandu/xstrings v1.3.3 // indirect + github.com/imdario/mergo v0.3.11 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/mitchellh/copystructure v1.0.0 // indirect + github.com/mitchellh/reflectwalk v1.0.0 // indirect + github.com/shopspring/decimal v1.2.0 // indirect + github.com/spf13/cast v1.3.1 // indirect github.com/spf13/pflag v1.0.5 // indirect + golang.org/x/crypto v0.3.0 // indirect + golang.org/x/sys v0.6.0 // indirect ) diff --git a/dsnexec/go.sum b/dsnexec/go.sum index c6b321d8..55b2556c 100644 --- a/dsnexec/go.sum +++ b/dsnexec/go.sum @@ -1,15 +1,97 @@ +github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= +github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI= +github.com/Masterminds/goutils v1.1.1/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU= +github.com/Masterminds/semver/v3 v3.2.0 h1:3MEsd0SM6jqZojhjLWWeBY+Kcjy9i6MQAeY7YgDP83g= +github.com/Masterminds/semver/v3 v3.2.0/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ= +github.com/Masterminds/sprig/v3 v3.2.3 h1:eL2fZNezLomi0uOLqjQoN6BfsDD+fyLtgbJMAj9n6YA= +github.com/Masterminds/sprig/v3 v3.2.3/go.mod h1:rXcFaZ2zZbLRJv/xSysmlgIM1u11eBaRMhvYXJNkGuM= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/huandu/xstrings v1.3.3 h1:/Gcsuc1x8JVbJ9/rlye4xZnVAbEkGauT8lbebqcQws4= +github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= +github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA= +github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/infobloxopen/dsnutil v0.0.2 h1:kwq9NRlrytbehTIfYz6CgFcoxMHayKNrBJMLvAt88TI= github.com/infobloxopen/dsnutil v0.0.2/go.mod h1:DJ70NkKH9opwy8Go4VbirbvGjqI2cXNOyAmwHof9Gts= +github.com/infobloxopen/hotload v1.2.0 h1:vPIJ7G29tBn8XO7cDUVDonstpObNnlh0CljWlVhWK5Y= +github.com/infobloxopen/hotload v1.2.0/go.mod h1:UFU/fIkq2+DiE/5Ecwc+/R83oxi8+BVbVEORGA6T4F0= +github.com/lib/pq v1.10.8 h1:3fdt97i/cwSU83+E0hZTC/Xpc9mTZxc6UWSCRcSbxiE= +github.com/lib/pq v1.10.8/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/mitchellh/copystructure v1.0.0 h1:Laisrj+bAB6b/yJwB5Bt3ITZhGJdqmxquMKeZ+mmkFQ= +github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw= +github.com/mitchellh/reflectwalk v1.0.0 h1:9D+8oIskB4VJBN5SFlmc27fSlIBZaov1Wpk/IfikLNY= +github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= +github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= +github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= +github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng= +github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.3.0 h1:a06MkbcxBrEFc0w0QIZWXrH/9cCX6KJyWbBOIwAn+7A= +golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/dsnexec/pkg/dsnexec/config.go b/dsnexec/pkg/dsnexec/config.go index 3ce6fb63..2ec8e5e3 100644 --- a/dsnexec/pkg/dsnexec/config.go +++ b/dsnexec/pkg/dsnexec/config.go @@ -6,21 +6,21 @@ import ( // Command is a command to execute type Command struct { - CommandStr string - Args []string + Command string `yaml:"command"` + Args []string `yaml:"args,omitempty"` } // DBConnInfo is a database connection info type DBConnInfo struct { - Driver string - DSN string + Driver string `yaml:"driver"` + DSN string `yaml:"dsn,omitempty"` } // Config is the configuration for the dsnexec type Config struct { Sources map[string]DBConnInfo `yaml:"sources"` Destination DBConnInfo `yaml:"destination"` - Commands []Command `yaml:"commands"` + Commands []Command `yaml:"commands,omitempty"` } // Validate validates the db conn info @@ -48,7 +48,7 @@ func (c *Config) Validate() error { return fmt.Errorf("commands must be set") } for _, v := range c.Commands { - if v.CommandStr == "" { + if v.Command == "" { return fmt.Errorf("command_str must be set") } } diff --git a/dsnexec/pkg/dsnexec/config_test.go b/dsnexec/pkg/dsnexec/config_test.go index 19b41e00..0f590b3b 100644 --- a/dsnexec/pkg/dsnexec/config_test.go +++ b/dsnexec/pkg/dsnexec/config_test.go @@ -32,7 +32,7 @@ func TestConfig_Validate(t *testing.T) { }, Commands: []Command{ { - CommandStr: "SELECT 1", + Command: "SELECT 1", }, }, }, diff --git a/dsnexec/pkg/dsnexec/dsnexec.go b/dsnexec/pkg/dsnexec/dsnexec.go index 529372c1..09b9bc36 100644 --- a/dsnexec/pkg/dsnexec/dsnexec.go +++ b/dsnexec/pkg/dsnexec/dsnexec.go @@ -9,6 +9,10 @@ import ( "strings" "sync" "text/template" + + "github.com/Masterminds/sprig/v3" + _ "github.com/infobloxopen/db-controller/dsnexec/pkg/shelldb" + _ "github.com/lib/pq" ) // Hanlder is an instance of dsnexec. @@ -21,9 +25,10 @@ type Handler struct { type HandlerOption func(*Handler) error // WithConfig sets the config for the dnsexec handler. -func WithConfig(c Config) func(w *Handler) { - return func(w *Handler) { +func WithConfig(c Config) HandlerOption { + return func(w *Handler) error { w.config = c + return nil } } @@ -65,18 +70,17 @@ func (w *Handler) exec() error { argContext := make(map[string]interface{}) for name, source := range w.config.Sources { - switch source.Driver { - case "postgres": - parsedOpts, err := parseDSN(source.DSN) - if err != nil { - return fmt.Errorf("failed to parse dsn: %v", err) - } - parsedOpts["raw_dsn"] = source.DSN - argContext[name] = parsedOpts - - default: + parse, found := parsers[source.Driver] + if !found { return fmt.Errorf("unsupported source driver: %s", source.Driver) } + + parsedOpts, err := parse(source.DSN) + if err != nil { + return fmt.Errorf("failed to parse dsn: %v", err) + } + parsedOpts["raw_dsn"] = source.DSN + argContext[name] = parsedOpts } db, err := sql.Open(w.config.Destination.Driver, w.config.Destination.DSN) if err != nil { @@ -85,15 +89,24 @@ func (w *Handler) exec() error { defer db.Close() for i, v := range w.config.Commands { + t, err := template.New("command").Funcs(sprig.FuncMap()).Parse(v.Command) + if err != nil { + return fmt.Errorf("failed to parse command template %v: %v", v.Command, err) + } + bs := bytes.NewBuffer(nil) + if err := t.Execute(bs, argContext); err != nil { + return fmt.Errorf("failed to render command template: %v: %v", v.Command, err) + } + cmd := bs.String() if len(v.Args) == 0 { - if _, err := db.Exec(v.CommandStr); err != nil { + if _, err := db.Exec(cmd); err != nil { return fmt.Errorf("failed to execute sql: %v", err) } continue } var args []interface{} for j, arg := range v.Args { - t, err := template.New(fmt.Sprintf("arg(%d, %d)", i, j)).Parse(arg) + t, err := template.New(fmt.Sprintf("arg(%d, %d)", i, j)).Funcs(sprig.FuncMap()).Parse(arg) if err != nil { return fmt.Errorf("failed to parse argument template %v: %v", arg, err) } @@ -107,8 +120,8 @@ func (w *Handler) exec() error { } args = append(args, val) } - if _, err := db.Exec(v.CommandStr, args...); err != nil { - return fmt.Errorf("failed to execute command: %v", err) + if _, err := db.Exec(cmd, args...); err != nil { + return fmt.Errorf("failed to execute command: command %s %v", v.Command, err) } } return nil diff --git a/dsnexec/pkg/dsnexec/dsnexec_test.go b/dsnexec/pkg/dsnexec/dsnexec_test.go index 3e2cfeb9..f23bb0c2 100644 --- a/dsnexec/pkg/dsnexec/dsnexec_test.go +++ b/dsnexec/pkg/dsnexec/dsnexec_test.go @@ -38,7 +38,7 @@ func TestHandler_UpdateDSN(t *testing.T) { }, Commands: []Command{ { - CommandStr: "select 1", + Command: "select 1", }, }, }, @@ -62,7 +62,7 @@ func TestHandler_UpdateDSN(t *testing.T) { }, Commands: []Command{ { - CommandStr: "select 1", + Command: "select 1", Args: []string{ "{{ .test.host }}", "int64:{{ .test.port }}", @@ -94,7 +94,7 @@ func TestHandler_UpdateDSN(t *testing.T) { }, Commands: []Command{ { - CommandStr: "select 1", + Command: "select 1", Args: []string{ "{{ .test.raw_dsn }}", }, diff --git a/dsnexec/pkg/dsnexec/funcs.go b/dsnexec/pkg/dsnexec/funcs.go index 81d6b336..2471f1db 100644 --- a/dsnexec/pkg/dsnexec/funcs.go +++ b/dsnexec/pkg/dsnexec/funcs.go @@ -1,14 +1,61 @@ package dsnexec import ( + "encoding/json" + "fmt" "strings" "github.com/infobloxopen/dsnutil/pg" + "gopkg.in/yaml.v2" ) -// parseDSN parses a dsn string into a map of options. The DSN can be +var ( + parsers map[string]func(string) (map[string]string, error) +) + +func init() { + parsers = make(map[string]func(string) (map[string]string, error)) + parsers["postgres"] = parsePostgresDSN + parsers["json"] = parseJSON + parsers["yaml"] = parseYAML + parsers["none"] = parseNone + parsers[""] = parseNone +} + +func parseNone(dsn string) (map[string]string, error) { + value := make(map[string]string) + return value, nil +} + +func parseJSON(dsn string) (map[string]string, error) { + var imap map[string]interface{} + + if err := json.Unmarshal([]byte(dsn), &imap); err != nil { + return nil, err + } + value := make(map[string]string) + for k, v := range imap { + value[k] = fmt.Sprintf("%s", v) + } + return value, nil +} + +func parseYAML(dsn string) (map[string]string, error) { + var imap map[string]interface{} + + if err := yaml.Unmarshal([]byte(dsn), &imap); err != nil { + return nil, err + } + value := make(map[string]string) + for k, v := range imap { + value[k] = fmt.Sprintf("%s", v) + } + return value, nil +} + +// parsePostgresDSN parses a dsn string into a map of options. The DSN can be // in a URI or key=value format. -func parseDSN(dsn string) (map[string]string, error) { +func parsePostgresDSN(dsn string) (map[string]string, error) { if strings.HasPrefix(dsn, "postgres://") || strings.HasPrefix(dsn, "postgresql://") { var err error dsn, err = pg.ParseURL(dsn) diff --git a/dsnexec/pkg/fdsnexec/config.go b/dsnexec/pkg/fdsnexec/config.go new file mode 100644 index 00000000..304f14f6 --- /dev/null +++ b/dsnexec/pkg/fdsnexec/config.go @@ -0,0 +1,23 @@ +package fdsnexec + +import "github.com/infobloxopen/db-controller/dsnexec/pkg/dsnexec" + +type Source struct { + Driver string `yaml:"driver"` + Filename string `yaml:"filename"` +} + +// InputFile is the input file format for fdsnexec. It is a yaml file with +// a top level key of configs. The configs key is a map of config names to +// Configs. +type InputFile struct { + Configs map[string]*Config `yaml:"configs"` +} + +// Config is the config for a single fdsnexec instance. +type Config struct { + Disabled bool `yaml:"disabled"` + Sources []Source `yaml:"sources"` + Destination dsnexec.DBConnInfo `yaml:"destination"` + Commands []dsnexec.Command `yaml:"commands"` +} diff --git a/dsnexec/pkg/fdsnexec/fdsnexec.go b/dsnexec/pkg/fdsnexec/fdsnexec.go new file mode 100644 index 00000000..3f2ac2c8 --- /dev/null +++ b/dsnexec/pkg/fdsnexec/fdsnexec.go @@ -0,0 +1,118 @@ +package fdsnexec + +import ( + "context" + "fmt" + + "github.com/infobloxopen/db-controller/dsnexec/pkg/dsnexec" + "github.com/infobloxopen/hotload/fsnotify" + log "github.com/sirupsen/logrus" +) + +// Handler is an instance of fdsnexec. +type Handler struct { + config *InputFile +} + +// NewHandler creates a new dsnexec handler. +func NewHandler(config *InputFile) (*Handler, error) { + + h := &Handler{ + config: config, + } + return h, nil +} + +// Run the fdsnexec handler. This will block until the context is canceled +// or and unhandled error occurs. If there are errors parsing the config, +// reading the sources or executing agaist the destination, this will return +// an error. +func (h *Handler) Run(ctx context.Context) error { + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + exited := make(chan error, len(h.config.Configs)) + for k := range h.config.Configs { + cfg := h.config.Configs[k] + if cfg.Disabled { + log.Infof("skipping disabled config %s", k) + continue + } + go func(c *Config) { + exited <- c.run(ctx) + }(cfg) + } + + select { + case err := <-exited: + return err + case <-ctx.Done(): + log.Debug("exiting fdsnexec...") + cancel() + return nil + } +} + +func (c Config) run(ctx context.Context) error { + notifyS := fsnotify.NewStrategy() + + type update struct { + filename string + value string + driver string + } + dsnConfig := dsnexec.Config{ + Sources: make(map[string]dsnexec.DBConnInfo), + Destination: c.Destination, + Commands: c.Commands, + } + updates := make(chan update, len(c.Sources)) + + for _, s := range c.Sources { + val, values, err := notifyS.Watch(ctx, s.Filename, nil) + if err != nil { + return err + } + dsnConfig.Sources[s.Filename] = dsnexec.DBConnInfo{ + Driver: s.Driver, + DSN: val, + } + go func(filename string, values <-chan string, driver string) { + for { + select { + case <-ctx.Done(): + return + case v := <-values: + updates <- update{ + filename: filename, + value: v, + driver: driver, + } + } + } + }(s.Filename, values, s.Driver) + } + + handler, err := dsnexec.NewHanlder(dsnexec.WithConfig(dsnConfig)) + if err != nil { + return err + } + + // initial sync + if err := handler.Exec(); err != nil { + return fmt.Errorf("failed initial execute: %v", err) + } + + for { + select { + case <-ctx.Done(): + return nil + case u := <-updates: + log.Infof("updating dsn for %s", u.filename) + if err := handler.UpdateDSN(u.filename, u.value); err != nil { + return fmt.Errorf("failed to update dsn: %s", err) + } + } + } +} diff --git a/dsnexec/pkg/fprintf/driver.go b/dsnexec/pkg/fprintf/driver.go new file mode 100644 index 00000000..c42fc36f --- /dev/null +++ b/dsnexec/pkg/fprintf/driver.go @@ -0,0 +1,105 @@ +package fprintf + +import ( + "context" + "database/sql" + "database/sql/driver" + "fmt" + "net/url" + + log "github.com/sirupsen/logrus" +) + +type d struct { +} + +func init() { + sql.Register("fprintf", &d{}) +} + +func (d *d) Open(name string) (driver.Conn, error) { + uri, err := url.Parse(name) + if err != nil { + return nil, err + } + fileHandler, ok := fileHandlers[uri.Scheme] + if !ok { + return nil, fmt.Errorf("fprintf: unsupported file handler %s", uri.Scheme) + } + templater, ok := templaters[uri.Host] + if !ok { + return nil, fmt.Errorf("fprintf: unsupported templater %s", uri.Scheme) + } + + return &conn{ + filename: uri.Path, + fileHandler: fileHandler, + templater: templater, + }, nil +} + +type conn struct { + filename string + fileHandler FileHandler + templater Templater +} + +func (c *conn) Exec(query string, args []driver.Value) (driver.Result, error) { + return c.ExecContext(context.Background(), query, args) +} + +func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Value) (driver.Result, error) { + + log := log.WithFields(log.Fields{ + "fprintf_destination": c.filename, + }) + + log.Debug("executing query") + + ctx, abort := context.WithCancel(ctx) + defer abort() + + w, err := newFileHandler(ctx, c.filename) + if err != nil { + return nil, err + } + var vargs []interface{} + for i := range args { + vargs = append(vargs, fmt.Sprintf("%s", args[i])) + } + if len(vargs) == 0 { + if _, err := c.templater(w, query); err != nil { + return nil, err + } + } else { + if _, err := c.templater(w, query, vargs...); err != nil { + return nil, err + } + } + if err := w.Close(); err != nil { + return nil, err + } + return &result{}, nil +} + +type result struct{} + +func (r *result) LastInsertId() (int64, error) { + return 0, fmt.Errorf("unsupported LastInsertId in shell driver") +} + +func (r *result) RowsAffected() (int64, error) { + return 0, fmt.Errorf("unsupported RowsAffected in shell driver") +} + +func (c *conn) Prepare(query string) (driver.Stmt, error) { + return nil, fmt.Errorf("unsupported Prepare in shell driver") +} + +func (c *conn) Begin() (driver.Tx, error) { + return nil, fmt.Errorf("unsupported Begin in shell driver") +} + +func (c *conn) Close() error { + return nil +} diff --git a/dsnexec/pkg/fprintf/file_handler.go b/dsnexec/pkg/fprintf/file_handler.go new file mode 100644 index 00000000..4795eba8 --- /dev/null +++ b/dsnexec/pkg/fprintf/file_handler.go @@ -0,0 +1,82 @@ +package fprintf + +import ( + "context" + "fmt" + "io" + "os" + "path" + "strings" +) + +var ( + fileHandlers map[string]FileHandler = make(map[string]FileHandler) +) + +type FileHandler func(ctx context.Context, filename string) (io.WriteCloser, error) + +func init() { + fileHandlers["file"] = newFileHandler +} + +type fileHandler struct { + name string + tempTemp string + f io.WriteCloser +} + +func newFileHandler(ctx context.Context, filename string) (io.WriteCloser, error) { + + dirname := path.Dir(filename) + isTemp := false + if strings.HasPrefix(filename, "/$tmp/") { + dirname = "" + filename = strings.TrimPrefix(filename, "/$tmp/") + isTemp = true + } + + baseFile := path.Base(filename) + f, err := os.CreateTemp(dirname, baseFile) + if err != nil { + return nil, err + } + if isTemp { + filename = path.Join(os.TempDir(), baseFile) + } + + fh := &fileHandler{ + name: filename, + f: f, + tempTemp: f.Name(), + } + go fh.watchTilClose(ctx) + return fh, nil +} + +func (fh *fileHandler) watchTilClose(ctx context.Context) { + <-ctx.Done() + fh.f.Close() + os.Remove(fh.tempTemp) +} + +func (fh *fileHandler) Write(p []byte) (n int, err error) { + return fh.f.Write(p) +} + +func (fh *fileHandler) Close() error { + if err := fh.f.Close(); err != nil { + return err + } + if err := os.Rename(fh.tempTemp, fh.name); err != nil { + return err + } + return nil +} + +// RegisterFileHandler a new file handler. This will panic if the name is already registered. +func RegisterFileHandler(name string, f FileHandler) { + if _, ok := fileHandlers[name]; ok { + panic(fmt.Sprintf("fprintf: %s already registered", name)) + } + fileHandlers[name] = f +} diff --git a/dsnexec/pkg/fprintf/templater.go b/dsnexec/pkg/fprintf/templater.go new file mode 100644 index 00000000..2be38f4b --- /dev/null +++ b/dsnexec/pkg/fprintf/templater.go @@ -0,0 +1,25 @@ +package fprintf + +import ( + "fmt" + "io" +) + +var ( + templaters map[string]Templater = make(map[string]Templater) +) + +type Templater func(io.Writer, string, ...interface{}) (int, error) + +func init() { + templaters["fprintf"] = fmt.Fprintf + templaters[""] = fmt.Fprintf +} + +// RegisterTemplater a new templater. This will panic if the name is already registered. +func RegisterTemplater(name string, f Templater) { + if _, ok := templaters[name]; ok { + panic(fmt.Sprintf("fprintf: %s already registered", name)) + } + templaters[name] = f +} diff --git a/dsnexec/pkg/shelldb/shelldb.go b/dsnexec/pkg/shelldb/shelldb.go index 148595d7..e4f92c2f 100644 --- a/dsnexec/pkg/shelldb/shelldb.go +++ b/dsnexec/pkg/shelldb/shelldb.go @@ -6,28 +6,25 @@ import ( "database/sql/driver" "fmt" "os/exec" + + log "github.com/sirupsen/logrus" ) type d struct { } func init() { - sql.Register("shell", &d{}) + sql.Register("shelldb", &d{}) } func (d *d) Open(name string) (driver.Conn, error) { - - executable, err := exec.LookPath(name) - if err != nil { - return nil, err - } return &conn{ - command: executable, + name: name, }, nil } type conn struct { - command string + name string } type ExecArgs struct { @@ -39,19 +36,33 @@ func (c *conn) Exec(query string, args []driver.Value) (driver.Result, error) { return c.ExecContext(context.Background(), query, args) } +type logWriter struct { + logger *log.Entry +} + +func (lw logWriter) Write(p []byte) (n int, err error) { + lw.logger.Infof("%s", p) + return len(p), nil +} + func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Value) (driver.Result, error) { var cmd *exec.Cmd if len(args) == 0 { - cmd = exec.CommandContext(ctx, c.command, query) + cmd = exec.CommandContext(ctx, query) } else { var argsStr []string for _, v := range args { argsStr = append(argsStr, fmt.Sprintf("%s", v)) } - cmd = exec.Command(c.command, argsStr...) + cmd = exec.Command(query, argsStr...) } + log := log.WithFields(log.Fields{ + "shelldb_destination": c.name, + }) + cmd.Stdout = logWriter{logger: log} + cmd.Stderr = logWriter{logger: log} if err := cmd.Run(); err != nil { return nil, err } diff --git a/dsnexec/test/source.json b/dsnexec/test/source.json new file mode 100644 index 00000000..38c16bee --- /dev/null +++ b/dsnexec/test/source.json @@ -0,0 +1,3 @@ +{ + "name": "json source" +} diff --git a/dsnexec/test/source.pg.dsn b/dsnexec/test/source.pg.dsn new file mode 100644 index 00000000..8f19fc02 --- /dev/null +++ b/dsnexec/test/source.pg.dsn @@ -0,0 +1 @@ +user=postgres password=letmein host=localhost port=5432 dbname=test sslmode=disable \ No newline at end of file diff --git a/dsnexec/test/source.pg.uri b/dsnexec/test/source.pg.uri new file mode 100644 index 00000000..a3019da7 --- /dev/null +++ b/dsnexec/test/source.pg.uri @@ -0,0 +1 @@ + postgres://user:pass@localhost:5431/dbname?sslmode=disable diff --git a/dsnexec/test/source.yaml b/dsnexec/test/source.yaml new file mode 100644 index 00000000..235568fb --- /dev/null +++ b/dsnexec/test/source.yaml @@ -0,0 +1,2 @@ +name: "yaml source" +other: "data"