forked from CVL-dev/cvl-ssh-utils
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathASyncAuth.py
119 lines (103 loc) · 4.5 KB
/
ASyncAuth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import wx
from HTMLParser import HTMLParser
from logger.Logger import logger
import json
class ASyncAuth():
class reset_exception(Exception):
def __init__(self,*args,**kwargs):
super(ASyncAuth.reset_exception,self).__init__(*args,**kwargs)
class UserPassDialog(wx.Dialog):
def __init__(self,user=None,*args,**kwargs):
super(ASyncAuth.UserPassDialog,self).__init__(*args,**kwargs)
self.SetSizer(wx.BoxSizer(wx.VERTICAL))
p=wx.Panel(self)
p.SetSizer(wx.FlexGridSizer(cols=2,rows=2,hgap=15,vgap=15))
t=wx.StaticText(p,wx.ID_ANY,label='Please enter your Australian Synchrotron Username (email address)')
p.GetSizer().Add(t)
tc=wx.TextCtrl(p,wx.ID_ANY,name='username_field')
tc.SetMinSize((300,-1))
if user!=None:
tc.SetValue(user)
p.GetSizer().Add(tc,proportion=1,flag=wx.EXPAND)
t=wx.StaticText(p,wx.ID_ANY,label='Please enter your Australian Synchrotron password')
p.GetSizer().Add(t)
pc=wx.TextCtrl(p,wx.ID_ANY,name='passwd_field',style=wx.TE_PASSWORD)
p.GetSizer().Add(pc,proportion=1,flag=wx.EXPAND)
self.GetSizer().Add(p,proportion=1,flag=wx.EXPAND|wx.ALL,border=15)
p=wx.Panel(self)
p.SetSizer(wx.BoxSizer(wx.HORIZONTAL))
p.GetSizer().Add((1,-1),proportion=1,flag=wx.EXPAND)
b=wx.Button(p,wx.ID_CANCEL,"Cancel")
b.Bind(wx.EVT_BUTTON,self.onClose)
p.GetSizer().Add(b,flag=wx.ALIGN_RIGHT|wx.ALL,border=15)
b=wx.Button(p,wx.ID_OK,"OK")
b.SetDefault()
b.Bind(wx.EVT_BUTTON,self.onClose)
p.GetSizer().Add(b,flag=wx.ALIGN_RIGHT|wx.ALL,border=15)
self.GetSizer().Add(p,flag=wx.EXPAND|wx.BOTTOM,border=10)
self.Fit()
def onClose(self,event):
rv=event.GetEventObject().GetId()
self.EndModal(rv)
def getUser(self):
return self.FindWindowByName('username_field').GetValue()
def getPasswd(self):
return self.FindWindowByName('passwd_field').GetValue()
def queryUserPass(self,queue,username=None):
dlg=ASyncAuth.UserPassDialog(parent=self.parent,id=wx.ID_ANY,user=username)
try:
wx.EndBusyCursor()
except:
pass
self.progressDialog.Hide()
if dlg.ShowModal()==wx.ID_OK:
username=dlg.getUser()
passwd=dlg.getPasswd()
queue.put((username,passwd))
else:
queue.put(None)
dlg.Destroy()
self.progressDialog.Show()
wx.BeginBusyCursor()
def getUpdateDict(self):
d={}
d['aaf_username']=self.username
return d
def __init__(self,s,authURL,parent,postFirst=None,extraParams=None,*args,**kwargs):
self.parent=parent
if kwargs.has_key('aaf_username'):
self.username=kwargs['aaf_username']
else:
self.username=None
self.passwd=None
self.destURL=authURL
self.postFirst=postFirst
if kwargs.has_key('progressDialog'):
self.progressDialog=kwargs['progressDialog']
else:
self.progressDialog=None
self.session=s
if extraParams!=None:
self.clientusername=extraParams['oauthclient']
self.clientpasswd=extraParams['oauthclientpasswd']
else:
raise Exception("There was an unknown error communicating with the Australian Synchtrotron Identity System. Please submit a debug report so we can work out what went wrong")
def gettoken(self):
retry = True
while retry:
import Queue
queue=Queue.Queue()
wx.CallAfter(self.queryUserPass,queue,self.username)
res=queue.get()
if res==None:
raise Exception("Login cancelled")
else:
(self.username,self.passwd)=res
logger.debug('queryIdPUserPass set values for user: %s.'%(self.username))
r=self.session.post(self.destURL,auth=(self.clientusername,self.clientpasswd),data={'grant_type': 'password','username':self.username,'password':self.passwd},verify=False)
if r.status_code==200:
data=json.loads(r.text)
retry=False
else:
logger.debug('AS portal reported %s %s'%(r.status_code,r.text))
return data['data']['access_token']