2
2
# See full license in LICENSE file.
3
3
4
4
import json
5
- import urllib2
5
+ import requests
6
+ from requests .auth import HTTPDigestAuth , HTTPBasicAuth
6
7
7
8
from errors import P2ESError
8
9
10
+ def http (CONFIG , url , method = "GET" , data = None ):
11
+ auth = None
12
+ if CONFIG ['ES_AuthType' ] != 'none' :
13
+ if CONFIG ['ES_AuthType' ] == 'basic' :
14
+ auth = HTTPBasicAuth (CONFIG ['ES_UserName' ], CONFIG ['ES_Password' ])
15
+ elif CONFIG ['ES_AuthType' ] == 'digest' :
16
+ auth = HTTPDigestAuth (CONFIG ['ES_UserName' ], CONFIG ['ES_Password' ])
17
+ else :
18
+ raise P2ESError (
19
+ 'Unexpected authentication type: {}' .format (CONFIG ['ES_AuthType' ])
20
+ )
21
+
22
+ headers = {'Content-Type' : 'application/x-ndjson' }
23
+
24
+ if method == "GET" :
25
+ return requests .get (url , auth = auth , headers = headers )
26
+ elif method == "POST" :
27
+ return requests .post (url , auth = auth , data = data , headers = headers )
28
+ elif method == "PUT" :
29
+ return requests .put (url , auth = auth , data = data , headers = headers )
30
+ elif method == "HEAD" :
31
+ return requests .head (url , auth = auth , headers = headers )
32
+ else :
33
+ raise Exception ("Method unknown: {}" .format (method ))
34
+
9
35
# Sends data to ES.
10
36
# Raises exceptions: yes.
11
37
def send_to_es (CONFIG , index_name , data ):
@@ -18,7 +44,7 @@ def send_to_es(CONFIG, index_name, data):
18
44
)
19
45
20
46
try :
21
- http_res = urllib2 . urlopen ( url , data )
47
+ http_res = http ( CONFIG , url , method = "POST" , data = data )
22
48
except Exception as e :
23
49
raise P2ESError (
24
50
'Error while executing HTTP bulk insert on {} - {}' .format (
@@ -27,27 +53,24 @@ def send_to_es(CONFIG, index_name, data):
27
53
)
28
54
29
55
# Interpreting HTTP bulk insert response
30
-
31
- http_plaintext = http_res .read ()
32
-
33
- if (http_res .getcode () != 200 ):
56
+ if http_res .status_code != 200 :
34
57
raise P2ESError (
35
58
'Bulk insert on {} failed - '
36
59
'HTTP status code = {} - '
37
60
'Response {}' .format (
38
- index_name , http_res .getcode (), http_plaintext
61
+ index_name , http_res .status_code , http_res . text
39
62
)
40
63
)
41
64
42
65
try :
43
- json_res = json . loads ( http_plaintext )
66
+ json_res = http_res . json ( )
44
67
except Exception as e :
45
68
raise P2ESError (
46
69
'Error while decoding JSON HTTP response - '
47
70
'{} - '
48
71
'first 100 characters: {}' .format (
49
72
str (e ),
50
- http_plaintext [:100 ],
73
+ http_res . text [:100 ],
51
74
)
52
75
)
53
76
@@ -64,27 +87,19 @@ def does_index_exist(index_name, CONFIG):
64
87
url = '{}/{}' .format (CONFIG ['ES_URL' ], index_name )
65
88
66
89
try :
67
- head_req = urllib2 .Request (url )
68
- head_req .get_method = lambda : 'HEAD'
69
- http_res = urllib2 .urlopen (head_req )
70
- return http_res .getcode () == 200
71
- except urllib2 .HTTPError as err :
72
- if err .code == 404 :
90
+ status_code = http (CONFIG , url , method = "HEAD" ).status_code
91
+ if status_code == 200 :
92
+ return True
93
+ if status_code == 404 :
73
94
return False
74
- else :
75
- raise P2ESError (
76
- 'Error while checking if {} index exists: {}' .format (
77
- index_name , str (err )
78
- )
79
- )
95
+ raise Exception ("Unexpected status code: {}" .format (status_code ))
80
96
except Exception as err :
81
97
raise P2ESError (
82
98
'Error while checking if {} index exists: {}' .format (
83
99
index_name , str (err )
84
100
)
85
101
)
86
102
87
-
88
103
# Creates index 'index_name' using template given in config.
89
104
# Raises exceptions: yes.
90
105
def create_index (index_name , CONFIG ):
@@ -110,11 +125,18 @@ def create_index(index_name, CONFIG):
110
125
111
126
last_err = None
112
127
try :
113
- http_res = urllib2 .urlopen (url , tpl )
114
- except Exception as e :
115
- # something went wrong: does index exist anyway?
116
- last_err = str (e )
117
- pass
128
+ # using PUT
129
+ http_res = http (CONFIG , url , method = "PUT" , data = tpl )
130
+ except Exception as e1 :
131
+ last_err = "Error using PUT method: {}" .format (str (e1 ))
132
+ # trying the old way
133
+ try :
134
+ http_res = http (CONFIG , url , method = "POST" , data = tpl )
135
+ except Exception as e2 :
136
+ # something went wrong: does index exist anyway?
137
+ last_err += " - "
138
+ last_err += "Error using old way: {}" .format (str (e2 ))
139
+ pass
118
140
119
141
try :
120
142
if does_index_exist (index_name , CONFIG ):
@@ -128,25 +150,3 @@ def create_index(index_name, CONFIG):
128
150
else :
129
151
err += "error unknown"
130
152
raise P2ESError (err .format (index_name , tpl_path ))
131
-
132
- def prepare_for_http_auth (CONFIG ):
133
- if CONFIG ['ES_AuthType' ] != 'none' :
134
- pwdman = urllib2 .HTTPPasswordMgrWithDefaultRealm ()
135
- pwdman .add_password (
136
- None ,
137
- CONFIG ['ES_URL' ],
138
- CONFIG ['ES_UserName' ],
139
- CONFIG ['ES_Password' ]
140
- )
141
-
142
- if CONFIG ['ES_AuthType' ] == 'basic' :
143
- auth_handler = urllib2 .HTTPBasicAuthHandler (pwdman )
144
- elif CONFIG ['ES_AuthType' ] == 'digest' :
145
- auth_handler = urllib2 .HTTPDigestAuthHandler (pwdman )
146
- else :
147
- raise P2ESError (
148
- 'Unexpected authentication type: {}' .format (CONFIG ['ES_AuthType' ])
149
- )
150
-
151
- opener = urllib2 .build_opener (auth_handler )
152
- urllib2 .install_opener (opener )
0 commit comments