-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcommunityservice.py
134 lines (116 loc) · 6.25 KB
/
communityservice.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from dspacerequest import DspaceCommunityRequest
from config import ImporterConfig
from dataobjects import DSO, DSOTypes, AuthData
from requests import HTTPError
class CommunityException(Exception):
def __init__(self, msg):
self.msg = msg
def __str__(self):
return self.msg
class CommunityService:
_self = None
_top_communities = []
_communities_collections = {}
def __new__(cls, *args, **kwargs):
if cls._self is None:
cls._self = super().__new__(cls)
return cls._self
def __init__(self, config: ImporterConfig, auth_data: AuthData) -> None:
self.__community_request = DspaceCommunityRequest(config.dspace_rest_url(), auth_data)
def __add_community_and_collections(self, community: DSO):
self._communities_collections[community.id] = community
def __add_top_community(self, community_id):
self._top_communities.append(community_id)
# getters
@property
def communities_and_collections(self):
return self._communities_collections
@property
def top_communities(self):
return self._top_communities
def get_top_communities(self):
try:
comms = self.__community_request.top_communities()
if comms["page"]["totalElements"] > 0:
for community in comms["_embedded"]["communities"]:
dsObject = DSO(community["uuid"], community["name"], None, DSOTypes.COMMUNITY)
self.__add_community_and_collections(dsObject)
self.__add_top_community(dsObject.id)
except HTTPError as err:
print(f"Exception when getting top communities. Error code {err.response.status_code}, reason {err.response.reason}")
raise CommunityException(f"Error when getting top communities. Error code {err.response.status_code}, reason {err.response.reason}")
except Exception as err1:
print(f"Exception when getting top communities. {str(err1)}")
raise CommunityException(f"Error when getting top communities. {str(err1)}")
def get_subcommunities(self, community: DSO):
result = []
if community is None:
# return top communities
#print("return top communities")
for c_id in self._top_communities:
result.append(self._communities_collections[c_id])
else:
# may have sub communities cached
comm_dso = self._communities_collections[community.id]
if comm_dso.itemsLoaded:
#print("return cached sub communities")
# have sub communities to return
for c_id in comm_dso.children:
result.append(self._communities_collections[c_id])
else:
# request these from server
#print("get sub communities from server")
try:
curr_page = 0
total_pages = 1
while curr_page < total_pages:
comms = self.__community_request.sub_communities(comm_dso.uuid, curr_page)
total_pages = comms["page"]["totalPages"]
for sub_comm in comms["_embedded"]["subcommunities"]:
#print(f"page {curr_page}")
dsObject = DSO(sub_comm["uuid"], sub_comm["name"], comm_dso.id, DSOTypes.COMMUNITY)
self.__add_community_and_collections(dsObject)
comm_dso.addChild(dsObject.id)
result.append(dsObject)
curr_page = curr_page + 1
comm_dso.itemsLoaded = True
except HTTPError as err:
print(f"Exception getting sub communities for {community.name}. Error code {err.response.status_code}, reason {err.response.reason}")
raise CommunityException(f"Error getting sub communities for {community.name}. Error code {err.response.status_code}, reason {err.response.reason}")
except Exception as err1:
print(f"Exception getting sub communities for {community.name}. Error: {str(err1)}")
raise CommunityException(f"Error getting sub communities for {community.name}. Error: {str(err1)}")
return result
def get_community_dso(self, community_id):
return self._communities_collections[community_id]
def get_collections(self, community: DSO):
result = []
comm_dso = self._communities_collections[community.id]
if comm_dso.collectionsLoaded:
# return cached collections
#print("return cached collections")
for c_id in comm_dso.collections:
result.append(self._communities_collections[c_id])
else:
# get the collections from the server
#print("get collections from server")
try:
curr_page = 0
total_pages = 1
while curr_page < total_pages:
colls = self.__community_request.sub_collections(comm_dso.uuid, curr_page)
total_pages = colls["page"]["totalPages"]
for coll in colls["_embedded"]["collections"]:
dsObject = DSO(coll["uuid"], coll["name"], comm_dso.id, DSOTypes.COLLECTION)
self.__add_community_and_collections(dsObject)
comm_dso.addCollection(dsObject.id)
result.append(dsObject)
curr_page = curr_page + 1
comm_dso.collectionsLoaded = True
except HTTPError as err:
print(f"Exception getting collections for {community.name}. Error code {err.response.status_code}, reason {err.response.reason}")
raise CommunityException(f"Error getting collections for {community.name}. Error code {err.response.status_code}, reason {err.response.reason}")
except Exception as err1:
print(f"Exception getting collections for {community.name}. Error: {str(err1)}")
raise CommunityException(f"Error getting collections for {community.name}. Error: {str(err1)}")
return result