1
+ import json
2
+ import logging
3
+ import os
4
+ import time
1
5
from typing import Union
2
6
3
7
import requests
@@ -79,3 +83,142 @@ def get_form(self, uid: str) -> Union[KoboForm, None]:
79
83
kform = self ._create_koboform (form )
80
84
81
85
return kform
86
+
87
+ def redeploy_form (self , uid : str ) -> None :
88
+ url = f"{ self .url } /api/v{ self .api_version } /assets/{ uid } /deployment/?format=json"
89
+ requests .patch (url = url , headers = self .headers )
90
+
91
+ def upload_media_from_local (
92
+ self , uid : str , folder_path : str , file_name : str , rewrite : bool = False
93
+ ) -> None :
94
+ file_extension = os .path .splitext (file_name )[1 ]
95
+ valid_media = [".jpeg" , ".jpg" , ".png" , ".csv" , ".JPGE" , ".JPG" , ".PNG" ]
96
+
97
+ if not folder_path .endswith (("/" , "\\ " )):
98
+ folder_path += "/"
99
+
100
+ if file_extension not in valid_media :
101
+ raise ValueError (
102
+ "upload_media_from_local: file extension must be one of %r."
103
+ % valid_media
104
+ )
105
+
106
+ file_path = os .path .join (folder_path , file_name )
107
+
108
+ if not os .path .exists (file_path ):
109
+ raise FileNotFoundError (f"File not found: { file_path } " )
110
+
111
+ self ._upload_media (uid , open (file_path , "rb" ), file_name , rewrite )
112
+
113
+ def upload_media_from_server (
114
+ self , uid : str , media_data : bytes , file_name : str , rewrite : bool = False
115
+ ) -> None :
116
+ self ._upload_media (uid , media_data , file_name , rewrite )
117
+
118
+ def _upload_media (
119
+ self , uid : str , media_data : bytes , file_name : str , rewrite : bool
120
+ ) -> None :
121
+ url_media = f"{ self .url } /api/v{ self .api_version } /assets/{ uid } /files"
122
+ payload = {"filename" : file_name }
123
+ data = {
124
+ "description" : "default" ,
125
+ "metadata" : json .dumps (payload ),
126
+ "data_value" : file_name ,
127
+ "file_type" : "form_media" ,
128
+ }
129
+
130
+ res = requests .get (f"{ url_media } .json" , headers = self .headers )
131
+ res .raise_for_status ()
132
+ dict_response = res .json ()["results" ]
133
+
134
+ for each in dict_response :
135
+ if each ["metadata" ]["filename" ] == file_name :
136
+ if rewrite :
137
+ del_id = each ["uid" ]
138
+ res .status_code = 403
139
+ while res .status_code != 204 :
140
+ res = requests .delete (
141
+ f"{ url_media } /{ del_id } " , headers = self .headers
142
+ )
143
+ time .sleep (1 )
144
+ break
145
+ else :
146
+ raise ValueError (
147
+ "There is already a file with the same name! Select a new name or set 'rewrite=True'"
148
+ )
149
+
150
+ files = {"content" : (file_name , media_data )} # Pass media_data directly
151
+
152
+ res = requests .post (
153
+ url = f"{ url_media } .json" , data = data , files = files , headers = self .headers
154
+ )
155
+ res .raise_for_status ()
156
+
157
+ if res .status_code == 201 :
158
+ logging .info (f"Successfully uploaded { file_name } to { uid } form." )
159
+ else :
160
+ logging .error (f"Unsuccessful. Response code: { str (res .status_code )} " )
161
+
162
+ def share_project (self , uid : str , user : str , permission : str ):
163
+ """
164
+ Share a project with a user.
165
+
166
+ Parameters
167
+ ----------
168
+ uid : str
169
+ The project's uid.
170
+ user : str
171
+ The user's uid.
172
+ permission : str
173
+ The permission to give the user.
174
+ """
175
+
176
+ valid_permissions = [
177
+ "add_submissions" ,
178
+ "change_asset" ,
179
+ "change_submissions" ,
180
+ "delete_submissions" ,
181
+ "discover_asset" ,
182
+ "manage_asset" ,
183
+ "partial_submissions" ,
184
+ "validate_submissions" ,
185
+ "view_asset" ,
186
+ "view_submissions" ,
187
+ ]
188
+
189
+ if permission not in valid_permissions :
190
+ raise ValueError (
191
+ "Permission must be one of the following: " + str (valid_permissions )
192
+ )
193
+
194
+ data = {
195
+ "user" : f"{ self .url } /api/v{ self .api_version } /users/{ user } /" ,
196
+ "permission" : f"{ self .url } /api/v{ self .api_version } /permissions/{ permission } /" ,
197
+ }
198
+
199
+ url = f"{ self .url } /api/v{ self .api_version } /assets/{ uid } /permission-assignments.json"
200
+ res = requests .post (url = url , headers = self .headers , data = data )
201
+
202
+ if res .status_code != 201 :
203
+ raise requests .HTTPError (res .text )
204
+
205
+ def fetch_users_with_access (self , uid : str ):
206
+ """
207
+ Fetch the list of users who have access to a specific form, extracting usernames from URLs.
208
+ """
209
+ url_permissions = f"{ self .url } /api/v{ self .api_version } /assets/{ uid } /permission-assignments/"
210
+ res = requests .get (url = url_permissions , headers = self .headers )
211
+
212
+ if res .status_code != 200 :
213
+ raise requests .HTTPError (f"Failed to fetch permissions: { res .text } " )
214
+
215
+ permissions = res .json ()
216
+ users_with_access = set ()
217
+
218
+ for permission in permissions :
219
+ user_url = permission .get ('user' )
220
+ if user_url :
221
+ username = user_url .rstrip ('/' ).split ('/' )[- 1 ] # Split to get the username from the url
222
+ users_with_access .add (username )
223
+
224
+ return list (users_with_access )
0 commit comments