diff --git a/changelog.markdown b/changelog.markdown index afac2b54c..ff781fc05 100644 --- a/changelog.markdown +++ b/changelog.markdown @@ -5,12 +5,13 @@ ## 0.14.0 -This is a major release containing four new features: +This is a major release containing two big features: - A new dependencies development mode. - Support for lock files. - Download tarballs when downloading packages from GitHub. -- A setup command. +- Parallel downloads of the locked dependencies. +- Setup command. ## 0.13.0 diff --git a/src/nimble.nim b/src/nimble.nim index 93393f0f5..bd16c713f 100644 --- a/src/nimble.nim +++ b/src/nimble.nim @@ -4,7 +4,7 @@ import system except TResult import os, tables, strtabs, json, algorithm, sets, uri, sugar, sequtils, osproc, - strformat + strformat, asyncdispatch import std/options as std_opt @@ -27,6 +27,9 @@ const gitIgnoreFileName = ".gitignore" hgIgnoreFileName = ".hgignore" +type + DownloadInfo = (DownloadMethod, string, Table[string, string]) + proc refresh(options: Options) = ## Downloads the package list from the specified URL. ## @@ -61,10 +64,66 @@ proc initPkgList(pkgInfo: PackageInfo, options: Options): seq[PackageInfo] = {.warning[ProveInit]: on.} proc install(packages: seq[PkgTuple], options: Options, - doPrompt, first, fromLockFile: bool): PackageDependenciesInfo + doPrompt, first, fromLockFile: bool): + Future[PackageDependenciesInfo] {.async.} + +proc getDownloadInfo(pv: PkgTuple, options: Options, doPrompt: bool): + DownloadInfo + +proc checkForAlreadyInstalledPkg(dep: PkgTuple, pkgList: seq[PackageInfo], + resolvedDep: var PkgTuple, pkg: var PackageInfo, options: Options): bool = + resolvedDep = dep.resolveAlias(options) + display("Checking", "for $1" % $resolvedDep, priority = MediumPriority) + result = findPkg(pkgList, resolvedDep, pkg) + # Check if the original name exists. + if not result and resolvedDep.name != dep.name: + display("Checking", "for $1" % $dep, priority = MediumPriority) + result = findPkg(pkgList, dep, pkg) + if result: + displayWarning(&"Installed package {dep.name} should be renamed to " & + resolvedDep.name) proc processFreeDependencies(pkgInfo: PackageInfo, options: Options): - HashSet[PackageInfo] = + Future[HashSet[PackageInfo]] {.async.} + +proc processFreeDependenciesOfAlreadyInstalledPkg( + resultSet: ptr HashSet[PackageInfo], + reverseDependencies: ptr seq[PackageBasicInfo], + dep: PkgTuple, pkg: PackageInfo, options: Options): Future[void] {.async.} = + displayInfo(pkgDepsAlreadySatisfiedMsg(dep)) + resultSet[].incl pkg + # Process the dependencies of this dependency. + resultSet[].incl await processFreeDependencies( + pkg.toFullInfo(options), options) + if not pkg.isLink: + reverseDependencies[].add(pkg.basicInfo) + +proc addDepsToResultSet(deps: HashSet[PackageInfo], + resultSet: ptr HashSet[PackageInfo]) = + for dep in deps: + if resultSet[].contains dep: + # If the package already exists in the result set we had to merge its + # special versions set into the special versions set of the existing + # one. + resultSet[][dep].metaData.specialVersions.incl( + dep.metaData.specialVersions) + else: + resultSet[].incl dep + +proc awaitInstallFutures(futures: seq[Future[PackageDependenciesInfo]], + pkgList: ptr seq[PackageInfo], + reverseDependencies: ptr seq[PackageBasicInfo], + resultSet: ptr HashSet[PackageInfo]): + Future[void] {.async.} = + var installResults = if futures.len > 0: await all(futures) else: @[] + for (deps, pkg) in mitems(installResults): + addDepsToResultSet(deps, resultSet) + fillMetaData(pkg, pkg.getRealDir(), false) + pkgList[].add pkg + reverseDependencies[].add(pkg.basicInfo) + +proc processFreeDependencies(pkgInfo: PackageInfo, options: Options): + Future[HashSet[PackageInfo]] {.async.} = ## Verifies and installs dependencies. ## ## Returns set of PackageInfo (for paths) to pass to the compiler @@ -80,7 +139,13 @@ proc processFreeDependencies(pkgInfo: PackageInfo, options: Options): [pkgInfo.basicInfo.name, $pkgInfo.basicInfo.version], priority = HighPriority) - var reverseDependencies: seq[PackageBasicInfo] = @[] + var + reverseDependencies: seq[PackageBasicInfo] = @[] + installFutures {.global.}: + Table[string, seq[Future[PackageDependenciesInfo]]] + currentlyWaitingFutures: seq[Future[PackageDependenciesInfo]] + dependenciesToCheckAgain: seq[PkgTuple] + for dep in pkgInfo.requires: if dep.name == "nimrod" or dep.name == "nim": let nimVer = getNimrodVersion(options) @@ -88,46 +153,52 @@ proc processFreeDependencies(pkgInfo: PackageInfo, options: Options): let msg = "Unsatisfied dependency: " & dep.name & " (" & $dep.ver & ")" raise nimbleError(msg) else: - let resolvedDep = dep.resolveAlias(options) - display("Checking", "for $1" % $resolvedDep, priority = MediumPriority) - var pkg = initPackageInfo() - var found = findPkg(pkgList, resolvedDep, pkg) - # Check if the original name exists. - if not found and resolvedDep.name != dep.name: - display("Checking", "for $1" % $dep, priority = MediumPriority) - found = findPkg(pkgList, dep, pkg) - if found: - displayWarning(&"Installed package {dep.name} should be renamed to " & - resolvedDep.name) - + var + pkg = initPackageInfo() + resolvedDep: PkgTuple + let found = checkForAlreadyInstalledPkg( + dep, pkgList, resolvedDep, pkg, options) if not found: - display("Installing", $resolvedDep, priority = HighPriority) - let toInstall = @[(resolvedDep.name, resolvedDep.ver)] - let (packages, installedPkg) = install(toInstall, options, - doPrompt = false, first = false, fromLockFile = false) - - for pkg in packages: - if result.contains pkg: - # If the result already contains the newly tried to install package - # we had to merge its special versions set into the set of the old - # one. - result[pkg].metaData.specialVersions.incl( - pkg.metaData.specialVersions) - else: - result.incl pkg - - pkg = installedPkg # For addRevDep - fillMetaData(pkg, pkg.getRealDir(), false) - - # This package has been installed so we add it to our pkgList. - pkgList.add pkg + let(_, resolvedDepUrl, _) = getDownloadInfo(resolvedDep, options, true) + let url = resolvedDepUrl.removeTrailingGitString + if installFutures.hasKey url: + currentlyWaitingFutures.add installFutures[url] + dependenciesToCheckAgain.add resolvedDep + else: + display("Installing", $resolvedDep, priority = HighPriority) + let future = install(@[resolvedDep], options, + doPrompt = false, first = false, fromLockFile = false) + installFutures[url] = @[future] + currentlyWaitingFutures.add future else: - displayInfo(pkgDepsAlreadySatisfiedMsg(dep)) - result.incl pkg - # Process the dependencies of this dependency. - result.incl processFreeDependencies(pkg.toFullInfo(options), options) - if not pkg.isLink: - reverseDependencies.add(pkg.basicInfo) + await processFreeDependenciesOfAlreadyInstalledPkg( + result.addr, reverseDependencies.addr, dep, pkg, options) + + await awaitInstallFutures(currentlyWaitingFutures, + pkgList.addr, reverseDependencies.addr, result.addr) + + currentlyWaitingFutures.setLen(0) + + for dep in dependenciesToCheckAgain: + var + pkg = initPackageInfo() + resolvedDep: PkgTuple + let found = checkForAlreadyInstalledPkg( + dep, pkgList, resolvedDep, pkg, options) + if not found: + let(_, resolvedDepUrl, _) = getDownloadInfo(resolvedDep, options, true) + let url = resolvedDepUrl.removeTrailingGitString + display("Installing", $resolvedDep, priority = HighPriority) + let future = install(@[resolvedDep], options, + doPrompt = false, first = false, fromLockFile = false) + installFutures[url].add future + currentlyWaitingFutures.add future + else: + await processFreeDependenciesOfAlreadyInstalledPkg( + result.addr, reverseDependencies.addr, dep, pkg, options) + + await awaitInstallFutures(currentlyWaitingFutures, + pkgList.addr, reverseDependencies.addr, result.addr) # Check if two packages of the same name (but different version) are listed # in the path. @@ -295,12 +366,12 @@ proc processAllDependencies(pkgInfo: PackageInfo, options: Options): if pkgInfo.lockedDeps.len > 0: pkgInfo.processLockedDependencies(options) else: - pkgInfo.processFreeDependencies(options) + waitFor pkgInfo.processFreeDependencies(options) proc installFromDir(dir: string, requestedVer: VersionRange, options: Options, url: string, first: bool, fromLockFile: bool, vcsRevision = notSetSha1Hash): - PackageDependenciesInfo = + Future[PackageDependenciesInfo] {.async.} = ## Returns where package has been installed to, together with paths ## to the packages this package depends on. ## @@ -343,7 +414,7 @@ proc installFromDir(dir: string, requestedVer: VersionRange, options: Options, if first and pkgInfo.lockedDeps.len > 0: result.deps = pkgInfo.processLockedDependencies(depsOptions) elif not fromLockFile: - result.deps = pkgInfo.processFreeDependencies(depsOptions) + result.deps = await pkgInfo.processFreeDependencies(depsOptions) if options.depsOnly: result.pkg = pkgInfo @@ -478,14 +549,21 @@ proc getDependency(name: string, dep: LockFileDep, options: Options): getInstalledPackageMin(depDirName, nimbleFilePath).toFullInfo(options) type - DownloadInfo = ref object + LockedDepDownloadInfo = ref object ## Information for a downloaded dependency needed for installation. name: string dependency: LockFileDep url: string version: VersionRange downloadDir: string - vcsRevision: Sha1Hash + vcsRevision: Sha1HashRef + + DownloadQueue = ref seq[tuple[name: string, dep: LockFileDep]] + ## A queue of dependencies from the lock file which to be downloaded. + + DownloadResults = ref seq[LockedDepDownloadInfo] + ## A list of `LockedDepDownloadInfo` objects used for installing the + ## downloaded dependencies. proc developWithDependencies(options: Options): bool = ## Determines whether the current executed action is a develop sub-command @@ -514,8 +592,8 @@ proc raiseCannotCloneInExistingDirException(downloadDir: string) = raise nimbleError(msg, hint) proc downloadDependency(name: string, dep: LockFileDep, options: Options): - DownloadInfo = - ## Downloads a dependency from the lock file. + Future[LockedDepDownloadInfo] {.async.} = + ## Asynchronously downloads a dependency from the lock file. if not options.developWithDependencies: let depDirName = getDependencyDir(name, dep, options) @@ -533,18 +611,18 @@ proc downloadDependency(name: string, dep: LockFileDep, options: Options): if options.developWithDependencies: displayWarning(skipDownloadingInAlreadyExistingDirectoryMsg( downloadPath, name)) - result = DownloadInfo( + result = LockedDepDownloadInfo( name: name, dependency: dep, url: url, version: version, downloadDir: downloadPath, - vcsRevision: dep.vcsRevision) + vcsRevision: dep.vcsRevision.newClone) return else: raiseCannotCloneInExistingDirException(downloadPath) - let (downloadDir, _, vcsRevision) = downloadPkg( + let (downloadDir, _, vcsRevision) = await downloadPkg( url, version, dep.downloadMethod, subdir, options, downloadPath, dep.vcsRevision) @@ -553,7 +631,7 @@ proc downloadDependency(name: string, dep: LockFileDep, options: Options): raise checksumError(name, dep.version, dep.vcsRevision, downloadedPackageChecksum, dep.checksums.sha1) - result = DownloadInfo( + result = LockedDepDownloadInfo( name: name, dependency: dep, url: url, @@ -561,17 +639,17 @@ proc downloadDependency(name: string, dep: LockFileDep, options: Options): downloadDir: downloadDir, vcsRevision: vcsRevision) -proc installDependency(pkgInfo: PackageInfo, downloadInfo: DownloadInfo, +proc installDependency(pkgInfo: PackageInfo, downloadInfo: LockedDepDownloadInfo, options: Options): PackageInfo = ## Installs an already downloaded dependency of the package `pkgInfo`. - let (_, newlyInstalledPkgInfo) = installFromDir( + let (_, newlyInstalledPkgInfo) = waitFor installFromDir( downloadInfo.downloadDir, downloadInfo.version, options, downloadInfo.url, first = false, fromLockFile = true, - downloadInfo.vcsRevision) + downloadInfo.vcsRevision[]) downloadInfo.downloadDir.removeDir @@ -583,6 +661,31 @@ proc installDependency(pkgInfo: PackageInfo, downloadInfo: DownloadInfo, return newlyInstalledPkgInfo +proc startDownloadWorker(queue: DownloadQueue, options: Options, + downloadResults: DownloadResults) {.async.} = + ## Starts a new download worker. + while queue[].len > 0: + let download = queue[].pop + let index = queue[].len + downloadResults[index] = await downloadDependency( + download.name, download.dep, options) + +proc lockedDepsDownload(dependenciesToDownload: DownloadQueue, + options: Options): DownloadResults = + ## By given queue with dependencies to download performs the downloads and + ## returns the result objects. + + result.new + result[].setLen(dependenciesToDownload[].len) + + var downloadWorkers: seq[Future[void]] + let workersCount = min( + options.maxParallelDownloads, dependenciesToDownload[].len) + for i in 0 ..< workersCount: + downloadWorkers.add startDownloadWorker( + dependenciesToDownload, options, result) + waitFor all(downloadWorkers) + proc processLockedDependencies(pkgInfo: PackageInfo, options: Options): HashSet[PackageInfo] = # Returns a hash set with `PackageInfo` of all packages from the lock file of @@ -593,18 +696,23 @@ proc processLockedDependencies(pkgInfo: PackageInfo, options: Options): let developModeDeps = getDevelopDependencies(pkgInfo, options) + var dependenciesToDownload: DownloadQueue + dependenciesToDownload.new + for name, dep in pkgInfo.lockedDeps: if developModeDeps.hasKey(name): result.incl developModeDeps[name][] elif isInstalled(name, dep, options): result.incl getDependency(name, dep, options) else: - let downloadResult = downloadDependency(name, dep, options) - result.incl installDependency(pkgInfo, downloadResult, options) + dependenciesToDownload[].add (name, dep) + + let downloadResults = lockedDepsDownload(dependenciesToDownload, options) + for downloadResult in downloadResults[]: + result.incl installDependency(pkgInfo, downloadResult, options) -proc getDownloadInfo*(pv: PkgTuple, options: Options, - doPrompt: bool): (DownloadMethod, string, - Table[string, string]) = +proc getDownloadInfo(pv: PkgTuple, options: Options, doPrompt: bool): + DownloadInfo = if pv.name.isURL: let (url, metadata) = getUrlData(pv.name) return (checkUrlType(url), url, metadata) @@ -629,7 +737,8 @@ proc getDownloadInfo*(pv: PkgTuple, options: Options, raise nimbleError(pkgNotFoundMsg(pv)) proc install(packages: seq[PkgTuple], options: Options, - doPrompt, first, fromLockFile: bool): PackageDependenciesInfo = + doPrompt, first, fromLockFile: bool): + Future[PackageDependenciesInfo] {.async.} = ## ``first`` ## True if this is the first level of the indirect recursion. ## ``fromLockFile`` @@ -641,19 +750,19 @@ proc install(packages: seq[PkgTuple], options: Options, displayWarning( "Installing a package which currently has develop mode dependencies." & "\nThey will be ignored and installed as normal packages.") - result = installFromDir(currentDir, newVRAny(), options, "", first, - fromLockFile) + result = await installFromDir(currentDir, newVRAny(), options, "", first, + fromLockFile) else: # Install each package. for pv in packages: let (meth, url, metadata) = getDownloadInfo(pv, options, doPrompt) let subdir = metadata.getOrDefault("subdir") let (downloadDir, downloadVersion, vcsRevision) = - downloadPkg(url, pv.ver, meth, subdir, options, - downloadPath = "", vcsRevision = notSetSha1Hash) + await downloadPkg(url, pv.ver, meth, subdir, options, + downloadPath = "", vcsRevision = notSetSha1Hash) try: - result = installFromDir(downloadDir, pv.ver, options, url, - first, fromLockFile, vcsRevision) + result = await installFromDir(downloadDir, pv.ver, options, url, + first, fromLockFile, vcsRevision[]) except BuildFailed as error: # The package failed to build. # Check if we tried building a tagged version of the package. @@ -669,8 +778,8 @@ proc install(packages: seq[PkgTuple], options: Options, [pv.name, $downloadVersion]) if promptResult: let toInstall = @[(pv.name, headVer.toVersionRange())] - result = install(toInstall, options, doPrompt, first, - fromLockFile = false) + result = await install(toInstall, options, doPrompt, first, + fromLockFile = false) else: raise buildFailed( "Aborting installation due to build failure.", details = error) @@ -1205,8 +1314,8 @@ proc installDevelopPackage(pkgTup: PkgTuple, options: var Options): else: pkgTup.ver - discard downloadPkg(url, ver, meth, subdir, options, downloadDir, - vcsRevision = notSetSha1Hash) + discard waitFor downloadPkg(url, ver, meth, subdir, options, downloadDir, + vcsRevision = notSetSha1Hash) let pkgDir = downloadDir / subdir var pkgInfo = getPkgInfo(pkgDir, options) @@ -1220,12 +1329,18 @@ proc developLockedDependencies(pkgInfo: PackageInfo, alreadyDownloaded: var HashSet[string], options: var Options) = ## Downloads for develop the dependencies from the lock file. + var dependenciesToDownload: DownloadQueue + dependenciesToDownload.new + for name, dep in pkgInfo.lockedDeps: if dep.url.removeTrailingGitString notin alreadyDownloaded: - let downloadResult = downloadDependency(name, dep, options) - alreadyDownloaded.incl downloadResult.url.removeTrailingGitString - options.action.devActions.add( - (datAdd, downloadResult.downloadDir.normalizedPath)) + dependenciesToDownload[].add (name, dep) + + let downloadResults = lockedDepsDownload(dependenciesToDownload, options) + for downloadResult in downloadResults[]: + alreadyDownloaded.incl downloadResult.url.removeTrailingGitString + options.action.devActions.add( + (datAdd, downloadResult.downloadDir.normalizedPath)) proc check(alreadyDownloaded: HashSet[string], dep: PkgTuple, options: Options): bool = @@ -1545,7 +1660,7 @@ proc lock(options: Options) = let doesLockFileExist = displayLockOperationStart(currentDir) validateDevModeDepsWorkingCopiesBeforeLock(pkgInfo, options) - let dependencies = pkgInfo.processFreeDependencies(options).map( + let dependencies = (waitFor pkgInfo.processFreeDependencies(options)).map( pkg => pkg.toFullInfo(options)).toSeq pkgInfo.validateDevelopDependenciesVersionRanges(dependencies, options) var dependencyGraph = buildDependencyGraph(dependencies, options) @@ -1833,10 +1948,10 @@ proc doAction(options: var Options) = of actionRefresh: refresh(options) of actionInstall: - let (_, pkgInfo) = install(options.action.packages, options, - doPrompt = true, - first = true, - fromLockFile = false) + let (_, pkgInfo) = waitFor install(options.action.packages, options, + doPrompt = true, + first = true, + fromLockFile = false) if options.action.packages.len == 0: nimScriptHint(pkgInfo) if pkgInfo.foreignDeps.len > 0: @@ -1921,7 +2036,6 @@ when isMainModule: except CatchableError as error: exitCode = QuitFailure displayTip() - echo error.getStackTrace() displayError(error) finally: try: diff --git a/src/nimblepkg/asynctools/asyncpipe.nim b/src/nimblepkg/asynctools/asyncpipe.nim new file mode 100644 index 000000000..ac2e8b9b9 --- /dev/null +++ b/src/nimblepkg/asynctools/asyncpipe.nim @@ -0,0 +1,536 @@ +# +# +# Asynchronous tools for Nim Language +# (c) Copyright 2016 Eugene Kabanov +# +# See the file "LICENSE", included in this +# distribution, for details about the copyright. +# + +## This module implements cross-platform asynchronous pipes communication. +## +## Module uses named pipes for Windows, and anonymous pipes for +## Linux/BSD/MacOS. +## +## .. code-block:: nim +## var inBuffer = newString(64) +## var outBuffer = "TEST STRING BUFFER" +## +## # Create new pipe +## var o = createPipe() +## +## # Write string to pipe +## waitFor write(o, cast[pointer](addr outBuffer[0]), outBuffer.len) +## +## # Read data from pipe +## var c = waitFor readInto(o, cast[pointer](addr inBuffer[0]), inBuffer.len) +## +## inBuffer.setLen(c) +## doAssert(inBuffer == outBuffer) +## +## # Close pipe +## close(o) + +import asyncdispatch, os + +when defined(nimdoc): + type + AsyncPipe* = ref object ## Object represents ``AsyncPipe``. + + proc createPipe*(register = true): AsyncPipe = + ## Create descriptor pair for interprocess communication. + ## + ## Returns ``AsyncPipe`` object, which represents OS specific pipe. + ## + ## If ``register`` is `false`, both pipes will not be registered with + ## current dispatcher. + + proc closeRead*(pipe: AsyncPipe, unregister = true) = + ## Closes read side of pipe ``pipe``. + ## + ## If ``unregister`` is `false`, pipe will not be unregistered from + ## current dispatcher. + + proc closeWrite*(pipe: AsyncPipe, unregister = true) = + ## Closes write side of pipe ``pipe``. + ## + ## If ``unregister`` is `false`, pipe will not be unregistered from + ## current dispatcher. + + proc getReadHandle*(pipe: AsyncPipe): int = + ## Returns OS specific handle for read side of pipe ``pipe``. + + proc getWriteHandle*(pipe: AsyncPipe): int = + ## Returns OS specific handle for write side of pipe ``pipe``. + + proc getHandles*(pipe: AsyncPipe): array[2, Handle] = + ## Returns OS specific handles of ``pipe``. + + proc getHandles*(pipe: AsyncPipe): array[2, cint] = + ## Returns OS specific handles of ``pipe``. + + proc close*(pipe: AsyncPipe, unregister = true) = + ## Closes both ends of pipe ``pipe``. + ## + ## If ``unregister`` is `false`, pipe will not be unregistered from + ## current dispatcher. + + proc write*(pipe: AsyncPipe, data: pointer, nbytes: int): Future[int] = + ## This procedure writes an untyped ``data`` of ``size`` size to the + ## pipe ``pipe``. + ## + ## The returned future will complete once ``all`` data has been sent or + ## part of the data has been sent. + + proc readInto*(pipe: AsyncPipe, data: pointer, nbytes: int): Future[int] = + ## This procedure reads up to ``size`` bytes from pipe ``pipe`` + ## into ``data``, which must at least be of that size. + ## + ## Returned future will complete once all the data requested is read or + ## part of the data has been read. + + proc asyncWrap*(readHandle: Handle|cint = 0, + writeHandle: Handle|cint = 0): AsyncPipe = + ## Wraps existing OS specific pipe handles to ``AsyncPipe`` and register + ## it with current dispatcher. + ## + ## ``readHandle`` - read side of pipe (optional value). + ## ``writeHandle`` - write side of pipe (optional value). + ## **Note**: At least one handle must be specified. + ## + ## Returns ``AsyncPipe`` object. + ## + ## Windows handles must be named pipes created with ``CreateNamedPipe`` and + ## ``FILE_FLAG_OVERLAPPED`` in flags. You can use ``ReopenFile()`` function + ## to convert existing handle to overlapped variant. + ## + ## Posix handle will be modified with ``O_NONBLOCK``. + + proc asyncUnwrap*(pipe: AsyncPipe) = + ## Unregisters ``pipe`` handle from current async dispatcher. + + proc `$`*(pipe: AsyncPipe) = + ## Returns string representation of ``AsyncPipe`` object. + +else: + + when defined(windows): + import winlean + else: + import posix + + type + AsyncPipe* = ref object of RootRef + when defined(windows): + readPipe: Handle + writePipe: Handle + else: + readPipe: cint + writePipe: cint + + when defined(windows): + + proc QueryPerformanceCounter(res: var int64) + {.importc: "QueryPerformanceCounter", stdcall, dynlib: "kernel32".} + proc connectNamedPipe(hNamedPipe: Handle, lpOverlapped: pointer): WINBOOL + {.importc: "ConnectNamedPipe", stdcall, dynlib: "kernel32".} + + when not declared(PCustomOverlapped): + type + PCustomOverlapped = CustomRef + + const + pipeHeaderName = r"\\.\pipe\asyncpipe_" + + const + DEFAULT_PIPE_SIZE = 65536'i32 + FILE_FLAG_FIRST_PIPE_INSTANCE = 0x00080000'i32 + PIPE_WAIT = 0x00000000'i32 + PIPE_TYPE_BYTE = 0x00000000'i32 + PIPE_READMODE_BYTE = 0x00000000'i32 + ERROR_PIPE_CONNECTED = 535 + ERROR_PIPE_BUSY = 231 + ERROR_BROKEN_PIPE = 109 + ERROR_PIPE_NOT_CONNECTED = 233 + + proc `$`*(pipe: AsyncPipe): string = + result = "AsyncPipe [read = " & $(cast[uint](pipe.readPipe)) & + ", write = " & $(cast[int](pipe.writePipe)) & "]" + + proc createPipe*(register = true): AsyncPipe = + + var number = 0'i64 + var pipeName: WideCString + var pipeIn: Handle + var pipeOut: Handle + var sa = SECURITY_ATTRIBUTES(nLength: sizeof(SECURITY_ATTRIBUTES).cint, + lpSecurityDescriptor: nil, bInheritHandle: 1) + while true: + QueryPerformanceCounter(number) + let p = pipeHeaderName & $number + pipeName = newWideCString(p) + var openMode = FILE_FLAG_FIRST_PIPE_INSTANCE or FILE_FLAG_OVERLAPPED or + PIPE_ACCESS_INBOUND + var pipeMode = PIPE_TYPE_BYTE or PIPE_READMODE_BYTE or PIPE_WAIT + pipeIn = createNamedPipe(pipeName, openMode, pipeMode, 1'i32, + DEFAULT_PIPE_SIZE, DEFAULT_PIPE_SIZE, + 1'i32, addr sa) + if pipeIn == INVALID_HANDLE_VALUE: + let err = osLastError() + if err.int32 != ERROR_PIPE_BUSY: + raiseOsError(err) + else: + break + + var openMode = (FILE_WRITE_DATA or SYNCHRONIZE) + pipeOut = createFileW(pipeName, openMode, 0, addr(sa), OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, 0) + if pipeOut == INVALID_HANDLE_VALUE: + let err = osLastError() + discard closeHandle(pipeIn) + raiseOsError(err) + + result = AsyncPipe(readPipe: pipeIn, writePipe: pipeOut) + + var ovl = PCustomOverlapped() + let res = connectNamedPipe(pipeIn, cast[pointer](ovl)) + if res == 0: + let err = osLastError() + if err.int32 == ERROR_PIPE_CONNECTED: + discard + elif err.int32 == ERROR_IO_PENDING: + var bytesRead = 0.Dword + if getOverlappedResult(pipeIn, cast[POVERLAPPED](ovl), bytesRead, 1) == 0: + let oerr = osLastError() + discard closeHandle(pipeIn) + discard closeHandle(pipeOut) + raiseOsError(oerr) + else: + discard closeHandle(pipeIn) + discard closeHandle(pipeOut) + raiseOsError(err) + + if register: + register(AsyncFD(pipeIn)) + register(AsyncFD(pipeOut)) + + proc asyncWrap*(readHandle = Handle(0), + writeHandle = Handle(0)): AsyncPipe = + doAssert(readHandle != 0 or writeHandle != 0) + + result = AsyncPipe(readPipe: readHandle, writePipe: writeHandle) + if result.readPipe != 0: + register(AsyncFD(result.readPipe)) + if result.writePipe != 0: + register(AsyncFD(result.writePipe)) + + proc asyncUnwrap*(pipe: AsyncPipe) = + if pipe.readPipe != 0: + unregister(AsyncFD(pipe.readPipe)) + if pipe.writePipe != 0: + unregister(AsyncFD(pipe.writePipe)) + + proc getReadHandle*(pipe: AsyncPipe): Handle {.inline.} = + result = pipe.readPipe + + proc getWriteHandle*(pipe: AsyncPipe): Handle {.inline.} = + result = pipe.writePipe + + proc getHandles*(pipe: AsyncPipe): array[2, Handle] {.inline.} = + result = [pipe.readPipe, pipe.writePipe] + + proc closeRead*(pipe: AsyncPipe, unregister = true) = + if pipe.readPipe != 0: + if unregister: + unregister(AsyncFD(pipe.readPipe)) + if closeHandle(pipe.readPipe) == 0: + raiseOsError(osLastError()) + pipe.readPipe = 0 + + proc closeWrite*(pipe: AsyncPipe, unregister = true) = + if pipe.writePipe != 0: + if unregister: + unregister(AsyncFD(pipe.writePipe)) + if closeHandle(pipe.writePipe) == 0: + raiseOsError(osLastError()) + pipe.writePipe = 0 + + proc close*(pipe: AsyncPipe, unregister = true) = + closeRead(pipe, unregister) + closeWrite(pipe, unregister) + + proc write*(pipe: AsyncPipe, data: pointer, nbytes: int): Future[int] = + var retFuture = newFuture[int]("asyncpipe.write") + var ol = PCustomOverlapped() + + if pipe.writePipe == 0: + retFuture.fail(newException(ValueError, + "Write side of pipe closed or not available")) + else: + GC_ref(ol) + ol.data = CompletionData(fd: AsyncFD(pipe.writePipe), cb: + proc (fd: AsyncFD, bytesCount: DWord, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + retFuture.complete(bytesCount) + else: + retFuture.fail(newException(OSError, osErrorMsg(errcode))) + ) + let res = writeFile(pipe.writePipe, data, nbytes.int32, nil, + cast[POVERLAPPED](ol)).bool + if not res: + let errcode = osLastError() + if errcode.int32 != ERROR_IO_PENDING: + GC_unref(ol) + retFuture.fail(newException(OSError, osErrorMsg(errcode))) + return retFuture + + proc readInto*(pipe: AsyncPipe, data: pointer, nbytes: int): Future[int] = + var retFuture = newFuture[int]("asyncpipe.readInto") + var ol = PCustomOverlapped() + + if pipe.readPipe == 0: + retFuture.fail(newException(ValueError, + "Read side of pipe closed or not available")) + else: + GC_ref(ol) + ol.data = CompletionData(fd: AsyncFD(pipe.readPipe), cb: + proc (fd: AsyncFD, bytesCount: DWord, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + assert(bytesCount > 0 and bytesCount <= nbytes.int32) + retFuture.complete(bytesCount) + else: + if errcode.int32 in {ERROR_BROKEN_PIPE, + ERROR_PIPE_NOT_CONNECTED}: + retFuture.complete(bytesCount) + else: + retFuture.fail(newException(OSError, osErrorMsg(errcode))) + ) + let res = readFile(pipe.readPipe, data, nbytes.int32, nil, + cast[POVERLAPPED](ol)).bool + if not res: + let err = osLastError() + if err.int32 in {ERROR_BROKEN_PIPE, ERROR_PIPE_NOT_CONNECTED}: + GC_unref(ol) + retFuture.complete(0) + elif err.int32 != ERROR_IO_PENDING: + GC_unref(ol) + retFuture.fail(newException(OSError, osErrorMsg(err))) + return retFuture + else: + + proc setNonBlocking(fd: cint) {.inline.} = + var x = fcntl(fd, F_GETFL, 0) + if x == -1: + raiseOSError(osLastError()) + else: + var mode = x or O_NONBLOCK + if fcntl(fd, F_SETFL, mode) == -1: + raiseOSError(osLastError()) + + proc `$`*(pipe: AsyncPipe): string = + result = "AsyncPipe [read = " & $(cast[uint](pipe.readPipe)) & + ", write = " & $(cast[uint](pipe.writePipe)) & "]" + + proc createPipe*(size = 65536, register = true): AsyncPipe = + var fds: array[2, cint] + if posix.pipe(fds) == -1: + raiseOSError(osLastError()) + setNonBlocking(fds[0]) + setNonBlocking(fds[1]) + + result = AsyncPipe(readPipe: fds[0], writePipe: fds[1]) + + if register: + register(AsyncFD(fds[0])) + register(AsyncFD(fds[1])) + + proc asyncWrap*(readHandle = cint(0), writeHandle = cint(0)): AsyncPipe = + doAssert((readHandle != 0) or (writeHandle != 0)) + result = AsyncPipe(readPipe: readHandle, writePipe: writeHandle) + if result.readPipe != 0: + setNonBlocking(result.readPipe) + register(AsyncFD(result.readPipe)) + if result.writePipe != 0: + setNonBlocking(result.writePipe) + register(AsyncFD(result.writePipe)) + + proc asyncUnwrap*(pipe: AsyncPipe) = + if pipe.readPipe != 0: + unregister(AsyncFD(pipe.readPipe)) + if pipe.writePipe != 0: + unregister(AsyncFD(pipe.writePipe)) + + proc getReadHandle*(pipe: AsyncPipe): cint {.inline.} = + result = pipe.readPipe + + proc getWriteHandle*(pipe: AsyncPipe): cint {.inline.} = + result = pipe.writePipe + + proc getHandles*(pipe: AsyncPipe): array[2, cint] {.inline.} = + result = [pipe.readPipe, pipe.writePipe] + + proc closeRead*(pipe: AsyncPipe, unregister = true) = + if pipe.readPipe != 0: + if unregister: + unregister(AsyncFD(pipe.readPipe)) + if posix.close(cint(pipe.readPipe)) != 0: + raiseOSError(osLastError()) + pipe.readPipe = 0 + + proc closeWrite*(pipe: AsyncPipe, unregister = true) = + if pipe.writePipe != 0: + if unregister: + unregister(AsyncFD(pipe.writePipe)) + if posix.close(cint(pipe.writePipe)) != 0: + raiseOSError(osLastError()) + pipe.writePipe = 0 + + proc close*(pipe: AsyncPipe, unregister = true) = + closeRead(pipe, unregister) + closeWrite(pipe, unregister) + + proc write*(pipe: AsyncPipe, data: pointer, nbytes: int): Future[int] = + var retFuture = newFuture[int]("asyncpipe.write") + var bytesWrote = 0 + + proc cb(fd: AsyncFD): bool = + result = true + let reminder = nbytes - bytesWrote + let pdata = cast[pointer](cast[uint](data) + bytesWrote.uint) + let res = posix.write(pipe.writePipe, pdata, cint(reminder)) + if res < 0: + let err = osLastError() + if err.int32 != EAGAIN: + retFuture.fail(newException(OSError, osErrorMsg(err))) + else: + result = false # We still want this callback to be called. + elif res == 0: + retFuture.complete(bytesWrote) + else: + bytesWrote.inc(res) + if res != reminder: + result = false + else: + retFuture.complete(bytesWrote) + + if pipe.writePipe == 0: + retFuture.fail(newException(ValueError, + "Write side of pipe closed or not available")) + else: + if not cb(AsyncFD(pipe.writePipe)): + addWrite(AsyncFD(pipe.writePipe), cb) + return retFuture + + proc readInto*(pipe: AsyncPipe, data: pointer, nbytes: int): Future[int] = + var retFuture = newFuture[int]("asyncpipe.readInto") + proc cb(fd: AsyncFD): bool = + result = true + let res = posix.read(pipe.readPipe, data, cint(nbytes)) + if res < 0: + let err = osLastError() + if err.int32 != EAGAIN: + retFuture.fail(newException(OSError, osErrorMsg(err))) + else: + result = false # We still want this callback to be called. + elif res == 0: + retFuture.complete(0) + else: + retFuture.complete(res) + + if pipe.readPipe == 0: + retFuture.fail(newException(ValueError, + "Read side of pipe closed or not available")) + else: + if not cb(AsyncFD(pipe.readPipe)): + addRead(AsyncFD(pipe.readPipe), cb) + return retFuture + +when isMainModule: + + when not defined(windows): + const + SIG_DFL = cast[proc(x: cint) {.noconv,gcsafe.}](0) + SIG_IGN = cast[proc(x: cint) {.noconv,gcsafe.}](1) + else: + const + ERROR_NO_DATA = 232 + + var outBuffer = "TEST STRING BUFFER" + + block test1: + # simple read/write test + var inBuffer = newString(64) + var o = createPipe() + var sc = waitFor write(o, cast[pointer](addr outBuffer[0]), + outBuffer.len) + doAssert(sc == len(outBuffer)) + var rc = waitFor readInto(o, cast[pointer](addr inBuffer[0]), + inBuffer.len) + inBuffer.setLen(rc) + doAssert(inBuffer == outBuffer) + close(o) + + block test2: + # read from pipe closed write side + var inBuffer = newString(64) + var o = createPipe() + o.closeWrite() + var rc = waitFor readInto(o, cast[pointer](addr inBuffer[0]), + inBuffer.len) + doAssert(rc == 0) + + block test3: + # write to closed read side + var sc: int = -1 + var o = createPipe() + o.closeRead() + when not defined(windows): + posix.signal(SIGPIPE, SIG_IGN) + + try: + sc = waitFor write(o, cast[pointer](addr outBuffer[0]), + outBuffer.len) + except: + discard + doAssert(sc == -1) + + when not defined(windows): + doAssert(osLastError().int32 == EPIPE) + else: + doAssert(osLastError().int32 == ERROR_NO_DATA) + + when not defined(windows): + posix.signal(SIGPIPE, SIG_DFL) + + block test4: + # bulk test of sending/receiving data + const + testsCount = 5000 + + proc sender(o: AsyncPipe) {.async.} = + var data = 1'i32 + for i in 1..testsCount: + data = int32(i) + let res = await write(o, addr data, sizeof(int32)) + doAssert(res == sizeof(int32)) + closeWrite(o) + + proc receiver(o: AsyncPipe): Future[tuple[count: int, sum: int]] {.async.} = + var data = 0'i32 + result = (count: 0, sum: 0) + while true: + let res = await readInto(o, addr data, sizeof(int32)) + if res == 0: + break + doAssert(res == sizeof(int32)) + inc(result.sum, data) + inc(result.count) + + var o = createPipe() + asyncCheck sender(o) + let res = waitFor(receiver(o)) + doAssert(res.count == testsCount) + doAssert(res.sum == testsCount * (1 + testsCount) div 2) + diff --git a/src/nimblepkg/asynctools/asyncproc.nim b/src/nimblepkg/asynctools/asyncproc.nim new file mode 100644 index 000000000..489d68ff9 --- /dev/null +++ b/src/nimblepkg/asynctools/asyncproc.nim @@ -0,0 +1,923 @@ +# +# +# Asynchronous tools for Nim Language +# (c) Copyright 2016 Eugene Kabanov +# +# See the file "LICENSE", included in this +# distribution, for details about the copyright. +# + +## This module implements an advanced facility for executing OS processes +## and process communication in asynchronous way. +## +## Most code for this module is borrowed from original ``osproc.nim`` by +## Andreas Rumpf, with some extensions, improvements and fixes. +## +## API is near compatible with stdlib's ``osproc.nim``. + +import strutils, os, strtabs +import asyncdispatch, asyncpipe + +when defined(windows): + import winlean +else: + const STILL_ACTIVE = 259 + import posix + +type + ProcessOption* = enum ## options that can be passed `startProcess` + poEchoCmd, ## echo the command before execution + poUsePath, ## Asks system to search for executable using PATH + ## environment variable. + ## On Windows, this is the default. + poEvalCommand, ## Pass `command` directly to the shell, without + ## quoting. + ## Use it only if `command` comes from trusted source. + poStdErrToStdOut, ## merge stdout and stderr to the stdout stream + poParentStreams, ## use the parent's streams + poInteractive, ## optimize the buffer handling for responsiveness for + ## UI applications. Currently this only affects + ## Windows: Named pipes are used so that you can peek + ## at the process' output streams. + poDemon ## Windows: The program creates no Window. + + AsyncProcessObj = object of RootObj + inPipe: AsyncPipe + outPipe: AsyncPipe + errPipe: AsyncPipe + + when defined(windows): + fProcessHandle: Handle + fThreadHandle: Handle + procId: int32 + threadId: int32 + isWow64: bool + else: + procId: Pid + isExit: bool + exitCode: cint + options: set[ProcessOption] + + AsyncProcess* = ref AsyncProcessObj ## represents an operating system process + +proc quoteShellWindows*(s: string): string = + ## Quote s, so it can be safely passed to Windows API. + ## + ## Based on Python's subprocess.list2cmdline + ## + ## See http://msdn.microsoft.com/en-us/library/17w5ykft.aspx + let needQuote = {' ', '\t'} in s or s.len == 0 + + result = "" + var backslashBuff = "" + if needQuote: + result.add("\"") + + for c in s: + if c == '\\': + backslashBuff.add(c) + elif c == '\"': + result.add(backslashBuff) + result.add(backslashBuff) + backslashBuff.setLen(0) + result.add("\\\"") + else: + if backslashBuff.len != 0: + result.add(backslashBuff) + backslashBuff.setLen(0) + result.add(c) + + if needQuote: + result.add("\"") + +proc quoteShellPosix*(s: string): string = + ## Quote ``s``, so it can be safely passed to POSIX shell. + ## + ## Based on Python's pipes.quote + const safeUnixChars = {'%', '+', '-', '.', '/', '_', ':', '=', '@', + '0'..'9', 'A'..'Z', 'a'..'z'} + if s.len == 0: + return "''" + + let safe = s.allCharsInSet(safeUnixChars) + + if safe: + return s + else: + return "'" & s.replace("'", "'\"'\"'") & "'" + +proc quoteShell*(s: string): string = + ## Quote ``s``, so it can be safely passed to shell. + when defined(Windows): + return quoteShellWindows(s) + elif defined(posix): + return quoteShellPosix(s) + else: + {.error:"quoteShell is not supported on your system".} + + +proc execProcess*(command: string, args: seq[string] = @[], + env: StringTableRef = nil, + options: set[ProcessOption] = {poStdErrToStdOut, poUsePath, + poEvalCommand} + ): Future[tuple[exitcode: int, output: string]] {.async.} + ## A convenience asynchronous procedure that executes ``command`` + ## with ``startProcess`` and returns its exit code and output as a tuple. + ## + ## **WARNING**: this function uses poEvalCommand by default for backward + ## compatibility. Make sure to pass options explicitly. + ## + ## .. code-block:: Nim + ## + ## let outp = await execProcess("nim c -r mytestfile.nim") + ## echo "process exited with code = " & $outp.exitcode + ## echo "process output = " & outp.output + +proc startProcess*(command: string, workingDir: string = "", + args: openArray[string] = [], + env: StringTableRef = nil, + options: set[ProcessOption] = {poStdErrToStdOut}, + pipeStdin: AsyncPipe = nil, + pipeStdout: AsyncPipe = nil, + pipeStderr: AsyncPipe = nil): AsyncProcess + ## Starts a process. + ## + ## ``command`` is the executable file path + ## + ## ``workingDir`` is the process's working directory. If ``workingDir == ""`` + ## the current directory is used. + ## + ## ``args`` are the command line arguments that are passed to the + ## process. On many operating systems, the first command line argument is the + ## name of the executable. ``args`` should not contain this argument! + ## + ## ``env`` is the environment that will be passed to the process. + ## If ``env == nil`` the environment is inherited of + ## the parent process. + ## + ## ``options`` are additional flags that may be passed + ## to `startProcess`. See the documentation of ``ProcessOption`` for the + ## meaning of these flags. + ## + ## ``pipeStdin``, ``pipeStdout``, ``pipeStderr`` is ``AsyncPipe`` handles + ## which will be used as ``STDIN``, ``STDOUT`` and ``STDERR`` of started + ## process respectively. This handles are optional, unspecified handles + ## will be created automatically. + ## + ## Note that you can't pass any ``args`` if you use the option + ## ``poEvalCommand``, which invokes the system shell to run the specified + ## ``command``. In this situation you have to concatenate manually the + ## contents of ``args`` to ``command`` carefully escaping/quoting any special + ## characters, since it will be passed *as is* to the system shell. + ## Each system/shell may feature different escaping rules, so try to avoid + ## this kind of shell invocation if possible as it leads to non portable + ## software. + ## + ## Return value: The newly created process object. Nil is never returned, + ## but ``EOS`` is raised in case of an error. + +proc suspend*(p: AsyncProcess) + ## Suspends the process ``p``. + ## + ## On Posix OSes the procedure sends ``SIGSTOP`` signal to the process. + ## + ## On Windows procedure suspends main thread execution of process via + ## ``SuspendThread()``. WOW64 processes is also supported. + +proc resume*(p: AsyncProcess) + ## Resumes the process ``p``. + ## + ## On Posix OSes the procedure sends ``SIGCONT`` signal to the process. + ## + ## On Windows procedure resumes execution of main thread via + ## ``ResumeThread()``. WOW64 processes is also supported. + +proc terminate*(p: AsyncProcess) + ## Stop the process ``p``. On Posix OSes the procedure sends ``SIGTERM`` + ## to the process. On Windows the Win32 API function ``TerminateProcess()`` + ## is called to stop the process. + +proc kill*(p: AsyncProcess) + ## Kill the process ``p``. On Posix OSes the procedure sends ``SIGKILL`` to + ## the process. On Windows ``kill()`` is simply an alias for ``terminate()``. + +proc running*(p: AsyncProcess): bool + ## Returns `true` if the process ``p`` is still running. Returns immediately. + +proc peekExitCode*(p: AsyncProcess): int + ## Returns `STILL_ACTIVE` if the process is still running. + ## Otherwise the process' exit code. + +proc processID*(p: AsyncProcess): int = + ## Returns process ``p`` id. + return p.procId + +proc inputHandle*(p: AsyncProcess): AsyncPipe {.inline.} = + ## Returns ``AsyncPipe`` handle to ``STDIN`` pipe of process ``p``. + result = p.inPipe + +proc outputHandle*(p: AsyncProcess): AsyncPipe {.inline.} = + ## Returns ``AsyncPipe`` handle to ``STDOUT`` pipe of process ``p``. + result = p.outPipe + +proc errorHandle*(p: AsyncProcess): AsyncPipe {.inline.} = + ## Returns ``AsyncPipe`` handle to ``STDERR`` pipe of process ``p``. + result = p.errPipe + +proc waitForExit*(p: AsyncProcess): Future[int] + ## Waits for the process to finish in asynchronous way and returns + ## exit code. + +when defined(windows): + + const + STILL_ACTIVE = 0x00000103'i32 + HANDLE_FLAG_INHERIT = 0x00000001'i32 + + proc isWow64Process(hProcess: Handle, wow64Process: var WinBool): WinBool + {.importc: "IsWow64Process", stdcall, dynlib: "kernel32".} + proc wow64SuspendThread(hThread: Handle): Dword + {.importc: "Wow64SuspendThread", stdcall, dynlib: "kernel32".} + proc setHandleInformation(hObject: Handle, dwMask: Dword, + dwFlags: Dword): WinBool + {.importc: "SetHandleInformation", stdcall, dynlib: "kernel32".} + + proc buildCommandLine(a: string, args: openArray[string]): cstring = + var res = quoteShell(a) + for i in 0..high(args): + res.add(' ') + res.add(quoteShell(args[i])) + result = cast[cstring](alloc0(res.len+1)) + copyMem(result, cstring(res), res.len) + + proc buildEnv(env: StringTableRef): tuple[str: cstring, len: int] = + var L = 0 + for key, val in pairs(env): inc(L, key.len + val.len + 2) + var str = cast[cstring](alloc0(L+2)) + L = 0 + for key, val in pairs(env): + var x = key & "=" & val + copyMem(addr(str[L]), cstring(x), x.len+1) # copy \0 + inc(L, x.len+1) + (str, L) + + proc close(p: AsyncProcess) = + if p.inPipe != nil: close(p.inPipe) + if p.outPipe != nil: close(p.outPipe) + if p.errPipe != nil: close(p.errPipe) + + proc startProcess(command: string, workingDir: string = "", + args: openArray[string] = [], + env: StringTableRef = nil, + options: set[ProcessOption] = {poStdErrToStdOut}, + pipeStdin: AsyncPipe = nil, + pipeStdout: AsyncPipe = nil, + pipeStderr: AsyncPipe = nil): AsyncProcess = + var + si: STARTUPINFO + procInfo: PROCESS_INFORMATION + + result = AsyncProcess(options: options, isExit: true) + si.cb = sizeof(STARTUPINFO).cint + + if not isNil(pipeStdin): + si.hStdInput = pipeStdin.getReadHandle() + + # Mark other side of pipe as non inheritable. + let oh = pipeStdin.getWriteHandle() + if oh != 0: + if setHandleInformation(oh, HANDLE_FLAG_INHERIT, 0) == 0: + raiseOSError(osLastError()) + else: + if poParentStreams in options: + si.hStdInput = getStdHandle(STD_INPUT_HANDLE) + else: + let pipe = createPipe() + if poInteractive in options: + result.inPipe = pipe + si.hStdInput = pipe.getReadHandle() + else: + result.inPipe = pipe + si.hStdInput = pipe.getReadHandle() + + if setHandleInformation(pipe.getWriteHandle(), + HANDLE_FLAG_INHERIT, 0) == 0: + raiseOSError(osLastError()) + + if not isNil(pipeStdout): + si.hStdOutput = pipeStdout.getWriteHandle() + + # Mark other side of pipe as non inheritable. + let oh = pipeStdout.getReadHandle() + if oh != 0: + if setHandleInformation(oh, HANDLE_FLAG_INHERIT, 0) == 0: + raiseOSError(osLastError()) + else: + if poParentStreams in options: + si.hStdOutput = getStdHandle(STD_OUTPUT_HANDLE) + else: + let pipe = createPipe() + if poInteractive in options: + result.outPipe = pipe + si.hStdOutput = pipe.getWriteHandle() + else: + result.outPipe = pipe + si.hStdOutput = pipe.getWriteHandle() + if setHandleInformation(pipe.getReadHandle(), + HANDLE_FLAG_INHERIT, 0) == 0: + raiseOSError(osLastError()) + + if not isNil(pipeStderr): + si.hStdError = pipeStderr.getWriteHandle() + + # Mark other side of pipe as non inheritable. + let oh = pipeStderr.getReadHandle() + if oh != 0: + if setHandleInformation(oh, HANDLE_FLAG_INHERIT, 0) == 0: + raiseOSError(osLastError()) + else: + if poParentStreams in options: + si.hStdError = getStdHandle(STD_ERROR_HANDLE) + else: + if poInteractive in options: + let pipe = createPipe() + result.errPipe = pipe + si.hStdError = pipe.getWriteHandle() + if setHandleInformation(pipe.getReadHandle(), + HANDLE_FLAG_INHERIT, 0) == 0: + raiseOSError(osLastError()) + else: + if poStdErrToStdOut in options: + result.errPipe = result.outPipe + si.hStdError = si.hStdOutput + else: + let pipe = createPipe() + result.errPipe = pipe + si.hStdError = pipe.getWriteHandle() + if setHandleInformation(pipe.getReadHandle(), + HANDLE_FLAG_INHERIT, 0) == 0: + raiseOSError(osLastError()) + + if si.hStdInput != 0 or si.hStdOutput != 0 or si.hStdError != 0: + si.dwFlags = STARTF_USESTDHANDLES + + # building command line + var cmdl: cstring + if poEvalCommand in options: + cmdl = buildCommandLine("cmd.exe", ["/c", command]) + assert args.len == 0 + else: + cmdl = buildCommandLine(command, args) + # building environment + var e = (str: nil.cstring, len: -1) + if env != nil: e = buildEnv(env) + # building working directory + var wd: cstring = nil + if len(workingDir) > 0: wd = workingDir + # processing echo command line + if poEchoCmd in options: echo($cmdl) + # building security attributes for process and main thread + var psa = SECURITY_ATTRIBUTES(nLength: sizeof(SECURITY_ATTRIBUTES).cint, + lpSecurityDescriptor: nil, bInheritHandle: 1) + var tsa = SECURITY_ATTRIBUTES(nLength: sizeof(SECURITY_ATTRIBUTES).cint, + lpSecurityDescriptor: nil, bInheritHandle: 1) + + var tmp = newWideCString(cmdl) + var ee = + if e.str.isNil: newWideCString(cstring(nil)) + else: newWideCString(e.str, e.len) + var wwd = newWideCString(wd) + var flags = NORMAL_PRIORITY_CLASS or CREATE_UNICODE_ENVIRONMENT + if poDemon in options: flags = flags or CREATE_NO_WINDOW + let res = winlean.createProcessW(nil, tmp, addr psa, addr tsa, 1, flags, + ee, wwd, si, procInfo) + if e.str != nil: dealloc(e.str) + if res == 0: + close(result) + raiseOsError(osLastError()) + else: + result.fProcessHandle = procInfo.hProcess + result.procId = procInfo.dwProcessId + result.fThreadHandle = procInfo.hThread + result.threadId = procInfo.dwThreadId + when sizeof(int) == 8: + # If sizeof(int) == 8, then our process is 64bit, and we need to check + # architecture of just spawned process. + var iswow64 = WinBool(0) + if isWow64Process(procInfo.hProcess, iswow64) == 0: + raiseOsError(osLastError()) + result.isWow64 = (iswow64 != 0) + else: + result.isWow64 = false + + result.isExit = false + + if poParentStreams notin options: + closeRead(result.inPipe) + closeWrite(result.outPipe) + closeWrite(result.errPipe) + + proc suspend(p: AsyncProcess) = + var res = 0'i32 + if p.isWow64: + res = wow64SuspendThread(p.fThreadHandle) + else: + res = suspendThread(p.fThreadHandle) + if res < 0: + raiseOsError(osLastError()) + + proc resume(p: AsyncProcess) = + let res = resumeThread(p.fThreadHandle) + if res < 0: + raiseOsError(osLastError()) + + proc running(p: AsyncProcess): bool = + var value = 0'i32 + let res = getExitCodeProcess(p.fProcessHandle, value) + if res == 0: + raiseOsError(osLastError()) + else: + if value == STILL_ACTIVE: + result = true + else: + p.isExit = true + p.exitCode = value + + proc terminate(p: AsyncProcess) = + if running(p): + discard terminateProcess(p.fProcessHandle, 0) + + proc kill(p: AsyncProcess) = + terminate(p) + + proc peekExitCode(p: AsyncProcess): int = + if p.isExit: + result = p.exitCode + else: + var value = 0'i32 + let res = getExitCodeProcess(p.fProcessHandle, value) + if res == 0: + raiseOsError(osLastError()) + else: + result = value + if value != STILL_ACTIVE: + p.isExit = true + p.exitCode = value + + when declared(addProcess): + proc waitForExit(p: AsyncProcess): Future[int] = + var retFuture = newFuture[int]("asyncproc.waitForExit") + + proc cb(fd: AsyncFD): bool = + var value = 0'i32 + let res = getExitCodeProcess(p.fProcessHandle, value) + if res == 0: + retFuture.fail(newException(OSError, osErrorMsg(osLastError()))) + else: + p.isExit = true + p.exitCode = value + retFuture.complete(p.exitCode) + + if p.isExit: + retFuture.complete(p.exitCode) + else: + addProcess(p.procId, cb) + return retFuture + +else: + const + readIdx = 0 + writeIdx = 1 + + template statusToExitCode(status): int32 = + (status and 0xFF00) shr 8 + + proc envToCStringArray(t: StringTableRef): cstringArray = + result = cast[cstringArray](alloc0((t.len + 1) * sizeof(cstring))) + var i = 0 + for key, val in pairs(t): + var x = key & "=" & val + result[i] = cast[cstring](alloc(x.len+1)) + copyMem(result[i], addr(x[0]), x.len+1) + inc(i) + + proc envToCStringArray(): cstringArray = + var counter = 0 + for key, val in envPairs(): inc counter + result = cast[cstringArray](alloc0((counter + 1) * sizeof(cstring))) + var i = 0 + for key, val in envPairs(): + var x = key & "=" & val + result[i] = cast[cstring](alloc(x.len+1)) + copyMem(result[i], addr(x[0]), x.len+1) + inc(i) + + type StartProcessData = object + sysCommand: cstring + sysArgs: cstringArray + sysEnv: cstringArray + workingDir: cstring + pStdin, pStdout, pStderr, pErrorPipe: array[0..1, cint] + options: set[ProcessOption] + + const useProcessAuxSpawn = declared(posix_spawn) and not defined(useFork) and + not defined(useClone) and not defined(linux) + when useProcessAuxSpawn: + proc startProcessAuxSpawn(data: StartProcessData): Pid {. + tags: [ExecIOEffect, ReadEnvEffect], gcsafe.} + else: + proc startProcessAuxFork(data: StartProcessData): Pid {. + tags: [ExecIOEffect, ReadEnvEffect], gcsafe.} + + {.push stacktrace: off, profiler: off.} + proc startProcessAfterFork(data: ptr StartProcessData) {. + tags: [ExecIOEffect, ReadEnvEffect], cdecl, gcsafe.} + {.pop.} + + proc startProcess(command: string, workingDir: string = "", + args: openArray[string] = [], + env: StringTableRef = nil, + options: set[ProcessOption] = {poStdErrToStdOut}, + pipeStdin: AsyncPipe = nil, + pipeStdout: AsyncPipe = nil, + pipeStderr: AsyncPipe = nil): AsyncProcess = + var sd = StartProcessData() + + result = AsyncProcess(options: options, isExit: true) + + if not isNil(pipeStdin): + sd.pStdin = pipeStdin.getHandles() + else: + if poParentStreams notin options: + let pipe = createPipe() + sd.pStdin = pipe.getHandles() + result.inPipe = pipe + + if not isNil(pipeStdout): + sd.pStdout = pipeStdout.getHandles() + else: + if poParentStreams notin options: + let pipe = createPipe() + sd.pStdout = pipe.getHandles() + result.outPipe = pipe + + if not isNil(pipeStderr): + sd.pStderr = pipeStderr.getHandles() + else: + if poParentStreams notin options: + if poStdErrToStdOut in options: + sd.pStderr = sd.pStdout + result.errPipe = result.outPipe + else: + let pipe = createPipe() + sd.pStderr = pipe.getHandles() + result.errPipe = pipe + + var sysCommand: string + var sysArgsRaw: seq[string] + + if poEvalCommand in options: + sysCommand = "/bin/sh" + sysArgsRaw = @[sysCommand, "-c", command] + assert args.len == 0, "`args` has to be empty when using poEvalCommand." + else: + sysCommand = command + sysArgsRaw = @[command] + for arg in args.items: + sysArgsRaw.add arg + + var pid: Pid + + var sysArgs = allocCStringArray(sysArgsRaw) + defer: deallocCStringArray(sysArgs) + + var sysEnv = if env == nil: + envToCStringArray() + else: + envToCStringArray(env) + defer: deallocCStringArray(sysEnv) + + sd.sysCommand = sysCommand + sd.sysArgs = sysArgs + sd.sysEnv = sysEnv + sd.options = options + sd.workingDir = workingDir + + when useProcessAuxSpawn: + let currentDir = getCurrentDir() + pid = startProcessAuxSpawn(sd) + if workingDir.len > 0: + setCurrentDir(currentDir) + else: + pid = startProcessAuxFork(sd) + + # Parent process. Copy process information. + if poEchoCmd in options: + echo(command, " ", join(args, " ")) + result.procId = pid + + result.isExit = false + + if poParentStreams notin options: + closeRead(result.inPipe) + closeWrite(result.outPipe) + closeWrite(result.errPipe) + + when useProcessAuxSpawn: + proc startProcessAuxSpawn(data: StartProcessData): Pid = + var attr: Tposix_spawnattr + var fops: Tposix_spawn_file_actions + + template chck(e: untyped) = + if e != 0'i32: raiseOSError(osLastError()) + + chck posix_spawn_file_actions_init(fops) + chck posix_spawnattr_init(attr) + + var mask: Sigset + chck sigemptyset(mask) + chck posix_spawnattr_setsigmask(attr, mask) + + var flags = POSIX_SPAWN_USEVFORK or POSIX_SPAWN_SETSIGMASK + if poDemon in data.options: + flags = flags or POSIX_SPAWN_SETPGROUP + chck posix_spawnattr_setpgroup(attr, 0'i32) + + chck posix_spawnattr_setflags(attr, flags) + + if not (poParentStreams in data.options): + chck posix_spawn_file_actions_addclose(fops, data.pStdin[writeIdx]) + chck posix_spawn_file_actions_adddup2(fops, data.pStdin[readIdx], + readIdx) + chck posix_spawn_file_actions_addclose(fops, data.pStdout[readIdx]) + chck posix_spawn_file_actions_adddup2(fops, data.pStdout[writeIdx], + writeIdx) + if (poStdErrToStdOut in data.options): + chck posix_spawn_file_actions_adddup2(fops, data.pStdout[writeIdx], 2) + else: + chck posix_spawn_file_actions_addclose(fops, data.pStderr[readIdx]) + chck posix_spawn_file_actions_adddup2(fops, data.pStderr[writeIdx], 2) + + var res: cint + if data.workingDir.len > 0: + setCurrentDir($data.workingDir) + var pid: Pid + + if (poUsePath in data.options): + res = posix_spawnp(pid, data.sysCommand, fops, attr, data.sysArgs, + data.sysEnv) + else: + res = posix_spawn(pid, data.sysCommand, fops, attr, data.sysArgs, + data.sysEnv) + + discard posix_spawn_file_actions_destroy(fops) + discard posix_spawnattr_destroy(attr) + chck res + return pid + else: + proc startProcessAuxFork(data: StartProcessData): Pid = + if pipe(data.pErrorPipe) != 0: + raiseOSError(osLastError()) + + defer: + discard close(data.pErrorPipe[readIdx]) + + var pid: Pid + var dataCopy = data + + when defined(useClone): + const stackSize = 65536 + let stackEnd = cast[clong](alloc(stackSize)) + let stack = cast[pointer](stackEnd + stackSize) + let fn: pointer = startProcessAfterFork + pid = clone(fn, stack, + cint(CLONE_VM or CLONE_VFORK or SIGCHLD), + pointer(addr dataCopy), nil, nil, nil) + discard close(data.pErrorPipe[writeIdx]) + dealloc(stack) + else: + pid = fork() + if pid == 0: + startProcessAfterFork(addr(dataCopy)) + exitnow(1) + + discard close(data.pErrorPipe[writeIdx]) + if pid < 0: raiseOSError(osLastError()) + + var error: cint + + var res = read(data.pErrorPipe[readIdx], addr error, sizeof(error)) + if res == sizeof(error): + raiseOSError(osLastError(), + "Could not find command: '$1'. OS error: $2" % + [$data.sysCommand, $strerror(error)]) + return pid + + {.push stacktrace: off, profiler: off.} + proc startProcessFail(data: ptr StartProcessData) = + var error: cint = errno + discard write(data.pErrorPipe[writeIdx], addr error, sizeof(error)) + exitnow(1) + + when not defined(uClibc) and (not defined(linux) or defined(android)): + var environ {.importc.}: cstringArray + + proc startProcessAfterFork(data: ptr StartProcessData) = + # Warning: no GC here! + # Or anything that touches global structures - all called nim procs + # must be marked with stackTrace:off. Inspect C code after making changes. + if (poDemon in data.options): + if posix.setpgid(Pid(0), Pid(0)) != 0: + startProcessFail(data) + + if not (poParentStreams in data.options): + if posix.close(data.pStdin[writeIdx]) != 0: + startProcessFail(data) + + if dup2(data.pStdin[readIdx], readIdx) < 0: + startProcessFail(data) + + if posix.close(data.pStdout[readIdx]) != 0: + startProcessFail(data) + + if dup2(data.pStdout[writeIdx], writeIdx) < 0: + startProcessFail(data) + + if (poStdErrToStdOut in data.options): + if dup2(data.pStdout[writeIdx], 2) < 0: + startProcessFail(data) + else: + if posix.close(data.pStderr[readIdx]) != 0: + startProcessFail(data) + + if dup2(data.pStderr[writeIdx], 2) < 0: + startProcessFail(data) + + if data.workingDir.len > 0: + if chdir(data.workingDir) < 0: + startProcessFail(data) + + if posix.close(data.pErrorPipe[readIdx]) != 0: + startProcessFail(data) + + discard fcntl(data.pErrorPipe[writeIdx], F_SETFD, FD_CLOEXEC) + + if (poUsePath in data.options): + when defined(uClibc): + # uClibc environment (OpenWrt included) doesn't have the full execvpe + discard execve(data.sysCommand, data.sysArgs, data.sysEnv) + elif defined(linux) and not defined(android): + discard execvpe(data.sysCommand, data.sysArgs, data.sysEnv) + else: + # MacOSX doesn't have execvpe, so we need workaround. + # On MacOSX we can arrive here only from fork, so this is safe: + environ = data.sysEnv + discard execvp(data.sysCommand, data.sysArgs) + else: + discard execve(data.sysCommand, data.sysArgs, data.sysEnv) + + startProcessFail(data) + {.pop} + + proc close(p: AsyncProcess) = + ## We need to `wait` for process, to avoid `zombie`, so if `running()` + ## returns `false`, then process exited and `wait()` was called. + doAssert(not p.running()) + if p.inPipe != nil: close(p.inPipe) + if p.outPipe != nil: close(p.outPipe) + if p.errPipe != nil: close(p.errPipe) + + proc running(p: AsyncProcess): bool = + result = true + if p.isExit: + result = false + else: + var status = cint(0) + let res = posix.waitpid(p.procId, status, WNOHANG) + if res == 0: + result = true + elif res < 0: + raiseOsError(osLastError()) + else: + if WIFEXITED(status) or WIFSIGNALED(status): + p.isExit = true + p.exitCode = statusToExitCode(status) + result = false + + proc peekExitCode(p: AsyncProcess): int = + if p.isExit: + result = p.exitCode + else: + var status = cint(0) + let res = posix.waitpid(p.procId, status, WNOHANG) + if res < 0: + raiseOsError(osLastError()) + elif res > 0: + p.isExit = true + p.exitCode = statusToExitCode(status) + result = p.exitCode + else: + result = STILL_ACTIVE + + proc suspend(p: AsyncProcess) = + if posix.kill(p.procId, SIGSTOP) != 0'i32: + raiseOsError(osLastError()) + + proc resume(p: AsyncProcess) = + if posix.kill(p.procId, SIGCONT) != 0'i32: + raiseOsError(osLastError()) + + proc terminate(p: AsyncProcess) = + if posix.kill(p.procId, SIGTERM) != 0'i32: + raiseOsError(osLastError()) + + proc kill(p: AsyncProcess) = + if posix.kill(p.procId, SIGKILL) != 0'i32: + raiseOsError(osLastError()) + + when declared(addProcess): + proc waitForExit*(p: AsyncProcess): Future[int] = + var retFuture = newFuture[int]("asyncproc.waitForExit") + + proc cb(fd: AsyncFD): bool = + var status = cint(0) + let res = posix.waitpid(p.procId, status, WNOHANG) + if res <= 0: + retFuture.fail(newException(OSError, osErrorMsg(osLastError()))) + else: + p.isExit = true + p.exitCode = statusToExitCode(status) + retFuture.complete(p.exitCode) + + if p.isExit: + retFuture.complete(p.exitCode) + else: + while true: + var status = cint(0) + let res = posix.waitpid(p.procId, status, WNOHANG) + if res < 0: + retFuture.fail(newException(OSError, osErrorMsg(osLastError()))) + break + elif res > 0: + p.isExit = true + p.exitCode = statusToExitCode(status) + retFuture.complete(p.exitCode) + break + else: + try: + addProcess(p.procId, cb) + break + except: + let err = osLastError() + if cint(err) == ESRCH: + continue + else: + retFuture.fail(newException(OSError, osErrorMsg(err))) + break + return retFuture + +proc execProcess(command: string, args: seq[string] = @[], + env: StringTableRef = nil, + options: set[ProcessOption] = {poStdErrToStdOut, poUsePath, + poEvalCommand} + ): Future[tuple[exitcode: int, output: string]] {.async.} = + result = (exitcode: int(STILL_ACTIVE), output: "") + let bufferSize = 1024 + var data = newString(bufferSize) + var p = startProcess(command, args = args, env = env, options = options) + + # Here the code path for Linux systems is a workaround for a bug eighter in + # the `asynctools` library or the Nim standard library which causes `Resource + # temporarily unavailable (code: 11)` error when executing multiple + # asynchronous operations. + # + # TODO: Add a proper fix that does not use a different code ordering on the + # different systems. + + when not defined(linux): + let future = p.waitForExit + + while true: + let res = await p.outputHandle.readInto(addr data[0], bufferSize) + if res > 0: + data.setLen(res) + result.output &= data + data.setLen(bufferSize) + else: + break + + result.exitcode = + when not defined(linux): + await future + else: + await p.waitForExit + + close(p) + +when isMainModule: + when defined(windows): + var data = waitFor(execProcess("cd")) + else: + var data = waitFor(execProcess("pwd")) + echo "exitCode = " & $data.exitcode + echo "output = [" & $data.output & "]" diff --git a/src/nimblepkg/download.nim b/src/nimblepkg/download.nim index c550186ca..a8957e823 100644 --- a/src/nimblepkg/download.nim +++ b/src/nimblepkg/download.nim @@ -2,7 +2,7 @@ # BSD License. Look at license.txt for more info. import parseutils, os, osproc, strutils, tables, pegs, uri, strformat, - httpclient, json, sequtils + httpclient, json, asyncdispatch, sequtils from algorithm import SortOrder, sorted @@ -13,43 +13,48 @@ type DownloadPkgResult* = tuple dir: string version: Version - vcsRevision: Sha1Hash + vcsRevision: Sha1HashRef -proc doCheckout(meth: DownloadMethod, downloadDir, branch: string) = +proc doCheckout(meth: DownloadMethod, downloadDir, branch: string): + Future[void] {.async.} = case meth of DownloadMethod.git: # Force is used here because local changes may appear straight after a clone # has happened. Like in the case of git on Windows where it messes up the # damn line endings. - discard tryDoCmdEx(&"git -C {downloadDir} checkout --force {branch}") - discard tryDoCmdEx( - &"git -C {downloadDir} submodule update --recursive --depth 1") + discard await tryDoCmdExAsync("git", + @["-C", downloadDir, "checkout", "--force", branch]) + discard await tryDoCmdExAsync("git", + @["-C", downloadDir, "submodule", "update", "--recursive", "--depth", "1"]) of DownloadMethod.hg: - discard tryDoCmdEx(&"hg --cwd {downloadDir} checkout {branch}") + discard await tryDoCmdExAsync("hg", + @["--cwd", downloadDir, "checkout", branch]) proc doClone(meth: DownloadMethod, url, downloadDir: string, branch = "", - onlyTip = true) = + onlyTip = true) {.async.} = case meth of DownloadMethod.git: let - depthArg = if onlyTip: "--depth 1" else: "" - branchArg = if branch == "": "" else: &"-b {branch}" - discard tryDoCmdEx( - &"git clone --recursive {depthArg} {branchArg} {url} {downloadDir}") + depthArgs = if onlyTip: @["--depth", "1"] else: @[] + branchArgs = if branch == "": @[] else: @["-b", branch] + discard await tryDoCmdExAsync("git", concat(@["clone", "--recursive"], + depthArgs, branchArgs, @[url, downloadDir])) of DownloadMethod.hg: let - tipArg = if onlyTip: "-r tip " else: "" - branchArg = if branch == "": "" else: &"-b {branch}" - discard tryDoCmdEx(&"hg clone {tipArg} {branchArg} {url} {downloadDir}") + tipArgs = if onlyTip: @["-r", "tip"] else: @[] + branchArgs = if branch == "": @[] else: @["-b", branch] + discard await tryDoCmdExAsync("hg", + concat(@["clone"], tipArgs, branchArgs, @[url, downloadDir])) -proc getTagsList(dir: string, meth: DownloadMethod): seq[string] = +proc getTagsList(dir: string, meth: DownloadMethod): + Future[seq[string]] {.async.} = var output: string cd dir: case meth of DownloadMethod.git: - output = tryDoCmdEx("git tag") + output = await tryDoCmdExAsync("git", @["tag"]) of DownloadMethod.hg: - output = tryDoCmdEx("hg tags") + output = await tryDoCmdExAsync("hg", @["tags"]) if output.len > 0: case meth of DownloadMethod.git: @@ -68,11 +73,13 @@ proc getTagsList(dir: string, meth: DownloadMethod): seq[string] = else: result = @[] -proc getTagsListRemote*(url: string, meth: DownloadMethod): seq[string] = +proc getTagsListRemote*(url: string, meth: DownloadMethod): + Future[seq[string]] {.async.} = result = @[] case meth of DownloadMethod.git: - var (output, exitCode) = doCmdEx(&"git ls-remote --tags {url}") + var (output, exitCode) = await doCmdExAsync("git", + @["ls-remote", "--tags", url]) if exitCode != QuitSuccess: raise nimbleError("Unable to query remote tags for " & url & ". Git returned: " & output) @@ -135,20 +142,22 @@ proc isURL*(name: string): bool = proc cloneSpecificRevision(downloadMethod: DownloadMethod, url, downloadDir: string, - vcsRevision: Sha1Hash) = + vcsRevision: Sha1Hash) {.async.} = assert vcsRevision != notSetSha1Hash display("Cloning", "revision: " & $vcsRevision, priority = MediumPriority) case downloadMethod of DownloadMethod.git: let downloadDir = downloadDir.quoteShell createDir(downloadDir) - discard tryDoCmdEx(&"git -C {downloadDir} init") - discard tryDoCmdEx(&"git -C {downloadDir} remote add origin {url}") - discard tryDoCmdEx( - &"git -C {downloadDir} fetch --depth 1 origin {vcsRevision}") - discard tryDoCmdEx(&"git -C {downloadDir} reset --hard FETCH_HEAD") + discard await tryDoCmdExAsync("git", @["-C", downloadDir, "init"]) + discard await tryDoCmdExAsync("git", + @["-C", downloadDir, "remote", "add", "origin", url]) + discard await tryDoCmdExAsync("git", + @["-C", downloadDir, "fetch", "--depth", "1", "origin", $vcsRevision]) + discard await tryDoCmdExAsync("git", + @["-C", downloadDir, "reset", "--hard", "FETCH_HEAD"]) of DownloadMethod.hg: - discard tryDoCmdEx(&"hg clone {url} -r {vcsRevision}") + discard await tryDoCmdExAsync("hg", @["clone", url, "-r", $vcsRevision]) proc getTarExePath: string = ## Returns path to `tar` executable. @@ -222,22 +231,23 @@ proc getGitHubApiUrl(url, commit: string): string = ## an URL for the GitHub REST API query for the full commit hash. &"https://api.github.com/repos/{extractOwnerAndRepo(url)}/commits/{commit}" -proc getUrlContent(url: string): string = +proc getUrlContent(url: string): Future[string] {.async.} = ## Makes a GET request to `url`. - let client = newHttpClient() - return client.getContent(url) + let client = newAsyncHttpClient() + return await client.getContent(url) {.warning[ProveInit]: off.} -proc getFullRevisionFromGitHubApi(url, version: string): Sha1Hash = +proc getFullRevisionFromGitHubApi(url, version: string): + Future[Sha1HashRef] {.async.} = ## By given a commit short hash and an URL to a GitHub repository retrieves ## the full hash of the commit by using GitHub REST API. try: let gitHubApiUrl = getGitHubApiUrl(url, version) display("Get", gitHubApiUrl); - let content = getUrlContent(gitHubApiUrl) + let content = await getUrlContent(gitHubApiUrl) let json = parseJson(content) if json.hasKey("sha"): - return json["sha"].str.initSha1Hash + return json["sha"].str.initSha1Hash.newClone else: raise nimbleError(json["message"].str) except CatchableError as error: @@ -245,7 +255,7 @@ proc getFullRevisionFromGitHubApi(url, version: string): Sha1Hash = &"of package at \"{url}\".", details = error) {.warning[ProveInit]: on.} -proc parseRevision(lsRemoteOutput: string): Sha1Hash = +proc parseRevision(lsRemoteOutput: string): Sha1HashRef = ## Parses the output from `git ls-remote` call to extract the returned sha1 ## hash value. Even when successful the first line of the command's output ## can be a redirection warning. @@ -253,42 +263,43 @@ proc parseRevision(lsRemoteOutput: string): Sha1Hash = for line in lines: if line.len >= 40: try: - return line[0..39].initSha1Hash + return line[0..39].initSha1Hash.newClone except InvalidSha1HashError: discard - return notSetSha1Hash + return notSetSha1Hash.newClone -proc getRevision(url, version: string): Sha1Hash = +proc getRevision(url, version: string): Future[Sha1HashRef] {.async.} = ## Returns the commit hash corresponding to the given `version` of the package ## in repository at `url`. - let output = tryDoCmdEx(&"git ls-remote {url} {version}") + let output = await tryDoCmdExAsync("git", @["ls-remote", url, $version]) result = parseRevision(output) - if result == notSetSha1Hash: + if result[] == notSetSha1Hash: if version.seemsLikeRevision: - result = getFullRevisionFromGitHubApi(url, version) + result = await getFullRevisionFromGitHubApi(url, version) else: raise nimbleError(&"Cannot get revision for version \"{version}\" " & &"of package at \"{url}\".") -proc getTarCmdLine(downloadDir, filePath: string): string = +proc getTarCmdLine(downloadDir, filePath: string): + tuple[cmd: string, args: seq[string]] = ## Returns an OS specific command and arguments for extracting the downloaded ## tarball. when defined(Windows): let downloadDir = downloadDir.replace('\\', '/') let filePath = filePath.replace('\\', '/') - &"{getTarExePath()} -C {downloadDir} -xf {filePath} --strip-components 1 " & - "--force-local" + (getTarExePath(), @["-C", downloadDir, "-xf", filePath, + "--strip-components", "1", "--force-local"]) else: - &"tar -C {downloadDir} -xf {filePath} --strip-components 1" + ("tar", @["-C", downloadDir, "-xf", filePath, "--strip-components", "1"]) proc doDownloadTarball(url, downloadDir, version: string, queryRevision: bool): - Sha1Hash = + Future[Sha1HashRef] {.async.} = ## Downloads package tarball from GitHub. Returns the commit hash of the ## downloaded package in the case `queryRevision` is `true`. let downloadLink = getTarballDownloadLink(url, version) display("Downloading", downloadLink) - let data = getUrlContent(downloadLink) + let data = await getUrlContent(downloadLink) display("Completed", "downloading " & downloadLink) let filePath = downloadDir / "tarball.tar.gz" @@ -298,8 +309,8 @@ proc doDownloadTarball(url, downloadDir, version: string, queryRevision: bool): display("Completed", "saving " & filePath) display("Unpacking", filePath) - let cmd = getTarCmdLine(downloadDir, filePath) - let (output, exitCode) = doCmdEx(cmd) + let (cmd, args) = getTarCmdLine(downloadDir, filePath) + let (output, exitCode) = await doCmdExAsync(cmd, args) if exitCode != QuitSuccess and not output.contains("Cannot create symlink to"): # If the command fails for reason different then unable establishing a # sym-link raise an exception. This reason for failure is common on Windows @@ -310,13 +321,14 @@ proc doDownloadTarball(url, downloadDir, version: string, queryRevision: bool): display("Completed", "unpacking " & filePath) filePath.removeFile - return if queryRevision: getRevision(url, version) else: notSetSha1Hash + return if queryRevision: await getRevision(url, version) + else: notSetSha1Hash.newClone {.warning[ProveInit]: off.} -proc doDownload(url, downloadDir: string, verRange: VersionRange, +proc doDownload(url: string, downloadDir: string, verRange: VersionRange, downMethod: DownloadMethod, options: Options, vcsRevision: Sha1Hash): - tuple[version: Version, vcsRevision: Sha1Hash] = + Future[tuple[version: Version, vcsRevision: Sha1HashRef]] {.async.} = ## Downloads the repository specified by ``url`` using the specified download ## method. ## @@ -334,37 +346,38 @@ proc doDownload(url, downloadDir: string, verRange: VersionRange, if $latest.ver != "": result.version = latest.ver - result.vcsRevision = notSetSha1Hash + result.vcsRevision = notSetSha1Hash.newClone removeDir(downloadDir) if vcsRevision != notSetSha1Hash: if downloadTarball(url, options): - discard doDownloadTarball(url, downloadDir, $vcsRevision, false) + discard await doDownloadTarball(url, downloadDir, $vcsRevision, false) else: - cloneSpecificRevision(downMethod, url, downloadDir, vcsRevision) - result.vcsRevision = vcsRevision + await cloneSpecificRevision(downMethod, url, downloadDir, vcsRevision) + result.vcsRevision = vcsRevision.newClone elif verRange.kind == verSpecial: # We want a specific commit/branch/tag here. if verRange.spe == getHeadName(downMethod): # Grab HEAD. if downloadTarball(url, options): - result.vcsRevision = doDownloadTarball(url, downloadDir, "HEAD", true) + result.vcsRevision = await doDownloadTarball( + url, downloadDir, "HEAD", true) else: - doClone(downMethod, url, downloadDir, - onlyTip = not options.forceFullClone) + await doClone(downMethod, url, downloadDir, + onlyTip = not options.forceFullClone) else: assert ($verRange.spe)[0] == '#', "The special version must start with '#'." let specialVersion = substr($verRange.spe, 1) if downloadTarball(url, options): - result.vcsRevision = doDownloadTarball( + result.vcsRevision = await doDownloadTarball( url, downloadDir, specialVersion, true) else: # Grab the full repo. - doClone(downMethod, url, downloadDir, onlyTip = false) + await doClone(downMethod, url, downloadDir, onlyTip = false) # Then perform a checkout operation to get the specified branch/commit. # `spe` starts with '#', trim it. - doCheckout(downMethod, downloadDir, specialVersion) + await doCheckout(downMethod, downloadDir, specialVersion) result.version = verRange.spe else: case downMethod @@ -372,46 +385,48 @@ proc doDownload(url, downloadDir: string, verRange: VersionRange, # For Git we have to query the repo remotely for its tags. This is # necessary as cloning with a --depth of 1 removes all tag info. result.version = getHeadName(downMethod) - let versions = getTagsListRemote(url, downMethod).getVersionList() + let versions = (await getTagsListRemote(url, downMethod)).getVersionList() if versions.len > 0: getLatestByTag: if downloadTarball(url, options): let versionToDownload = if latest.tag.len > 0: latest.tag else: "HEAD" - result.vcsRevision = doDownloadTarball( + result.vcsRevision = await doDownloadTarball( url, downloadDir, versionToDownload, true) else: display("Cloning", "latest tagged version: " & latest.tag, priority = MediumPriority) - doClone(downMethod, url, downloadDir, latest.tag, - onlyTip = not options.forceFullClone) + await doClone(downMethod, url, downloadDir, latest.tag, + onlyTip = not options.forceFullClone) else: display("Warning:", "The package has no tagged releases, downloading HEAD instead.", Warning, priority = HighPriority) if downloadTarball(url, options): - result.vcsRevision = doDownloadTarball(url, downloadDir, "HEAD", true) + result.vcsRevision = await doDownloadTarball( + url, downloadDir, "HEAD", true) else: # If no commits have been tagged on the repo we just clone HEAD. - doClone(downMethod, url, downloadDir) # Grab HEAD. + await doClone(downMethod, url, downloadDir) # Grab HEAD. of DownloadMethod.hg: - doClone(downMethod, url, downloadDir, - onlyTip = not options.forceFullClone) + await doClone(downMethod, url, downloadDir, + onlyTip = not options.forceFullClone) result.version = getHeadName(downMethod) - let versions = getTagsList(downloadDir, downMethod).getVersionList() + let versions = + (await getTagsList(downloadDir, downMethod)).getVersionList() if versions.len > 0: getLatestByTag: display("Switching", "to latest tagged version: " & latest.tag, priority = MediumPriority) - doCheckout(downMethod, downloadDir, latest.tag) + await doCheckout(downMethod, downloadDir, latest.tag) else: display("Warning:", "The package has no tagged releases, downloading HEAD instead.", Warning, priority = HighPriority) - if result.vcsRevision == notSetSha1Hash: + if result.vcsRevision[] == notSetSha1Hash: # In the case the package in not downloaded as tarball we must query its # VCS revision from its download directory. - result.vcsRevision = downloadDir.getVcsRevision + result.vcsRevision = downloadDir.getVcsRevision.newClone {.warning[ProveInit]: on.} proc downloadPkg*(url: string, verRange: VersionRange, @@ -419,7 +434,7 @@ proc downloadPkg*(url: string, verRange: VersionRange, subdir: string, options: Options, downloadPath: string, - vcsRevision: Sha1Hash): DownloadPkgResult = + vcsRevision: Sha1Hash): Future[DownloadPkgResult] {.async.} = ## Downloads the repository as specified by ``url`` and ``verRange`` using ## the download method specified. ## @@ -462,7 +477,7 @@ proc downloadPkg*(url: string, verRange: VersionRange, priority = HighPriority) result.dir = downloadDir / subdir - (result.version, result.vcsRevision) = doDownload( + (result.version, result.vcsRevision) = await doDownload( modUrl, downloadDir, verRange, downMethod, options, vcsRevision) if verRange.kind != verSpecial: @@ -480,7 +495,8 @@ proc echoPackageVersions*(pkg: Package) = case downMethod of DownloadMethod.git: try: - let versions = getTagsListRemote(pkg.url, downMethod).getVersionList() + let versions = + (waitFor getTagsListRemote(pkg.url, downMethod)).getVersionList() if versions.len > 0: let sortedVersions = toSeq(values(versions)) echo(" versions: " & join(sortedVersions, ", ")) diff --git a/src/nimblepkg/lockfile.nim b/src/nimblepkg/lockfile.nim index 7e871c908..a80b80336 100644 --- a/src/nimblepkg/lockfile.nim +++ b/src/nimblepkg/lockfile.nim @@ -14,7 +14,7 @@ const lockFileName* = "nimble.lock" lockFileVersion = 1 -proc initLockFileDep*: LockFileDep = +proc initLockFileDep(): LockFileDep = result = LockFileDep( version: notSetVersion, vcsRevision: notSetSha1Hash, diff --git a/src/nimblepkg/options.nim b/src/nimblepkg/options.nim index 8969a5589..6d9754ad8 100644 --- a/src/nimblepkg/options.nim +++ b/src/nimblepkg/options.nim @@ -42,6 +42,8 @@ type developLocaldeps*: bool # True if local deps + nimble develop pkg1 ... disableSslCertCheck*: bool enableTarballs*: bool # Enable downloading of packages as tarballs from GitHub. + maxParallelDownloads*: int # This is the maximum number of parallel + # downloads. 0 means no limit. ActionType* = enum actionNil, actionRefresh, actionInit, actionDump, actionPublish, @@ -187,6 +189,8 @@ Nimble Options: -l, --localdeps Run in project local dependency mode -t, --tarballs Enable downloading of packages as tarballs when working with GitHub repositories. + -m, --max-parallel-downloads The maximum number of parallel downloads. + The default value is 20. Use 0 for no limit. --ver Query remote server for package version information when searching or listing packages. --nimbleDir:dirname Set the Nimble directory. @@ -476,6 +480,10 @@ proc parseFlag*(flag, val: string, result: var Options, kind = cmdLongOption) = of "localdeps", "l": result.localdeps = true of "nosslcheck": result.disableSslCertCheck = true of "tarballs", "t": result.enableTarballs = true + of "max-parallel-downloads", "m": + result.maxParallelDownloads = parseInt(val) + if result.maxParallelDownloads == 0: + result.maxParallelDownloads = int.high else: isGlobalFlag = false var wasFlagHandled = true @@ -579,6 +587,7 @@ proc initOptions*(): Options = verbosity: HighPriority, noColor: not isatty(stdout), startDir: getCurrentDir(), + maxParallelDownloads: 20, ) proc handleUnknownFlags(options: var Options) = diff --git a/src/nimblepkg/sha1hashes.nim b/src/nimblepkg/sha1hashes.nim index 5c12cdb6f..497052fbe 100644 --- a/src/nimblepkg/sha1hashes.nim +++ b/src/nimblepkg/sha1hashes.nim @@ -13,6 +13,8 @@ type ## procedure which validates the input. hashValue: string + Sha1HashRef* = ref Sha1Hash + const notSetSha1Hash* = Sha1Hash(hashValue: "") diff --git a/src/nimblepkg/tools.nim b/src/nimblepkg/tools.nim index 29b7027b8..a88dbf244 100644 --- a/src/nimblepkg/tools.nim +++ b/src/nimblepkg/tools.nim @@ -3,11 +3,12 @@ # # Various miscellaneous utility functions reside here. import osproc, pegs, strutils, os, uri, sets, json, parseutils, strformat, - sequtils + sequtils, asyncdispatch from net import SslCVerifyMode, newContext, SslContext import version, cli, common, packageinfotypes, options, sha1hashes +import asynctools/asyncproc except quoteShell from compiler/nimblecmd import getPathVersionChecksum proc extractBin(cmd: string): string = @@ -52,6 +53,19 @@ proc doCmdEx*(cmd: string): ProcessOutput = raise nimbleError("'" & bin & "' not in PATH.") return execCmdEx(cmd) +proc removeQuotes(cmd: string): string = + cmd.filterIt(it != '"').join + +proc doCmdExAsync*(cmd: string, args: seq[string] = @[]): + Future[ProcessOutput] {.async.} = + displayDebug("Executing", join(concat(@[cmd], args), " ")) + let bin = extractBin(cmd) + if findExe(bin) == "": + raise nimbleError("'" & bin & "' not in PATH.") + let res = await asyncproc.execProcess(cmd.removeQuotes, args, + options = {asyncproc.poStdErrToStdOut, asyncproc.poUsePath}) + return (res.output, res.exitCode) + proc tryDoCmdExErrorMessage*(cmd, output: string, exitCode: int): string = &"Execution of '{cmd}' failed with an exit code {exitCode}.\n" & &"Details: {output}" @@ -62,6 +76,13 @@ proc tryDoCmdEx*(cmd: string): string {.discardable.} = raise nimbleError(tryDoCmdExErrorMessage(cmd, output, exitCode)) return output +proc tryDoCmdExAsync*(cmd: string, args: seq[string] = @[]): + Future[string] {.async.} = + let (output, exitCode) = await doCmdExAsync(cmd, args) + if exitCode != QuitSuccess: + raise nimbleError(tryDoCmdExErrorMessage(cmd, output, exitCode)) + return output + proc getNimBin*: string = result = "nim" if findExe("nim") != "": result = findExe("nim")