Skip to content
This repository was archived by the owner on Apr 3, 2025. It is now read-only.

Commit f05db90

Browse files
authored
Merge pull request #62 from google/mail_notifications
Mail notifications
2 parents f3a7478 + 8eb4259 commit f05db90

30 files changed

+1033
-321
lines changed

cloud_config/generate_megalista_token.py

100644100755
+2-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@
3434
'https://www.googleapis.com/auth/dfatrafficking',
3535
'https://www.googleapis.com/auth/ddmconversions',
3636
"https://www.googleapis.com/auth/analytics.edit",
37-
'https://www.googleapis.com/auth/spreadsheets.readonly']
37+
'https://www.googleapis.com/auth/spreadsheets.readonly',
38+
'https://www.googleapis.com/auth/gmail.send']
3839

3940
# The redirect URI set for the given Client ID. The redirect URI for Client ID
4041
# generated for an installed application will always have this value.

megalista_dataflow/error/__init__.py

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2022 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
+185
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
# Copyright 2022 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import base64
15+
import logging
16+
from email.mime.text import MIMEText
17+
from typing import Iterable
18+
19+
from apache_beam.options.value_provider import ValueProvider
20+
from google.oauth2.credentials import Credentials
21+
from googleapiclient.discovery import build
22+
23+
from models.execution import DestinationType, Execution
24+
from models.oauth_credentials import OAuthCredentials
25+
26+
27+
class Error:
28+
"""
29+
Holds errors executions and respective error messages
30+
"""
31+
32+
def __init__(self, execution: Execution, error_message):
33+
self._execution = execution
34+
self._error_message = error_message
35+
36+
@property
37+
def execution(self):
38+
return self._execution
39+
40+
@property
41+
def error_message(self):
42+
return self._error_message
43+
44+
def __str__(self):
45+
return f'Execution: {self.execution}. Error message: {self.error_message}'
46+
47+
def __eq__(self, other):
48+
return self.execution == other.execution and self.error_message == other.error_message
49+
50+
def __hash__(self):
51+
return hash((self.execution, self.error_message))
52+
53+
54+
class ErrorNotifier:
55+
"""
56+
Abstract class to notify errors. The mean is defined by the implementation.
57+
"""
58+
59+
def notify(self, destination_type: DestinationType, errors: Iterable[Error]):
60+
raise NotImplementedError()
61+
62+
63+
class GmailNotifier(ErrorNotifier):
64+
"""
65+
Notify errors sending emails through the gMail API. Uses the main application credentials.
66+
"""
67+
68+
def __init__(self, should_notify: ValueProvider, oauth_credentials: OAuthCredentials,
69+
email_destinations: ValueProvider):
70+
self._oauth_credentials = oauth_credentials
71+
self._email_destinations = email_destinations
72+
self._should_notify_param = should_notify
73+
self._parsed_emails = None
74+
75+
def _get_gmail_service(self):
76+
credentials = Credentials(
77+
token=self._oauth_credentials.get_access_token(),
78+
refresh_token=self._oauth_credentials.get_refresh_token(),
79+
client_id=self._oauth_credentials.get_client_id(),
80+
client_secret=self._oauth_credentials.get_client_secret(),
81+
token_uri='https://accounts.google.com/o/oauth2/token',
82+
scopes=[
83+
'https://www.googleapis.com/auth/gmail.send'])
84+
85+
return build('gmail', 'v1', credentials=credentials)
86+
87+
def _should_notify(self):
88+
if not self._should_notify_param.get():
89+
return False
90+
should_notify = self._should_notify_param.get()
91+
return should_notify.lower() == 'true'
92+
93+
def notify(self, destination_type: DestinationType, errors: Iterable[Error]):
94+
logger = logging.getLogger('megalista.GmailNotifier')
95+
if not self._should_notify():
96+
logger.info(f'Skipping sending emails notifying of errors: {", ".join(map(str, errors))}')
97+
return
98+
99+
body = self._build_email_body(destination_type, errors)
100+
101+
gmail_service = self._get_gmail_service()
102+
103+
message = MIMEText(body, 'html')
104+
message['to'] = ','.join(self.email_destinations)
105+
message['from'] = 'me'
106+
message['subject'] = f'[Action Required] Megalista error detected - {destination_type.name}'
107+
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
108+
109+
logger.info(
110+
f'Sending error email to {", ".join(self.email_destinations)} regarding the conector {destination_type.name}')
111+
112+
try:
113+
gmail_service.users().messages().send(userId='me', body={'raw': raw}).execute()
114+
except:
115+
logger.error(
116+
f'Error sending email to {", ".join(self.email_destinations)} regarding the conector {destination_type.name}')
117+
raise
118+
119+
@property
120+
def email_destinations(self) -> Iterable[str]:
121+
if self._parsed_emails:
122+
return self._parsed_emails
123+
124+
self._parsed_emails = list(map(lambda email: email.strip(), self._email_destinations.get().split(',')))
125+
return self._parsed_emails
126+
127+
def _build_email_body(self, destination_type: DestinationType, errors: Iterable[Error]):
128+
body = f'''<h3>Hello, Megalista user.</h3>
129+
This is an error summary for the destination: <b>{destination_type.name}</b>.'''
130+
131+
body += '''<p>
132+
<b>Errors list:</b>
133+
<ul>'''
134+
135+
for error in errors:
136+
body += f'''
137+
<li>Error for source <b>"{error.execution.source.source_name}"</b> and destination
138+
<b>"{error.execution.destination.destination_name}"</b>: {error.error_message}</b>
139+
</li>'''
140+
141+
body += '</ul>'
142+
143+
return body
144+
145+
146+
class ErrorHandler:
147+
"""
148+
Accumulate errors and notify them.
149+
Only record one message by Execution.
150+
Notification details are decided by the ErrorNotifier received in the constructor.
151+
"""
152+
153+
def __init__(self, destination_type: DestinationType, error_notifier: ErrorNotifier):
154+
self._destination_type = destination_type
155+
self._error_notifier = error_notifier
156+
self._errors = {}
157+
158+
def add_error(self, execution: Execution, error_message: str):
159+
"""
160+
Add an error to be logged.
161+
Only record one error per Execution, so the output message isn't too long.
162+
"""
163+
164+
if execution.destination.destination_type != self._destination_type:
165+
raise ValueError(
166+
f'Received a error of destination type: {execution.destination.destination_type}'
167+
f' but this error handler is initialized with {self._destination_type} destination type')
168+
169+
error = Error(execution, error_message)
170+
171+
self._errors[error.execution] = error
172+
173+
@property
174+
def errors(self):
175+
return self._errors.copy()
176+
177+
def notify_errors(self):
178+
"""
179+
Send the errors accumulated by email.
180+
Does nothing if no errors were received.
181+
"""
182+
if len(self.errors) == 0:
183+
return
184+
185+
self._error_notifier.notify(self._destination_type, self.errors.values())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
# Copyright 2022 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from typing import Iterable
15+
16+
import pytest
17+
from apache_beam.options.value_provider import StaticValueProvider
18+
19+
from error.error_handling import ErrorHandler, Error, GmailNotifier, ErrorNotifier
20+
from models.execution import DestinationType, Execution, AccountConfig, Source, SourceType, Destination
21+
from models.oauth_credentials import OAuthCredentials
22+
23+
24+
class MockErrorNotifier(ErrorNotifier):
25+
def __init__(self):
26+
self.were_errors_sent = False
27+
self.errors_sent = {}
28+
self.destination_type = None
29+
30+
def notify(self, destination_type: DestinationType, errors: Iterable[Error]):
31+
self.were_errors_sent = True
32+
self.errors_sent = {error.execution: error.error_message for error in errors}
33+
self.destination_type = destination_type
34+
35+
36+
# ErrorHandler tests
37+
38+
def create_execution(source_name, destination_name):
39+
account_config = AccountConfig('', False, '', '', '')
40+
source = Source(source_name, SourceType.BIG_QUERY, ['', ''])
41+
destination = Destination(destination_name, DestinationType.ADS_OFFLINE_CONVERSION, [''])
42+
return Execution(account_config, source, destination)
43+
44+
45+
def test_single_error_per_execution():
46+
error_handler = ErrorHandler(DestinationType.ADS_OFFLINE_CONVERSION, MockErrorNotifier())
47+
48+
first_execution = create_execution('source1', 'destination1')
49+
second_execution = create_execution('source1', 'destination2')
50+
51+
error_handler.add_error(first_execution, 'Error for first execution, fist input')
52+
error_handler.add_error(first_execution, 'Error for first execution, second input')
53+
error_handler.add_error(second_execution, 'Error for second execution, fist input')
54+
55+
returned_errors = error_handler.errors
56+
assert len(returned_errors) == 2
57+
assert returned_errors.keys() == {first_execution, second_execution}
58+
59+
60+
def test_destination_type_consistency():
61+
error_handler = ErrorHandler(DestinationType.CM_OFFLINE_CONVERSION, MockErrorNotifier())
62+
wrong_destination_type_execution = create_execution('source', 'destination')
63+
64+
with pytest.raises(ValueError):
65+
error_handler.add_error(wrong_destination_type_execution, 'error message')
66+
67+
68+
def test_errors_sent_to_error_notifier():
69+
mock_notifier = MockErrorNotifier()
70+
error_handler = ErrorHandler(DestinationType.ADS_OFFLINE_CONVERSION, mock_notifier)
71+
72+
first_execution = create_execution('source1', 'destination1')
73+
second_execution = create_execution('source1', 'destination2')
74+
75+
error_handler.add_error(first_execution, 'Error for first execution, fist input')
76+
error_handler.add_error(second_execution, 'Error for second execution, fist input')
77+
78+
error_handler.notify_errors()
79+
80+
assert mock_notifier.were_errors_sent is True
81+
assert mock_notifier.errors_sent == {first_execution: 'Error for first execution, fist input',
82+
second_execution: 'Error for second execution, fist input'}
83+
assert mock_notifier.destination_type == DestinationType.ADS_OFFLINE_CONVERSION
84+
85+
86+
def test_should_not_notify_when_empty_errors():
87+
mock_notifier = MockErrorNotifier()
88+
error_handler = ErrorHandler(DestinationType.ADS_OFFLINE_CONVERSION, mock_notifier)
89+
90+
error_handler.notify_errors()
91+
92+
assert mock_notifier.were_errors_sent is False
93+
94+
95+
# GmailNotifier tests
96+
97+
def test_multiple_destinations_separated_by_comma():
98+
first_email = '[email protected]'
99+
second_email = '[email protected]'
100+
third_email = '[email protected]'
101+
102+
credentials = OAuthCredentials('', '', '', '')
103+
gmail_notifier = GmailNotifier(StaticValueProvider(str, 'true'), credentials,
104+
StaticValueProvider(str, f'{first_email}, {second_email} ,{third_email}'))
105+
106+
emails = set(gmail_notifier.email_destinations)
107+
assert len(emails) == 3
108+
assert set(emails) == {first_email, third_email, second_email}
109+
110+
111+
def test_should_not_notify_when_param_is_false():
112+
first_email = '[email protected]'
113+
second_email = '[email protected]'
114+
third_email = '[email protected]'
115+
116+
credentials = OAuthCredentials('', '', '', '')
117+
gmail_notifier = GmailNotifier(StaticValueProvider(str, 'false'), credentials,
118+
StaticValueProvider(str, f'{first_email}, {second_email} ,{third_email}'))
119+
120+
gmail_notifier.notify(DestinationType.ADS_OFFLINE_CONVERSION, [Error(create_execution('s', 'd'), 'error message')])
121+
122+
123+
def test_should_not_notify_when_param_is_empty():
124+
first_email = '[email protected]'
125+
second_email = '[email protected]'
126+
third_email = '[email protected]'
127+
128+
credentials = OAuthCredentials('', '', '', '')
129+
gmail_notifier = GmailNotifier(StaticValueProvider(str, None), credentials,
130+
StaticValueProvider(str, f'{first_email}, {second_email} ,{third_email}'))
131+
132+
gmail_notifier.notify(DestinationType.ADS_OFFLINE_CONVERSION, [Error(create_execution('s', 'd'), 'error message')])

0 commit comments

Comments
 (0)