Skip to content

Commit 4aa80d3

Browse files
authored
Merge pull request #36 from weissleb/configure-timeout
add dml and ddl timeout overrides
2 parents 52312e8 + 5ca0341 commit 4aa80d3

File tree

9 files changed

+248
-12
lines changed

9 files changed

+248
-12
lines changed

README.md

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,58 @@ Sample Output:
642642
2020/01/20 15:28:35 context deadline exceeded
643643
```
644644
645+
### Overriding Athena Service Limits for Query Timeout
646+
This library assumes default [Athena service limits](https://docs.aws.amazon.com/athena/latest/ug/service-limits.html) for DDL and DML query timeouts, as can be found in `athenadriver/go/constants.go`.
647+
If you've increased your service limits, for example via the [Athena Service Quotas](https://console.aws.amazon.com/servicequotas/home/services/athena/quotas) console,
648+
you can override them on your `Config`.
649+
650+
Here's the same example found at [Query Cancellation](#query-cancellation), but with an *increased* query timeout.
651+
652+
```go
653+
package main
654+
655+
import (
656+
"context"
657+
"database/sql"
658+
"log"
659+
"time"
660+
drv "github.com/uber/athenadriver/go"
661+
)
662+
663+
func main() {
664+
// 1. Set AWS Credential in Driver Config.
665+
conf, _ := drv.NewDefaultConfig("s3://myqueryresults/",
666+
"us-east-2", "DummyAccessID", "DummySecretAccessKey")
667+
668+
// 2. Override the DML query timeout to 60 minutes (3600 seconds).
669+
serviceLimitOverride := drv.NewServiceLimitOverride()
670+
serviceLimitOverride.SetDMLQueryTimeout(3600)
671+
conf.SetServiceLimitOverride(*serviceLimitOverride)
672+
673+
// 3. Open Connection.
674+
dsn := conf.Stringify()
675+
db, _ := sql.Open(drv.DriverName, dsn)
676+
677+
// 4. Run the query.
678+
rows, err := db.QueryContext(context.Background(), "select count(*) from sampledb.elb_logs")
679+
if err != nil {
680+
log.Fatal(err)
681+
return
682+
}
683+
defer rows.Close()
684+
685+
var requestTimestamp string
686+
var url string
687+
for rows.Next() {
688+
if err := rows.Scan(&requestTimestamp, &url); err != nil {
689+
log.Fatal(err)
690+
}
691+
println(requestTimestamp + "," + url)
692+
}
693+
}
694+
```
695+
696+
645697
### Missing Value Handling
646698
647699
It is common to have missing values in S3 file, or Athena DB. When this happens, you can specify if you want to use

go/config.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,3 +421,17 @@ func (c *Config) SetAWSProfile(profile string) {
421421
func (c *Config) GetAWSProfile() string {
422422
return c.values.Get("AWSProfile")
423423
}
424+
425+
// SetServiceLimitOverride is to set values from a ServiceLimitOverride
426+
func (c *Config) SetServiceLimitOverride(serviceLimitOverride ServiceLimitOverride) {
427+
for k, v := range serviceLimitOverride.GetAsStringMap() {
428+
c.values.Set(k, v)
429+
}
430+
}
431+
432+
// GetServiceLimitOverride is to get the ServiceLimitOverride manually set by a user
433+
func (c *Config) GetServiceLimitOverride() *ServiceLimitOverride {
434+
serviceLimitOverride := NewServiceLimitOverride()
435+
serviceLimitOverride.SetFromValues(c.values)
436+
return serviceLimitOverride
437+
}

go/config_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,3 +286,29 @@ func TestConfig_SetAWSProfile(t *testing.T) {
286286
testConf.SetAWSProfile("development")
287287
assert.Equal(t, testConf.GetAWSProfile(), "development")
288288
}
289+
290+
func TestConfig_SetServiceLimitOverride(t *testing.T) {
291+
var s3bucket string = "s3://query-results-henry-wu-us-east-2/"
292+
293+
testConf := NewNoOpsConfig()
294+
_ = testConf.SetOutputBucket(s3bucket)
295+
serviceLimitOverride := NewServiceLimitOverride()
296+
ddlQueryTimeout := 1000 * 60 // 1000 minutes
297+
_ = serviceLimitOverride.SetDDLQueryTimeout(ddlQueryTimeout)
298+
testConf.SetServiceLimitOverride(*serviceLimitOverride)
299+
testServiceLimitOverride := testConf.GetServiceLimitOverride()
300+
assert.Equal(t, ddlQueryTimeout, testServiceLimitOverride.GetDDLQueryTimeout())
301+
302+
expected := "s3://query-results-henry-wu-us-east-2?DDLQueryTimeout=60000&DMLQueryTimeout=0&WGRemoteCreation=true&db=default&missingAsEmptyString=true&region=us-east-1"
303+
assert.Equal(t, expected, testConf.Stringify())
304+
305+
dmlQueryTimeout := 60 * 60 // 60 minutes
306+
_ = serviceLimitOverride.SetDMLQueryTimeout(dmlQueryTimeout)
307+
testConf.SetServiceLimitOverride(*serviceLimitOverride)
308+
testServiceLimitOverride = testConf.GetServiceLimitOverride()
309+
assert.Equal(t, ddlQueryTimeout, testServiceLimitOverride.GetDDLQueryTimeout())
310+
assert.Equal(t, dmlQueryTimeout, testServiceLimitOverride.GetDMLQueryTimeout())
311+
312+
expected = "s3://query-results-henry-wu-us-east-2?DDLQueryTimeout=60000&DMLQueryTimeout=3600&WGRemoteCreation=true&db=default&missingAsEmptyString=true&region=us-east-1"
313+
assert.Equal(t, expected, testConf.Stringify())
314+
}

go/connection.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,7 @@ WAITING_FOR_RESULT:
432432
obs.Log(ErrorLevel, "query canceled", zap.String("queryID", queryID))
433433
return nil, ctx.Err()
434434
case <-time.After(PoolInterval * time.Second):
435-
if isQueryTimeOut(startOfStartQueryExecution, *statusResp.QueryExecution.StatementType) {
435+
if isQueryTimeOut(startOfStartQueryExecution, *statusResp.QueryExecution.StatementType, c.connector.config.GetServiceLimitOverride()) {
436436
obs.Log(ErrorLevel, "Query timeout failure",
437437
zap.String("workgroup", wg.Name),
438438
zap.String("queryID", queryID),

go/errors.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package athenadriver
2222

2323
import (
2424
"errors"
25+
"fmt"
2526
)
2627

2728
// Various errors the driver might return. Can change between driver versions.
@@ -41,4 +42,5 @@ var (
4142
ErrAthenaNilAPI = errors.New("athenaAPI must not be nil")
4243
ErrTestMockGeneric = errors.New("some_mock_error_for_test")
4344
ErrTestMockFailedByAthena = errors.New("the reason why Athena failed the query")
45+
ErrServiceLimitOverride = errors.New(fmt.Sprintf("service limit override must be greater than %d", PoolInterval))
4446
)

go/servicelimitoverride.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// Copyright (c) 2020 Uber Technologies, Inc.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
// THE SOFTWARE.
20+
21+
package athenadriver
22+
23+
import (
24+
"fmt"
25+
"net/url"
26+
"strconv"
27+
)
28+
29+
// ServiceLimitOverride allows users to override service limits, hardcoded in constants.go.
30+
// This assumes the service limits have been raised in the AWS account.
31+
// https://docs.aws.amazon.com/athena/latest/ug/service-limits.html
32+
type ServiceLimitOverride struct {
33+
ddlQueryTimeout int
34+
dmlQueryTimeout int
35+
}
36+
37+
// NewServiceLimitOverride is to create an empty ServiceLimitOverride.
38+
// Values can be set using setters.
39+
func NewServiceLimitOverride() *ServiceLimitOverride {
40+
return &ServiceLimitOverride{}
41+
}
42+
43+
// SetDDLQueryTimeout is to set the DDLQueryTimeout override.
44+
func (c *ServiceLimitOverride) SetDDLQueryTimeout(seconds int) error {
45+
if seconds < PoolInterval {
46+
return ErrServiceLimitOverride
47+
}
48+
c.ddlQueryTimeout = seconds
49+
return nil
50+
}
51+
52+
// GetDDLQueryTimeout is to get the DDLQueryTimeout override.
53+
func (c *ServiceLimitOverride) GetDDLQueryTimeout() int {
54+
return c.ddlQueryTimeout
55+
}
56+
57+
// SetDMLQueryTimeout is to set the DMLQueryTimeout override.
58+
func (c *ServiceLimitOverride) SetDMLQueryTimeout(seconds int) error {
59+
if seconds < PoolInterval {
60+
return ErrServiceLimitOverride
61+
}
62+
c.dmlQueryTimeout = seconds
63+
return nil
64+
}
65+
66+
// GetDMLQueryTimeout is to get the DMLQueryTimeout override.
67+
func (c *ServiceLimitOverride) GetDMLQueryTimeout() int {
68+
return c.dmlQueryTimeout
69+
}
70+
71+
// GetAsStringMap is to get the ServiceLimitOverride as a map of strings
72+
// and aids in setting url.Values in Config
73+
func (c *ServiceLimitOverride) GetAsStringMap() map[string]string {
74+
res := map[string]string{}
75+
res["DDLQueryTimeout"] = fmt.Sprintf("%d", c.ddlQueryTimeout)
76+
res["DMLQueryTimeout"] = fmt.Sprintf("%d", c.dmlQueryTimeout)
77+
return res
78+
}
79+
80+
// SetFromValues is to set ServiceLimitOverride properties from a url.Values
81+
// which might be a list of override and other ignored values from a dsn
82+
func (c *ServiceLimitOverride) SetFromValues(kvp url.Values) {
83+
ddlQueryTimeout, _ := strconv.Atoi(kvp.Get("DDLQueryTimeout"))
84+
_ = c.SetDDLQueryTimeout(ddlQueryTimeout)
85+
dmlQueryTimeout, _ := strconv.Atoi(kvp.Get("DMLQueryTimeout"))
86+
_ = c.SetDMLQueryTimeout(dmlQueryTimeout)
87+
}

go/servicelimitoverride_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package athenadriver
2+
3+
import (
4+
"github.com/stretchr/testify/assert"
5+
"testing"
6+
)
7+
8+
// Tests for ServiceLimitOverride.
9+
func TestNewServiceLimitOverride(t *testing.T) {
10+
testConf := NewServiceLimitOverride()
11+
assert.Zero(t, testConf.GetDDLQueryTimeout())
12+
assert.Zero(t, testConf.GetDMLQueryTimeout())
13+
14+
ddlQueryTimeout := 30 * 60 // seconds
15+
dmlQueryTimeout := 60 * 60 // seconds
16+
testConf.SetDDLQueryTimeout(ddlQueryTimeout)
17+
assert.Equal(t, ddlQueryTimeout, testConf.GetDDLQueryTimeout()) // seconds
18+
19+
testConf.SetDMLQueryTimeout(dmlQueryTimeout)
20+
assert.Equal(t, dmlQueryTimeout, testConf.GetDMLQueryTimeout()) // seconds
21+
22+
ddlQueryTimeout = 0
23+
dmlQueryTimeout = 0
24+
err := testConf.SetDDLQueryTimeout(ddlQueryTimeout)
25+
assert.NotNil(t, err)
26+
27+
err = testConf.SetDMLQueryTimeout(dmlQueryTimeout)
28+
assert.NotNil(t, err)
29+
30+
ddlQueryTimeout = -1
31+
dmlQueryTimeout = -1
32+
err = testConf.SetDDLQueryTimeout(ddlQueryTimeout)
33+
assert.NotNil(t, err)
34+
35+
err = testConf.SetDMLQueryTimeout(dmlQueryTimeout)
36+
assert.NotNil(t, err)
37+
}

go/utils.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -665,22 +665,32 @@ func valueToNamedValue(args []driver.Value) []driver.NamedValue {
665665
return nameValues
666666
}
667667

668-
func isQueryTimeOut(startOfStartQueryExecution time.Time, queryType string) bool {
668+
func isQueryTimeOut(startOfStartQueryExecution time.Time, queryType string, serviceLimitOverride *ServiceLimitOverride) bool {
669+
ddlQueryTimeout := DDLQueryTimeout
670+
dmlQueryTimeout := DMLQueryTimeout
671+
if serviceLimitOverride != nil {
672+
if serviceLimitOverride.GetDDLQueryTimeout() > 0 {
673+
ddlQueryTimeout = serviceLimitOverride.GetDDLQueryTimeout()
674+
}
675+
if serviceLimitOverride.GetDMLQueryTimeout() > 0 {
676+
dmlQueryTimeout = serviceLimitOverride.GetDMLQueryTimeout()
677+
}
678+
}
669679
switch queryType {
670680
case "DDL":
671681
return time.Since(startOfStartQueryExecution) >
672-
DDLQueryTimeout*time.Second
682+
time.Duration(ddlQueryTimeout)*time.Second
673683
case "DML":
674684
return time.Since(startOfStartQueryExecution) >
675-
DMLQueryTimeout*time.Second
685+
time.Duration(dmlQueryTimeout)*time.Second
676686
case "UTILITY":
677687
return time.Since(startOfStartQueryExecution) >
678-
DMLQueryTimeout*time.Second
688+
time.Duration(dmlQueryTimeout)*time.Second
679689
case "TIMEOUT_NOW":
680690
return true
681691
default:
682692
return time.Since(startOfStartQueryExecution) >
683-
DDLQueryTimeout*time.Second
693+
time.Duration(ddlQueryTimeout)*time.Second
684694
}
685695
}
686696

go/utils_test.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -206,14 +206,22 @@ func TestValueToNamedValue(t *testing.T) {
206206
}
207207

208208
func TestIsQueryTimeOut(t *testing.T) {
209-
assert.False(t, isQueryTimeOut(time.Now(), athena.StatementTypeDdl))
210-
assert.False(t, isQueryTimeOut(time.Now(), athena.StatementTypeDml))
211-
assert.False(t, isQueryTimeOut(time.Now(), athena.StatementTypeUtility))
209+
assert.False(t, isQueryTimeOut(time.Now(), athena.StatementTypeDdl, nil))
210+
assert.False(t, isQueryTimeOut(time.Now(), athena.StatementTypeDml, nil))
211+
assert.False(t, isQueryTimeOut(time.Now(), athena.StatementTypeUtility, nil))
212212
now := time.Now()
213213
OneHourAgo := now.Add(-3600 * time.Second)
214-
assert.True(t, isQueryTimeOut(OneHourAgo, athena.StatementTypeDml))
215-
assert.False(t, isQueryTimeOut(OneHourAgo, athena.StatementTypeDdl))
216-
assert.False(t, isQueryTimeOut(OneHourAgo, "UNKNOWN"))
214+
assert.True(t, isQueryTimeOut(OneHourAgo, athena.StatementTypeDml, nil))
215+
assert.False(t, isQueryTimeOut(OneHourAgo, athena.StatementTypeDdl, nil))
216+
assert.False(t, isQueryTimeOut(OneHourAgo, "UNKNOWN", nil))
217+
218+
testConf := NewServiceLimitOverride()
219+
testConf.SetDMLQueryTimeout(65 * 60) // 65 minutes
220+
assert.False(t, isQueryTimeOut(OneHourAgo, athena.StatementTypeDml, testConf))
221+
222+
testConf.SetDDLQueryTimeout(30 * 60) // 30 minutes
223+
assert.True(t, isQueryTimeOut(OneHourAgo, athena.StatementTypeDdl, testConf))
224+
assert.True(t, isQueryTimeOut(OneHourAgo, "UNKNOWN", testConf))
217225
}
218226

219227
func TestEscapeBytesBackslash(t *testing.T) {

0 commit comments

Comments
 (0)