@@ -203,42 +203,13 @@ def max_record_count(self) -> int | None:
203
203
"""Return the maximum number of records to fetch in a single query."""
204
204
return self .config .get ("max_record_count" )
205
205
206
- def build_query (self , context : Context | None ) -> sa .sql .Select :
206
+ def build_query (self , table : sa . Table ) -> sa .sql .Select :
207
207
"""Build a SQLAlchemy query for the stream."""
208
- selected_column_names = self .get_selected_schema ()["properties" ].keys ()
209
- table = self .connector .get_table (
210
- full_table_name = self .fully_qualified_name ,
211
- column_names = selected_column_names ,
212
- )
213
- query = table .select ()
214
-
215
- if self .replication_key :
216
- replication_key_col = table .columns [self .replication_key ]
217
- order_by = (
218
- sa .nulls_first (replication_key_col .asc ())
219
- if self .supports_nulls_first
220
- else replication_key_col .asc ()
221
- )
222
- query = query .order_by (order_by )
223
-
224
- start_val = self .get_starting_replication_key_value (context )
225
- if start_val :
226
- query = query .where (replication_key_col >= start_val )
227
-
208
+ query = super ().build_query (table )
228
209
stream_options = self .config .get ("stream_options" , {}).get (self .name , {})
229
210
if clauses := stream_options .get ("custom_where_clauses" ):
230
211
query = query .where (* (sa .text (clause .strip ()) for clause in clauses ))
231
212
232
- if self .ABORT_AT_RECORD_COUNT is not None :
233
- # Limit record count to one greater than the abort threshold. This ensures
234
- # `MaxRecordsLimitException` exception is properly raised by caller
235
- # `Stream._sync_records()` if more records are available than can be
236
- # processed.
237
- query = query .limit (self .ABORT_AT_RECORD_COUNT + 1 )
238
-
239
- if self .max_record_count ():
240
- query = query .limit (self .max_record_count ())
241
-
242
213
return query
243
214
244
215
# Get records from stream
@@ -264,8 +235,18 @@ def get_records(self, context: Context | None) -> t.Iterable[dict[str, t.Any]]:
264
235
msg = f"Stream '{ self .name } ' does not support partitioning."
265
236
raise NotImplementedError (msg )
266
237
238
+ selected_column_names = self .get_selected_schema ()["properties" ].keys ()
239
+ table = self .connector .get_table (
240
+ full_table_name = self .fully_qualified_name ,
241
+ column_names = selected_column_names ,
242
+ )
243
+
244
+ query = self .build_query (table )
245
+ query = self .apply_replication_filter (query , table , context = context )
246
+ query = self .apply_abort_query_limit (query )
247
+
267
248
with self .connector ._connect () as conn :
268
- for record in conn .execute (self . build_query ( context ) ).mappings ():
249
+ for record in conn .execute (query ).mappings ():
269
250
# TODO: Standardize record mapping type
270
251
# https://github.com/meltano/sdk/issues/2096
271
252
transformed_record = self .post_process (dict (record ))
0 commit comments