Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
jmaeng72 committed Feb 5, 2025
1 parent 36985aa commit 7c322c3
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 410 deletions.
2 changes: 1 addition & 1 deletion ingest-app/src/cmr/ingest/api/subscriptions.clj
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@
parsed)]
(when-not (= CMR_PROVIDER provider-id)
(api-core/verify-provider-exists context provider-id))
(validate-user-id context subscriber-id) ;; TODO temp comment out to bypass user id check
(validate-user-id context subscriber-id)
(validate-query context parsed)
(validate-subscription-endpoint parsed)
(let [parsed-metadata (assoc parsed :SubscriberId subscriber-id)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -889,8 +889,6 @@
context
(ingest-events/associations-update-event associations))))))

;; TODO jyna main concepts func
;; TODO jyna saving subscription or granule or any other concept falls to this func
;; false implies creation of a non-tombstone revision
(defmethod save-concept-revision false
[context concept]
Expand Down
176 changes: 13 additions & 163 deletions metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@
:concept-type :subscription
:collection-concept-id coll-concept-id})))

;;TODO unit test
(defn convert-concept-to-edn
"Converts the passed in concept to edn"
[subscription]
(update subscription :metadata #(json/decode % true)))

;; TODO unit test
(defn convert-and-filter-subscriptions
"Convert the metadata of the subscriptions to edn and then filter out the non
ingest subscriptions."
Expand All @@ -58,33 +60,6 @@
(= false (:deleted %)))
subs)))

(defn add-to-existing-mode
"Depending on the passed in new-mode [\"New\" \"Update\"] create a structure that merges
the new mode to the existing mode. The result looks like [\"New\" \"Update\"]"
[existing-modes new-modes]
(loop [ms new-modes
result existing-modes]
(let [mode (first ms)]
(if (nil? mode)
result
(if result
(if (some #(= mode %) result)
(recur (rest ms) result)
(recur (rest ms) (merge result mode)))
(recur (rest ms) [mode]))))))

;(defn merge-modes
; "Go through the list of subscriptions and combine the :modes found into one array.
; Return the array of modes."
; [subscriptions]
; (loop [subs subscriptions
; result []]
; (let [sub (first subs)]
; (if (nil? sub)
; result
; (recur (rest subs) (add-to-existing-mode result (get-in sub [:metadata :Mode])))))))


(use 'clojure.set)
(defn create-mode-to-endpoints-map
"Creates a mode-to-endpoints map given a list of subscriptions all for the same one collection.
Expand Down Expand Up @@ -113,18 +88,6 @@
)
@final-map))

;(defn change-subscription
; "When a subscription is added or deleted, the collection-concept-id must be put into
; or deleted from the subscription cache. Get the subscriptions that match the collection-concept-id
; from the database and rebuild the modes list. Return 1 if successful 0 otherwise."
; [context concept-edn]
; (let [coll-concept-id (:CollectionConceptId (:metadata concept-edn))
; subs (convert-and-filter-subscriptions (get-subscriptions-from-db context coll-concept-id))]
; (if (seq subs)
; (subscription-cache/set-value context coll-concept-id (merge-modes subs))
; (subscription-cache/remove-value context coll-concept-id))))


(defn change-subscription-in-cache
"When a subscription is added or deleted, the collection-concept-id must be added or deleted from the subscription cache.
Get the subscriptions that match the collection-concept-id from the database and rebuild the info map for this collection.
Expand All @@ -143,7 +106,7 @@
(subscription-cache/remove-value context coll-concept-id))))

;;
;; The functions below are for subscribing and unsubscribing and endpoint to the topic.
;; The functions below are for subscribing and unsubscribing an endpoint to the topic.
;;

(defn add-or-delete-ingest-subscription-in-cache
Expand Down Expand Up @@ -180,6 +143,7 @@
(some? (re-matches #"http://localhost:9324.*" endpoint))
false))

;;TODO unit test this
(defn attach-subscription-to-topic
"If valid ingest subscription, will attach the subscription concept's sqs arn to external SNS topic
and will add the sqs arn used as an extra field to the concept"
Expand All @@ -195,6 +159,8 @@
concept))
concept)))


;;TODO unit test this
(defn set-subscription-arn-if-applicable
"If subscription has an endpoint that is an SQS ARN, then it will attach the SQS ARN to the CMR SNS external topic and
save the SQS ARN to the subscription concept.
Expand All @@ -210,6 +176,7 @@
;; and we don't consider that an error
concept))

;:TODO unit test
(defn delete-ingest-subscription
"Remove the subscription from the cache and unsubscribe the subscription from
the topic if applicable.
Expand All @@ -231,73 +198,6 @@
;; The functions below are for refreshing the subscription cache if needed.
;;

;;TODO update this with new structure
;(defn create-subscription-cache-contents-for-refresh
; "Go through all the subscriptions and find the ones that are
; ingest subscriptions. Create the mode values for each collection-concept-id
; and put those into a map. The end result looks like:
; {Collection concept id 1: [\"New\" \"Update\"]
; Collection concept id 2: [\"New\" \"Update\" \"Delete\"]
; ...}"
; [result sub]
; (let [concept-edn (convert-concept-to-edn sub)
; metadata-edn (:metadata concept-edn)]
; (if (ingest-subscription-concept? concept-edn)
; (let [coll-concept-id (:CollectionConceptId metadata-edn)
; concept-map (result coll-concept-id)
; mode (:Mode metadata-edn)]
; (if concept-map
; (update result coll-concept-id #(add-to-existing-mode % mode))
; (assoc result coll-concept-id (add-to-existing-mode nil mode))))
; result)))

;(defn create-subscription-cache-contents-for-refresh
; "Go through all the subscriptions and find the ones that are
; ingest subscriptions. Create the mode values for each collection-concept-id
; and put those into a map. The end result looks like:
; {Collection concept id 1: [\"New\" \"Update\"]
; Collection concept id 2: [\"New\" \"Update\" \"Delete\"]
; ...}"
; [subscriptions]
; (let [cache-map (atom {})]
; (doseq [sub subscriptions]
; (let [final-sub-map (atom {})
; concept-edn (convert-concept-to-edn sub)
; metadata-edn (:metadata concept-edn)]
; (if (ingest-subscription-concept? concept-edn)
; (let [coll-concept-id (:CollectionConceptId metadata-edn)
; modes (:Mode metadata-edn)
; endpoint (:EndPoint metadata-edn)]
; (doseq [curr-mode modes]
; ;(let [curr-coll-sub-map (get-in @cache-map coll-concept-id)]
; (if final-sub-map
; ;;add to the existing cache-map
; (let [endpoint-set (get-in @final-sub-map curr-mode)]
; (if endpoint-set
; ;; create your own mode-to-endpoints-map and combine with the existing one
; ;(swap! final-sub-map (fn [n] (merge-with union mode-set {modes #{endpoint}})))
; ;(swap! cache-map assoc coll-concept-id {"Mode" final-sub-map})
;
; ;; create current map
; (conj endpoint-set endpoint)
;
; )
;
; )
; ;; else create new cache-map
; (swap! final-sub-map assoc curr-mode #{endpoint})
; ;)
; )
;
; )
; )
; )
; )
; )
; @cache-map
; )
; )

(defn create-subscription-cache-contents-for-refresh
"Go through all the subscriptions and find the ones that are
ingest subscriptions. Create the mode values for each collection-concept-id
Expand All @@ -312,13 +212,13 @@
;; order the subscriptions by collection and create a map collection to subscriptions
(doseq [sub subscriptions]
;(println "curr sub = " sub)
(let [metadata-edn (:metadata sub)]
(let [coll-concept-id (:CollectionConceptId metadata-edn)
sub-list (get @coll-to-subscription-concept-map coll-concept-id)]
(let [metadata-edn (:metadata sub)
coll-concept-id (:CollectionConceptId metadata-edn)
sub-list (get @coll-to-subscription-concept-map coll-concept-id)]
;(println "sub-list = " sub-list)
(swap! coll-to-subscription-concept-map conj {coll-concept-id (conj sub-list sub)})
;(println "curr coll-to-subscription-concept-map = " @coll-to-subscription-concept-map)
)))
))
;(println "coll-to-subscription-concept-map = " @coll-to-subscription-concept-map)
;;for every subscription list by the collection create a mode-to-endpoints-map and add it to the final cache map
(doseq [[coll-id subscription-list] @coll-to-subscription-concept-map]
Expand All @@ -328,17 +228,15 @@
(swap! cache-map assoc coll-id {"Mode" mode-to-endpoints-map})))
@cache-map))


;;TODO fix this. Refreshing cache requires for it to be the new structure.
(defn refresh-subscription-cache
"Go through all the subscriptions and create a map of collection concept ids and
their mode values. Get the old keys from the cache and if the keys exist in the new structure
then update the cache with the new values. Otherwise, delete the contents that no longer exists."
[context]
(when ingest-subscriptions-enabled?
(info "Starting refreshing the ingest subscription cache.")
(let [subs (convert-and-filter-subscriptions (get-subscriptions-from-db context))
new-contents (create-subscription-cache-contents-for-refresh subs)
(let [subscriptions-from-db (convert-and-filter-subscriptions (get-subscriptions-from-db context))
new-contents (create-subscription-cache-contents-for-refresh subscriptions-from-db)
;_ (println "new-contents = " new-contents)
cache-content-keys (subscription-cache/get-keys context)]
;; update the cache with the new values contained in the new-contents map.
Expand Down Expand Up @@ -397,54 +295,6 @@
[mode]
(str mode " Notification"))

;(defn create-attributes-and-subject-map
; "Determine based on the passed in concept if the granule is new, is an update
; or delete. Use the passed in mode to determine if any subscription is interested
; in a notification. If they are then return the message attributes and subject, otherwise
; return nil."
; [concept mode coll-concept-id]
; (cond
; ;; Mode = Delete.
; (and (:deleted concept)
; (some #(= "Delete" %) mode))
; {:attributes (create-message-attributes coll-concept-id "Delete")
; :subject (create-message-subject "Delete")}
;
; ;; Mode = New
; (and (not (:deleted concept))
; (= 1 (:revision-id concept))
; (some #(= "New" %) mode))
; {:attributes (create-message-attributes coll-concept-id "New")
; :subject (create-message-subject "New")}
;
; ;; Mode = Update
; (and (not (:deleted concept))
; (pos? (compare (:revision-id concept) 1))
; (some #(= "Update" %) mode))
; {:attributes (create-message-attributes coll-concept-id "Update")
; :subject (create-message-subject "Update")}))

;(defn publish-subscription-notification-if-applicable
; "Publish a notification to the topic if the passed-in concept is a granule
; and a subscription is interested in being informed of the granule's actions."
; [context concept]
; (when (granule-concept? (:concept-type concept))
; (let [start (System/currentTimeMillis)
; coll-concept-id (:parent-collection-id (:extra-fields concept))
; sub-cache-map (subscription-cache/get-value context coll-concept-id)]
; ;; if this granule's collection is found in subscription cache that means it has a subscription attached to it
; (when sub-cache-map
; ;; Check the mode to see if the granule notification needs to be pushed. Mode examples are 'new', 'update', 'delete'.
; (let [topic (get-in context [:system :sns :internal])
; message (create-notification-message-body concept)
; ;; TODO Jyna will need to update this attributes and subject map for URL endpoints
; {:keys [attributes subject]} (create-attributes-and-subject-map concept sub-cache-map coll-concept-id)]
; (when (and attributes subject)
; (let [result (topic-protocol/publish topic message attributes subject)
; duration (- (System/currentTimeMillis) start)]
; (debug (format "Work potential subscription publish took %d ms." duration))
; result)))))))

(defn- get-gran-concept-mode
"Gets the granule concept's ingestion mode (i.e. Update, Delete, New, etc)"
[concept]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,40 +254,3 @@

"sets a collection concept id."
(dissoc example-concept :concept-id)))))

;(deftest set-subscription-arn-test
; (testing "set-subscription-arn"
; (are3 [concept-type concept expected]
; (is (= expected (cs/set-subscription-arn nil concept-type concept)))
;
; "non-subscription concept type returns un-changed concept"
; :granule
; {:metadata {:EndPoint ""}}
; {:metadata {:EndPoint ""}}
;
; "empty endpoint returns un-changed concept"
; :subscription
; {:metadata {:EndPoint ""}}
; {:metadata {:EndPoint ""}}
;
; "url endpoint returns un-changed concept"
; :subscription
; {:metadata {:EndPoint "https://www.endpoint.com"}}
; {:metadata {:EndPoint "https://www.endpoint.com"}}))
; (with-redefs [cmr.metadata-db.services.subscriptions/attach-subscription-to-topic (fn [context concept] "sqs:arn")]
; (testing "local test queue url endpoint returns changed concept"
; (let [concept {:metadata {:EndPoint "http://localhost:9324/000000000/"}}
; expected-concept {:metadata {:EndPoint "http://localhost:9324/000000000/"} :extra-fields {:aws-arn "sqs:arn"}}]
; (is (= expected-concept (cs/set-subscription-arn nil :subscription concept)))))
;
; (testing "sqs arn endpoint returns changed concept"
; (let [concept {:metadata {:EndPoint "arn:aws:sqs:1234:Queue-Name"}}
; expected-concept {:metadata {:EndPoint "arn:aws:sqs:1234:Queue-Name"} :extra-fields {:aws-arn "sqs:arn"}}]
; (is (= expected-concept (cs/set-subscription-arn nil :subscription concept)))))))







Loading

0 comments on commit 7c322c3

Please sign in to comment.