1111# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212# See the License for the specific language governing permissions and
1313# limitations under the License.
14+ from enum import Enum
1415
1516import apache_beam as beam
1617import logging
2021from apache_beam .options .value_provider import ValueProvider
2122from google .cloud import bigquery
2223from models .execution import DestinationType , Execution , Batch
24+ from string import Template
2325from typing import Any , List , Iterable , Tuple , Dict
2426
2527
@@ -48,6 +50,20 @@ def is_deterministic(self):
4850 return True
4951
5052
53+ class TransactionalType (Enum ):
54+ """
55+ Distinct types to handle data uploading deduplication.
56+ NOT_TRANSACTION: don't handle.
57+ UUID: Expect a 'uuid' field in the source table as a unique identifier to each row.
58+ GCLID_DATE_TIME: Expect 'gclid' and 'time' fields in the source table as unique identifiers to each row.
59+ """
60+ (
61+ NOT_TRANSACTIONAL ,
62+ UUID ,
63+ GCLID_TIME ,
64+ ) = range (3 )
65+
66+
5167class BatchesFromExecutions (beam .PTransform ):
5268 """
5369 Filter the received executions by the received action,
@@ -62,14 +78,15 @@ def process(self, execution: Execution) -> Iterable[Tuple[Execution, Dict[str, A
6278 table_name = table_name .replace ('`' , '' )
6379 query = f"SELECT data.* FROM `{ table_name } ` AS data"
6480 logging .getLogger (_LOGGER_NAME ).info (f'Reading from table { table_name } for Execution { execution } ' )
65- rows_iterator = client .query (query ).result (page_size = _BIGQUERY_PAGE_SIZE )
66- for row in rows_iterator :
81+ for row in client .query (query ).result (page_size = _BIGQUERY_PAGE_SIZE ):
6782 yield execution , _convert_row_to_dict (row )
6883
6984 class _ExecutionIntoBigQueryRequestTransactional (beam .DoFn ):
7085
71- def __init__ (self , bq_ops_dataset ):
86+ def __init__ (self , bq_ops_dataset , create_table_query , join_query ):
7287 self ._bq_ops_dataset = bq_ops_dataset
88+ self ._create_table_query = create_table_query
89+ self ._join_query = join_query
7390
7491 def process (self , execution : Execution ) -> Iterable [Tuple [Execution , Dict [str , Any ]]]:
7592 table_name = execution .source .source_metadata [0 ] + \
@@ -81,25 +98,20 @@ def process(self, execution: Execution) -> Iterable[Tuple[Execution, Dict[str, A
8198 uploaded_table_name = uploaded_table_name .replace ('`' , '' )
8299 client = bigquery .Client ()
83100
84- query = f"CREATE TABLE IF NOT EXISTS `{ uploaded_table_name } ` ( \
85- timestamp TIMESTAMP OPTIONS(description= 'Event timestamp'), \
86- uuid STRING OPTIONS(description='Event unique identifier')) \
87- PARTITION BY _PARTITIONDATE \
88- OPTIONS(partition_expiration_days=15)"
101+ create_table_query_ready = \
102+ Template (self ._create_table_query ).substitute (uploaded_table_name = uploaded_table_name )
89103
90104 logging .getLogger (_LOGGER_NAME ).info (
91105 f"Creating table { uploaded_table_name } if it doesn't exist" )
92106
93- client .query (query ).result ()
107+ client .query (create_table_query_ready ).result ()
94108
95- query = f"SELECT data.* FROM `{ table_name } ` AS data \
96- LEFT JOIN { uploaded_table_name } AS uploaded USING(uuid) \
97- WHERE uploaded.uuid IS NULL;"
109+ join_query_ready = \
110+ Template (self ._join_query ).substitute (table_name = table_name , uploaded_table_name = uploaded_table_name )
98111
99112 logging .getLogger (_LOGGER_NAME ).info (
100113 f'Reading from table { table_name } for Execution { execution } ' )
101- rows_iterator = client .query (query ).result (page_size = _BIGQUERY_PAGE_SIZE )
102- for row in rows_iterator :
114+ for row in client .query (join_query_ready ).result (page_size = _BIGQUERY_PAGE_SIZE ):
103115 yield execution , _convert_row_to_dict (row )
104116
105117
@@ -123,21 +135,44 @@ def __init__(
123135 self ,
124136 destination_type : DestinationType ,
125137 batch_size : int = 5000 ,
126- transactional : bool = False ,
138+ transactional_type : TransactionalType = TransactionalType . NOT_TRANSACTIONAL ,
127139 bq_ops_dataset : ValueProvider = None
128140 ):
129141 super ().__init__ ()
130- if transactional and not bq_ops_dataset :
142+ if transactional_type is not TransactionalType . NOT_TRANSACTIONAL and not bq_ops_dataset :
131143 raise Exception ('Missing bq_ops_dataset for this uploader' )
132144
133145 self ._destination_type = destination_type
134146 self ._batch_size = batch_size
135- self ._transactional = transactional
147+ self ._transactional_type = transactional_type
136148 self ._bq_ops_dataset = bq_ops_dataset
137149
138150 def _get_bq_request_class (self ):
139- if self ._transactional :
140- return self ._ExecutionIntoBigQueryRequestTransactional (self ._bq_ops_dataset )
151+ if self ._transactional_type == TransactionalType .UUID :
152+ return self ._ExecutionIntoBigQueryRequestTransactional (
153+ self ._bq_ops_dataset ,
154+ "CREATE TABLE IF NOT EXISTS `$uploaded_table_name` ( \
155+ timestamp TIMESTAMP OPTIONS(description= 'Event timestamp'), \
156+ uuid STRING OPTIONS(description='Event unique identifier')) \
157+ PARTITION BY _PARTITIONDATE \
158+ OPTIONS(partition_expiration_days=15)" ,
159+ "SELECT data.* FROM `$table_name` AS data \
160+ LEFT JOIN $uploaded_table_name AS uploaded USING(uuid) \
161+ WHERE uploaded.uuid IS NULL;"
162+ )
163+ if self ._transactional_type == TransactionalType .GCLID_TIME :
164+ return self ._ExecutionIntoBigQueryRequestTransactional (
165+ self ._bq_ops_dataset ,
166+ "CREATE TABLE IF NOT EXISTS `$uploaded_table_name` ( \
167+ timestamp TIMESTAMP OPTIONS(description= 'Event timestamp'), \
168+ gclid STRING OPTIONS(description= 'Original gclid'), \
169+ time STRING OPTIONS(description= 'Original time')) \
170+ PARTITION BY _PARTITIONDATE \
171+ OPTIONS(partition_expiration_days=15)" ,
172+ "SELECT data.* FROM `$table_name` AS data \
173+ LEFT JOIN $uploaded_table_name AS uploaded USING(gclid, time) \
174+ WHERE uploaded.gclid IS NULL;"
175+ )
141176 return self ._ExecutionIntoBigQueryRequest ()
142177
143178 def expand (self , executions ):
0 commit comments