Skip to content

Commit

Permalink
GH-2048 Replace local retry mechanism with shared lock
Browse files Browse the repository at this point in the history
  • Loading branch information
dzikoysk committed Feb 5, 2024
1 parent 4f58525 commit 4c5844a
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,25 @@ import com.reposilite.shared.toErrorResponse
import com.reposilite.storage.api.FileType
import com.reposilite.storage.api.FileType.DIRECTORY
import com.reposilite.storage.api.FileType.FILE
import io.javalin.http.HttpStatus.INTERNAL_SERVER_ERROR
import io.javalin.http.HttpStatus.NO_CONTENT
import panda.std.Result
import java.io.InputStream
import java.nio.file.Files
import java.nio.file.Path
import kotlin.io.path.isDirectory
import panda.std.Result

fun Path.type(): FileType =
if (this.isDirectory()) DIRECTORY else FILE

fun Path.inputStream(): Result<InputStream, ErrorResponse> =
Result.`when`(Files.exists(this), this, notFound(""))
.filter({ it.isDirectory().not() }, { NO_CONTENT.toErrorResponse("Requested file is a directory") })
.map { Files.newInputStream(it) }
.flatMap {
Result.supplyThrowing { Files.newInputStream(it) }
.onError { it.printStackTrace() }
.mapErr { INTERNAL_SERVER_ERROR.toErrorResponse("Cannot read file") }
}

internal fun Path.getExtension(): String =
getSimpleName().getExtension()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,17 @@ import com.reposilite.storage.type
import io.javalin.http.ContentType
import io.javalin.http.ContentType.APPLICATION_OCTET_STREAM
import io.javalin.http.HttpStatus.INSUFFICIENT_STORAGE
import java.io.Closeable
import java.io.File
import java.io.FilterInputStream
import java.io.IOException
import java.io.InputStream
import java.nio.file.FileAlreadyExistsException
import java.nio.file.Files
import java.nio.file.NoSuchFileException
import java.nio.file.Path
import java.nio.file.StandardCopyOption.REPLACE_EXISTING
import java.nio.file.StandardCopyOption
import java.nio.file.attribute.FileTime
import kotlin.io.path.absolutePathString
import java.util.concurrent.Executors
import java.util.concurrent.locks.ReentrantLock
import kotlin.streams.asSequence
import panda.std.Result
import panda.std.asSuccess
Expand All @@ -60,7 +61,40 @@ abstract class FileSystemStorageProvider protected constructor(
val rootDirectory: Path
) : StorageProvider {

override fun shutdown() {}
data class LockedLocation(
val location: Location,
val lock: ReentrantLock = ReentrantLock(),
val updates: Int = 1
)

private val lockManager = Executors.newSingleThreadExecutor()
private val lockedLocations = HashMap<Location, LockedLocation>(16)

override fun shutdown() {
lockManager.shutdown()
}

private fun acquireFileAccessLock(location: Location): Closeable =
lockManager.submit<Closeable> {
val lockedLocation = lockedLocations.compute(location) { _, currentLock ->
currentLock?.copy(updates = currentLock.updates + 1) ?: LockedLocation(location)
}!!

lockedLocation.lock.lock()

Closeable {
lockManager.execute {
lockedLocation.lock.unlock()

lockedLocations.compute(location) { _, currentLock ->
when (currentLock!!.updates) {
1 -> null
else -> currentLock.copy(updates = currentLock.updates - 1)
}
}
}
}
}.get()

override fun putFile(location: Location, inputStream: InputStream): Result<Unit, ErrorResponse> =
inputStream.use { data ->
Expand All @@ -83,27 +117,31 @@ abstract class FileSystemStorageProvider protected constructor(
data.copyTo(destination)
}

do {
try {
Files.move(temporaryFile.toPath(), file, REPLACE_EXISTING)
} catch (e: NoSuchFileException) {
// Concurrent Files.move calls may throw
// ~ https://github.com/dzikoysk/reposilite/issues/1975
journalist.logger.debug("[FS][1] Cannot move file ${temporaryFile.absolutePath} to ${file.absolutePathString()}, retrying...")
Thread.sleep(1000) // probably good enough for now
} catch (e: FileAlreadyExistsException) {
// ~ https://github.com/dzikoysk/reposilite/issues/2027
journalist.logger.debug("[FS][2] Cannot move file ${temporaryFile.absolutePath} to ${file.absolutePathString()}, retrying...")
Thread.sleep(1000) // probably good enough for now
}
} while (Files.exists(temporaryFile.toPath()))
acquireFileAccessLock(location).use {
Files.move(temporaryFile.toPath(), file, StandardCopyOption.REPLACE_EXISTING)
Unit
}
}
}

override fun getFile(location: Location): Result<InputStream, ErrorResponse> =
location.resolveWithRootDirectory()
.exists()
.flatMap { it.inputStream() }
.flatMap {
val lock = acquireFileAccessLock(location)

it.inputStream()
.onError { lock.close() }
.map { inputStream ->
object : FilterInputStream(inputStream) {
override fun close() {
lock.use {
inputStream.close()
}
}
}
}
}

override fun getFileDetails(location: Location): Result<out FileDetails, ErrorResponse> =
location.resolveWithRootDirectory()
Expand Down

0 comments on commit 4c5844a

Please sign in to comment.