Skip to content

Commit f06da9b

Browse files
committed
fix: locking the tables for too much time
If the worker process the queue for a long time, it will preventing operations like TRUNCATE or VACUUM FULL from completing. This is because it acquires an AccessShareLock on the extension tables and only releases it once it goes to sleep. Now we use `ConditionalLockRelationOid` and unlock the tables at the end of each transaction, allowing VACUUM FULL or TRUNCATE to finish fast.
1 parent 331d16d commit f06da9b

File tree

4 files changed

+67
-67
lines changed

4 files changed

+67
-67
lines changed

src/core.c

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,6 @@ void set_curl_mhandle(WorkerState *wstate){
137137
}
138138

139139
uint64 delete_expired_responses(char *ttl, int batch_size){
140-
SetCurrentStatementStartTimestamp();
141-
StartTransactionCommand();
142-
PushActiveSnapshot(GetTransactionSnapshot());
143140
SPI_connect();
144141

145142
int ret_code = SPI_execute_with_args("\
@@ -168,8 +165,6 @@ uint64 delete_expired_responses(char *ttl, int batch_size){
168165
}
169166

170167
SPI_finish();
171-
PopActiveSnapshot();
172-
CommitTransactionCommand();
173168

174169
return affected_rows;
175170
}

src/worker.c

Lines changed: 41 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,15 @@ static WorkerState *worker_state = NULL;
2828
static const int curl_handle_event_timeout_ms = 1000;
2929
static const int net_worker_restart_time_sec = 1;
3030
static const long no_timeout = -1L;
31-
static bool extension_locked = false;
3231
static bool wake_commit_cb_active = false;
3332
static bool worker_should_restart = false;
33+
static const size_t total_extension_tables = 2;
3434

3535
static char* guc_ttl;
3636
static int guc_batch_size;
3737
static char* guc_database_name;
3838
static char* guc_username;
3939
static MemoryContext CurlMemContext = NULL;
40-
static LockRelId queue_table_lock;
41-
static LockRelId response_table_lock;
4240
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
4341
static volatile sig_atomic_t got_sighup = false;
4442

@@ -147,54 +145,11 @@ static void publish_state(WorkerStatus s) {
147145
ConditionVariableBroadcast(&worker_state->cv);
148146
}
149147

150-
static bool is_extension_loaded(){
151-
StartTransactionCommand();
152-
153-
bool extension_exists = OidIsValid(get_extension_oid("pg_net", true));
154-
155-
if(extension_exists && !extension_locked){
156-
Oid db_oid = get_database_oid(guc_database_name, false);
157-
158-
Oid net_oid = get_namespace_oid("net", false);
159-
160-
queue_table_lock.dbId = db_oid;
161-
queue_table_lock.relId = get_relname_relid("http_request_queue", net_oid);
162-
163-
response_table_lock.dbId = db_oid;
164-
response_table_lock.relId = get_relname_relid("_http_response", net_oid);
165-
}
166-
167-
CommitTransactionCommand();
168-
169-
return extension_exists;
170-
}
171-
172-
static inline void lock_extension(){
173-
if(!extension_locked){
174-
elog(DEBUG1, "pg_net worker locking extension tables");
175-
LockRelationIdForSession(&queue_table_lock, AccessShareLock);
176-
LockRelationIdForSession(&response_table_lock, AccessShareLock);
177-
extension_locked = true;
178-
}
179-
}
180-
181-
static inline void unlock_extension(){
182-
if(extension_locked){
183-
elog(DEBUG1, "pg_net worker unlocking extension tables");
184-
UnlockRelationIdForSession(&queue_table_lock, AccessShareLock);
185-
UnlockRelationIdForSession(&response_table_lock, AccessShareLock);
186-
extension_locked = false;
187-
}
188-
}
189-
190148
static void
191149
net_on_exit(__attribute__ ((unused)) int code, __attribute__ ((unused)) Datum arg){
192150
worker_should_restart = false;
193151
pg_atomic_write_u32(&worker_state->should_wake, 1); // ensure the remaining work will continue since we'll restart
194152

195-
// ensure unlock happens in case of error
196-
unlock_extension();
197-
198153
DisownLatch(&worker_state->latch);
199154

200155
ev_monitor_close(worker_state);
@@ -231,6 +186,31 @@ static void wait_while_processing_interrupts(WorkerWait ww, bool *should_restart
231186
}
232187
}
233188

189+
static bool is_extension_locked(Oid ext_table_oids[static total_extension_tables]){
190+
Oid net_oid = get_namespace_oid("net", true);
191+
192+
if(!OidIsValid(net_oid)){
193+
return false;
194+
}
195+
196+
Oid queue_oid = get_relname_relid("http_request_queue", net_oid);
197+
Oid resp_oid = get_relname_relid("_http_response", net_oid);
198+
199+
bool is_locked = ConditionalLockRelationOid(queue_oid, AccessShareLock) && ConditionalLockRelationOid(resp_oid, AccessShareLock);
200+
201+
if (is_locked) {
202+
ext_table_oids[0] = queue_oid;
203+
ext_table_oids[1] = resp_oid;
204+
}
205+
206+
return is_locked;
207+
}
208+
209+
static void unlock_extension(Oid ext_table_oids[static total_extension_tables]){
210+
UnlockRelationOid(ext_table_oids[0], AccessShareLock);
211+
UnlockRelationOid(ext_table_oids[1], AccessShareLock);
212+
}
213+
234214
void pg_net_worker(__attribute__ ((unused)) Datum main_arg) {
235215
on_proc_exit(net_on_exit, 0);
236216

@@ -266,17 +246,8 @@ void pg_net_worker(__attribute__ ((unused)) Datum main_arg) {
266246

267247
do {
268248

269-
if(!is_extension_loaded()){
270-
elog(DEBUG1, "pg_net worker waiting for extension to load");
271-
wait_while_processing_interrupts(WORKER_WAIT_ONE_SECOND, &worker_should_restart);
272-
continue;
273-
}
274-
275-
lock_extension(); // lock the extension immediately after it's loaded
276-
277249
uint32 expected = 1;
278250
if (!pg_atomic_compare_exchange_u32(&worker_state->should_wake, &expected, 0)){
279-
unlock_extension();
280251
elog(DEBUG1, "pg_net worker waiting for wake");
281252
wait_while_processing_interrupts(WORKER_WAIT_NO_TIMEOUT, &worker_should_restart);
282253
continue;
@@ -286,13 +257,23 @@ void pg_net_worker(__attribute__ ((unused)) Datum main_arg) {
286257
uint64 expired_responses = 0;
287258

288259
do {
260+
SetCurrentStatementStartTimestamp();
261+
StartTransactionCommand();
262+
PushActiveSnapshot(GetTransactionSnapshot());
263+
264+
Oid ext_table_oids[total_extension_tables];
265+
266+
if(!is_extension_locked(ext_table_oids)){
267+
elog(DEBUG1, "pg_net extension not loaded");
268+
PopActiveSnapshot();
269+
AbortCurrentTransaction();
270+
break;
271+
}
272+
289273
expired_responses = delete_expired_responses(guc_ttl, guc_batch_size);
290274

291275
elog(DEBUG1, "Deleted "UINT64_FORMAT" expired rows", expired_responses);
292276

293-
StartTransactionCommand();
294-
PushActiveSnapshot(GetTransactionSnapshot());
295-
296277
requests_consumed = consume_request_queue(worker_state->curl_mhandle, guc_batch_size, CurlMemContext);
297278

298279
elog(DEBUG1, "Consumed "UINT64_FORMAT" request rows", requests_consumed);
@@ -341,6 +322,8 @@ void pg_net_worker(__attribute__ ((unused)) Datum main_arg) {
341322
} while (running_handles > 0); // run while there are curl handles, some won't finish in a single iteration since they could be slow and waiting for a timeout
342323
}
343324

325+
unlock_extension(ext_table_oids);
326+
344327
PopActiveSnapshot();
345328
CommitTransactionCommand();
346329

@@ -353,8 +336,6 @@ void pg_net_worker(__attribute__ ((unused)) Datum main_arg) {
353336

354337
} while (!worker_should_restart);
355338

356-
unlock_extension();
357-
358339
publish_state(WS_EXITED);
359340

360341
// causing a failure on exit will make the postmaster process restart the bg worker

test/test_http_requests_deleted_after_ttl.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ def test_http_responses_will_complete_deletion(sess, autocommit_sess):
9191
sess.execute(text("select net.wake()"))
9292
sess.commit() # commit so worker wakes
9393

94+
time.sleep(0.1)
95+
9496
(count,) = sess.execute(
9597
text(
9698
"""

test/test_worker_behavior.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ def test_can_delete_rows_while_processing_queue(sess, autocommit_sess):
134134

135135

136136
def test_truncate_wait_while_processing_queue(sess, autocommit_sess):
137-
"""a truncate will wait until the worker is done processing all requests"""
137+
"""a truncate will not wait until the worker is done processing all requests"""
138138

139139
autocommit_sess.execute(text("alter system set pg_net.batch_size to '1';"))
140140
autocommit_sess.execute(text("select net.worker_restart();"))
@@ -148,6 +148,7 @@ def test_truncate_wait_while_processing_queue(sess, autocommit_sess):
148148

149149
sess.commit()
150150

151+
# truncate succeeds
151152
sess.execute(text(
152153
"""
153154
truncate net.http_request_queue;
@@ -156,15 +157,36 @@ def test_truncate_wait_while_processing_queue(sess, autocommit_sess):
156157

157158
sess.commit()
158159

160+
# and only one response will be done
159161
(count,) = sess.execute(text(
160162
"""
161163
select count(*) from net._http_response;
162164
"""
163165
)).fetchone()
164-
assert count == 5
166+
assert count == 1
167+
168+
sess.commit()
169+
170+
# even if some time passes
171+
time.sleep(1.1)
172+
173+
(count,) = sess.execute(text(
174+
"""
175+
select count(*) from net._http_response;
176+
"""
177+
)).fetchone()
178+
assert count == 1
165179

166180
sess.commit()
167181

182+
# and the queue will be empty
183+
(count,) = sess.execute(text(
184+
"""
185+
select count(*) from net.http_request_queue;
186+
"""
187+
)).fetchone()
188+
assert count == 0
189+
168190
autocommit_sess.execute(text("alter system reset pg_net.batch_size"))
169191
autocommit_sess.execute(text("select net.worker_restart()"))
170192
autocommit_sess.execute(text("select net.wait_until_running()"))

0 commit comments

Comments
 (0)