@@ -24,6 +24,20 @@ use std::time::Duration;
24
24
25
25
const TOKEN_LIFETIME_SECONDS : u64 = 3600 ;
26
26
27
+ /// Unique violation error code in Postgres.
28
+ /// NOTE: typically we should use "on conflict do nothing", but that only
29
+ /// works with insert queries, not update queries.
30
+ /// From: https://www.postgresql.org/docs/9.2/errcodes-appendix.html
31
+ pub const PG_UNIQUE_VIOLATION_ERROR : & str = "23505" ;
32
+ fn violates_uniqueness ( err : & sqlx:: Error ) -> bool {
33
+ if let sqlx:: Error :: Database ( err) = & err {
34
+ if let Some ( code) = err. code ( ) {
35
+ return code == PG_UNIQUE_VIOLATION_ERROR ;
36
+ }
37
+ }
38
+ false
39
+ }
40
+
27
41
type Result < T > = std:: result:: Result < T , ConnectError > ;
28
42
29
43
#[ derive( thiserror:: Error , Debug ) ]
@@ -73,15 +87,16 @@ impl ConnectError {
73
87
}
74
88
75
89
/// Attempts to create a new backend that owns the given key. If the key is already held, returns
76
- /// Ok(None). If the key is not held, creates a new backend and returns Ok(Some(backend_id)).
90
+ /// Err(ConnectError::FailedToAcquireKey). If the key is not held, creates a new backend and
91
+ /// returns Ok(backend_id).
77
92
async fn create_backend_with_key (
78
93
pool : & PgPool ,
79
94
key : & KeyConfig ,
80
95
spawn_config : & SpawnConfig ,
81
96
cluster : & ClusterName ,
82
97
drone_for_spawn : & DroneForSpawn ,
83
98
static_token : Option < & BearerToken > ,
84
- ) -> Result < Option < BackendName > > {
99
+ ) -> Result < BackendName > {
85
100
let backend_id = spawn_config. id . clone ( ) . or_random ( ) ;
86
101
let mut txn = pool. begin ( ) . await ?;
87
102
@@ -125,11 +140,17 @@ async fn create_backend_with_key(
125
140
serde_json:: to_value( & BackendState :: Scheduled ) . expect( "valid json" ) ,
126
141
static_token. map( |t| t. to_string( ) ) ,
127
142
)
128
- . fetch_optional ( & mut * txn)
129
- . await ?;
130
-
131
- let Some ( result) = result else {
132
- return Ok ( None ) ;
143
+ . fetch_one ( & mut * txn)
144
+ . await ;
145
+
146
+ let result = match result {
147
+ Ok ( result) => result,
148
+ Err ( err) => {
149
+ if violates_uniqueness ( & err) {
150
+ return Err ( ConnectError :: FailedToAcquireKey ) ;
151
+ }
152
+ return Err ( err. into ( ) ) ;
153
+ }
133
154
} ;
134
155
135
156
let acquired_key = AcquiredKey {
@@ -161,7 +182,7 @@ async fn create_backend_with_key(
161
182
162
183
txn. commit ( ) . await ?;
163
184
164
- Ok ( Some ( backend_id) )
185
+ Ok ( backend_id)
165
186
}
166
187
167
188
async fn create_token (
@@ -293,18 +314,15 @@ async fn attempt_connect(
293
314
. use_static_token
294
315
. then ( BearerToken :: new_random_static) ;
295
316
296
- let Some ( backend_id) = create_backend_with_key (
317
+ let backend_id = create_backend_with_key (
297
318
pool,
298
319
& key,
299
320
spawn_config,
300
321
cluster,
301
322
& drone,
302
323
bearer_token. as_ref ( ) ,
303
324
)
304
- . await ?
305
- else {
306
- return Err ( ConnectError :: FailedToAcquireKey ) ;
307
- } ;
325
+ . await ?;
308
326
tracing:: info!( backend_id = ?backend_id, "Created backend" ) ;
309
327
310
328
let ( token, secret_token) = if let Some ( token) = bearer_token {
0 commit comments