Skip to content

Commit

Permalink
CMR-10254: Adding AccessControl functionality, tests, code structure,…
Browse files Browse the repository at this point in the history
… and Docker.
  • Loading branch information
eereiter committed Feb 5, 2025
1 parent 8b71d4c commit 78a900e
Show file tree
Hide file tree
Showing 13 changed files with 507 additions and 27 deletions.
8 changes: 5 additions & 3 deletions subscription/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,25 @@ ARG QUEUE_URL
ARG DEAD_LETTER_QUEUE_URL
ARG SNS_NAME
ARG SUB_DEAD_LETTER_QUEUE_URL
ARG ENVIRONMENT_NAME

#Set environment variables
ENV AWS_REGION=$AWS_REGION
ENV QUEUE_URL=$QUEUE_URL
ENV DEAD_LETTER_QUEUE_URL=$DEAD_LETTER_QUEUE_URL
ENV LONG_POLL_TIME 10
ENV LONG_POLL_TIME=10
ENV SNS_NAME=$SNS_NAME
ENV SUB_DEAD_LETTER_QUEUE_URL=$SUB_DEAD_LETTER_QUEUE_URL
ENV ENVIRONMENT_NAME=$ENVIRONMENT_NAME

#Set working directory
WORKDIR /app

#Copy the application files
COPY *.py .
COPY src/*.py .

#Install the required packages
RUN pip3 install boto3 Flask
RUN pip3 install boto3 Flask requests

#EXPOSE 8089
# Command to run the application
Expand Down
5 changes: 5 additions & 0 deletions subscription/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,8 @@ Buid the project: docker build -t {AWS Repository}/cmr-subscription-worker-{env}
Log in docker to the AWS repository: aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin {AWS Repository}
Using docker to push the deployment artifact: docker push {AWS Repository}/cmr-subscription-worker-{env}:latest
For the ECS to update the service: aws ecs update-service --force-new-deployment --service subscription-worker-sit --cluster cmr-service-sit


## locally
docker build -f Dockerfile.local -t subscription_worker .
run script start.sh
3 changes: 2 additions & 1 deletion subscription/build.sh
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
zip deployment_package.zip subscription_worker.py sns.py part1_docker part_docker
#zip deployment_package.zip subscription_worker.py sns.py part1_docker part_docker
zip deployment_package.zip src Dockerfile
7 changes: 5 additions & 2 deletions subscription/run-tests-cicd.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#!/bin/bash

pip3 install boto3 Flask
python3 -m unittest -v
# This works because I did export PYTHONPATH=src

pip3 install boto3 Flask requests
#python3 -m unittest -v
python3 -m unittest discover -v -s ./test -p "*_test.py"
87 changes: 87 additions & 0 deletions subscription/src/access_control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import os
import requests
from env_vars import Env_Vars
from sys import stdout
from logger import logger

class AccessControl:
"""Encapsulates Access Control API.
This class needs the following environment variables set:
For local development:
ACCESS_CONTROL_URL=http://localhost:3011/access-control
For AWS:
ENVIRONMENT_NAME=SIT
CMR_ACCESS_CONTROL_PROTOCOL=https
CMR_ACCESS_CONTROL_PORT=3011
CMR_ACCESS_CONTROL_HOST=cmr.sit.earthdata.nasa.gov
CMR_ACCESS_CONTROL_RELATIVE_ROOT_URL=access-control
Example Use of this class
access_control = AccessControl()
response = access_control.get_permissions('eereiter', 'C1200484253-CMR_ONLY')
The call is the same as 'curl https://cmr.sit.earthdata.nasa.gov/access-control/permissions?user_id=eereiter&concept_id=C1200484253-CMR_ONLY'
Return is either None (Null or Nil) (if check on response is false) or
{"C1200484253-CMR_ONLY":["read","update","delete","order"]}
"""

def __init__(self):
self.url = None

def get_url_from_parameter_store(self):
# Access Control URL is for local development
access_control_url = os.getenv("ACCESS_CONTROL_URL")

if access_control_url:
self.url = access_control_url
return
else:
# This block gets the access_control URL from the AWS parameter store.
environment_name = os.getenv("ENVIRONMENT_NAME")

if not environment_name:
logger.error("ENVIRONMENT_NAME environment variable is not set")
raise ValueError("ENVIRONMENT_NAME environment variable is not set")

# construct the access control parameter names from the environment variable
pre_fix = f"/{environment_name}/ingest/"
protocol_param_name = f"{pre_fix}CMR_ACCESS_CONTROL_PROTOCOL"
port_param_name = f"{pre_fix}CMR_ACCESS_CONTROL_PORT"
host_param_name = f"{pre_fix}CMR_ACCESS_CONTROL_HOST"
context_param_name = f"{pre_fix}CMR_ACCESS_CONTROL_RELATIVE_ROOT_URL"

env_vars = Env_Vars
protocol = env_vars.get_var(protocol_param_name)
port = env_vars.get_var(port_param_name)
host = env_vars.get_var(host_param_name)
context = env_vars.get_var(context_param_name)
self.url = f"{protocol}://{host}:{port}/{context}"
logger.debug("Subscription Worker Access-Control URL:" + self.url)

def get_url(self):
if not self.url:
self.get_url_from_parameter_store()
return self.url

def get_permissions(self, subscriber_id, concept_id):
# Set the access-control permissions URL.
url = f"{self.get_url()}/permissions"

# Set the parameters
params = {
"user_id": subscriber_id,
"concept_id": concept_id
}

# Make a GET request with parameters
response = requests.get(url, params=params)

# Check if the request was successful
if response.status_code == 200:
# Request was successful
data = response.text
logger.debug("Response data:", data)
return data
else:
# Request failed
logger.warning(f"Subscription Worker getting Access Control permissions request using URL {url} with parameters {params} failed with status code: {response.status_code}")
35 changes: 35 additions & 0 deletions subscription/src/env_vars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import os
import boto3
from botocore.exceptions import ClientError
from sys import stdout

class Env_Vars:
"""Encapsulates Accessing Variables first from the OS
if not there, then the parameter store."""

def __init__(self):
self.ssm_client = boto3.client('ssm')

def get_var(self, name, decryption=False):
value = os.getenv(name)
if value:
print("Value: " + value)
else:
print("No Value")


if not value:
try:
# Get the parameter value from AWS Parameter Store
response = self.ssm_client.get_parameter(Name=name, WithDecryption=decryption)
value = response['Parameter']['Value']
print("if Value: " + value)
return value

except ClientError as e:
print(f"Error retrieving parameter from AWS Parameter Store: {e}")
stdout.flush()
raise
else:
print("Else Value: " + value)
return value
29 changes: 29 additions & 0 deletions subscription/src/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os
import logging
import sys

LOG_LEVEL = os.getenv("LOG_LEVEL")
if not LOG_LEVEL:
LOG_LEVEL = logging.INFO

def setup_logger(name, log_file=None, level=logging.INFO):
"""Function to setup as many loggers as you want"""

formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')

handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)

logger = logging.getLogger(name)
logger.setLevel(level)
logger.addHandler(handler)

if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

return logger

# Create a default logger
logger = setup_logger(name='default_logger', level=LOG_LEVEL)
8 changes: 3 additions & 5 deletions subscription/sns.py → subscription/src/sns.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import boto3
import json
from sys import stdout
from botocore.exceptions import ClientError
from logger import logger

class Sns:
"""Encapsulates AWS SNS topics."""
Expand All @@ -16,8 +16,7 @@ def create_topic(self, topic_name):
try:
topic = self.sns_resource.create_topic(Name=topic_name)
except ClientError as error:
print("Could not get the topic ARN: {error}.")
stdout.flush()
logger.error("Subscription Worker could not get the topic ARN: {error}.")
raise error
else:
return topic
Expand All @@ -40,8 +39,7 @@ def publish_message(topic, message):
else:
response = topic.publish(Subject=message_subject, Message=message_message)
except ClientError as error:
print(f"Could not publish message to topic {topic}. {error}")
stdout.flush()
logger.error(f"Subscription Worker could not publish message to topic {topic}. {error}")
raise error
else:
return response
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import os
from flask import Flask, jsonify
from sns import Sns
from sys import stdout
from botocore.exceptions import ClientError
from access_control import AccessControl
from logger import logger

AWS_REGION = os.getenv("AWS_REGION")
QUEUE_URL = os.getenv("QUEUE_URL")
DEAD_LETTER_QUEUE_URL = os.getenv("DEAD_LETTER_QUEUE_URL")
SUB_DEAD_LETTER_QUEUE_URL = os.getenv("SUB_DEAD_LETTER_QUEUE_URL")
LONG_POLL_TIME = os.getenv("LONG_POLL_TIME")
LONG_POLL_TIME = os.getenv("LONG_POLL_TIME", "10")
SNS_NAME = os.getenv("SNS_NAME")

def receive_message(sqs_client, queue_url):
Expand All @@ -21,8 +22,7 @@ def receive_message(sqs_client, queue_url):
WaitTimeSeconds=(int (LONG_POLL_TIME)))

if len(response.get('Messages', [])) > 0:
print(f"Number of messages received: {len(response.get('Messages', []))}")
stdout.flush()
logger.info(f"Number of messages received: {len(response.get('Messages', []))}")
return response

def delete_message(sqs_client, queue_url, receipt_handle):
Expand All @@ -33,29 +33,45 @@ def delete_messages(sqs_client, queue_url, messages):
receipt_handle = message['ReceiptHandle']
delete_message(sqs_client=sqs_client, queue_url=queue_url, receipt_handle=receipt_handle)

def process_messages(topic, messages):
def process_messages(sns_client, topic, messages, access_control):
for message in messages.get("Messages", []):

# Get the permission for the collection from access-control
# response = access_control.get_permissions(subscriber-id, collection-concept-id)
# Return is either None (Null or Nil) (if check on response is false) or
# {"C1200484253-CMR_ONLY":["read","update","delete","order"]}
#if response and if array contains read:
# publish message.
#else:
# log subscriber-id no longer has read access to collection-concept-id

sns_client.publish_message(topic, message)

def poll_queue(running):
""" Poll the SQS queue and process messages. """

sqs_client = boto3.client("sqs", region_name=AWS_REGION)
sns_resource = boto3.resource("sns", region_name=AWS_REGION)
sns_client = Sns(sns_resource)
topic = sns_client.create_topic(SNS_NAME)

access_control = AccessControl()
while running.value:
try:
# Poll the SQS
messages = receive_message(sqs_client=sqs_client, queue_url=QUEUE_URL)

if messages:
process_messages(topic=topic, messages=messages)
process_messages(sns_client=sns_client, topic=topic, messages=messages, access_control=access_control)
delete_messages(sqs_client=sqs_client, queue_url=QUEUE_URL, messages=messages)

dl_messages = receive_message(sqs_client=sqs_client, queue_url=DEAD_LETTER_QUEUE_URL)
if dl_messages:
process_messages(topic=topic, messages=dl_messages)
process_messages(sns_client=sns_client, topic=topic, messages=dl_messages, access_control=access_control)
delete_messages(sqs_client=sqs_client, queue_url=DEAD_LETTER_QUEUE_URL, messages=dl_messages)

except Exception as e:
print(f"An error occurred receiving or deleting messages: {e}")
stdout.flush()
logger.warning(f"An error occurred receiving or deleting messages: {e}")

app = Flask(__name__)
@app.route('/shutdown', methods=['POST'])
Expand All @@ -66,16 +82,11 @@ def shutdown():
running.value = False
return jsonify({'status': 'shutting down'})

sqs_client = boto3.client("sqs", region_name=AWS_REGION)
sns_resource = boto3.resource("sns")
sns_client = Sns(sns_resource)
topic = sns_client.create_topic(SNS_NAME)

#Shared boolean value for process communication
running = multiprocessing.Value('b',True)

if __name__ == "__main__":
print("Starting to poll the SQS queue...")
logger.info("The subscription worker is starting to poll the SQS queue...")
# Start the polling process
poll_process = multiprocessing.Process(target=poll_queue, args=(running,))
poll_process.start()
Expand All @@ -86,4 +97,4 @@ def shutdown():

# Wait for the polling process to finish before exiting
poll_process.join()
print("Exited polling loop.")
logger.info("The subscription worker exited the polling loop.")
Loading

0 comments on commit 78a900e

Please sign in to comment.