Skip to content

Commit

Permalink
Apply review comments
Browse files Browse the repository at this point in the history
- Wrap error when calling HasPendingRebootDBParams
- Use a single channel containing a struct with both the bool and error
  in HasPendingRebootDBParams
- Move all new DB-related methods to terraform/db_operations.go
  • Loading branch information
agarciamontoro committed Dec 11, 2024
1 parent 2d92ee5 commit e221ace
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 151 deletions.
152 changes: 1 addition & 151 deletions deployment/terraform/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/ec2/types"
"github.com/aws/aws-sdk-go-v2/service/rds"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/v8/config"
Expand Down Expand Up @@ -345,7 +344,7 @@ func (t *Terraform) PostProcessDatabase(extAgent *ssh.ExtAgent) error {

needsReboot, err := t.HasPendingRebootDBParams()
if err != nil {
return fmt.Errorf("failed to check whether the DB has pending-reboot parameters")
return fmt.Errorf("failed to check whether the DB has pending-reboot parameters: %w", err)
}

if needsReboot {
Expand All @@ -355,155 +354,6 @@ func (t *Terraform) PostProcessDatabase(extAgent *ssh.ExtAgent) error {
return nil
}

func (t *Terraform) HasPendingRebootDBParams() (bool, error) {
// Build the RDS client
cfg, err := t.GetAWSConfig()
if err != nil {
return false, fmt.Errorf("failed to get AWS config: %w", err)
}
rdsClient := rds.NewFromConfig(cfg)

// Check in parallel whether each DB instance needs to be rebooted
errChan := make(chan error, len(t.output.DBCluster.Instances))
needsRebootChan := make(chan bool, len(t.output.DBCluster.Instances))
var wg sync.WaitGroup
for _, instance := range t.output.DBCluster.Instances {
wg.Add(1)
go func(dbId string) {
defer wg.Done()
needsReboot, err := hasPendingRebootDBParams(rdsClient, dbId)
needsRebootChan <- needsReboot
errChan <- err
}(instance.DBIdentifier)
}

wg.Wait()
close(needsRebootChan)
close(errChan)

needsReboot := false
for b := range needsRebootChan {
needsReboot = needsReboot || b
}

var finalErr error
for err := range errChan {
finalErr = errors.Join(finalErr, err)
}

return needsReboot, finalErr
}

func hasPendingRebootDBParams(rdsClient *rds.Client, dbId string) (bool, error) {
describeParams := &rds.DescribeDBInstancesInput{
DBInstanceIdentifier: model.NewPointer(dbId),
}
describeOut, err := rdsClient.DescribeDBInstances(context.Background(), describeParams)
if err != nil {
return false, fmt.Errorf("error describing DB instance %q: %w", dbId, err)
}

if len(describeOut.DBInstances) < 1 {
return false, fmt.Errorf("describe instances returned no instances")
}

for _, group := range describeOut.DBInstances[0].DBParameterGroups {
if group.ParameterApplyStatus == nil {
return false, fmt.Errorf("parameter group has no ParameterApplyStatus")
}

if *group.ParameterApplyStatus == "pending-reboot" {
return true, nil
}
}

return false, nil
}

func (t *Terraform) RebootDBInstances(extAgent *ssh.ExtAgent) error {
// Build the RDS client
cfg, err := t.GetAWSConfig()
if err != nil {
return fmt.Errorf("failed to get AWS config: %w", err)
}
rdsClient := rds.NewFromConfig(cfg)

// Reboot each DB instance in parallel
errChan := make(chan error, len(t.output.DBCluster.Instances))
var wg sync.WaitGroup
for _, instance := range t.output.DBCluster.Instances {
wg.Add(1)
go func(dbId string) {
defer wg.Done()
errChan <- rebootDBInstance(rdsClient, dbId)
}(instance.DBIdentifier)
}

wg.Wait()
close(errChan)

var finalErr error
for err := range errChan {
finalErr = errors.Join(finalErr, err)
}

return finalErr
}

func rebootDBInstance(rdsClient *rds.Client, dbId string) error {
params := &rds.RebootDBInstanceInput{
DBInstanceIdentifier: model.NewPointer(dbId),
}

out, err := rdsClient.RebootDBInstance(context.Background(), params)
if err != nil {
return fmt.Errorf("failed to reboot DB instance: %w", err)
}

mlog.Info("DB instance reboot has started",
mlog.String("id", dbId),
mlog.String("status", *out.DBInstance.DBInstanceStatus))

// Wait for the DB instance to become available, or fail after 15 minutes
timeout := time.After(15 * time.Minute)
for {
select {
case <-timeout:
return fmt.Errorf("timeout reached, instance is not available yet")
case <-time.After(30 * time.Second):
describeParams := &rds.DescribeDBInstancesInput{
DBInstanceIdentifier: model.NewPointer(dbId),
}
describeOut, err := rdsClient.DescribeDBInstances(context.Background(), describeParams)
if err != nil {
return fmt.Errorf("error describing DB instance %q: %w", dbId, err)
}

if len(describeOut.DBInstances) < 1 {
return fmt.Errorf("describe instances returned no instances")
}

if describeOut.DBInstances[0].DBInstanceStatus == nil {
return fmt.Errorf("describe instances returned no status")
}

status := *describeOut.DBInstances[0].DBInstanceStatus

// Finish when the DB is completely rebooted
if status == "available" {
mlog.Info("DB instance is now available.",
mlog.String("id", dbId),
mlog.String("status", status))
return nil
}

mlog.Info("DB instance is not available yet. Waiting 30 seconds...",
mlog.String("id", dbId),
mlog.String("status", status))
}
}
}

func (t *Terraform) setupAppServers(extAgent *ssh.ExtAgent, uploadBinary bool, uploadRelease bool, uploadPath string, siteURL string) error {
for _, val := range t.output.Instances {
err := t.setupMMServer(extAgent, val.PublicIP, siteURL, uploadBinary, uploadRelease, uploadPath, val.Tags.Name)
Expand Down
161 changes: 161 additions & 0 deletions deployment/terraform/db_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ import (
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/service/rds"
"github.com/mattermost/mattermost-load-test-ng/deployment/terraform/ssh"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/mlog"
)

// StopDB stops the DB cluster and syncs the changes.
Expand Down Expand Up @@ -114,3 +120,158 @@ func (t *Terraform) DBStatus() (string, error) {

return out.DBCluster[0].Status, nil
}

// HasPendingRebootDBParams queries the deployed DB cluster and checks whether
// there is at least a DB instance whose status is "pending-reboot"
func (t *Terraform) HasPendingRebootDBParams() (bool, error) {
// Build the RDS client
cfg, err := t.GetAWSConfig()
if err != nil {
return false, fmt.Errorf("failed to get AWS config: %w", err)
}
rdsClient := rds.NewFromConfig(cfg)

// Check in parallel whether each DB instance needs to be rebooted
type retValue struct {
needsReboot bool
err error
}
retChan := make(chan retValue, len(t.output.DBCluster.Instances))
var wg sync.WaitGroup
for _, instance := range t.output.DBCluster.Instances {
wg.Add(1)
go func(dbId string) {
defer wg.Done()
needsReboot, err := hasPendingRebootDBParams(rdsClient, dbId)
retChan <- retValue{needsReboot, err}
}(instance.DBIdentifier)
}

wg.Wait()
close(retChan)

needsReboot := false
var finalErr error
for b := range retChan {
needsReboot = needsReboot || b.needsReboot
finalErr = errors.Join(finalErr, b.err)
}

return needsReboot, finalErr
}

// hasPendingRebootDBParams queries the specified DB instance and checks whether
// its status is "pending-reboot"
func hasPendingRebootDBParams(rdsClient *rds.Client, dbId string) (bool, error) {
describeParams := &rds.DescribeDBInstancesInput{
DBInstanceIdentifier: model.NewPointer(dbId),
}
describeOut, err := rdsClient.DescribeDBInstances(context.Background(), describeParams)
if err != nil {
return false, fmt.Errorf("error describing DB instance %q: %w", dbId, err)
}

if len(describeOut.DBInstances) < 1 {
return false, fmt.Errorf("describe instances returned no instances")
}

for _, group := range describeOut.DBInstances[0].DBParameterGroups {
if group.ParameterApplyStatus == nil {
return false, fmt.Errorf("parameter group has no ParameterApplyStatus")
}

if *group.ParameterApplyStatus == "pending-reboot" {
return true, nil
}
}

return false, nil
}

// RebootDBInstances reboots all deployed database instances, blocking the call
// until the status of each of them is back to "available"
func (t *Terraform) RebootDBInstances(extAgent *ssh.ExtAgent) error {
// Build the RDS client
cfg, err := t.GetAWSConfig()
if err != nil {
return fmt.Errorf("failed to get AWS config: %w", err)
}
rdsClient := rds.NewFromConfig(cfg)

// Reboot each DB instance in parallel
errChan := make(chan error, len(t.output.DBCluster.Instances))
var wg sync.WaitGroup
for _, instance := range t.output.DBCluster.Instances {
wg.Add(1)
go func(dbId string) {
defer wg.Done()
errChan <- rebootDBInstance(rdsClient, dbId)
}(instance.DBIdentifier)
}

wg.Wait()
close(errChan)

var finalErr error
for err := range errChan {
finalErr = errors.Join(finalErr, err)
}

return finalErr
}

// rebootDBInstance reboots the specified database instance, blocking the call
// until its status is back to "available"
func rebootDBInstance(rdsClient *rds.Client, dbId string) error {
params := &rds.RebootDBInstanceInput{
DBInstanceIdentifier: model.NewPointer(dbId),
}

out, err := rdsClient.RebootDBInstance(context.Background(), params)
if err != nil {
return fmt.Errorf("failed to reboot DB instance: %w", err)
}

mlog.Info("DB instance reboot has started",
mlog.String("id", dbId),
mlog.String("status", *out.DBInstance.DBInstanceStatus))

// Wait for the DB instance to become available, or fail after 15 minutes
timeout := time.After(15 * time.Minute)
for {
select {
case <-timeout:
return fmt.Errorf("timeout reached, instance is not available yet")
case <-time.After(30 * time.Second):
describeParams := &rds.DescribeDBInstancesInput{
DBInstanceIdentifier: model.NewPointer(dbId),
}
describeOut, err := rdsClient.DescribeDBInstances(context.Background(), describeParams)
if err != nil {
return fmt.Errorf("error describing DB instance %q: %w", dbId, err)
}

if len(describeOut.DBInstances) < 1 {
return fmt.Errorf("describe instances returned no instances")
}

if describeOut.DBInstances[0].DBInstanceStatus == nil {
return fmt.Errorf("describe instances returned no status")
}

status := *describeOut.DBInstances[0].DBInstanceStatus

// Finish when the DB is completely rebooted
if status == "available" {
mlog.Info("DB instance is now available.",
mlog.String("id", dbId),
mlog.String("status", status))
return nil
}

mlog.Info("DB instance is not available yet. Waiting 30 seconds...",
mlog.String("id", dbId),
mlog.String("status", status))
}
}
}

0 comments on commit e221ace

Please sign in to comment.