11package de.gesellix.docker.client
22
33import de.gesellix.docker.client.container.ArchiveUtil
4- import de.gesellix.docker.engine.AttachConfig
54import de.gesellix.docker.remote.api.ChangeType
65import de.gesellix.docker.remote.api.ContainerCreateRequest
76import de.gesellix.docker.remote.api.ContainerUpdateRequest
@@ -23,6 +22,7 @@ import groovy.json.JsonOutput
2322import groovy.util.logging.Slf4j
2423import okhttp3.Response
2524import okhttp3.WebSocket
25+ import okio.BufferedSink
2626import okio.ByteString
2727import okio.Okio
2828import okio.Sink
@@ -657,36 +657,80 @@ class DockerContainerIntegrationSpec extends Specification {
657657
658658 String input = " exec ${ UUID.randomUUID()} "
659659 String expectedOutput = " #$input #"
660- def outputStream = new ByteArrayOutputStream ()
661660
662- def onSinkClosed = new CountDownLatch (1 )
663- def onSourceConsumed = new CountDownLatch (1 )
661+ when :
662+ Cancellable job = null
663+ List<Frame > frames = []
664+ def execStartConfig = new ExecStartConfig (false , true , null )
665+ def callback = new StreamCallback<Frame > () {
664666
665- def attachConfig = new AttachConfig ()
666- attachConfig. streams. stdin = new ByteArrayInputStream (" $input \n " . bytes)
667- attachConfig. streams. stdout = outputStream
668- attachConfig. onFailure = { Exception e ->
669- log. error(" exec failed" , e)
670- }
671- attachConfig. onResponse = { Response response ->
672- log. trace(" onResponse (${ response} )" )
667+ @Override
668+ void onStarting (Cancellable cancellable ) {
669+ job = cancellable
670+ }
671+
672+ @Override
673+ void attachInput (Sink sink ) {
674+ System . out. println (" attachInput, sending data..." )
675+ new Thread (() -> {
676+ BufferedSink buffer = Okio . buffer(sink)
677+ try {
678+ buffer. writeUtf8(" $input \n " )
679+ buffer. flush()
680+ System . out. println (" ... data sent" )
681+ } catch (IOException e) {
682+ e. printStackTrace()
683+ System . err. println (" Failed to write to stdin: " + e. getMessage())
684+ } finally {
685+ try {
686+ Thread . sleep(100 )
687+ sink. close()
688+ } catch (Exception ignored) {
689+ // ignore
690+ }
691+ }
692+ }). start()
693+ }
694+
695+ @Override
696+ void onNext (Frame element ) {
697+ log. info(element?. toString())
698+ frames. add(element)
699+ }
700+
701+ @Override
702+ void onFailed (Exception e ) {
703+ log. error(" Exec Attach failed" , e)
704+ }
705+
706+ @Override
707+ void onFinished () {
708+ log. info(" Exec Attach finished" )
709+ }
673710 }
674- attachConfig. onSinkClosed = { Response response ->
675- log. trace(" onSinkClosed (${ response} )" )
676- onSinkClosed. countDown()
711+
712+ new Thread (() -> {
713+ dockerClient. startExec(execId, execStartConfig, callback, Duration . ofSeconds(10 ))
714+ }, " exec-client" ). start()
715+
716+ CountDownLatch wait = new CountDownLatch (1 )
717+ new Timer (). schedule(new TimerTask () {
718+ @Override
719+ void run () {
720+ if (job != null ) {
721+ job. cancel()
722+ }
723+ wait. countDown()
724+ }
725+ }, 5000 )
726+
727+ try {
728+ wait. await()
677729 }
678- attachConfig. onSourceConsumed = {
679- log. trace(" onSourceConsumed" )
680- onSourceConsumed. countDown()
730+ catch (InterruptedException e) {
731+ e. printStackTrace()
681732 }
682733
683- when :
684- def execStartConfig = new ExecStartConfig (false , true , null )
685- dockerClient. startExec(execId, execStartConfig, attachConfig)
686- // dockerClient.startExec(execId, execStartConfig, callback, Duration.of(1, ChronoUnit.MINUTES))
687- onSinkClosed. await(5 , SECONDS )
688- onSourceConsumed. await(5 , SECONDS )
689-
690734 def containerIsolation = dockerClient. inspectContainer(containerId). content. hostConfig?. isolation
691735 def actualIsolation = containerIsolation ? SystemInfo.Isolation . values(). find { it. value == containerIsolation. value } : LocalDocker . getDaemonIsolation()
692736 if (actualIsolation == SystemInfo.Isolation.Hyperv ) {
0 commit comments