Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import com.spotify.confidence.apply.ApplyInstance
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import java.io.File
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.read
import kotlin.concurrent.write

internal const val FLAGS_FILE_NAME = "confidence_flags_cache.json"
internal const val APPLY_FILE_NAME = "confidence_apply_cache.json"
Expand All @@ -14,13 +17,16 @@ internal class FileDiskStorage internal constructor(
private val flagsFile: File,
private val applyFile: File
) : DiskStorage {
private val lock = ReentrantReadWriteLock()

override fun store(flagResolution: FlagResolution) {
write(Json.encodeToString(flagResolution))
}

override fun clear() {
flagsFile.delete()
lock.write {
flagsFile.delete()
}
}

override fun writeApplyData(applyData: Map<String, MutableMap<String, ApplyInstance>>) {
Expand All @@ -42,11 +48,11 @@ internal class FileDiskStorage internal constructor(
}
}

private fun write(data: String) {
private fun write(data: String) = lock.write {
flagsFile.writeText(data)
}

override fun read(): FlagResolution {
override fun read(): FlagResolution = lock.read {
if (!flagsFile.exists()) return FlagResolution.EMPTY
val fileText: String = flagsFile.bufferedReader().use { it.readText() }
return if (fileText.isEmpty()) {
Expand All @@ -55,6 +61,8 @@ internal class FileDiskStorage internal constructor(
try {
Json.decodeFromString(fileText)
} catch (e: Throwable) {
// Delete corrupted file - safe to do while holding read lock
// since we're the only thread accessing it
flagsFile.delete()
FlagResolution.EMPTY
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import org.junit.Test
import java.io.File
import java.nio.file.Files
import java.util.Date
import java.util.concurrent.CountDownLatch
import java.util.concurrent.CyclicBarrier
import kotlin.concurrent.thread

class FileDiskStorageTest {
private lateinit var diskStorage: FileDiskStorage
Expand Down Expand Up @@ -64,4 +67,144 @@ class FileDiskStorageTest {
val read = diskStorage.read()
Assert.assertEquals(FlagResolution.EMPTY, read)
}

@Test
fun testConcurrentReadersAndWriters() {
// This test reproduces the original crash scenario:
// Multiple threads reading while multiple threads are writing simultaneously

// Given: Initial data
val initialData = FlagResolution(
mapOf("key" to ConfidenceValue.String("initial")),
listOf(),
"token-initial"
)
diskStorage.store(initialData)

// When: Multiple reader threads AND multiple writer threads run concurrently
val readerCount = 10
val writerCount = 5
val iterations = 200
val exceptions = mutableListOf<Throwable>()
val barrier = CyclicBarrier(readerCount + writerCount)
val threads = mutableListOf<Thread>()

// Reader threads - continuously read
repeat(readerCount) {
threads.add(
thread {
try {
barrier.await() // Start all threads at the same time
repeat(iterations) {
diskStorage.read() // This used to throw FileNotFoundException
}
} catch (e: Throwable) {
synchronized(exceptions) {
exceptions.add(e)
}
}
}
)
}

// Writer threads - continuously write
repeat(writerCount) { writerId ->
threads.add(
thread {
try {
barrier.await() // Start all threads at the same time
repeat(iterations) { iteration ->
val testData = FlagResolution(
mapOf("writer" to ConfidenceValue.String("$writerId-$iteration")),
listOf(),
"token-$writerId-$iteration"
)
diskStorage.store(testData)
}
} catch (e: Throwable) {
synchronized(exceptions) {
exceptions.add(e)
}
}
}
)
}

// Then: No FileNotFoundException or other exceptions should occur
threads.forEach { it.join(30000) }
Assert.assertTrue(
"Expected no exceptions but got: ${exceptions.map { "${it.javaClass.simpleName}: ${it.message}" }}",
exceptions.isEmpty()
)

// And: File should contain valid data from one of the writers
val finalResult = diskStorage.read()
Assert.assertNotEquals(FlagResolution.EMPTY, finalResult)
}

@Test
fun testFileDeletedExternallyDuringRead() {
// Given: A file with valid data
val testData = FlagResolution(
mapOf("key1" to ConfidenceValue.String("value1")),
listOf(),
"token123"
)
diskStorage.store(testData)

// When: File is deleted externally during read attempts
val readThreadCount = 5
val latch = CountDownLatch(1)
val exceptions = mutableListOf<Throwable>()

val threads = List(readThreadCount) {
thread {
try {
latch.await()
// Repeatedly try to read, some will succeed, some will find file missing
repeat(10) {
diskStorage.read() // Should handle FileNotFoundException gracefully
Thread.sleep(1)
}
} catch (e: Throwable) {
synchronized(exceptions) {
exceptions.add(e)
}
}
}
}

// Delete file externally (simulating external process/crash)
thread {
latch.countDown()
Thread.sleep(5) // Let some reads start
flagsFile.delete()
}

// Then: No FileNotFoundException should propagate
threads.forEach { it.join(5000) }
Assert.assertTrue(
"Expected no exceptions but got: ${exceptions.map { it.javaClass.simpleName + ": " + it.message }}",
exceptions.isEmpty()
)
}

@Test
fun testReadAfterClearReturnsEmpty() {
// Given: A file with data
val testData = FlagResolution(
mapOf("key1" to ConfidenceValue.String("value1")),
listOf(),
"token123"
)
diskStorage.store(testData)
Assert.assertEquals(testData, diskStorage.read())

// When: File is cleared
diskStorage.clear()

// Then: Read returns empty
val result = diskStorage.read()
Assert.assertEquals(FlagResolution.EMPTY, result)
}
}