Skip to content

Commit

Permalink
GH-2048 Simplify implementation with ReentrantReadWriteLock & cached map
Browse files Browse the repository at this point in the history
  • Loading branch information
dzikoysk committed Feb 20, 2024
1 parent d0042d5 commit b4d8314
Showing 1 changed file with 30 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.reposilite.storage.filesystem

import com.google.common.cache.CacheBuilder
import com.reposilite.journalist.Journalist
import com.reposilite.shared.ErrorResponse
import com.reposilite.shared.badRequest
Expand Down Expand Up @@ -48,7 +49,8 @@ import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.nio.file.attribute.FileTime
import java.util.concurrent.Executors
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit.MINUTES
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.streams.asSequence
import panda.std.Result
import panda.std.asSuccess
Expand All @@ -61,36 +63,42 @@ abstract class FileSystemStorageProvider protected constructor(
val rootDirectory: Path
) : StorageProvider {

data class LockedLocation(
val location: Location,
val lock: ReentrantLock = ReentrantLock(),
val updates: Int = 1
)

// tech-debt(GH-2048): StorageProvider API uses blocking operations
// FileSystemStorageProvider has to respond with open input-streams, so resource lock has to be maintained until the stream is closed
// Locks have to be released by the same thread that acquired them, so we have to use a single-thread executor that proxies lock requests
// This is a quick hot-fix, but it's not a proper solution - we should cover IO operations with CompletableFutures, so we can avoid it in the first place
// ~ https://github.com/dzikoysk/reposilite/issues/2048
private val lockManager = Executors.newSingleThreadExecutor()
private val lockedLocations = HashMap<Location, LockedLocation>(16)

private val lockedLocations =
CacheBuilder.newBuilder()
.expireAfterAccess(3, MINUTES)
.concurrencyLevel(1)
.build<Location, ReentrantReadWriteLock>()

private enum class LockMode {
READ,
WRITE
}

override fun shutdown() {
lockManager.shutdown()
}

private fun acquireFileAccessLock(location: Location): Closeable =
private fun acquireFileAccessLock(location: Location, lockMode: LockMode): Closeable =
lockManager.submit<Closeable> {
val lockedLocation = lockedLocations.compute(location) { _, currentLock ->
currentLock?.copy(updates = currentLock.updates + 1) ?: LockedLocation(location)
}!!
val lock = lockedLocations.get(location) { ReentrantReadWriteLock() }

lockedLocation.lock.lock()
when (lockMode) {
LockMode.READ -> lock.readLock().lock()
LockMode.WRITE -> lock.writeLock().lock()
}

Closeable {
lockManager.execute {
lockedLocation.lock.unlock()

lockedLocations.compute(location) { _, currentLock ->
when (currentLock!!.updates) {
1 -> null
else -> currentLock.copy(updates = currentLock.updates - 1)
}
when (lockMode) {
LockMode.READ -> lock.readLock().unlock()
LockMode.WRITE -> lock.writeLock().unlock()
}
}
}
Expand All @@ -117,7 +125,7 @@ abstract class FileSystemStorageProvider protected constructor(
data.copyTo(destination)
}

acquireFileAccessLock(location).use {
acquireFileAccessLock(location, LockMode.WRITE).use {
Files.move(temporaryFile.toPath(), file, StandardCopyOption.REPLACE_EXISTING)
Unit
}
Expand All @@ -128,7 +136,7 @@ abstract class FileSystemStorageProvider protected constructor(
location.resolveWithRootDirectory()
.exists()
.flatMap {
val lock = acquireFileAccessLock(location)
val lock = acquireFileAccessLock(location, LockMode.READ)

it.inputStream()
.onError { lock.close() }
Expand Down

0 comments on commit b4d8314

Please sign in to comment.