Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

WIP: Add FtpConnector #105

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
28 changes: 28 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,34 @@ lazy val s3Connector = project
Test / fork := true
)

lazy val ftpConnector = project
.in(file("connectors/ftp-connector"))
.settings(stdSettings("zio-connect-ftp"))
.settings(
libraryDependencies ++= Seq(
FtpDependencies.zioFtp,
FtpDependencies.testContainersScala,
`zio`,
`zio-streams`,
`zio-prelude`,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move zio-prelude to FtpDependencies please. While it's an RC version I would like to avoid putting it in the core dependencies.

`zio-test`,
`zio-test-sbt`,
)
)
.settings(
libraryDependencies ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, n)) if n <= 12 => Seq(`scala-compact-collection`)
case _ => Seq.empty
}
}
)
.settings(
testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework"),
Test / fork := true
)
.enablePlugins(BuildInfoPlugin)

lazy val docs = project
.in(file("zio-connect-docs"))
.settings(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package zio.connect.ftp

import zio._
import zio.connect.ftp.FtpConnector._
import zio.ftp.FtpResource
import zio.prelude.Subtype
import zio.stream.{ZSink, ZStream}

import java.io.IOException

trait FtpConnector {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be ordered alphabetically


def stat(implicit trace: Trace): ZSink[Any, IOException, PathName, PathName, Option[FtpResource]]

def rm(implicit trace: Trace): ZSink[Any, IOException, PathName, PathName, Unit]

def rmDir(implicit trace: Trace): ZSink[Any, IOException, PathName, PathName, Unit]

def mkDir(implicit trace: Trace): ZSink[Any, IOException, PathName, PathName, Unit]

def ls(path: => PathName)(implicit trace: Trace): ZStream[Any, IOException, FtpResource]

def lsDescendant(path: => PathName): ZStream[Any, IOException, FtpResource]

def readFile(path: => PathName, chunkSize: Int = 2048)(implicit trace: Trace): ZStream[Any, IOException, Byte]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def readFile(path: => PathName, chunkSize: Int = 2048)(implicit trace: Trace): ZStream[Any, IOException, Byte]
def readFile(path: => PathName, chunkSize: => Int = 2048)(implicit trace: Trace): ZStream[Any, IOException, Byte]


def upload[R](pathName: => PathName)(implicit trace: Trace): ZSink[R & Scope, IOException, Byte, Nothing, Unit]

}

object FtpConnector {

object PathName extends Subtype[String]
type PathName = PathName.Type

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package zio.connect.ftp

import zio._
import zio.connect.ftp.FtpConnector.PathName
import zio.ftp.{Ftp, FtpResource}
import zio.stream.{ZSink, ZStream}

import java.io.IOException

case class LiveFtpConnector(ftp: Ftp) extends FtpConnector {

override def stat(implicit trace: Trace): ZSink[Any, IOException, PathName, PathName, Option[FtpResource]] =
ZSink
.take[PathName](1)
.map(_.headOption)
.mapZIO {
case Some(path) => ftp.stat(path)
case None => ZIO.succeed(None)
}

override def rm(implicit trace: Trace): ZSink[Any, IOException, PathName, PathName, Unit] = {
ZSink
.foreach[Any, IOException, PathName] { path =>
ftp.rm(path)
}
}

override def rmDir(implicit trace: Trace): ZSink[Any, IOException, PathName, PathName, Unit] = {
ZSink
.foreach[Any, IOException, PathName] { path =>
ftp.rmdir(path)
}
}

override def mkDir(implicit trace: Trace): ZSink[Any, IOException, PathName, PathName, Unit] = {
ZSink
.foreach[Any, IOException, PathName] { path =>
ftp.mkdir(path)
}
}

override def ls(path: => PathName)(implicit trace: Trace): ZStream[Any, IOException, FtpResource] = {
ftp.ls(path)
}

override def lsDescendant(path: => PathName): ZStream[Any, IOException, FtpResource] = {
ftp.lsDescendant(path)
}

override def readFile(path: => PathName, chunkSize: Int = 2048)(implicit trace: Trace): ZStream[Any, IOException, Byte] = {
ftp.readFile(path, chunkSize)
}

override def upload[R](pathName: => PathName)(implicit trace: Trace): ZSink[R & Scope, IOException, Byte, Nothing, Unit] = {
ZSink
.foreachChunk[R & Scope, IOException, Byte] { content =>
ftp.upload(
path = pathName,
source = ZStream.fromChunk(content)
)
}
}
}

object LiveFtpConnector {

val layer: ZLayer[Ftp, Nothing, LiveFtpConnector] =
ZLayer.fromZIO(ZIO.service[Ftp].map(LiveFtpConnector(_)))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package zio.connect

import zio._
import zio.connect.ftp.FtpConnector.PathName
import zio.ftp.{Ftp, FtpResource}
import zio.stream.{ZSink, ZStream}

import java.io.IOException

package object ftp {

def stat(implicit trace: Trace): ZSink[FtpConnector, IOException, PathName, PathName, Option[FtpResource]] =
ZSink.serviceWithSink(_.stat)

def rm(implicit trace: Trace): ZSink[FtpConnector, IOException, PathName, PathName, Unit] =
ZSink.serviceWithSink(_.rm)

def rmDir(implicit trace: Trace): ZSink[FtpConnector, IOException, PathName, PathName, Unit] =
ZSink.serviceWithSink(_.rmDir)

def mkDir(implicit trace: Trace): ZSink[FtpConnector, IOException, PathName, PathName, Unit] =
ZSink.serviceWithSink(_.mkDir)

def ls(path: => PathName)(implicit trace: Trace): ZStream[FtpConnector, IOException, FtpResource] =
ZStream.serviceWithStream(_.ls(path))

def lsDescendant(path: => PathName): ZStream[FtpConnector, IOException, FtpResource] =
ZStream.serviceWithStream(_.lsDescendant(path))

def readFile(path: => PathName, chunkSize: Int = 2048)(implicit trace: Trace): ZStream[FtpConnector, IOException, Byte] =
ZStream.serviceWithStream(_.readFile(path, chunkSize))

def upload[R](pathName: => PathName)(implicit trace: Trace): ZSink[R & Scope & FtpConnector, IOException, Byte, Nothing, Unit] =
ZSink.serviceWithSink[FtpConnector](_.upload(pathName))

val ftpConnectorLiveLayer: ZLayer[Ftp, Nothing, LiveFtpConnector] = LiveFtpConnector.layer

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package zio.connect.ftp

import zio._
import zio.connect.ftp.FtpConnector.PathName
import zio.stream.ZPipeline.utf8Decode

import zio.stream.ZStream
import zio.test.Assertion._
import zio.test._

import java.io.IOException

trait FtpConnectorSpec extends ZIOSpecDefault {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suites should be ordered alphabetically in implementation and in the composed result


val ftpConnectorSpec: Spec[Scope & FtpConnector, IOException] =
statSuite + rmSuite + rmDirSuite + mkDirSuite + lsSuite + lsDescendantSuite + readFileSuite + uploadSuite

private lazy val statSuite: Spec[Scope & FtpConnector, IOException] =
suite("stat")(
test("returns None when path doesn't exist") {
val path = PathName("/fail-stat.txt")
for {
content <- ZStream.succeed(path) >>> stat
} yield assertTrue(content.isEmpty)
},
test("succeeds when file does exist") {
val path = PathName("/succeed-stat.txt")
val data = ZStream.fromChunks(Chunk.fromArray("hello".getBytes))
for {
_ <- data >>> upload(path)
resource <- ZStream.succeed(path) >>> stat
fileDeleted <- (ZStream.succeed(path) >>> rm).as(true)
resourceHasSamePath = resource.get.path == path
resourceIsFile = !resource.get.isDirectory.get
} yield assertTrue(resourceHasSamePath) &&
assertTrue(resourceIsFile) &&
assertTrue(fileDeleted)
},
test("succeeds when directory does exist") {
val path = PathName("/test")
for {
dirCreated <- (ZStream.succeed(path) >>> mkDir).as(true)
resource <- ZStream.succeed(path) >>> stat
dirDeleted <- (ZStream.succeed(path) >>> rmDir).as(true)
resourceHasSamePath = resource.get.path == path
resourceIsDirectory = resource.get.isDirectory.get
} yield assertTrue(dirCreated) && assertTrue(dirDeleted) &&
assertTrue(resourceHasSamePath) && assertTrue(resourceIsDirectory)
}
)

private lazy val rmSuite: Spec[Scope & FtpConnector, IOException] =
suite("rm")(
test("fails when path is invalid") {
val path = PathName("/fail-rm.txt")
for {
invalid <- (ZStream.succeed(path) >>> rm)
.foldCause(_.failureOption.map(_.getMessage).getOrElse(""), _ => "")
} yield assertTrue(invalid == s"Path is invalid. Cannot delete file : $path")
},
test("succeeds") {
val path = PathName("/succeed-rm.txt")
val data = ZStream.fromChunks(Chunk.fromArray("hello".getBytes))
for {
_ <- data >>> upload(path)
_ <- ZStream.succeed(path) >>> rm
stat <- ZStream.succeed(path) >>> stat
} yield assertTrue(stat.isEmpty)
}
)

private lazy val rmDirSuite: Spec[FtpConnector, IOException] =
suite("rmDir")(
test("fails when directory doesn't exist") {
val path = PathName("/invalid")
for {
invalid <- (ZStream.succeed(path) >>> rmDir)
.foldCause(_.failureOption.map(_.getMessage).getOrElse(""), _ => "")
} yield assertTrue(invalid == s"Path is invalid. Cannot delete directory : $path")
},
test("succeeds") {
val path = PathName("/succeed")
for {
_ <- ZStream.succeed(path) >>> mkDir
_ <- ZStream.succeed(path) >>> rmDir
resource <- ZStream.succeed(path) >>> stat
} yield assertTrue(resource.isEmpty)
}
)

private lazy val mkDirSuite: Spec[Scope & FtpConnector, IOException] =
suite("mkDir")(
test("fails when path is invalid") {
val path = PathName("fail-mkdir.txt")
val data = ZStream.fromChunks(Chunk.fromArray("hello".getBytes))
for {
_ <- data >>> upload(path)
invalid <- (ZStream.succeed(path) >>> mkDir).foldCause(_.failureOption.map(_.getMessage).getOrElse(""), _ => "")
fileDeleted <- (ZStream.succeed(path) >>> rm).as(true)
} yield assertTrue(fileDeleted) &&
assertTrue(invalid == s"Path is invalid. Cannot create directory : $path")
},
test("succeeds") {
val path = PathName("succeed")
for {
_ <- ZStream.succeed(path) >>> mkDir
resource <- ZStream.succeed(path) >>> stat
dirDeleted <- (ZStream.succeed(path) >>> rmDir).as(true)
} yield
assertTrue(dirDeleted) &&
assertTrue(resource.get.isDirectory.get)
}
)

private lazy val lsSuite: Spec[Scope & FtpConnector, IOException] =
suite("ls")(
test("fails with invalid directory") {
val path = PathName("/invalid-path")
for {
files <- ls(path).runCollect
} yield assert(files)(hasSameElements(Nil))
},
test("succeeds") {
val dataPath = PathName("/hello.txt")
val data = ZStream.fromChunks(Chunk.fromArray("hello".getBytes))

for {
_ <- data >>> upload(dataPath)
files <- ls(PathName("/")).runFold(List.empty[String])((l, resource) => l :+ resource.path)
fileDeleted <- (ZStream(dataPath) >>> rm).as(true)
} yield assertTrue(fileDeleted) &&
assert(files)(hasSameElements(List(dataPath)))
}
)

private lazy val lsDescendantSuite: Spec[Scope & FtpConnector, IOException] =
suite("lsDescendant")(
test("succeeds") {
val dirPath = PathName("/dir1")
val filePath = PathName("/hello.txt")
val fileInDirPath = PathName(dirPath + filePath)
val data = ZStream.fromChunks(Chunk.fromArray("hello".getBytes))

for {
_ <- data >>> upload(filePath)
_ <- ZStream.succeed(dirPath) >>> mkDir
_ <- data >>> upload(fileInDirPath)
files <- lsDescendant(PathName("/")).runFold(List.empty[String])((s, f) => f.path +: s)
fileDeleted <- (ZStream.succeed(filePath) >>> rm).as(true)
fileInDirDeleted <- (ZStream.succeed(fileInDirPath) >>> rm).as(true)
dirDeleted <- (ZStream.succeed(dirPath) >>> rmDir).as(true)
} yield assertTrue(fileDeleted) &&
assertTrue(fileInDirDeleted) &&
assertTrue(dirDeleted) &&
assert(files.reverse)(hasSameElements(List(fileInDirPath, filePath)))
},
test("fails with invalid directory") {
val path = PathName("/invalid-path-ls-descendant")
for {
files <- lsDescendant(path).runCollect
} yield assertTrue(files == Chunk.empty)
},
)

private lazy val readFileSuite: Spec[Scope & FtpConnector, IOException] =
suite("readFile")(
test("fails when file doesn't exist") {
val path = PathName("/fail.txt")
for {
invalid <- readFile(path)
.via(utf8Decode)
.runCollect
.foldCause(_.failureOption.map(_.getMessage).getOrElse(""), _ => "")
} yield assertTrue(invalid == s"File does not exist $path")
},
test("succeeds") {
val path = PathName("/succeed.txt")
val data = ZStream.fromChunk(Chunk.fromArray("hello world".getBytes))
for {
_ <- data >>> upload(path)
content <- readFile(path).via(utf8Decode).runCollect
fileDeleted <- (ZStream.succeed(path) >>> rm).as(true)
} yield assertTrue(fileDeleted) &&
assert(content.mkString)(equalTo("hello world"))
}
)

private lazy val uploadSuite: Spec[Scope & FtpConnector, IOException] =
suite("upload")(
test("fails when path is invalid") {
val path = PathName("/dir/fail.txt")
val data = ZStream.fromChunk(Chunk.fromArray("hello world".getBytes))
for {
invalid <- (data >>> upload(path)).foldCause(_.failureOption.map(_.getMessage).getOrElse(""), _ => "")
} yield assertTrue(invalid == s"Path is invalid. Cannot upload data to : $path")
},
test("succeeds") {
val data = ZStream.fromChunk(Chunk.fromArray("hello world".getBytes))
val path = PathName("/succeed.txt")
(
for {
_ <- data >>> upload(path)
content <- readFile(path).via(utf8Decode).runCollect
fileDeleted <- (ZStream.succeed(path) >>> rm).as(true)
} yield assertTrue(fileDeleted) &&
assertTrue(content.mkString == "hello world")
)
}
)
}
Loading