25
25
26
26
27
27
class GoogleAdsSSIUploaderDoFn (MegalistaUploader ):
28
-
29
28
def __init__ (self , oauth_credentials , developer_token , error_handler : ErrorHandler ):
30
29
super ().__init__ (error_handler )
31
30
self .oauth_credentials = oauth_credentials
32
31
self .developer_token = developer_token
33
32
self .active = developer_token is not None
34
33
35
34
def _get_offline_user_data_job_service (self , customer_id ):
36
- return utils .get_ads_service ('OfflineUserDataJobService' , ADS_API_VERSION ,
37
- self .oauth_credentials ,
38
- self .developer_token .get (),
39
- customer_id )
35
+ return utils .get_ads_service (
36
+ "OfflineUserDataJobService" ,
37
+ ADS_API_VERSION ,
38
+ self .oauth_credentials ,
39
+ self .developer_token .get (),
40
+ customer_id ,
41
+ )
40
42
41
43
@staticmethod
42
44
def _assert_conversion_metadata_is_present (execution : Execution ):
43
45
metadata = execution .destination .destination_metadata
44
46
if len (metadata ) < 3 :
45
47
raise ValueError (
46
- f'Missing destination information. Received { len (metadata )} entry(ies)' )
48
+ f"Missing destination information. Received { len (metadata )} entry(ies)"
49
+ )
47
50
48
- @utils .safe_process (
49
- logger = logging .getLogger ('megalista.GoogleAdsSSIUploader' ))
51
+ @utils .safe_process (logger = logging .getLogger ("megalista.GoogleAdsSSIUploader" ))
50
52
def process (self , batch : Batch , ** kwargs ):
51
53
execution = batch .execution
52
54
self ._assert_conversion_metadata_is_present (execution )
53
55
54
- currency_code = self ._get_currency_code (execution .destination )
55
- customer_id = self ._get_customer_id (execution .account_config , execution .destination )
56
- login_customer_id = self ._get_login_customer_id (execution .account_config , execution .destination )
57
- user_data_consent = self ._get_user_data_consent (execution .account_config , execution .destination )
58
- ad_personalization = self ._get_ad_personalization_consent (execution .account_config , execution .destination )
56
+ custom_key = self ._get_custom_key (execution .destination )
57
+ customer_id = self ._get_customer_id (
58
+ execution .account_config , execution .destination
59
+ )
60
+ login_customer_id = self ._get_login_customer_id (
61
+ execution .account_config , execution .destination
62
+ )
63
+ user_data_consent = self ._get_user_data_consent (
64
+ execution .account_config , execution .destination
65
+ )
66
+ ad_personalization = self ._get_ad_personalization_consent (
67
+ execution .account_config , execution .destination
68
+ )
59
69
60
70
offline_user_data_job_service = self ._get_offline_user_data_job_service (
61
- login_customer_id )
62
- conversion_action_resource_name = self ._get_resource_name (customer_id ,
63
- login_customer_id ,
64
- execution .destination .destination_metadata [0 ])
65
- self ._do_upload (execution ,
66
- offline_user_data_job_service ,
67
- customer_id ,
68
- currency_code ,
69
- conversion_action_resource_name ,
70
- user_data_consent ,
71
- ad_personalization ,
72
- batch .elements )
71
+ login_customer_id
72
+ )
73
+
74
+ ads_service = self ._get_ads_service (login_customer_id )
75
+
76
+ conversion_action_resource_name = self ._get_resource_name (
77
+ ads_service , customer_id , execution .destination .destination_metadata [0 ]
78
+ )
79
+
80
+ self ._do_upload (
81
+ execution ,
82
+ offline_user_data_job_service ,
83
+ customer_id ,
84
+ custom_key ,
85
+ conversion_action_resource_name ,
86
+ user_data_consent ,
87
+ ad_personalization ,
88
+ batch .elements ,
89
+ )
73
90
74
91
return [execution ]
75
92
76
- def _do_upload (self , execution , offline_user_data_job_service , customer_id , currency_code , conversion_action_resource_name , user_data_consent , ad_personalization , rows ):
77
- logger = logging .getLogger ('megalista.GoogleAdsSSIUploader' )
93
+ def _do_upload (
94
+ self ,
95
+ execution ,
96
+ offline_user_data_job_service ,
97
+ customer_id ,
98
+ custom_key ,
99
+ conversion_action_resource_name ,
100
+ user_data_consent ,
101
+ ad_personalization ,
102
+ rows ,
103
+ ):
104
+ logger = logging .getLogger ("megalista.GoogleAdsSSIUploader" )
78
105
79
106
# Upload is divided into 3 parts:
80
107
# 1. Creates Job
@@ -83,108 +110,153 @@ def _do_upload(self, execution, offline_user_data_job_service, customer_id, curr
83
110
84
111
# 1. Creates Job
85
112
job_creation_payload = {
86
- 'type_' : 'STORE_SALES_UPLOAD_FIRST_PARTY' ,
87
- 'external_id' : int (datetime .datetime .now ().timestamp ()* 10e3 ),
88
- 'store_sales_metadata' : {
89
- 'loyalty_fraction' : 1.0 ,
90
- 'transaction_upload_fraction' : 1.0
91
- }
113
+ "type_" : "STORE_SALES_UPLOAD_FIRST_PARTY" ,
114
+ "external_id" : int (datetime .datetime .now ().timestamp () * 10e3 ),
115
+ "store_sales_metadata" : {
116
+ "loyalty_fraction" : 1.0 ,
117
+ "transaction_upload_fraction" : 1.0 ,
118
+ ** ({"custom_key" : custom_key } if custom_key else {}),
119
+ },
92
120
}
93
121
94
- job_resource_name = offline_user_data_job_service .create_offline_user_data_job (customer_id = customer_id , job = job_creation_payload ).resource_name
95
-
122
+ job_resource_name = offline_user_data_job_service .create_offline_user_data_job (
123
+ customer_id = customer_id , job = job_creation_payload
124
+ ).resource_name
125
+
96
126
# Sets consent info if any
97
127
consent = {}
98
128
if user_data_consent :
99
- consent [' ad_user_data' ] = user_data_consent
100
- if ad_personalization :
101
- consent [' ad_personalization' ] = ad_personalization
129
+ consent [" ad_user_data" ] = user_data_consent
130
+ if ad_personalization :
131
+ consent [" ad_personalization" ] = ad_personalization
102
132
103
133
# 2. Creates operations (data insertion)
104
134
data_insertion_payload = {
105
- 'resource_name' : job_resource_name ,
106
- 'enable_partial_failure' : False ,
107
- 'operations' : [{
108
- 'create' : {
109
- 'user_identifiers' : [{k : v } for (k , v ) in conversion .items () if k not in ('amount' , 'time' )],
110
- 'transaction_attribute' : {
111
- 'conversion_action' : conversion_action_resource_name ,
112
- 'currency_code' : currency_code ,
113
- 'transaction_amount_micros' : conversion ['amount' ],
114
- 'transaction_date_time' : utils .format_date (conversion ['time' ])
115
- },
116
- ** ({'consent' :consent } if consent else {})
135
+ "resource_name" : job_resource_name ,
136
+ "enable_partial_failure" : False ,
137
+ "operations" : [
138
+ {
139
+ "create" : {
140
+ "user_identifiers" : [
141
+ {k : v }
142
+ for (k , v ) in conversion .items ()
143
+ if k not in ("amount" , "time" , "currency_code" , "custom_value" )
144
+ ],
145
+ "transaction_attribute" : {
146
+ "conversion_action" : conversion_action_resource_name ,
147
+ "currency_code" : conversion ["currency_code" ],
148
+ "transaction_amount_micros" : int (conversion ["amount" ]),
149
+ "transaction_date_time" : utils .format_date (
150
+ conversion ["time" ]
151
+ ),
152
+ ** (
153
+ {"custom_value" : conversion ["custom_value" ]}
154
+ if "custom_value" in conversion and custom_key
155
+ else {}
156
+ ),
157
+ },
158
+ ** ({"consent" : consent } if consent else {}),
159
+ }
117
160
}
118
- } for conversion in rows ]
161
+ for conversion in rows
162
+ ],
119
163
}
120
164
121
- data_insertion_response = offline_user_data_job_service .add_offline_user_data_job_operations (request = data_insertion_payload )
165
+ data_insertion_response = (
166
+ offline_user_data_job_service .add_offline_user_data_job_operations (
167
+ request = data_insertion_payload
168
+ )
169
+ )
122
170
123
- error_message = utils .print_partial_error_messages (logger , 'uploading ssi' ,
124
- data_insertion_response )
171
+ error_message = utils .print_partial_error_messages (
172
+ logger , "uploading ssi" , data_insertion_response
173
+ )
125
174
if error_message :
126
175
self ._add_error (execution , error_message )
127
176
128
177
# 3. Runs the Job
129
- offline_user_data_job_service .run_offline_user_data_job (resource_name = job_resource_name )
178
+ offline_user_data_job_service .run_offline_user_data_job (
179
+ resource_name = job_resource_name
180
+ )
130
181
131
- def _get_customer_id (self , account_config : AccountConfig , destination : Destination ) -> str :
182
+ def _get_customer_id (
183
+ self , account_config : AccountConfig , destination : Destination
184
+ ) -> str :
132
185
"""
133
- If the customer_id is present on the destination, returns it, otherwise defaults to the account_config info.
186
+ If the customer_id is present on the destination, returns it, otherwise defaults to the account_config info.
134
187
"""
135
- if len (destination .destination_metadata ) >= 2 and len (destination .destination_metadata [1 ]) > 0 :
188
+ if (
189
+ len (destination .destination_metadata ) >= 2
190
+ and len (destination .destination_metadata [1 ]) > 0
191
+ ):
136
192
return Utils .filter_text_only_numbers (destination .destination_metadata [1 ])
137
193
return account_config .google_ads_account_id
138
194
139
- def _get_currency_code (self , destination : Destination ) -> str :
195
+ def _get_custom_key (self , destination : Destination ) -> Optional [ str ] :
140
196
"""
141
- If the currency_code is present on the destination, return it, otherwise default to BRL .
197
+ If the custom_key is present on the destination, return it, otherwise returns none .
142
198
"""
143
- if len (destination .destination_metadata ) >= 5 and len (destination .destination_metadata [3 ]) > 0 :
199
+ if (
200
+ len (destination .destination_metadata ) >= 5
201
+ and destination .destination_metadata [3 ] is not None
202
+ ):
144
203
return destination .destination_metadata [3 ]
145
- return 'BRL'
146
-
147
- def _get_user_data_consent (self , account_config : AccountConfig , destination : Destination ) -> Optional [str ]:
204
+ return None
205
+
206
+ def _get_user_data_consent (
207
+ self , account_config : AccountConfig , destination : Destination
208
+ ) -> Optional [str ]:
148
209
"""
149
- Specifies whether user consent was obtained for the data you are uploading.
150
- https://www.google.com/about/company/user-consent-policy
210
+ Specifies whether user consent was obtained for the data you are uploading.
211
+ https://www.google.com/about/company/user-consent-policy
151
212
"""
152
- if len (destination .destination_metadata ) >= 5 and len (destination .destination_metadata [4 ]) > 0 :
213
+ if (
214
+ len (destination .destination_metadata ) >= 5
215
+ and len (destination .destination_metadata [4 ]) > 0
216
+ ):
153
217
return destination .destination_metadata [4 ]
154
218
return None
155
-
156
- def _get_ad_personalization_consent (self , account_config : AccountConfig , destination : Destination ) -> Optional [str ]:
219
+
220
+ def _get_ad_personalization_consent (
221
+ self , account_config : AccountConfig , destination : Destination
222
+ ) -> Optional [str ]:
157
223
"""
158
- Specifies whether user consent was obtained for the data you are uploading.
159
- https://www.google.com/about/company/user-consent-policy
224
+ Specifies whether user consent was obtained for the data you are uploading.
225
+ https://www.google.com/about/company/user-consent-policy
160
226
"""
161
- if len (destination .destination_metadata ) >= 6 and len (destination .destination_metadata [5 ]) > 0 :
227
+ if (
228
+ len (destination .destination_metadata ) >= 6
229
+ and len (destination .destination_metadata [5 ]) > 0
230
+ ):
162
231
return destination .destination_metadata [5 ]
163
232
return None
164
233
165
- def _get_login_customer_id (self , account_config : AccountConfig , destination : Destination ) -> str :
234
+ def _get_login_customer_id (
235
+ self , account_config : AccountConfig , destination : Destination
236
+ ) -> str :
166
237
"""
167
- If the customer_id in account_config is a mcc, then login with the mcc account id, otherwise use the customer id.
238
+ If the customer_id in account_config is a mcc, then login with the mcc account id, otherwise use the customer id.
168
239
"""
169
240
if account_config ._mcc :
170
241
return account_config .google_ads_account_id
171
-
242
+
172
243
return self ._get_customer_id (account_config , destination )
173
244
174
245
def _get_ads_service (self , customer_id : str ):
175
- return utils .get_ads_service ('GoogleAdsService' , ADS_API_VERSION ,
176
- self .oauth_credentials ,
177
- self .developer_token .get (),
178
- customer_id )
179
-
180
- def _get_resource_name (self , customer_id : str , login_customer_id : str , name : str ):
181
- resource_name = None
182
- service = self ._get_ads_service (login_customer_id )
246
+ return utils .get_ads_service (
247
+ "GoogleAdsService" ,
248
+ ADS_API_VERSION ,
249
+ self .oauth_credentials ,
250
+ self .developer_token .get (),
251
+ customer_id ,
252
+ )
253
+
254
+ def _get_resource_name (self , ads_service , customer_id : str , name : str ):
183
255
query = f"SELECT conversion_action.resource_name FROM conversion_action WHERE conversion_action.name = '{ name } '"
184
- response_query = service .search_stream (customer_id = customer_id , query = query )
256
+ response_query = ads_service .search_stream (customer_id = customer_id , query = query )
185
257
for batch in response_query :
186
- for row in batch .results :
187
- resource_name = row .conversion_action .resource_name
188
- return resource_name
189
-
190
-
258
+ for row in batch .results :
259
+ return row .conversion_action .resource_name
260
+ raise Exception (
261
+ f'Conversion " { name } " could not be found on account { customer_id } '
262
+ )
0 commit comments