Skip to content

Commit

Permalink
fix: Fix occasional unexpected EOF during dump/restore
Browse files Browse the repository at this point in the history
  • Loading branch information
gabe565 committed Oct 5, 2023
1 parent c294064 commit 467468f
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 5 deletions.
1 change: 1 addition & 0 deletions internal/actions/dump/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func (action Dump) Run(ctx context.Context) (err error) {
plogger,
false,
nil,
0,
); err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions internal/actions/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package exec
import (
"context"
"os"
"time"

"github.com/clevyr/kubedb/internal/command"
"github.com/clevyr/kubedb/internal/config"
Expand Down Expand Up @@ -43,6 +44,7 @@ func (action Exec) Run(ctx context.Context) error {
os.Stderr,
t.IsTerminalIn(),
sizeQueue,
5*time.Second,
)
})
}
Expand Down
1 change: 1 addition & 0 deletions internal/actions/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func (action Restore) runInDatabasePod(ctx context.Context, r *io.PipeReader, st
stderr,
false,
nil,
0,
); err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion internal/config/flags/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package flags
import (
"os"
"strings"
"time"

"github.com/clevyr/kubedb/internal/config"
"github.com/clevyr/kubedb/internal/database/dialect"
Expand Down Expand Up @@ -184,7 +185,7 @@ func listDatabases(cmd *cobra.Command, args []string, toComplete string) ([]stri
func queryInDatabase(cmd *cobra.Command, args []string, conf config.Exec) ([]string, cobra.ShellCompDirective) {
var buf strings.Builder
sqlCmd := conf.Dialect.ExecCommand(conf)
err := conf.Client.Exec(cmd.Context(), conf.Pod, sqlCmd.String(), nil, &buf, os.Stderr, false, nil)
err := conf.Client.Exec(cmd.Context(), conf.Pod, sqlCmd.String(), nil, &buf, os.Stderr, false, nil, 5*time.Second)
if err != nil {
return nil, cobra.ShellCompDirectiveError
}
Expand Down
3 changes: 2 additions & 1 deletion internal/database/dialect/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"os"
"strings"
"time"

"github.com/clevyr/kubedb/internal/command"
"github.com/clevyr/kubedb/internal/config"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (Postgres) FilterPods(ctx context.Context, client kubernetes.KubeClient, po
)

var buf strings.Builder
err := client.Exec(ctx, pods[0], cmd.String(), strings.NewReader(""), &buf, os.Stderr, false, nil)
err := client.Exec(ctx, pods[0], cmd.String(), strings.NewReader(""), &buf, os.Stderr, false, nil, 5*time.Second)
if err != nil {
return pods, err
}
Expand Down
30 changes: 27 additions & 3 deletions internal/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import (
"errors"
"fmt"
"io"
"net/http"
"time"

log "github.com/sirupsen/logrus"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
)

Expand All @@ -31,7 +34,7 @@ func (client KubeClient) GetNamespacedPods(ctx context.Context) (*v1.PodList, er
return pods, nil
}

func (client KubeClient) Exec(ctx context.Context, pod v1.Pod, cmd string, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
func (client KubeClient) Exec(ctx context.Context, pod v1.Pod, cmd string, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue, pingPeriod time.Duration) error {
req := client.ClientSet.CoreV1().RESTClient().Post().
Resource("pods").
Namespace(client.Namespace).
Expand All @@ -44,7 +47,28 @@ func (client KubeClient) Exec(ctx context.Context, pod v1.Pod, cmd string, stdin
Stderr: stderr != nil,
TTY: tty,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(client.ClientConfig, "POST", req.URL())

tlsConfig, err := rest.TLSConfigFor(client.ClientConfig)
if err != nil {
return err
}
proxy := http.ProxyFromEnvironment
if client.ClientConfig.Proxy != nil {
proxy = client.ClientConfig.Proxy
}
upgradeRoundTripper := spdy.NewRoundTripperWithConfig(spdy.RoundTripperConfig{
TLS: tlsConfig,
Proxier: proxy,
// Needs to be 0 for dump/restore to prevent unexpected EOF.
// See https://github.com/kubernetes/kubernetes/issues/60140#issuecomment-1411477275
PingPeriod: pingPeriod,
})
wrapper, err := rest.HTTPWrappersForConfig(client.ClientConfig, upgradeRoundTripper)
if err != nil {
return err
}

exec, err := remotecommand.NewSPDYExecutorForTransports(wrapper, upgradeRoundTripper, "POST", req.URL())
if err != nil {
return err
}
Expand Down

0 comments on commit 467468f

Please sign in to comment.