Skip to content
This repository has been archived by the owner on Jul 21, 2021. It is now read-only.

Add TLS support with ConnectTLS function #227

Open
wants to merge 39 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
95419b4
Add tls support with ConnectTLS function
linalinn Nov 29, 2019
3080669
Fix DeepSource: Go failure
linalinn Dec 2, 2019
a24b6fc
TLS support
linalinn Dec 9, 2019
ce3a7b9
new target certs to gen certs for zookeeper and go
linalinn Dec 10, 2019
c0fcbe7
Update zookeeper version to min version with SSL support
linalinn Dec 11, 2019
b468b36
added TLS enabled test
linalinn Dec 11, 2019
a003f5e
Add timeout functionality to TLS
linalinn Dec 11, 2019
4bbc581
Added TLS capability
linalinn Dec 11, 2019
368e294
fixing bugs
linalinn Dec 11, 2019
c5d628d
Enable Travis.ci for tls test
linalinn Dec 12, 2019
4899759
make can add prefix for zk version
linalinn Jan 2, 2020
672e87f
make file supports version 3.5.4-beta<
linalinn Jan 2, 2020
d437f76
merge
linalinn Jan 2, 2020
1a44ff3
Add tls to
linalinn Jan 2, 2020
fc7f658
Fix 404 http error in setup
linalinn Jan 2, 2020
6a70c07
fixing stuff
linalinn Jan 2, 2020
7073de3
fixing stuff
linalinn Jan 2, 2020
e32eaf2
fixing stuff
linalinn Jan 2, 2020
e08e9de
change version
linalinn Jan 2, 2020
e192930
tls config base on env tls=true
linalinn Jan 2, 2020
2237187
tls config base on env tls=true
linalinn Jan 2, 2020
d0e90ee
fix passwd
linalinn Jan 2, 2020
29b93d2
fix if condition for contorting tls
linalinn Jan 3, 2020
3dd925c
fix bug
linalinn Jan 3, 2020
b9c4081
is it working?
linalinn Jan 3, 2020
1c9196f
.travis.yml
linalinn Jan 3, 2020
3845cd6
.travis.yml add test
linalinn Jan 3, 2020
0594c81
bug fix
linalinn Jan 3, 2020
b19fe9c
cert gen
linalinn Jan 3, 2020
99938a6
removed fmt.Println(no tls/tls)
linalinn Jan 3, 2020
4505d24
removed unused variables
linalinn Jan 3, 2020
3350f4b
Merge remote-tracking branch 'origin/test' into tls-local
linalinn Jan 3, 2020
cd23882
Removed test branch from .travis.yml
linalinn Jan 3, 2020
b169ffc
Merge remote-tracking branch 'origin/tls-local' into tls-local
linalinn Jan 3, 2020
bf00666
cleaning
linalinn Jan 3, 2020
7244340
Removed branch tls-local from .travis.yml
linalinn Jan 3, 2020
5e3e5fa
removed duplicated test
linalinn Jan 3, 2020
f8506e5
fixed DeepSource fail by changing Parameter of Marshall
linalinn Jan 3, 2020
f062bea
Revert "fixed DeepSource fail by changing Parameter of Marshall"
linalinn Jan 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ env:
matrix:
- zk_version=3.5.4-beta
- zk_version=3.4.12
- zk_version=3.5.4-beta tls="true"
34 changes: 32 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
# make file to hold the logic of build and test setup
ZK_VERSION ?= 3.4.12
ZK_VERSION ?= 3.5.4-beta

ZK = zookeeper-$(ZK_VERSION)
ZK_URL = "https://archive.apache.org/dist/zookeeper/$(ZK)/$(ZK).tar.gz"

tls_passwd = password
tls_dir = "/tmp/certs"

tls_passwd = password
tls_dir = "/tmp/certs"

PACKAGES := $(shell go list ./... | grep -v examples)

.DEFAULT_GOAL := test
Expand All @@ -21,7 +27,9 @@ install-covertools:
go get golang.org/x/tools/cmd/cover

.PHONY: setup
setup: $(ZK) install-covertools
setup: certs $(ZK) install-covertools



.PHONY: lint
lint:
Expand All @@ -37,3 +45,25 @@ test: build
go test -timeout 500s -v -race -covermode atomic -coverprofile=profile.cov $(PACKAGES)
# ignore if we fail to publish coverage
-goveralls -coverprofile=profile.cov -service=travis-ci

.PHONY: certs
certs:
mkdir $(tls_dir)
keytool -keystore $(tls_dir)/zookeeper.server.keystore.jks -storepass $(tls_passwd) -alias localhost -validity 10 -genkey -keyalg RSA -dname "CN=localhost, OU=nope, O=nope, L=nope, S=nope, C=NO"
openssl req -passout pass:$(tls_passwd) -new -x509 -keyout $(tls_dir)/ca-key -out $(tls_dir)/ca-cert -days 10 -subj "/CN=localhost/OU=nope/O=nope/L=nope/C=NO"
keytool -keystore $(tls_dir)/zookeeper.client.truststore.jks -alias CARoot -import -file $(tls_dir)/ca-cert -trustcacerts -noprompt -storepass $(tls_passwd)
keytool -keystore $(tls_dir)/zookeeper.server.keystore.jks -alias localhost -certreq -file $(tls_dir)/cert-file -storepass $(tls_passwd)
openssl x509 -req -CA $(tls_dir)/ca-cert -CAkey $(tls_dir)/ca-key -in $(tls_dir)/cert-file -out $(tls_dir)/cert-signed -days 10 -CAcreateserial -passin pass:$(tls_passwd)
keytool -keystore $(tls_dir)/zookeeper.server.keystore.jks -alias CARoot -import -file $(tls_dir)/ca-cert -storepass $(tls_passwd) -trustcacerts -noprompt
keytool -keystore $(tls_dir)/zookeeper.server.keystore.jks -alias localhost -import -file $(tls_dir)/cert-signed -storepass $(tls_passwd)
keytool -keystore $(tls_dir)/zookeeper.client.keystore.jks -alias CLIENT -validity 10 -genkey -keyalg rsa -storepass $(tls_passwd) -dname "CN=localhost, OU=nope, O=nope, L=nope, S=nope, C=NO"
keytool -keystore $(tls_dir)/zookeeper.client.keystore.jks -alias CLIENT -certreq -file $(tls_dir)/client-cert-file -storepass $(tls_passwd)
openssl x509 -req -CA $(tls_dir)/ca-cert -CAkey $(tls_dir)/ca-key -in $(tls_dir)/client-cert-file -out $(tls_dir)/client-cert-signed -days 10 -CAcreateserial -passin pass:$(tls_passwd)
keytool -keystore $(tls_dir)/zookeeper.client.keystore.jks -alias CARoot -import -file $(tls_dir)/ca-cert -storepass $(tls_passwd) -trustcacerts -noprompt
keytool -keystore $(tls_dir)/zookeeper.client.keystore.jks -alias CLIENT -import -file $(tls_dir)/client-cert-signed -storepass $(tls_passwd) -trustcacerts -noprompt
keytool -keystore $(tls_dir)/zookeeper.server.truststore.jks -alias CARoot -import -file $(tls_dir)/ca-cert -storepass $(tls_passwd) -trustcacerts -noprompt
keytool -storepass $(tls_passwd) -srcstorepass $(tls_passwd) -importkeystore -srckeystore $(tls_dir)/zookeeper.server.truststore.jks -destkeystore $(tls_dir)/server.p12 -deststoretype PKCS12 -noprompt
openssl pkcs12 -in $(tls_dir)/server.p12 -nokeys -out $(tls_dir)/server.cer.pem -passin pass:$(tls_passwd)
keytool -importkeystore -srckeystore $(tls_dir)/zookeeper.server.keystore.jks -destkeystore $(tls_dir)/client.p12 -deststoretype PKCS12 -storepass $(tls_passwd) -srcstorepass $(tls_passwd)
openssl pkcs12 -in $(tls_dir)/client.p12 -nokeys -out $(tls_dir)/client.cer.pem -passin pass:$(tls_passwd)
openssl pkcs12 -in $(tls_dir)/client.p12 -nodes -nocerts -out $(tls_dir)/client.key.pem -passin pass:$(tls_passwd)
120 changes: 85 additions & 35 deletions zk/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Possible watcher events:

import (
"crypto/rand"
"crypto/tls"
"encoding/binary"
"errors"
"fmt"
Expand Down Expand Up @@ -108,7 +109,9 @@ type Conn struct {
logger Logger
logInfo bool // true if information messages are logged; false if only errors are logged

buf []byte
buf []byte
configTLS *tls.Config
enableTLS bool
}

// connOption represents a connection option.
Expand Down Expand Up @@ -166,30 +169,53 @@ func ConnectWithDialer(servers []string, sessionTimeout time.Duration, dialer Di
return Connect(servers, sessionTimeout, WithDialer(dialer))
}

func ConnectTLS(servers []string, sessionTimeout time.Duration, config *tls.Config, options ...connOption) (*Conn, <-chan Event, error) {
srvs, e := prepareServerArray(servers)
if e != nil {
return nil, nil, e
}

ec, conn, e := prepareConn(options, srvs, sessionTimeout, true, config)
if e != nil {
return nil, nil, e
}

go func() {
conn.loop()
conn.flushRequests(ErrClosing)
conn.invalidateWatches(ErrClosing)
close(conn.eventChan)
}()
return conn, ec, nil
}

// Connect establishes a new connection to a pool of zookeeper
// servers. The provided session timeout sets the amount of time for which
// a session is considered valid after losing connection to a server. Within
// the session timeout it's possible to reestablish a connection to a different
// server and keep the same session. This is means any ephemeral nodes and
// watches are maintained.
func Connect(servers []string, sessionTimeout time.Duration, options ...connOption) (*Conn, <-chan Event, error) {
if len(servers) == 0 {
return nil, nil, errors.New("zk: server list must not be empty")
srvs, e := prepareServerArray(servers)
if e != nil {
return nil, nil, e
}

srvs := make([]string, len(servers))

for i, addr := range servers {
if strings.Contains(addr, ":") {
srvs[i] = addr
} else {
srvs[i] = addr + ":" + strconv.Itoa(DefaultPort)
}
ec, conn, e := prepareConn(options, srvs, sessionTimeout, false, nil)
if e != nil {
return nil, nil, e
}

// Randomize the order of the servers to avoid creating hotspots
stringShuffle(srvs)
go func() {
conn.loop()
conn.flushRequests(ErrClosing)
conn.invalidateWatches(ErrClosing)
close(conn.eventChan)
}()
return conn, ec, nil
}

func prepareConn(options []connOption, srvs []string, sessionTimeout time.Duration, tlsEnabled bool, config *tls.Config) (chan Event, *Conn, error) {
ec := make(chan Event, eventChanSize)
conn := &Conn{
dialer: net.DialTimeout,
Expand All @@ -206,26 +232,35 @@ func Connect(servers []string, sessionTimeout time.Duration, options ...connOpti
logger: DefaultLogger,
logInfo: true, // default is true for backwards compatability
buf: make([]byte, bufferSize),
enableTLS: tlsEnabled,
configTLS: config,
}

// Set provided options.
for _, option := range options {
option(conn)
}

if err := conn.hostProvider.Init(srvs); err != nil {
return nil, nil, err
}

conn.setTimeouts(int32(sessionTimeout / time.Millisecond))
return ec, conn, nil
}

go func() {
conn.loop()
conn.flushRequests(ErrClosing)
conn.invalidateWatches(ErrClosing)
close(conn.eventChan)
}()
return conn, ec, nil
func prepareServerArray(servers []string) ([]string, error) {
if len(servers) == 0 {
return nil, errors.New("zk: server list must not be empty")
}
srvs := make([]string, len(servers))
for i, addr := range servers {
if strings.Contains(addr, ":") {
srvs[i] = addr
} else {
srvs[i] = addr + ":" + strconv.Itoa(DefaultPort)
}
}
// Randomize the order of the servers to avoid creating hotspots
stringShuffle(srvs)
return srvs, nil
}

// WithDialer returns a connection option specifying a non-default Dialer.
Expand Down Expand Up @@ -377,17 +412,31 @@ func (c *Conn) connect() error {
}
}

zkConn, err := c.dialer("tcp", c.Server(), c.connectTimeout)
if err == nil {
c.conn = zkConn
c.setState(StateConnected)
if c.logInfo {
c.logger.Printf("Connected to %s", c.Server())
if c.enableTLS {
dialer := net.Dialer{Timeout: c.connectTimeout}
zkConn, zkConnErr := tls.DialWithDialer(&dialer, "tcp", c.Server(), c.configTLS)
if zkConnErr == nil {
c.conn = zkConn
c.setState(StateConnected)
if c.logInfo {
c.logger.Printf("Connected to %s", c.Server())
}
return nil
}
return nil
c.logger.Printf("Failed to connect to %s: %+v", c.Server(), zkConnErr)
} else {
zkConn, zkConnErr := c.dialer("tcp", c.Server(), c.connectTimeout)
if zkConnErr == nil {
c.conn = zkConn
c.setState(StateConnected)
if c.logInfo {
c.logger.Printf("Connected to %s", c.Server())
}
return nil
}
c.logger.Printf("Failed to connect to %s: %+v", c.Server(), zkConnErr)
}

c.logger.Printf("Failed to connect to %s: %+v", c.Server(), err)
}
}

Expand Down Expand Up @@ -867,7 +916,8 @@ func (c *Conn) recvLoop(conn net.Conn) error {
return err
}

if res.Xid == -1 {
switch {
case res.Xid == -1:
res := &watcherEvent{}
_, err := decodePacket(buf[16:blen], res)
if err != nil {
Expand Down Expand Up @@ -901,11 +951,11 @@ func (c *Conn) recvLoop(conn net.Conn) error {
}
}
c.watchersLock.Unlock()
} else if res.Xid == -2 {
case res.Xid == -2:
// Ping response. Ignore.
} else if res.Xid < 0 {
case res.Xid < 0:
c.logger.Printf("Xid < 0 (%d) but not ping or watcher event", res.Xid)
} else {
default:
if res.Zxid > 0 {
c.lastZxid = res.Zxid
}
Expand Down
73 changes: 73 additions & 0 deletions zk/conn_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package zk

import (
"crypto/tls"
"io/ioutil"
"os"
"testing"
"time"
)
Expand Down Expand Up @@ -55,3 +57,74 @@ func TestRecurringReAuthHang(t *testing.T) {

<-conn.debugReauthDone
}

func TestStateChangesTLS(t *testing.T) {
if os.Getenv("tls") != "true" {
t.Skip("No TLS support")
}

config, err := newTLSConfig("/tmp/certs/client.cer.pem", "/tmp/certs/client.key.pem")
if err != nil {
panic(err)
}

ts, err := StartTestCluster(t, 1, logWriter{t: t, p: "[ZK] "}, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()

callbackChan := make(chan Event)
f := func(event Event) {
callbackChan <- event
}

zk, eventChan, err := ts.ConnectWithOptionsTLS(15*time.Second, config, WithEventCallback(f))
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}

verifyEventOrder := func(c <-chan Event, expectedStates []State, source string) {
for _, state := range expectedStates {
for {
event, ok := <-c
if !ok {
t.Fatalf("unexpected channel close for %s", source)
}

if event.Type != EventSession {
continue
}

if event.State != state {
t.Fatalf("mismatched state order from %s, expected %v, received %v", source, state, event.State)
}
break
}
}
}

states := []State{StateConnecting, StateConnected, StateHasSession}
verifyEventOrder(callbackChan, states, "callback")
verifyEventOrder(eventChan, states, "event channel")

zk.Close()
verifyEventOrder(callbackChan, []State{StateDisconnected}, "callback")
verifyEventOrder(eventChan, []State{StateDisconnected}, "event channel")
}

func newTLSConfig(clientCertFile, clientKeyFile string) (*tls.Config, error) {
tlsConfig := tls.Config{
InsecureSkipVerify: true,
}

// Load client cert
cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile)
if err != nil {
return nil, err
}
tlsConfig.Certificates = []tls.Certificate{cert}

tlsConfig.BuildNameToCertificate()
return &tlsConfig, nil
}
Loading