This repository has been archived by the owner on Jan 13, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 19
WIP: Add FtpConnector #105
Open
kevchuang
wants to merge
15
commits into
zio-archive:master
Choose a base branch
from
kevchuang:ftp-connector
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
fa3cf8b
Add FTP connector dependencies and module
kevchuang 72948fd
Add FtpConnector trait
kevchuang 1c172d4
Add FtpConnector trait
kevchuang 8916284
Add LiveFtpConnector and package object
kevchuang 1f06b6b
Adding unit tests in FtpConnectorSpec
kevchuang 61d2727
Add ftp containers
kevchuang 2ed335b
Add FTP connector dependencies and module
kevchuang ad019eb
Add FtpConnector trait
kevchuang c432e73
Add LiveFtpConnector and package object
kevchuang f0c44d1
Adding unit tests in FtpConnectorSpec
kevchuang 7d2a1c0
Add ftp containers
kevchuang ea8cbe6
Add ftp container
kevchuang 54b264b
Merge remote-tracking branch 'origin/ftp-connector' into ftp-connector
kevchuang 2f1574f
Fix Ftp connection refused
kevchuang c46c5ba
Refactoring FtpConnectorSpec and LiveFtpConnectorSpec
kevchuang File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
36 changes: 36 additions & 0 deletions
36
connectors/ftp-connector/src/main/scala/zio/connect/ftp/FtpConnector.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,36 @@ | ||||||
package zio.connect.ftp | ||||||
|
||||||
import zio._ | ||||||
import zio.connect.ftp.FtpConnector._ | ||||||
import zio.ftp.FtpResource | ||||||
import zio.prelude.Subtype | ||||||
import zio.stream.{ZSink, ZStream} | ||||||
|
||||||
import java.io.IOException | ||||||
|
||||||
trait FtpConnector { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be ordered alphabetically |
||||||
|
||||||
def stat(implicit trace: Trace): ZSink[Any, IOException, PathName, PathName, Option[FtpResource]] | ||||||
|
||||||
def rm(implicit trace: Trace): ZSink[Any, IOException, PathName, PathName, Unit] | ||||||
|
||||||
def rmDir(implicit trace: Trace): ZSink[Any, IOException, PathName, PathName, Unit] | ||||||
|
||||||
def mkDir(implicit trace: Trace): ZSink[Any, IOException, PathName, PathName, Unit] | ||||||
|
||||||
def ls(path: => PathName)(implicit trace: Trace): ZStream[Any, IOException, FtpResource] | ||||||
|
||||||
def lsDescendant(path: => PathName): ZStream[Any, IOException, FtpResource] | ||||||
|
||||||
def readFile(path: => PathName, chunkSize: Int = 2048)(implicit trace: Trace): ZStream[Any, IOException, Byte] | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
def upload[R](pathName: => PathName)(implicit trace: Trace): ZSink[R & Scope, IOException, Byte, Nothing, Unit] | ||||||
|
||||||
} | ||||||
|
||||||
object FtpConnector { | ||||||
|
||||||
object PathName extends Subtype[String] | ||||||
type PathName = PathName.Type | ||||||
|
||||||
} |
70 changes: 70 additions & 0 deletions
70
connectors/ftp-connector/src/main/scala/zio/connect/ftp/LiveFtpConnector.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
package zio.connect.ftp | ||
|
||
import zio._ | ||
import zio.connect.ftp.FtpConnector.PathName | ||
import zio.ftp.{Ftp, FtpResource} | ||
import zio.stream.{ZSink, ZStream} | ||
|
||
import java.io.IOException | ||
|
||
case class LiveFtpConnector(ftp: Ftp) extends FtpConnector { | ||
|
||
override def stat(implicit trace: Trace): ZSink[Any, IOException, PathName, PathName, Option[FtpResource]] = | ||
ZSink | ||
.take[PathName](1) | ||
.map(_.headOption) | ||
.mapZIO { | ||
case Some(path) => ftp.stat(path) | ||
case None => ZIO.succeed(None) | ||
} | ||
|
||
override def rm(implicit trace: Trace): ZSink[Any, IOException, PathName, PathName, Unit] = { | ||
ZSink | ||
.foreach[Any, IOException, PathName] { path => | ||
ftp.rm(path) | ||
} | ||
} | ||
|
||
override def rmDir(implicit trace: Trace): ZSink[Any, IOException, PathName, PathName, Unit] = { | ||
ZSink | ||
.foreach[Any, IOException, PathName] { path => | ||
ftp.rmdir(path) | ||
} | ||
} | ||
|
||
override def mkDir(implicit trace: Trace): ZSink[Any, IOException, PathName, PathName, Unit] = { | ||
ZSink | ||
.foreach[Any, IOException, PathName] { path => | ||
ftp.mkdir(path) | ||
} | ||
} | ||
|
||
override def ls(path: => PathName)(implicit trace: Trace): ZStream[Any, IOException, FtpResource] = { | ||
ftp.ls(path) | ||
} | ||
|
||
override def lsDescendant(path: => PathName): ZStream[Any, IOException, FtpResource] = { | ||
ftp.lsDescendant(path) | ||
} | ||
|
||
override def readFile(path: => PathName, chunkSize: Int = 2048)(implicit trace: Trace): ZStream[Any, IOException, Byte] = { | ||
ftp.readFile(path, chunkSize) | ||
} | ||
|
||
override def upload[R](pathName: => PathName)(implicit trace: Trace): ZSink[R & Scope, IOException, Byte, Nothing, Unit] = { | ||
ZSink | ||
.foreachChunk[R & Scope, IOException, Byte] { content => | ||
ftp.upload( | ||
path = pathName, | ||
source = ZStream.fromChunk(content) | ||
) | ||
} | ||
} | ||
} | ||
|
||
object LiveFtpConnector { | ||
|
||
val layer: ZLayer[Ftp, Nothing, LiveFtpConnector] = | ||
ZLayer.fromZIO(ZIO.service[Ftp].map(LiveFtpConnector(_))) | ||
|
||
} |
38 changes: 38 additions & 0 deletions
38
connectors/ftp-connector/src/main/scala/zio/connect/ftp/package.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package zio.connect | ||
|
||
import zio._ | ||
import zio.connect.ftp.FtpConnector.PathName | ||
import zio.ftp.{Ftp, FtpResource} | ||
import zio.stream.{ZSink, ZStream} | ||
|
||
import java.io.IOException | ||
|
||
package object ftp { | ||
|
||
def stat(implicit trace: Trace): ZSink[FtpConnector, IOException, PathName, PathName, Option[FtpResource]] = | ||
ZSink.serviceWithSink(_.stat) | ||
|
||
def rm(implicit trace: Trace): ZSink[FtpConnector, IOException, PathName, PathName, Unit] = | ||
ZSink.serviceWithSink(_.rm) | ||
|
||
def rmDir(implicit trace: Trace): ZSink[FtpConnector, IOException, PathName, PathName, Unit] = | ||
ZSink.serviceWithSink(_.rmDir) | ||
|
||
def mkDir(implicit trace: Trace): ZSink[FtpConnector, IOException, PathName, PathName, Unit] = | ||
ZSink.serviceWithSink(_.mkDir) | ||
|
||
def ls(path: => PathName)(implicit trace: Trace): ZStream[FtpConnector, IOException, FtpResource] = | ||
ZStream.serviceWithStream(_.ls(path)) | ||
|
||
def lsDescendant(path: => PathName): ZStream[FtpConnector, IOException, FtpResource] = | ||
ZStream.serviceWithStream(_.lsDescendant(path)) | ||
|
||
def readFile(path: => PathName, chunkSize: Int = 2048)(implicit trace: Trace): ZStream[FtpConnector, IOException, Byte] = | ||
ZStream.serviceWithStream(_.readFile(path, chunkSize)) | ||
|
||
def upload[R](pathName: => PathName)(implicit trace: Trace): ZSink[R & Scope & FtpConnector, IOException, Byte, Nothing, Unit] = | ||
ZSink.serviceWithSink[FtpConnector](_.upload(pathName)) | ||
|
||
val ftpConnectorLiveLayer: ZLayer[Ftp, Nothing, LiveFtpConnector] = LiveFtpConnector.layer | ||
|
||
} |
210 changes: 210 additions & 0 deletions
210
connectors/ftp-connector/src/test/scala/zio/connect/ftp/FtpConnectorSpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,210 @@ | ||
package zio.connect.ftp | ||
|
||
import zio._ | ||
import zio.connect.ftp.FtpConnector.PathName | ||
import zio.stream.ZPipeline.utf8Decode | ||
|
||
import zio.stream.ZStream | ||
import zio.test.Assertion._ | ||
import zio.test._ | ||
|
||
import java.io.IOException | ||
|
||
trait FtpConnectorSpec extends ZIOSpecDefault { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suites should be ordered alphabetically in implementation and in the composed result |
||
|
||
val ftpConnectorSpec: Spec[Scope & FtpConnector, IOException] = | ||
statSuite + rmSuite + rmDirSuite + mkDirSuite + lsSuite + lsDescendantSuite + readFileSuite + uploadSuite | ||
|
||
private lazy val statSuite: Spec[Scope & FtpConnector, IOException] = | ||
suite("stat")( | ||
test("returns None when path doesn't exist") { | ||
val path = PathName("/fail-stat.txt") | ||
for { | ||
content <- ZStream.succeed(path) >>> stat | ||
} yield assertTrue(content.isEmpty) | ||
}, | ||
test("succeeds when file does exist") { | ||
val path = PathName("/succeed-stat.txt") | ||
val data = ZStream.fromChunks(Chunk.fromArray("hello".getBytes)) | ||
for { | ||
_ <- data >>> upload(path) | ||
resource <- ZStream.succeed(path) >>> stat | ||
fileDeleted <- (ZStream.succeed(path) >>> rm).as(true) | ||
resourceHasSamePath = resource.get.path == path | ||
resourceIsFile = !resource.get.isDirectory.get | ||
} yield assertTrue(resourceHasSamePath) && | ||
assertTrue(resourceIsFile) && | ||
assertTrue(fileDeleted) | ||
}, | ||
test("succeeds when directory does exist") { | ||
val path = PathName("/test") | ||
for { | ||
dirCreated <- (ZStream.succeed(path) >>> mkDir).as(true) | ||
resource <- ZStream.succeed(path) >>> stat | ||
dirDeleted <- (ZStream.succeed(path) >>> rmDir).as(true) | ||
resourceHasSamePath = resource.get.path == path | ||
resourceIsDirectory = resource.get.isDirectory.get | ||
} yield assertTrue(dirCreated) && assertTrue(dirDeleted) && | ||
assertTrue(resourceHasSamePath) && assertTrue(resourceIsDirectory) | ||
} | ||
) | ||
|
||
private lazy val rmSuite: Spec[Scope & FtpConnector, IOException] = | ||
suite("rm")( | ||
test("fails when path is invalid") { | ||
val path = PathName("/fail-rm.txt") | ||
for { | ||
invalid <- (ZStream.succeed(path) >>> rm) | ||
.foldCause(_.failureOption.map(_.getMessage).getOrElse(""), _ => "") | ||
} yield assertTrue(invalid == s"Path is invalid. Cannot delete file : $path") | ||
}, | ||
test("succeeds") { | ||
val path = PathName("/succeed-rm.txt") | ||
val data = ZStream.fromChunks(Chunk.fromArray("hello".getBytes)) | ||
for { | ||
_ <- data >>> upload(path) | ||
_ <- ZStream.succeed(path) >>> rm | ||
stat <- ZStream.succeed(path) >>> stat | ||
} yield assertTrue(stat.isEmpty) | ||
} | ||
) | ||
|
||
private lazy val rmDirSuite: Spec[FtpConnector, IOException] = | ||
suite("rmDir")( | ||
test("fails when directory doesn't exist") { | ||
val path = PathName("/invalid") | ||
for { | ||
invalid <- (ZStream.succeed(path) >>> rmDir) | ||
.foldCause(_.failureOption.map(_.getMessage).getOrElse(""), _ => "") | ||
} yield assertTrue(invalid == s"Path is invalid. Cannot delete directory : $path") | ||
}, | ||
test("succeeds") { | ||
val path = PathName("/succeed") | ||
for { | ||
_ <- ZStream.succeed(path) >>> mkDir | ||
_ <- ZStream.succeed(path) >>> rmDir | ||
resource <- ZStream.succeed(path) >>> stat | ||
} yield assertTrue(resource.isEmpty) | ||
} | ||
) | ||
|
||
private lazy val mkDirSuite: Spec[Scope & FtpConnector, IOException] = | ||
suite("mkDir")( | ||
test("fails when path is invalid") { | ||
val path = PathName("fail-mkdir.txt") | ||
val data = ZStream.fromChunks(Chunk.fromArray("hello".getBytes)) | ||
for { | ||
_ <- data >>> upload(path) | ||
invalid <- (ZStream.succeed(path) >>> mkDir).foldCause(_.failureOption.map(_.getMessage).getOrElse(""), _ => "") | ||
fileDeleted <- (ZStream.succeed(path) >>> rm).as(true) | ||
} yield assertTrue(fileDeleted) && | ||
assertTrue(invalid == s"Path is invalid. Cannot create directory : $path") | ||
}, | ||
test("succeeds") { | ||
val path = PathName("succeed") | ||
for { | ||
_ <- ZStream.succeed(path) >>> mkDir | ||
resource <- ZStream.succeed(path) >>> stat | ||
dirDeleted <- (ZStream.succeed(path) >>> rmDir).as(true) | ||
} yield | ||
assertTrue(dirDeleted) && | ||
assertTrue(resource.get.isDirectory.get) | ||
} | ||
) | ||
|
||
private lazy val lsSuite: Spec[Scope & FtpConnector, IOException] = | ||
suite("ls")( | ||
test("fails with invalid directory") { | ||
val path = PathName("/invalid-path") | ||
for { | ||
files <- ls(path).runCollect | ||
} yield assert(files)(hasSameElements(Nil)) | ||
}, | ||
test("succeeds") { | ||
val dataPath = PathName("/hello.txt") | ||
val data = ZStream.fromChunks(Chunk.fromArray("hello".getBytes)) | ||
|
||
for { | ||
_ <- data >>> upload(dataPath) | ||
files <- ls(PathName("/")).runFold(List.empty[String])((l, resource) => l :+ resource.path) | ||
fileDeleted <- (ZStream(dataPath) >>> rm).as(true) | ||
} yield assertTrue(fileDeleted) && | ||
assert(files)(hasSameElements(List(dataPath))) | ||
} | ||
) | ||
|
||
private lazy val lsDescendantSuite: Spec[Scope & FtpConnector, IOException] = | ||
suite("lsDescendant")( | ||
test("succeeds") { | ||
val dirPath = PathName("/dir1") | ||
val filePath = PathName("/hello.txt") | ||
val fileInDirPath = PathName(dirPath + filePath) | ||
val data = ZStream.fromChunks(Chunk.fromArray("hello".getBytes)) | ||
|
||
for { | ||
_ <- data >>> upload(filePath) | ||
_ <- ZStream.succeed(dirPath) >>> mkDir | ||
_ <- data >>> upload(fileInDirPath) | ||
files <- lsDescendant(PathName("/")).runFold(List.empty[String])((s, f) => f.path +: s) | ||
fileDeleted <- (ZStream.succeed(filePath) >>> rm).as(true) | ||
fileInDirDeleted <- (ZStream.succeed(fileInDirPath) >>> rm).as(true) | ||
dirDeleted <- (ZStream.succeed(dirPath) >>> rmDir).as(true) | ||
} yield assertTrue(fileDeleted) && | ||
assertTrue(fileInDirDeleted) && | ||
assertTrue(dirDeleted) && | ||
assert(files.reverse)(hasSameElements(List(fileInDirPath, filePath))) | ||
}, | ||
test("fails with invalid directory") { | ||
val path = PathName("/invalid-path-ls-descendant") | ||
for { | ||
files <- lsDescendant(path).runCollect | ||
} yield assertTrue(files == Chunk.empty) | ||
}, | ||
) | ||
|
||
private lazy val readFileSuite: Spec[Scope & FtpConnector, IOException] = | ||
suite("readFile")( | ||
test("fails when file doesn't exist") { | ||
val path = PathName("/fail.txt") | ||
for { | ||
invalid <- readFile(path) | ||
.via(utf8Decode) | ||
.runCollect | ||
.foldCause(_.failureOption.map(_.getMessage).getOrElse(""), _ => "") | ||
} yield assertTrue(invalid == s"File does not exist $path") | ||
}, | ||
test("succeeds") { | ||
val path = PathName("/succeed.txt") | ||
val data = ZStream.fromChunk(Chunk.fromArray("hello world".getBytes)) | ||
for { | ||
_ <- data >>> upload(path) | ||
content <- readFile(path).via(utf8Decode).runCollect | ||
fileDeleted <- (ZStream.succeed(path) >>> rm).as(true) | ||
} yield assertTrue(fileDeleted) && | ||
assert(content.mkString)(equalTo("hello world")) | ||
} | ||
) | ||
|
||
private lazy val uploadSuite: Spec[Scope & FtpConnector, IOException] = | ||
suite("upload")( | ||
test("fails when path is invalid") { | ||
val path = PathName("/dir/fail.txt") | ||
val data = ZStream.fromChunk(Chunk.fromArray("hello world".getBytes)) | ||
for { | ||
invalid <- (data >>> upload(path)).foldCause(_.failureOption.map(_.getMessage).getOrElse(""), _ => "") | ||
} yield assertTrue(invalid == s"Path is invalid. Cannot upload data to : $path") | ||
}, | ||
test("succeeds") { | ||
val data = ZStream.fromChunk(Chunk.fromArray("hello world".getBytes)) | ||
val path = PathName("/succeed.txt") | ||
( | ||
for { | ||
_ <- data >>> upload(path) | ||
content <- readFile(path).via(utf8Decode).runCollect | ||
fileDeleted <- (ZStream.succeed(path) >>> rm).as(true) | ||
} yield assertTrue(fileDeleted) && | ||
assertTrue(content.mkString == "hello world") | ||
) | ||
} | ||
) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move zio-prelude to FtpDependencies please. While it's an RC version I would like to avoid putting it in the core dependencies.