Skip to content

Commit

Permalink
Patch pjoshi aurora (#395)
Browse files Browse the repository at this point in the history
* Upgrade Github actions used in `dockerimage` action (#379)

* upgrade github actions used in dockerimage action

* remove setup-buildx-action and pin to hashes.

* change deprecated pkg_resources to importlib.metadata (#387)

* In a previous commit, the detection of a failure became too aggressive. (#386)

* In a previous commit, the detection of a failure became too aggressive.

This remediates this by considering a run 'failed' if the hb hasn't been
updated within heartbeat_cutoff time as opposed to the heartbeat_threshold time

* change run finished at query to heartbeat_cutoff from threshold

* clean up unused values from run query

---------

Co-authored-by: Sakari Ikonen <[email protected]>

* fix PATH_PREFIX handling in metadata service so it doesn't interfere with mfgui routes (#388)

* Configurable SSL Connection (#373)

* [TRIS-297] Configurable SSL Connection (#1)

* Configurable SSL connection

* Update services/utils/__init__.py

* no ssl unit testing (#3)

* ssl seperate test (#4)

* dsn generator sslmode none (#5)

* fix run_goose.py not working without SSL mode env variables. (#390)

* change run inactive cutoff default to 6 minutes. cleanup unused constant (#392)

* clarify comment on read replica hosts

* make USE_SEPARATE_READER_POOL a boolean

* remove unnecessary conditionals for pool choice in execute_sql

---------

Co-authored-by: Tom Furmston <[email protected]>
Co-authored-by: Romain <[email protected]>
Co-authored-by: Oleg Avdeev <[email protected]>
Co-authored-by: RikishK <[email protected]>
  • Loading branch information
5 people authored Oct 25, 2023
1 parent 3b9b507 commit 158e27f
Show file tree
Hide file tree
Showing 11 changed files with 120 additions and 61 deletions.
37 changes: 26 additions & 11 deletions .github/workflows/dockerimage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,37 @@ name: Docker Image CI

on:
release:
branches: [ master ]
branches: [ master ]

jobs:

build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0
- uses: docker/build-push-action@3e7a4f6646880c6f63758d73ac32392d323eaf8f # v1.1.2
-
name: Checkout
uses: actions/checkout@f43a0e5ff2bd294095638e18286ca9a3d1956744 # v3.6.0
-
name: Docker meta
id: meta
uses: docker/metadata-action@818d4b7b91585d195f67373fd9cb0332e31a7175 # v4.6.0
with:
images: |
netflixoss/metaflow_metadata_service
tags: |
type=semver,pattern={{raw}}
type=sha
type=raw,value=latest
-
name: Login to Docker Hub
uses: docker/login-action@465a07811f14bebb1938fbed4728c6a1ff8901fc # v2.2.0
with:
username: ${{ secrets.DOCKER_USERNAME_NETFLIX_OSS }}
password: ${{ secrets.DOCKER_AUTH_TOKEN_NETFLIX_OSS }}
repository: netflixoss/metaflow_metadata_service
tag_with_ref: true
tag_with_sha: true
tags: "latest"
dockerfile: ${{ github.workspace }}/Dockerfile
-
name: Build and push # We have a single-platform build, so use of setup-buildx-action is currently omitted.
uses: docker/build-push-action@2eb1c1961a95fc15694676618e422e8ba1d63825 # v4.1.1
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
28 changes: 21 additions & 7 deletions run_goose.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,27 @@ def main():
help="Path to migration files")
args = parser.parse_args()

db_connection_string = "postgresql://{}:{}@{}:{}/{}?sslmode=disable".format(
quote(os.environ["MF_METADATA_DB_USER"]),
quote(os.environ["MF_METADATA_DB_PSWD"]),
os.environ["MF_METADATA_DB_HOST"],
os.environ["MF_METADATA_DB_PORT"],
os.environ["MF_METADATA_DB_NAME"],
)
db_connection_string = f'postgresql://{quote(os.environ["MF_METADATA_DB_USER"])}:'\
f'{quote(os.environ["MF_METADATA_DB_PSWD"])}@{os.environ["MF_METADATA_DB_HOST"]}:'\
f'{os.environ["MF_METADATA_DB_PORT"]}/{os.environ["MF_METADATA_DB_NAME"]}'

ssl_mode = os.environ.get("MF_METADATA_DB_SSL_MODE")
ssl_cert_path = os.environ.get("MF_METADATA_DB_SSL_CERT_PATH")
ssl_key_path = os.environ.get("MF_METADATA_DB_SSL_KEY_PATH")
ssl_root_cert_path = os.environ.get("MF_METADATA_DB_SSL_ROOT_CERT")

if ssl_mode in ['allow', 'prefer', 'require', 'verify-ca', 'verify-full']:
ssl_query = f'ssl_mode={ssl_mode}'
if ssl_cert_path is not None:
ssl_query = f'{ssl_query}&sslcert={ssl_cert_path}'
if ssl_key_path is not None:
ssl_query = f'{ssl_query}&sslkey={ssl_key_path}'
if ssl_root_cert_path is not None:
ssl_query = f'{ssl_query}&sslrootcert={ssl_root_cert_path}'
else:
ssl_query = f'sslmode=disable'

db_connection_string = f'{db_connection_string}?{ssl_query}'

if args.wait:
wait_for_postgres(db_connection_string, timeout_seconds=args.wait)
Expand Down
6 changes: 3 additions & 3 deletions services/data/postgres_async_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ async def _init(self, db_conf: DBConfiguration, create_triggers=DB_TRIGGER_CREAT
maxsize=db_conf.pool_max,
timeout=db_conf.timeout,
pool_recycle=10 * db_conf.timeout,
echo=AIOPG_ECHO) if USE_SEPARATE_READER_POOL == "1" else self.pool
echo=AIOPG_ECHO) if USE_SEPARATE_READER_POOL else self.pool

for table in self.tables:
await table._init(create_triggers=create_triggers)

if USE_SEPARATE_READER_POOL == "1":
if USE_SEPARATE_READER_POOL:
self.logger.info(
"Writer Connection established.\n"
" Pool min: {pool_min} max: {pool_max}\n".format(
Expand Down Expand Up @@ -267,7 +267,7 @@ async def _execute_on_cursor(_cur):
body, pagination = await _execute_on_cursor(cur)
return DBResponse(response_code=200, body=body), pagination
else:
db_pool = self.db.reader_pool if USE_SEPARATE_READER_POOL == "1" else self.db.pool
db_pool = self.db.reader_pool # defaults to self.db.pool if no separate reader_pool
with (await db_pool.cursor(
cursor_factory=psycopg2.extras.DictCursor
)) as cur:
Expand Down
4 changes: 2 additions & 2 deletions services/metadata_service/api/utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import json
from functools import wraps

import pkg_resources
import collections
from aiohttp import web
from multidict import MultiDict
from importlib import metadata

from services.utils import get_traceback_str

version = pkg_resources.require("metadata_service")[0].version
version = metadata.version("metadata_service")
METADATA_SERVICE_VERSION = version
METADATA_SERVICE_HEADER = 'METADATA_SERVICE_VERSION'

Expand Down
10 changes: 5 additions & 5 deletions services/metadata_service/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
PATH_PREFIX = os.environ.get("PATH_PREFIX", "")


def app(loop=None, db_conf: DBConfiguration = None, middlewares=None):
def app(loop=None, db_conf: DBConfiguration = None, middlewares=None, path_prefix=""):

loop = loop or asyncio.get_event_loop()

_app = web.Application(loop=loop)
app = web.Application(loop=loop) if len(PATH_PREFIX) > 0 else _app
app = web.Application(loop=loop) if path_prefix else _app
async_db = AsyncPostgresDB()
loop.run_until_complete(async_db._init(db_conf))
FlowApi(app)
Expand All @@ -34,16 +34,16 @@ def app(loop=None, db_conf: DBConfiguration = None, middlewares=None):
ArtificatsApi(app)
AuthApi(app)

if len(PATH_PREFIX) > 0:
_app.add_subapp(PATH_PREFIX, app)
if path_prefix:
_app.add_subapp(path_prefix, app)
if middlewares:
_app.middlewares.extend(middlewares)
return _app


def main():
loop = asyncio.get_event_loop()
the_app = app(loop, DBConfiguration())
the_app = app(loop, DBConfiguration(), path_prefix=PATH_PREFIX)
handler = web.AppRunner(the_app)
loop.run_until_complete(handler.setup())

Expand Down
4 changes: 2 additions & 2 deletions services/ui_backend_service/data/db/tables/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
os.environ.get("OLD_RUN_FAILURE_CUTOFF_TIME", 60 * 60 * 24 * 1000 * 1)
)
# Time before a run with a heartbeat will be considered inactive (and thus failed).
# Default to 1 day (in seconds)
# Default to 6 minutes (in seconds)
RUN_INACTIVE_CUTOFF_TIME = int(
os.environ.get("RUN_INACTIVE_CUTOFF_TIME", 60 * 60 * 24 * 1)
os.environ.get("RUN_INACTIVE_CUTOFF_TIME", 60 * 6)
)


Expand Down
7 changes: 1 addition & 6 deletions services/ui_backend_service/data/db/tables/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import List, Tuple
from .base import (
AsyncPostgresTable,
HEARTBEAT_THRESHOLD,
OLD_RUN_FAILURE_CUTOFF_TIME,
RUN_INACTIVE_CUTOFF_TIME,
)
Expand Down Expand Up @@ -111,13 +110,12 @@ def select_columns(self):
WHEN end_attempt_ok IS NOT NULL
THEN end_attempt_ok.ts_epoch
WHEN {table_name}.last_heartbeat_ts IS NOT NULL
AND @(extract(epoch from now())-{table_name}.last_heartbeat_ts)<={heartbeat_threshold}
AND @(extract(epoch from now())-{table_name}.last_heartbeat_ts)<={heartbeat_cutoff}
THEN NULL
ELSE {table_name}.last_heartbeat_ts*1000
END) AS finished_at
""".format(
table_name=table_name,
heartbeat_threshold=HEARTBEAT_THRESHOLD,
heartbeat_cutoff=RUN_INACTIVE_CUTOFF_TIME,
),
"""
Expand All @@ -136,8 +134,6 @@ def select_columns(self):
END) AS status
""".format(
table_name=table_name,
heartbeat_threshold=HEARTBEAT_THRESHOLD,
cutoff=OLD_RUN_FAILURE_CUTOFF_TIME,
heartbeat_cutoff=RUN_INACTIVE_CUTOFF_TIME,
),
"""
Expand All @@ -157,7 +153,6 @@ def select_columns(self):
END) AS duration
""".format(
table_name=table_name,
heartbeat_threshold=HEARTBEAT_THRESHOLD,
cutoff=OLD_RUN_FAILURE_CUTOFF_TIME,
),
]
Expand Down
2 changes: 1 addition & 1 deletion services/ui_backend_service/docs/environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ The threshold parameters for heartbeat checks can also be configured when necess
`OLD_RUN_FAILURE_CUTOFF_TIME` [ for runs that do not have a heartbeat, controls at what point a running status run should be considered failed. Default is 1 day (in milliseconds)]
`RUN_INACTIVE_CUTOFF_TIME` [ for runs that have a heartbeat, controls how long a run with a failed heartbeat should wait for possibly queued tasks to start and resume heartbeat updates. Default is 1 day (in seconds)]
`RUN_INACTIVE_CUTOFF_TIME` [ for runs that have a heartbeat, controls how long a run with a failed heartbeat should wait for possibly queued tasks to start and resume heartbeat updates. Default is 6 minutes (in seconds)]
## Baseurl configuration
Expand Down
77 changes: 54 additions & 23 deletions services/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import sys
import os
import traceback
import pkg_resources
from multidict import MultiDict
from urllib.parse import urlencode, quote
from aiohttp import web
Expand All @@ -12,10 +11,11 @@
import logging
import psycopg2
from packaging.version import Version, parse
from importlib import metadata

USE_SEPARATE_READER_POOL = os.environ.get("USE_SEPARATE_READER_POOL", "0")
USE_SEPARATE_READER_POOL = os.environ.get("USE_SEPARATE_READER_POOL", "0") in ["True", "true", "1"]

version = pkg_resources.require("metadata_service")[0].version
version = metadata.version("metadata_service")

METADATA_SERVICE_VERSION = version
METADATA_SERVICE_HEADER = 'METADATA_SERVICE_VERSION'
Expand Down Expand Up @@ -203,6 +203,10 @@ def __init__(self,
user: str = "postgres",
password: str = "postgres",
database_name: str = "postgres",
ssl_mode: str = "disabled",
ssl_cert_path: str = None,
ssl_key_path: str = None,
ssl_root_cert_path: str = None,
prefix="MF_METADATA_DB_",
pool_min: int = 1,
pool_max: int = 10,
Expand All @@ -216,11 +220,15 @@ def __init__(self,
self._dsn = None
self._host = os.environ.get(prefix + "HOST", host)
self._read_replica_host = \
os.environ.get(prefix + "READ_REPLICA_HOST") if USE_SEPARATE_READER_POOL == "1" else self._host
os.environ.get(prefix + "READ_REPLICA_HOST") if USE_SEPARATE_READER_POOL else self._host
self._port = int(os.environ.get(prefix + "PORT", port))
self._user = os.environ.get(prefix + "USER", user)
self._password = os.environ.get(prefix + "PSWD", password)
self._database_name = os.environ.get(prefix + "NAME", database_name)
self._ssl_mode = os.environ.get(prefix + "SSL_MODE", ssl_mode)
self._ssl_cert_path = os.environ.get(prefix + "SSL_CERT_PATH", ssl_cert_path)
self._ssl_key_path = os.environ.get(prefix + "SSL_KEY_PATH", ssl_key_path),
self._ssl_root_cert_path = os.environ.get(prefix + "SSL_ROOT_CERT_PATH", ssl_root_cert_path)
conn_str_required_values = [
self._host,
self._port,
Expand Down Expand Up @@ -260,29 +268,52 @@ def _is_valid_dsn(dsn):
def connection_string_url(self, type=None):
# postgresql://[user[:password]@][host][:port][/dbname][?param1=value1&...]
if type is None or type == DBType.WRITER:
return f'postgresql://{quote(self._user)}:{quote(self._password)}@{self._host}:{self._port}/{self._database_name}?sslmode=disable'
base_url = f'postgresql://{quote(self._user)}:{quote(self._password)}@{self._host}:{self._port}/{self._database_name}'
elif type == DBType.READER:
return f'postgresql://{quote(self._user)}:{quote(self._password)}@{self._read_replica_host}:{self._port}/{self._database_name}?sslmode=disable'
base_url = f'postgresql://{quote(self._user)}:{quote(self._password)}@{self._read_replica_host}:{self._port}/{self._database_name}'

if (self._ssl_mode in ['allow', 'prefer', 'require', 'verify-ca', 'verify-full']):
ssl_query = f'sslmode={self._ssl_mode}'
if self._ssl_cert_path is not None:
ssl_query = f'{ssl_query}&sslcert={self._ssl_cert_path}'
if self._ssl_key_path is not None:
ssl_query = f'{ssl_query}&sslkey={self._ssl_key_path}'
if self._ssl_root_cert_path is not None:
ssl_query = f'{ssl_query}&sslrootcert={self._ssl_root_cert_path}'
else:
ssl_query = f'sslmode=disable'

return f'{base_url}?{ssl_query}'

def get_dsn(self, type=None):
if self._dsn is None:
if type is None or type == DBType.WRITER:
return psycopg2.extensions.make_dsn(
dbname=self._database_name,
user=self._user,
host=self._host,
port=self._port,
password=self._password
)
elif type == DBType.READER:
# we assume that everything except the hostname remains the same for a reader
return psycopg2.extensions.make_dsn(
dbname=self._database_name,
user=self._user,
host=self._read_replica_host,
port=self._port,
password=self._password
)
ssl_mode = self._ssl_mode
sslcert = self._ssl_cert_path
sslkey = self._ssl_key_path
sslrootcert = self._ssl_root_cert_path
if (ssl_mode not in ['allow', 'prefer', 'require', 'verify-ca', 'verify-full']):
ssl_mode = None
sslcert = None
sslkey = None
sslrootcert = None
kwargs = {
'dbname': self._database_name,
'user': self._user,
'host': self._host,
'port': self._port,
'password': self._password,
'sslmode': ssl_mode,
'sslcert': sslcert,
'sslkey': sslkey,
'sslrootcert': sslrootcert
}

if type == DBType.READER:
# We assume that everything except the hostname remains the same for a reader.
# At the moment this is a fair assumption for Postgres read replicas.
kwargs.update({"host":self._read_replica_host})

return psycopg2.extensions.make_dsn(**{k: v for k, v in kwargs.items() if v is not None})
else:
return self._dsn

Expand Down
5 changes: 4 additions & 1 deletion services/utils/tests/unit_tests/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ def test_db_conf_env_dsn():
with set_env({'MF_METADATA_DB_DSN': 'dbname=testgres user=test_user host=test_host port=1234 password=test_pwd'}):
# valid DSN in env should set correctly.
assert DBConfiguration().get_dsn() == 'dbname=testgres user=test_user host=test_host port=1234 password=test_pwd'


with set_env({'MF_METADATA_DB_DSN': 'dbname=testgres user=test_user host=test_host port=1234 password=test_pwd sslmode=verify-full sslrootcert=/test'}):
# valid DSN in env should set correctly.
assert DBConfiguration().get_dsn() == 'dbname=testgres user=test_user host=test_host port=1234 password=test_pwd sslmode=verify-full sslrootcert=/test'

def test_db_conf_pool_size():
with set_env():
Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ commands = pytest --cov=services -m unit_tests

[testenv:integration]
commands = pytest --cov=services -m integration_tests

0 comments on commit 158e27f

Please sign in to comment.