Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redis timeout and retries expand and more verbosity in logs #96

Merged
merged 1 commit into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions pkg/driver/redis/device_manager_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ const (
deviceFlowMessageStream = "FLOW_MESSAGE_EVE_"
deviceAppLogsStream = "APPS_EVE_"

ioTimeout = 10 * time.Second
maxRetries = 5

MB = common.MB
maxLogSizeRedis = 100 * MB
maxInfoSizeRedis = 100 * MB
Expand Down Expand Up @@ -164,10 +167,13 @@ func (d *DeviceManager) Init(s string, sizes common.MaxSizes) (bool, error) {
}

d.client = redis.NewClient(&redis.Options{
Network: d.databaseNet,
Addr: d.databaseURL,
Password: URL.User.Username(), // yes, I know!
DB: d.databaseID,
Network: d.databaseNet,
Addr: d.databaseURL,
Password: URL.User.Username(), // yes, I know!
DB: d.databaseID,
ReadTimeout: ioTimeout,
WriteTimeout: ioTimeout,
MaxRetries: maxRetries,
})

return true, nil
Expand Down Expand Up @@ -348,16 +354,19 @@ func (d *DeviceManager) DeviceGet(u *uuid.UUID) (*x509.Certificate, *x509.Certif
// first lets get the device certificate
cert, err := d.readCert(deviceCertsHash, u.String())
if err != nil {
return nil, nil, "", err
return nil, nil, "", fmt.Errorf("error reading device certificate for %s: %v", u.String(), err)
}

// now lets get the device onboarding certificate
onboard, err := d.readCert(deviceOnboardCertsHash, u.String())
if err != nil {
return nil, nil, "", err
return nil, nil, "", fmt.Errorf("error reading onboarding certificate for %s: %v", u.String(), err)
}

serial, err := d.client.HGet(deviceSerialsHash, u.String()).Result()
if err != nil {
return nil, nil, "", fmt.Errorf("error reading device serial for %s: %v", u.String(), err)
}
// somehow device serials are best effort
return cert, onboard, serial, nil
}
Expand Down Expand Up @@ -478,7 +487,7 @@ func (d *DeviceManager) OnboardRegister(cert *x509.Certificate, serial []string)
cn := common.GetOnboardCertName(cert.Subject.CommonName)

if err := d.writeCert(cert.Raw, onboardCertsHash, cn, true); err != nil {
return err
return fmt.Errorf("failed to write onboardCertsHash %v: %v", cn, err)
}

v, err := msgpack.Marshal(&serial)
Expand Down Expand Up @@ -509,7 +518,9 @@ func (d *DeviceManager) OnboardRegister(cert *x509.Certificate, serial []string)
// WriteRequest record a request
func (d *DeviceManager) WriteRequest(u uuid.UUID, b []byte) error {
if dev, ok := d.devices[u]; ok {
dev.AddRequest(b)
if err := dev.AddRequest(b); err != nil {
return fmt.Errorf("AddRequest error: %s", err)
}
return nil
}
return fmt.Errorf("device not found: %s", u)
Expand Down Expand Up @@ -713,6 +724,10 @@ func (d *DeviceManager) refreshCache() error {
if now.Sub(d.lastUpdate).Seconds() < float64(d.cacheTimeout) {
return nil
}
stats := d.client.PoolStats()
if stats.Timeouts > 0 {
log.Printf("Pool timeouts: %d", stats.Timeouts)
}

// create new vars to hold while we load
onboardCerts := make(map[string]map[string]bool)
Expand Down
68 changes: 51 additions & 17 deletions pkg/server/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,33 +62,41 @@ func (h *adminHandler) onboardAdd(w http.ResponseWriter, r *http.Request) {
contentType := r.Header.Get(contentType)
if contentType != mimeJSON {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
decoder := json.NewDecoder(r.Body)
var t OnboardCert
err := decoder.Decode(&t)
if err != nil {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}

serials := strings.Split(t.Serial, ",")
cert, err := ax.ParseCert(t.Cert)
if err != nil {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
log.Printf("onboardAdd: ParseCert error: %v", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
err = h.manager.OnboardRegister(cert, serials)
if err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
log.Printf("onboardAdd: OnboardRegister error: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
}

func (h *adminHandler) onboardList(w http.ResponseWriter, r *http.Request) {
cns, err := h.manager.OnboardList()
if err != nil {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
log.Printf("onboardList: OnboardList error: %v", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.WriteHeader(http.StatusOK)
body := strings.Join(cns, "\n")
w.WriteHeader(http.StatusOK)
w.Header().Add(contentType, mimeTextPlain)
w.Write([]byte(body))
}
Expand All @@ -101,17 +109,19 @@ func (h *adminHandler) onboardGet(w http.ResponseWriter, r *http.Request) {
case err != nil && isNotFound:
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
case err != nil:
log.Printf("onboardGet: OnboardGet(%s) error: %v", cn, err)
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
default:
w.WriteHeader(http.StatusOK)
body, err := json.Marshal(OnboardCert{
Cert: ax.PemEncodeCert(cert.Raw),
Serial: strings.Join(serials, ","),
})
if err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Write([]byte(body))
w.WriteHeader(http.StatusOK)
w.Write(body)
}
}

Expand All @@ -123,7 +133,8 @@ func (h *adminHandler) onboardRemove(w http.ResponseWriter, r *http.Request) {
case err != nil && isNotFound:
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
case err != nil:
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
log.Printf("OnboardRemove(%s) error: %v", cn, err)
http.Error(w, err.Error(), http.StatusBadRequest)
default:
w.WriteHeader(http.StatusOK)
}
Expand All @@ -132,7 +143,8 @@ func (h *adminHandler) onboardRemove(w http.ResponseWriter, r *http.Request) {
func (h *adminHandler) onboardClear(w http.ResponseWriter, r *http.Request) {
err := h.manager.OnboardClear()
if err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
log.Printf("onboardClear: OnboardClear error: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}

Expand All @@ -141,6 +153,7 @@ func (h *adminHandler) deviceAdd(w http.ResponseWriter, r *http.Request) {
contentType := r.Header.Get(contentType)
if contentType != mimeTextPlain {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}

decoder := json.NewDecoder(r.Body)
Expand All @@ -151,36 +164,46 @@ func (h *adminHandler) deviceAdd(w http.ResponseWriter, r *http.Request) {
)
err := decoder.Decode(&t)
if err != nil {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
log.Printf("deviceAdd: Decode error: %v", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

cert, err = ax.ParseCert(t.Cert)
if err != nil {
log.Printf("deviceAdd: ParseCert device error: %v", err)
http.Error(w, fmt.Sprintf("bad device cert: %v", err), http.StatusBadRequest)
return
}
if t.Onboard != nil && len(t.Onboard) > 0 {
onboard, err = ax.ParseCert(t.Onboard)
if err != nil {
log.Printf("deviceAdd: ParseCert onboard error: %v", err)
http.Error(w, fmt.Sprintf("bad onboard cert: %v", err), http.StatusBadRequest)
return
}
}
// generate a new uuid
unew, err := uuid.NewV4()
if err != nil {
log.Printf("error generating a new device UUID: %v", err)
log.Printf("deviceAdd: error generating a new device UUID: %v", err)
http.Error(w, fmt.Sprintf("error generating a new device UUID: %v", err), http.StatusBadRequest)
return
}
if err := h.manager.DeviceRegister(unew, cert, onboard, t.Serial, common.CreateBaseConfig(unew)); err != nil {
log.Printf("deviceAdd: DeviceRegister error: %v", err)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
}

func (h *adminHandler) deviceList(w http.ResponseWriter, r *http.Request) {
uids, err := h.manager.DeviceList()
if err != nil {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
log.Printf("deviceList: DeviceList error: %v", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// convert the UUIDs
ids := make([]string, 0, len(uids))
Expand Down Expand Up @@ -208,7 +231,8 @@ func (h *adminHandler) deviceGet(w http.ResponseWriter, r *http.Request) {
case err != nil && isNotFound:
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
case err != nil:
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
log.Printf("deviceGet: DeviceGet error: %v", err)
http.Error(w, err.Error(), http.StatusBadRequest)
case deviceCert == nil:
http.Error(w, "found device information, but cert was empty", http.StatusInternalServerError)
default:
Expand All @@ -221,7 +245,7 @@ func (h *adminHandler) deviceGet(w http.ResponseWriter, r *http.Request) {
}
body, err := json.Marshal(dc)
if err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
http.Error(w, err.Error(), http.StatusInternalServerError)
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(body))
Expand All @@ -241,7 +265,8 @@ func (h *adminHandler) deviceRemove(w http.ResponseWriter, r *http.Request) {
case err != nil && isNotFound:
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
case err != nil:
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
log.Printf("deviceRemove: DeviceRemove error: %v", err)
http.Error(w, err.Error(), http.StatusBadRequest)
default:
w.WriteHeader(http.StatusOK)
}
Expand All @@ -267,7 +292,8 @@ func (h *adminHandler) deviceConfigGet(w http.ResponseWriter, r *http.Request) {
case err != nil && isNotFound:
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
case err != nil:
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
log.Printf("deviceConfigGet: GetConfig error: %v", err)
http.Error(w, err.Error(), http.StatusBadRequest)
case deviceConfig == nil:
http.Error(w, "found device information, but cert was empty", http.StatusInternalServerError)
default:
Expand All @@ -285,11 +311,14 @@ func (h *adminHandler) deviceConfigSet(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, fmt.Sprintf("bad body: %v", err), http.StatusBadRequest)
return
}
var deviceConfig config.EdgeDevConfig
err = json.Unmarshal(body, &deviceConfig)
if err != nil {
log.Printf("deviceConfigSet: Unmarshal config error: %v", err)
http.Error(w, fmt.Sprintf("failed to marshal json message into protobuf: %v", err), http.StatusBadRequest)
return
}
// before setting the config, set any necessary defaults
// check for UUID and/or version mismatch
Expand All @@ -304,6 +333,7 @@ func (h *adminHandler) deviceConfigSet(w http.ResponseWriter, r *http.Request) {
http.Error(w, fmt.Sprintf("device not found %s", u), http.StatusNotFound)
return
case err != nil:
log.Printf("deviceConfigSet: GetConfig error: %v", err)
http.Error(w, fmt.Sprintf("error retrieving existing config for device %s: %v", u, err), http.StatusBadRequest)
return
case len(existingConfigB) == 0:
Expand All @@ -312,6 +342,7 @@ func (h *adminHandler) deviceConfigSet(w http.ResponseWriter, r *http.Request) {
}
// convert it to protobuf so we can work with it
if err := protojson.Unmarshal(existingConfigB, &existingConfig); err != nil {
log.Printf("deviceConfigSet: processing existing config error: %v", err)
http.Error(w, fmt.Sprintf("error processing existing config: %v", err), http.StatusInternalServerError)
return
}
Expand Down Expand Up @@ -350,6 +381,7 @@ func (h *adminHandler) deviceConfigSet(w http.ResponseWriter, r *http.Request) {

b, err := protojson.Marshal(&deviceConfig)
if err != nil {
log.Printf("deviceConfigSet: Marshal error: %v", err)
http.Error(w, fmt.Sprintf("error processing device config: %v", err), http.StatusBadRequest)
return
}
Expand All @@ -359,7 +391,8 @@ func (h *adminHandler) deviceConfigSet(w http.ResponseWriter, r *http.Request) {
case err != nil && isNotFound:
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
case err != nil:
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
log.Printf("deviceConfigSet: SetConfig error: %v", err)
http.Error(w, err.Error(), http.StatusBadRequest)
default:
w.WriteHeader(http.StatusOK)
}
Expand Down Expand Up @@ -420,6 +453,7 @@ func (h *adminHandler) deviceDataGet(w http.ResponseWriter, r *http.Request, c <
case err != nil && isNotFound:
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
case err != nil:
log.Printf("deviceDataGet: readerFunc error: %v", err)
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
case reader == nil:
http.Error(w, "found device information, but logs were empty", http.StatusInternalServerError)
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/apiHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (h *apiHandler) configPost(w http.ResponseWriter, r *http.Request) {
}
data, code, err := configProcess(configRequest, cfg)
if err != nil {
log.Println(err)
log.Printf("error configProcess: %v", err)
http.Error(w, http.StatusText(code), code)
return
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/x509/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ func Generate(cn, hosts string) ([]byte, *rsa.PrivateKey, error) {

serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128)
serialNumber, err := rand.Int(rand.Reader, serialNumberLimit)
if err != nil {
return nil, nil, fmt.Errorf("failed to generate serial number: %v", err)
}

notBefore := time.Now()
notAfter := notBefore.Add(oneYear)
Expand Down