Skip to content

Commit

Permalink
fix unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jmaeng72 committed Feb 5, 2025
1 parent 7102dc8 commit 85eec19
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 135 deletions.
194 changes: 139 additions & 55 deletions metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj
Original file line number Diff line number Diff line change
Expand Up @@ -73,29 +73,31 @@
(recur (rest ms) (merge result mode)))
(recur (rest ms) [mode]))))))

(defn merge-modes
"Go through the list of subscriptions and combine the :modes found into one array.
Return the array of modes."
[subscriptions]
(loop [subs subscriptions
result []]
(let [sub (first subs)]
(if (nil? sub)
result
(recur (rest subs) (add-to-existing-mode result (get-in sub [:metadata :Mode])))))))
;(defn merge-modes
; "Go through the list of subscriptions and combine the :modes found into one array.
; Return the array of modes."
; [subscriptions]
; (loop [subs subscriptions
; result []]
; (let [sub (first subs)]
; (if (nil? sub)
; result
; (recur (rest subs) (add-to-existing-mode result (get-in sub [:metadata :Mode])))))))


(use 'clojure.set)
(defn create-mode-to-endpoints-map
"Ex. {
"Creates a mode-to-endpoints map given a list of subscriptions all for the same one collection.
Returns a map in this structure:
{
new: ['sqs:arn:111', 'https://www.url1.com', 'https://www.url2.com'],
update: ['sqs:arn:111'],
delete: ['https://www.url1.com']
}"
[subscriptions]
[subscriptions-of-same-collection]

(let [final-map (atom {})]
(doseq [sub subscriptions]
(doseq [sub subscriptions-of-same-collection]
(let [temp-map (atom {})
modes (get-in sub [:metadata :Mode])
endpoint (get-in sub [:metadata :EndPoint])]
Expand All @@ -106,13 +108,10 @@
;(println "temp-map at the end is " @temp-map)
(let [merged-map (merge-with union @final-map @temp-map)]
;(println "merged-map is " merged-map)
(swap! final-map (fn [n] merged-map)))
)
(swap! final-map (fn [n] merged-map))))
;(println "final-map so far is = " @final-map)
)
@final-map
)
)
@final-map))

;(defn change-subscription
; "When a subscription is added or deleted, the collection-concept-id must be put into
Expand All @@ -127,21 +126,21 @@


(defn change-subscription-in-cache
"When a subscription is added or deleted, the collection-concept-id must be put into
or deleted from the subscription cache. Get the subscriptions that match the collection-concept-id
from the database and rebuild the modes list. Return 1 if successful 0 otherwise."
"When a subscription is added or deleted, the collection-concept-id must be added or deleted from the subscription cache.
Get the subscriptions that match the collection-concept-id from the database and rebuild the info map for this collection.
Return 1 if successful 0 otherwise."
[context concept-edn]
;(println "*** INSIDE change-subscription-in-cache")
(let [coll-concept-id (:CollectionConceptId (:metadata concept-edn))
subscriptions-found-in-db (convert-and-filter-subscriptions (get-subscriptions-from-db context coll-concept-id))]
(println "***** subscriptions-found-in-db: " subscriptions-found-in-db)
;(println (format "***** collection %s , subscriptions-found-in-db: %s" coll-concept-id subscriptions-found-in-db))
;; if subscriptions found in db, create new cache value and add it to the cache (this may overwrite past cache value)
(if (seq subscriptions-found-in-db)
(let [mode-to-endpoints-map (create-mode-to-endpoints-map subscriptions-found-in-db)
_ (println "mode-to-endpoints-map = " mode-to-endpoints-map)]
(subscription-cache/set-value context coll-concept-id mode-to-endpoints-map))
(let [mode-to-endpoints-map (create-mode-to-endpoints-map subscriptions-found-in-db)]
;(println "mode-to-endpoints-map = " mode-to-endpoints-map)
(subscription-cache/set-value context coll-concept-id {"Mode" mode-to-endpoints-map}))
;; remove the entire subscription cache record if no active subscriptions for this collection was found in the db
(subscription-cache/remove-value context coll-concept-id)
)))
(subscription-cache/remove-value context coll-concept-id))))

;;
;; The functions below are for subscribing and unsubscribing and endpoint to the topic.
Expand Down Expand Up @@ -185,8 +184,8 @@
"If valid ingest subscription, will attach the subscription concept's sqs arn to external SNS topic
and will add the sqs arn used as an extra field to the concept"
[context concept]
(let [concept-edn (convert-concept-to-edn concept)
_ (println "concept-edn is " concept-edn)]
(let [concept-edn (convert-concept-to-edn concept)]
;(println "concept-edn is " concept-edn)
(if (ingest-subscription-concept? concept-edn)
(let [topic (get-in context [:system :sns :external])
;; subscribes the given endpoint sqs arn in the concept to the external SNS topic
Expand All @@ -206,8 +205,7 @@
endpoint (get-in concept-edn [:metadata :EndPoint])]
(if (or (is-valid-sqs-arn endpoint) (is-local-test-queue endpoint))
(attach-subscription-to-topic context concept)
concept
))
concept))
;; we return concept no matter what because not every concept that enters this func will be a subscription,
;; and we don't consider that an error
concept))
Expand All @@ -233,24 +231,103 @@
;; The functions below are for refreshing the subscription cache if needed.
;;

;;TODO update this with new structure
;(defn create-subscription-cache-contents-for-refresh
; "Go through all the subscriptions and find the ones that are
; ingest subscriptions. Create the mode values for each collection-concept-id
; and put those into a map. The end result looks like:
; {Collection concept id 1: [\"New\" \"Update\"]
; Collection concept id 2: [\"New\" \"Update\" \"Delete\"]
; ...}"
; [result sub]
; (let [concept-edn (convert-concept-to-edn sub)
; metadata-edn (:metadata concept-edn)]
; (if (ingest-subscription-concept? concept-edn)
; (let [coll-concept-id (:CollectionConceptId metadata-edn)
; concept-map (result coll-concept-id)
; mode (:Mode metadata-edn)]
; (if concept-map
; (update result coll-concept-id #(add-to-existing-mode % mode))
; (assoc result coll-concept-id (add-to-existing-mode nil mode))))
; result)))

;(defn create-subscription-cache-contents-for-refresh
; "Go through all the subscriptions and find the ones that are
; ingest subscriptions. Create the mode values for each collection-concept-id
; and put those into a map. The end result looks like:
; {Collection concept id 1: [\"New\" \"Update\"]
; Collection concept id 2: [\"New\" \"Update\" \"Delete\"]
; ...}"
; [subscriptions]
; (let [cache-map (atom {})]
; (doseq [sub subscriptions]
; (let [final-sub-map (atom {})
; concept-edn (convert-concept-to-edn sub)
; metadata-edn (:metadata concept-edn)]
; (if (ingest-subscription-concept? concept-edn)
; (let [coll-concept-id (:CollectionConceptId metadata-edn)
; modes (:Mode metadata-edn)
; endpoint (:EndPoint metadata-edn)]
; (doseq [curr-mode modes]
; ;(let [curr-coll-sub-map (get-in @cache-map coll-concept-id)]
; (if final-sub-map
; ;;add to the existing cache-map
; (let [endpoint-set (get-in @final-sub-map curr-mode)]
; (if endpoint-set
; ;; create your own mode-to-endpoints-map and combine with the existing one
; ;(swap! final-sub-map (fn [n] (merge-with union mode-set {modes #{endpoint}})))
; ;(swap! cache-map assoc coll-concept-id {"Mode" final-sub-map})
;
; ;; create current map
; (conj endpoint-set endpoint)
;
; )
;
; )
; ;; else create new cache-map
; (swap! final-sub-map assoc curr-mode #{endpoint})
; ;)
; )
;
; )
; )
; )
; )
; )
; @cache-map
; )
; )

(defn create-subscription-cache-contents-for-refresh
"Go through all the subscriptions and find the ones that are
ingest subscriptions. Create the mode values for each collection-concept-id
and put those into a map. The end result looks like:
{Collection concept id 1: [\"New\" \"Update\"]
Collection concept id 2: [\"New\" \"Update\" \"Delete\"]
...}"
[result sub]
(let [concept-edn (convert-concept-to-edn sub)
metadata-edn (:metadata concept-edn)]
(if (ingest-subscription-concept? concept-edn)
(let [coll-concept-id (:CollectionConceptId metadata-edn)
concept-map (result coll-concept-id)
mode (:Mode metadata-edn)]
(if concept-map
(update result coll-concept-id #(add-to-existing-mode % mode))
(assoc result coll-concept-id (add-to-existing-mode nil mode))))
result)))
"Go through all the subscriptions and find the ones that are
ingest subscriptions. Create the mode values for each collection-concept-id
and put those into a map. The end result looks like:"
[subscriptions]

;(println "**** INSIDE create-subscription-cache-contents-for-refresh")
;(println "subscriptions given = " subscriptions)
;; {coll1 : {subscription-concept-1, subscription-concept-2}}
(let [cache-map (atom {})
coll-to-subscription-concept-map (atom {})]
;; order the subscriptions by collection and create a map collection to subscriptions
(doseq [sub subscriptions]
;(println "curr sub = " sub)
(let [metadata-edn (:metadata sub)]
(let [coll-concept-id (:CollectionConceptId metadata-edn)
sub-list (get @coll-to-subscription-concept-map coll-concept-id)]
;(println "sub-list = " sub-list)
(swap! coll-to-subscription-concept-map conj {coll-concept-id (conj sub-list sub)})
;(println "curr coll-to-subscription-concept-map = " @coll-to-subscription-concept-map)
)))
;(println "coll-to-subscription-concept-map = " @coll-to-subscription-concept-map)
;;for every subscription list by the collection create a mode-to-endpoints-map and add it to the final cache map
(doseq [[coll-id subscription-list] @coll-to-subscription-concept-map]
(let [mode-to-endpoints-map (create-mode-to-endpoints-map subscription-list)]
;(println (format "mode-to-endpoints-map %s for collection %s" mode-to-endpoints-map coll-id))
;(println "curr cache-map = " @cache-map)
(swap! cache-map assoc coll-id {"Mode" mode-to-endpoints-map})))
@cache-map))


;;TODO fix this. Refreshing cache requires for it to be the new structure.
(defn refresh-subscription-cache
Expand All @@ -260,8 +337,9 @@
[context]
(when ingest-subscriptions-enabled?
(info "Starting refreshing the ingest subscription cache.")
(let [subs (get-subscriptions-from-db context)
new-contents (reduce create-subscription-cache-contents-for-refresh {} subs)
(let [subs (convert-and-filter-subscriptions (get-subscriptions-from-db context))
new-contents (create-subscription-cache-contents-for-refresh subs)
;_ (println "new-contents = " new-contents)
cache-content-keys (subscription-cache/get-keys context)]
;; update the cache with the new values contained in the new-contents map.
(doall (map #(subscription-cache/set-value context % (new-contents %)) (keys new-contents)))
Expand Down Expand Up @@ -367,17 +445,16 @@
; (debug (format "Work potential subscription publish took %d ms." duration))
; result)))))))


(defn- get-gran-concept-mode
"Gets the granule concept's ingestion mode (i.e. Update, Delete, New, etc)"
[concept]
(cond
;; Mode = Delete
(:deleted concept) "Delete"
;; Mode = New
(and (not (:deleted concept)) (= 1 (:revision-id concept))) "New"
;; Mode = Update
(and (not (:deleted concept)) (pos? (compare (:revision-id concept) 1))) "Update"
))
(and (not (:deleted concept)) (pos? (compare (:revision-id concept) 1))) "Update"))

(defn- create-message-attributes-map
"Create message attribute map that SQS uses to filter out messages from the SNS topic."
Expand All @@ -395,26 +472,33 @@
"Publish a notification to the topic if the passed-in concept is a granule
and a subscription is interested in being informed of the granule's actions."
[context concept]
(println "***** INSIDE publish-subscription-notification-if-applicable")
;(println "***** INSIDE publish-subscription-notification-if-applicable")
(when (granule-concept? (:concept-type concept))
(let [start (System/currentTimeMillis)
coll-concept-id (:parent-collection-id (:extra-fields concept))
sub-cache-map (subscription-cache/get-value context coll-concept-id)]
;(println "sub-cache-map = " sub-cache-map)
;; if this granule's collection is found in subscription cache that means it has a subscription attached to it
(when sub-cache-map
(let [gran-concept-mode (get-gran-concept-mode concept)
endpoint-list (get sub-cache-map gran-concept-mode)
endpoint-list (get-in sub-cache-map ["Mode" gran-concept-mode])
result-array (atom [])]
;(println "endpoint-list = " endpoint-list)
;; for every endpoint in the list create an attributes/subject map and send it along its way
(doseq [endpoint endpoint-list]
;(println "current endpoint = " endpoint)
(let [topic (get-in context [:system :sns :internal])
coll-concept-id (:parent-collection-id (:extra-fields concept))
message (create-notification-message-body concept)
message-attributes-map (create-message-attributes-map endpoint gran-concept-mode coll-concept-id)
subject (create-message-subject gran-concept-mode)]
;(println "message is " message)
;(println "message-attributes-map = " message-attributes-map)
;(println "subject = " subject)
(when (and message-attributes-map subject)
(let [result (topic-protocol/publish topic message message-attributes-map subject)
duration (- (System/currentTimeMillis) start)]
;(println "result is " result)
(debug (format "Subscription publish for endpoint %s took %d ms." endpoint duration))
(swap! result-array (fn [n] (conj @result-array result)))))))
@result-array)))))
Expand Down
Loading

0 comments on commit 85eec19

Please sign in to comment.