Skip to content

Commit 32b080f

Browse files
committed
Now using a pool for SSH ControlMaster processes, instead of associating them
with Upload or Verification models.
1 parent 9977c5f commit 32b080f

File tree

1 file changed

+163
-135
lines changed

1 file changed

+163
-135
lines changed

mydata/utils/openssh.py

Lines changed: 163 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,17 @@
2424
from datetime import datetime
2525
import errno
2626
import getpass
27+
import threading
28+
import time
2729

2830
from mydata.logs import logger
2931
from mydata.utils.exceptions import SshException
3032
from mydata.utils.exceptions import ScpException
3133
from mydata.utils.exceptions import StagingHostRefusedSshConnection
3234
from mydata.utils.exceptions import StagingHostSshPermissionDenied
3335
from mydata.utils.exceptions import PrivateKeyDoesNotExist
34-
from mydata.models.upload import HumanReadableSizeString
36+
from mydata.utils import PidIsRunning
37+
from mydata.utils import HumanReadableSizeString
3538

3639

3740
defaultStartupInfo = None
@@ -81,7 +84,6 @@ def __init__(self):
8184
self.sshKeyGen = f("bin", "ssh-keygen.exe")
8285
self.cipher = "arcfour"
8386
self.sh = f("bin", "sh.exe")
84-
self.pwd = f("bin", "pwd.exe")
8587
self.dd = f("bin", "dd.exe")
8688
self.preferToUseShellInSubprocess = False
8789

@@ -118,6 +120,27 @@ def __init__(self):
118120
# subprocess to quote the command lists correctly.
119121
self.preferToUseShellInSubprocess = True
120122

123+
def GetSshControlMasterPool(self, username=None, privateKeyFilePath=None,
124+
hostname=None, createIfMissing=True):
125+
"""
126+
-oControlMaster is only available in POSIX implementations of ssh.
127+
"""
128+
if sys.platform.startswith("win"):
129+
raise NotImplementedError("-oControlMaster is not implemented "
130+
"in MinGW or Cygwin builds of OpenSSH.")
131+
if not hasattr(self, "sshControlMasterPool"):
132+
if createIfMissing:
133+
if not hasattr(self, "createSshControlMasterPoolThreadingLock"):
134+
self.createSshControlMasterPoolThreadingLock = threading.Lock()
135+
self.createSshControlMasterPoolThreadingLock.acquire()
136+
self.sshControlMasterPool = \
137+
SshControlMasterPool(username, privateKeyFilePath,
138+
hostname)
139+
self.createSshControlMasterPoolThreadingLock.release()
140+
else:
141+
return None
142+
return self.sshControlMasterPool
143+
121144

122145
class KeyPair():
123146

@@ -350,120 +373,6 @@ def NewKeyPair(keyName=None,
350373
raise SshException(stdout)
351374

352375

353-
def GetSshMasterProcessAndControlPath(uploadOrVerificationModel, username,
354-
privateKeyFilePath, hostname):
355-
"""
356-
Unfortunately re-using an SSH connection with -oControlPath=...
357-
only works on POSIX systems, not on Windows.
358-
359-
To try to achieve a similar effect on Windows, we have the following
360-
options:
361-
362-
1. Use an SSH agent (ssh-agent.exe and ssh-add.exe are already bundled).
363-
Subsequent remote commands over SSH channels would still have some
364-
connection overhead (unlike the -oControlPath method), but reading the
365-
key from the agent should be faster than reading it from disk every time.
366-
367-
2. We can use a trick to slightly speed up SSH connection time:
368-
mkdir openssh-msys...\home\<username>\.ssh\
369-
where <username> can be determined from getpass.getuser()
370-
the Msys build of ssh looks here for known_hosts and repeatedly complains
371-
if this directory doesn't exist, so we can easily create it to keep Msys's
372-
SSH happy and hopefully speed up connections slightly.
373-
374-
3. Use bigger chunk sizes on Windows to ensure that we create new SSH
375-
connections less often. Maybe return to having two different upload
376-
methods - one for large files and one for small files.
377-
378-
4. Re-try the piping method (at least for small files). Basically,
379-
we repeatedly write chunks to a subprocess.stdin PIPE (one for each
380-
upload thread) and keep the "ssh staging_host cat >>" process open.
381-
This seemed to work OK on Windows previously, but it failed so dismally
382-
on Mac OS X (incomplete file transfers probably due to buffering), that
383-
I removed it (but we can resurrect it).
384-
385-
Whilst it will look messy to do different things for different OS's,
386-
the path of least resistance might be just to resurrect what was working
387-
on Windows before we did the Mac testing, and be careful to check which
388-
OS we're on before deciding which upload method (and submethod) to use.
389-
390-
So I think I'm voting in favour of 4 (piping for small files on Windows
391-
only) and 3 (the part about returning to different upload methods for
392-
different file sizes, at least on Windows). 2. is easy, so we can
393-
definitely do that, and might get some benefit, but
394-
"""
395-
396-
if sys.platform.startswith("win"):
397-
raise NotImplementedError("SSH connection caching is not implemented "
398-
"in MinGW or Cygwin builds of OpenSSH.")
399-
if uploadOrVerificationModel.GetSshMasterProcess():
400-
sshMasterProcess = uploadOrVerificationModel.GetSshMasterProcess()
401-
sshControlPath = uploadOrVerificationModel.GetSshControlPath()
402-
else:
403-
tempFile = tempfile.NamedTemporaryFile(delete=True)
404-
tempFile.close()
405-
if sys.platform.startswith("win"):
406-
sshControlPath = GetMsysPath(tempFile.name)
407-
else:
408-
sshControlPath = tempFile.name
409-
uploadOrVerificationModel.SetSshControlPath(sshControlPath)
410-
sshMasterProcessCommandString = \
411-
"%s -N -i %s -c %s " \
412-
"-oControlMaster=yes -oControlPath=%s " \
413-
"-oIdentitiesOnly=yes -oPasswordAuthentication=no " \
414-
"-oStrictHostKeyChecking=no " \
415-
"%s@%s" \
416-
% (openSSH.DoubleQuote(openSSH.ssh), privateKeyFilePath,
417-
openSSH.cipher,
418-
openSSH.DoubleQuote(sshControlPath),
419-
username, hostname)
420-
logger.debug(sshMasterProcessCommandString)
421-
proc = subprocess.Popen(
422-
sshMasterProcessCommandString,
423-
shell=openSSH.preferToUseShellInSubprocess,
424-
startupinfo=defaultStartupInfo,
425-
creationflags=defaultCreationFlags)
426-
427-
class SshMasterProcess():
428-
def __init__(self, proc, openSSH, sshControlPath,
429-
username, hostname,
430-
defaultStartupInfo, defaultCreationFlags):
431-
self.proc = proc
432-
self.openSSH = openSSH
433-
self.sshControlPath = sshControlPath
434-
self.username = username
435-
self.hostname = hostname
436-
self.defaultStartupInfo = defaultStartupInfo
437-
self.defaultCreationFlags = defaultCreationFlags
438-
self.pid = proc.pid
439-
440-
def terminate(self):
441-
logger.debug("Terminating SSH ControlMaster subprocess...")
442-
exitSshMasterProcessCommandString = \
443-
"%s -oControlPath=%s -O exit " \
444-
"%s@%s" \
445-
% (self.openSSH.DoubleQuote(self.openSSH.ssh),
446-
self.openSSH.DoubleQuote(self.sshControlPath),
447-
self.username, self.hostname)
448-
logger.debug(exitSshMasterProcessCommandString)
449-
proc = subprocess.Popen(
450-
exitSshMasterProcessCommandString,
451-
stdout=subprocess.PIPE,
452-
stderr=subprocess.STDOUT,
453-
shell=self.openSSH.preferToUseShellInSubprocess,
454-
startupinfo=self.defaultStartupInfo,
455-
creationflags=self.defaultCreationFlags)
456-
proc.communicate()
457-
logger.debug("Terminated SSH ControlMaster subprocess.")
458-
459-
sshMasterProcess = SshMasterProcess(proc, openSSH, sshControlPath,
460-
username, hostname,
461-
defaultStartupInfo, defaultCreationFlags)
462-
uploadOrVerificationModel.SetSshMasterProcess(sshMasterProcess)
463-
464-
return (sshMasterProcess, sshControlPath)
465-
466-
467376
def GetBytesUploadedToStaging(remoteFilePath, username, privateKeyFilePath,
468377
hostname, uploadOrVerificationModel):
469378
if sys.platform.startswith("win"):
@@ -482,10 +391,12 @@ def GetBytesUploadedToStaging(remoteFilePath, username, privateKeyFilePath,
482391
hostname,
483392
openSSH.DoubleQuote("wc -c %s" % quotedRemoteFilePath)]
484393
else:
485-
sshMasterProcess, sshControlPath = \
486-
GetSshMasterProcessAndControlPath(uploadOrVerificationModel,
487-
username,
488-
privateKeyFilePath, hostname)
394+
sshControlMasterPool = \
395+
openSSH.GetSshControlMasterPool(username, privateKeyFilePath,
396+
hostname)
397+
sshControlMasterProcess = \
398+
sshControlMasterPool.GetSshControlMasterProcess()
399+
sshControlPath = sshControlMasterProcess.GetControlPath()
489400

490401
# The authentication options below (-i privateKeyFilePath etc.)
491402
# shouldn't be necessary if the socket created by the SSH master
@@ -616,10 +527,6 @@ def UploadFile(filePath, fileSize, username, privateKeyFilePath,
616527
if foldersController.IsShuttingDown() or uploadModel.Canceled():
617528
logger.debug("UploadFile 1: Aborting upload for "
618529
"%s" % filePath)
619-
if sys.platform.startswith("darwin"):
620-
sshMasterProcess = uploadModel.GetSshMasterProcess()
621-
if sshMasterProcess:
622-
sshMasterProcess.terminate()
623530
return
624531
if bytesUploaded == fileSize:
625532
logger.debug("UploadFile returning because file \"%s\" has already "
@@ -677,9 +584,12 @@ def UploadFileFromPosixSystem(filePath, fileSize, username, privateKeyFilePath,
677584

678585
# logger.warning("Assuming that the remote shell is Bash.")
679586

680-
sshMasterProcess, sshControlPath = \
681-
GetSshMasterProcessAndControlPath(uploadModel, username,
682-
privateKeyFilePath, hostname)
587+
sshControlMasterPool = \
588+
openSSH.GetSshControlMasterPool(username, privateKeyFilePath,
589+
hostname)
590+
sshControlMasterProcess = \
591+
sshControlMasterPool.GetSshControlMasterProcess()
592+
sshControlPath = sshControlMasterProcess.GetControlPath()
683593

684594
remoteDir = os.path.dirname(remoteFilePath)
685595
quotedRemoteDir = openSSH.DoubleQuote(remoteDir)
@@ -758,7 +668,6 @@ def UploadFileFromPosixSystem(filePath, fileSize, username, privateKeyFilePath,
758668
if foldersController.IsShuttingDown() or uploadModel.Canceled():
759669
logger.debug("UploadFileFromPosixSystem 1: Aborting upload for "
760670
"%s" % filePath)
761-
sshMasterProcess.terminate()
762671
return
763672

764673
# Write chunk to temporary file:
@@ -870,7 +779,6 @@ def UploadFileFromPosixSystem(filePath, fileSize, username, privateKeyFilePath,
870779
if foldersController.IsShuttingDown() or uploadModel.Canceled():
871780
logger.debug("UploadFileFromPosixSystem 2: Aborting upload for "
872781
"%s" % filePath)
873-
sshMasterProcess.terminate()
874782
return
875783

876784
remoteRemoveChunkCommand = \
@@ -896,11 +804,8 @@ def UploadFileFromPosixSystem(filePath, fileSize, username, privateKeyFilePath,
896804
creationflags=defaultCreationFlags)
897805
stdout, _ = removeRemoteChunkProcess.communicate()
898806
if removeRemoteChunkProcess.returncode != 0:
899-
sshMasterProcess.terminate()
900807
raise SshException(stdout, removeRemoteChunkProcess.returncode)
901808

902-
sshMasterProcess.terminate()
903-
904809

905810
def UploadSmallFileFromWindows(filePath, fileSize, username,
906811
privateKeyFilePath, hostname, remoteFilePath,
@@ -1221,10 +1126,133 @@ def GetMsysPath(path):
12211126
raise Exception("OpenSSH.GetMsysPath: %s doesn't look like "
12221127
"a valid path." % path)
12231128

1129+
# Singleton instance of OpenSSH class:
12241130
openSSH = OpenSSH()
12251131
ssh = openSSH.ssh
12261132
scp = openSSH.scp
12271133
sshKeyGen = openSSH.sshKeyGen
1228-
if sys.platform.startswith("win"):
1229-
sh = openSSH.sh
1230-
pwd = openSSH.pwd
1134+
1135+
1136+
class SshControlMasterProcess():
1137+
"""
1138+
See "ControlMaster" in "man ssh_config"
1139+
Only available on POSIX systems.
1140+
"""
1141+
def __init__(self, username, privateKeyFilePath, hostname):
1142+
self.username = username
1143+
self.privateKeyFilePath = privateKeyFilePath
1144+
self.hostname = hostname
1145+
1146+
tempFile = tempfile.NamedTemporaryFile(delete=True)
1147+
tempFile.close()
1148+
if sys.platform.startswith("win"):
1149+
self.sshControlPath = GetMsysPath(tempFile.name)
1150+
else:
1151+
self.sshControlPath = tempFile.name
1152+
sshControlMasterProcessCommandString = \
1153+
"%s -N -i %s -c %s " \
1154+
"-oControlMaster=yes -oControlPath=%s " \
1155+
"-oIdentitiesOnly=yes -oPasswordAuthentication=no " \
1156+
"-oStrictHostKeyChecking=no " \
1157+
"%s@%s" \
1158+
% (openSSH.DoubleQuote(openSSH.ssh), privateKeyFilePath,
1159+
openSSH.cipher,
1160+
openSSH.DoubleQuote(self.sshControlPath),
1161+
username, hostname)
1162+
logger.debug(sshControlMasterProcessCommandString)
1163+
self.proc = subprocess.Popen(
1164+
sshControlMasterProcessCommandString,
1165+
shell=openSSH.preferToUseShellInSubprocess,
1166+
startupinfo=defaultStartupInfo,
1167+
creationflags=defaultCreationFlags)
1168+
self.pid = self.proc.pid
1169+
1170+
def Check(self):
1171+
checkSshControlMasterProcessCommandString = \
1172+
"%s -oControlPath=%s -O check " \
1173+
"%s@%s" \
1174+
% (openSSH.DoubleQuote(openSSH.ssh),
1175+
openSSH.DoubleQuote(self.sshControlPath),
1176+
self.username, self.hostname)
1177+
logger.debug(checkSshControlMasterProcessCommandString)
1178+
proc = subprocess.Popen(
1179+
checkSshControlMasterProcessCommandString,
1180+
stdout=subprocess.PIPE,
1181+
stderr=subprocess.STDOUT,
1182+
shell=openSSH.preferToUseShellInSubprocess,
1183+
startupinfo=defaultStartupInfo,
1184+
creationflags=defaultCreationFlags)
1185+
proc.communicate()
1186+
return (proc.returncode == 0)
1187+
1188+
def Exit(self):
1189+
exitSshControlMasterProcessCommandString = \
1190+
"%s -oControlPath=%s -O exit " \
1191+
"%s@%s" \
1192+
% (openSSH.DoubleQuote(openSSH.ssh),
1193+
openSSH.DoubleQuote(self.sshControlPath),
1194+
self.username, self.hostname)
1195+
logger.debug(exitSshControlMasterProcessCommandString)
1196+
proc = subprocess.Popen(
1197+
exitSshControlMasterProcessCommandString,
1198+
stdout=subprocess.PIPE,
1199+
stderr=subprocess.STDOUT,
1200+
shell=openSSH.preferToUseShellInSubprocess,
1201+
startupinfo=defaultStartupInfo,
1202+
creationflags=defaultCreationFlags)
1203+
proc.communicate()
1204+
1205+
def GetControlPath(self):
1206+
return self.sshControlPath
1207+
1208+
def GetPid(self):
1209+
return self.pid
1210+
1211+
1212+
class SshControlMasterPool():
1213+
"""
1214+
Re-using an SSH connection with -oControlPath=...
1215+
only works on POSIX systems, not on Windows.
1216+
1217+
To avoid having too many frequent SSH connections on Windows, we can
1218+
use larger chunk sizes (see UploadLargeFileFromWindows).
1219+
"""
1220+
1221+
def __init__(self, username, privateKeyFilePath, hostname):
1222+
if sys.platform.startswith("win"):
1223+
raise NotImplementedError("-oControlMaster is not implemented "
1224+
"in MinGW or Cygwin builds of OpenSSH.")
1225+
self.username = username
1226+
self.privateKeyFilePath = privateKeyFilePath
1227+
self.hostname = hostname
1228+
# self.maxConnections should be less than
1229+
# MaxSessions in staging server's sshd_config
1230+
self.maxConnections = 5
1231+
self.sshControlMasterProcesses = []
1232+
self.timeout = 1
1233+
1234+
def GetSshControlMasterProcess(self):
1235+
for sshControlMasterProcess in self.sshControlMasterProcesses:
1236+
if sshControlMasterProcess.Check():
1237+
return sshControlMasterProcess
1238+
if len(self.sshControlMasterProcesses) < self.maxConnections:
1239+
newSshControlMasterProcess = \
1240+
SshControlMasterProcess(self.username, self.privateKeyFilePath,
1241+
self.hostname)
1242+
self.sshControlMasterProcesses.append(newSshControlMasterProcess)
1243+
return newSshControlMasterProcess
1244+
else:
1245+
wait = 0
1246+
while wait < self.timeout:
1247+
time.sleep(0.1)
1248+
wait += 0.1
1249+
for sshControlMasterProcess in self.sshControlMasterProcesses:
1250+
if sshControlMasterProcess.Check():
1251+
return sshControlMasterProcess
1252+
raise Exception("Exceeded max connections in SshControlMasterPool")
1253+
1254+
def ShutDown(self):
1255+
for sshControlMasterProcess in self.sshControlMasterProcesses:
1256+
if PidIsRunning(sshControlMasterProcess.GetPid()):
1257+
sshControlMasterProcess.Exit()
1258+
self.sshControlMasterProcesses = []

0 commit comments

Comments
 (0)