e237555222
Biometric: - Handle onAuthenticationError with auto-retry (except user cancel) - Show lock screen with proper UI and an Unlock button as fallback - Add subtitle clarifying fingerprint/PIN options Sync engine: - Fix data race: async coroutines now return FileOutcome instead of mutating shared vars/list concurrently (was causing file states to not be saved, so every sync re-transferred all files) - Fix remoteChanged: use || instead of && so either etag or modifiedAt change is enough to detect a remote modification Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
266 lines
13 KiB
Kotlin
266 lines
13 KiB
Kotlin
package com.syncflow.domain.sync
|
|
|
|
import android.content.Context
|
|
import android.net.Uri
|
|
import com.syncflow.data.db.SyncConflictDao
|
|
import com.syncflow.data.db.SyncEventDao
|
|
import com.syncflow.data.db.SyncFileStateDao
|
|
import com.syncflow.data.db.SyncPairDao
|
|
import com.syncflow.data.db.entities.SyncConflictEntity
|
|
import com.syncflow.data.db.entities.SyncEventEntity
|
|
import com.syncflow.data.providers.CloudProvider
|
|
import com.syncflow.domain.model.ConflictStrategy
|
|
import com.syncflow.domain.model.DeleteBehavior
|
|
import com.syncflow.domain.model.RemoteFile
|
|
import com.syncflow.domain.model.SyncDirection
|
|
import com.syncflow.domain.model.SyncEventType
|
|
import com.syncflow.domain.model.SyncPair
|
|
import com.syncflow.domain.model.SyncStatus
|
|
import dagger.hilt.android.qualifiers.ApplicationContext
|
|
import kotlinx.coroutines.*
|
|
import kotlinx.coroutines.sync.Semaphore
|
|
import kotlinx.coroutines.sync.withPermit
|
|
import timber.log.Timber
|
|
import java.io.File
|
|
import java.time.Instant
|
|
import javax.inject.Inject
|
|
|
|
class SyncEngine @Inject constructor(
|
|
private val syncPairDao: SyncPairDao,
|
|
private val fileStateDao: SyncFileStateDao,
|
|
private val conflictDao: SyncConflictDao,
|
|
private val eventDao: SyncEventDao,
|
|
@ApplicationContext private val context: Context,
|
|
) {
|
|
suspend fun sync(pair: SyncPair, provider: CloudProvider): SyncResult {
|
|
syncPairDao.updateStatus(pair.id, SyncStatus.SYNCING)
|
|
logEvent(pair.id, SyncEventType.SYNC_STARTED, null, null, 0)
|
|
|
|
return try {
|
|
val result = performSync(pair, provider)
|
|
val finalStatus = when {
|
|
result.failedFiles > 0 && result.conflicts > 0 -> SyncStatus.CONFLICT
|
|
result.failedFiles > 0 -> SyncStatus.PARTIAL
|
|
result.conflicts > 0 -> SyncStatus.CONFLICT
|
|
else -> SyncStatus.SUCCESS
|
|
}
|
|
syncPairDao.updateSyncResult(pair.id, Instant.now(), finalStatus, result.conflicts)
|
|
logEvent(pair.id, SyncEventType.SYNC_COMPLETED, null, "↑${result.uploaded} ↓${result.downloaded} ✗${result.failedFiles}", result.bytesTransferred)
|
|
result
|
|
} catch (e: Exception) {
|
|
Timber.e(e, "Sync failed for pair ${pair.id}")
|
|
syncPairDao.updateSyncResult(pair.id, Instant.now(), SyncStatus.FAILED, 0)
|
|
logEvent(pair.id, SyncEventType.SYNC_FAILED, null, e.message, 0)
|
|
SyncResult(failedFiles = 1, error = e)
|
|
}
|
|
}
|
|
|
|
private fun makeAccessor(localPath: String): LocalAccessor =
|
|
if (localPath.startsWith("content://"))
|
|
LocalAccessor.Saf(Uri.parse(localPath), context.contentResolver)
|
|
else
|
|
LocalAccessor.JavaFile(File(localPath))
|
|
|
|
private suspend fun performSync(pair: SyncPair, provider: CloudProvider): SyncResult {
|
|
val accessor = makeAccessor(pair.localPath)
|
|
val knownStates = fileStateDao.getForPair(pair.id).associateBy { it.relativePath }
|
|
val remoteFiles = provider.listFiles(pair.remotePath).getOrThrow()
|
|
.associateBy { it.path.removePrefix(pair.remotePath).trimStart('/') }
|
|
val localFiles = accessor.walkFiles(pair)
|
|
|
|
val allPaths = (localFiles.keys + remoteFiles.keys + knownStates.keys).toSet()
|
|
val semaphore = Semaphore(4)
|
|
|
|
// Each async block returns its outcome; no shared mutable state across coroutines.
|
|
data class FileOutcome(
|
|
val uploaded: Int = 0, val downloaded: Int = 0, val deleted: Int = 0,
|
|
val skipped: Int = 0, val failed: Int = 0, val conflicts: Int = 0,
|
|
val bytesTransferred: Long = 0L,
|
|
val newState: com.syncflow.data.db.entities.SyncFileStateEntity? = null,
|
|
)
|
|
|
|
val outcomes: List<FileOutcome> = coroutineScope {
|
|
allPaths.map { rel ->
|
|
async {
|
|
semaphore.withPermit {
|
|
val local = localFiles[rel]
|
|
val remote = remoteFiles[rel]
|
|
val known = knownStates[rel]
|
|
val decision = decide(pair.syncDirection, pair.conflictStrategy, pair.deleteBehavior, local, remote, known)
|
|
|
|
when (decision) {
|
|
SyncDecision.UPLOAD -> {
|
|
val bytes = runCatching {
|
|
accessor.openInputStream(rel)?.use { stream ->
|
|
provider.uploadFile(stream, "${pair.remotePath}/$rel", local!!.sizeBytes) { }
|
|
}
|
|
local!!.sizeBytes
|
|
}.getOrElse { e ->
|
|
Timber.e(e, "Upload failed: $rel")
|
|
logEvent(pair.id, SyncEventType.FILE_SKIPPED, rel, e.message, 0)
|
|
return@withPermit FileOutcome(failed = 1)
|
|
}
|
|
logEvent(pair.id, SyncEventType.FILE_UPLOADED, rel, null, bytes)
|
|
FileOutcome(uploaded = 1, bytesTransferred = bytes, newState = buildState(pair.id, rel, local!!, remote))
|
|
}
|
|
SyncDecision.DOWNLOAD -> {
|
|
val bytes = runCatching {
|
|
accessor.createOutputStream(rel)?.use { stream ->
|
|
provider.downloadFile("${pair.remotePath}/$rel", stream) { }
|
|
}
|
|
remote!!.sizeBytes
|
|
}.getOrElse { e ->
|
|
Timber.e(e, "Download failed: $rel")
|
|
logEvent(pair.id, SyncEventType.FILE_SKIPPED, rel, e.message, 0)
|
|
return@withPermit FileOutcome(failed = 1)
|
|
}
|
|
logEvent(pair.id, SyncEventType.FILE_DOWNLOADED, rel, null, bytes)
|
|
FileOutcome(downloaded = 1, bytesTransferred = bytes, newState = buildState(pair.id, rel, null, remote))
|
|
}
|
|
SyncDecision.DELETE_LOCAL -> {
|
|
accessor.delete(rel)
|
|
fileStateDao.delete(pair.id, rel)
|
|
logEvent(pair.id, SyncEventType.FILE_DELETED, rel, "local", 0)
|
|
FileOutcome(deleted = 1)
|
|
}
|
|
SyncDecision.DELETE_REMOTE -> {
|
|
provider.deleteFile("${pair.remotePath}/$rel")
|
|
fileStateDao.delete(pair.id, rel)
|
|
logEvent(pair.id, SyncEventType.FILE_DELETED, rel, "remote", 0)
|
|
FileOutcome(deleted = 1)
|
|
}
|
|
SyncDecision.CONFLICT -> {
|
|
conflictDao.insert(SyncConflictEntity(
|
|
syncPairId = pair.id,
|
|
relativePath = rel,
|
|
localModifiedAt = local?.lastModifiedMs?.let { Instant.ofEpochMilli(it) } ?: Instant.EPOCH,
|
|
localSizeBytes = local?.sizeBytes ?: 0L,
|
|
remoteModifiedAt = remote?.modifiedAt ?: Instant.EPOCH,
|
|
remoteSizeBytes = remote?.sizeBytes ?: 0L,
|
|
resolution = null,
|
|
detectedAt = Instant.now(),
|
|
))
|
|
logEvent(pair.id, SyncEventType.CONFLICT_DETECTED, rel, null, 0)
|
|
FileOutcome(conflicts = 1)
|
|
}
|
|
SyncDecision.SKIP -> FileOutcome(skipped = 1)
|
|
}
|
|
}
|
|
}
|
|
}.awaitAll()
|
|
}
|
|
|
|
fileStateDao.upsertAll(outcomes.mapNotNull { it.newState })
|
|
return SyncResult(
|
|
uploaded = outcomes.sumOf { it.uploaded },
|
|
downloaded = outcomes.sumOf { it.downloaded },
|
|
deleted = outcomes.sumOf { it.deleted },
|
|
skipped = outcomes.sumOf { it.skipped },
|
|
failedFiles = outcomes.sumOf { it.failed },
|
|
conflicts = outcomes.sumOf { it.conflicts },
|
|
bytesTransferred = outcomes.sumOf { it.bytesTransferred },
|
|
)
|
|
}
|
|
|
|
private fun decide(
|
|
direction: SyncDirection,
|
|
conflictStrategy: ConflictStrategy,
|
|
deleteBehavior: DeleteBehavior,
|
|
local: LocalFileInfo?,
|
|
remote: RemoteFile?,
|
|
known: com.syncflow.data.db.entities.SyncFileStateEntity?,
|
|
): SyncDecision {
|
|
val localExists = local != null
|
|
val remoteExists = remote != null
|
|
|
|
val localChanged = known == null || (localExists && local!!.lastModifiedMs != known.localModifiedAt?.toEpochMilli())
|
|
val remoteChanged = known == null || (remoteExists && (remote!!.etag != known.remoteEtag || remote.modifiedAt != known.remoteModifiedAt))
|
|
|
|
return when {
|
|
!localExists && !remoteExists -> SyncDecision.SKIP
|
|
|
|
localExists && !remoteExists -> when {
|
|
known == null -> when (direction) {
|
|
SyncDirection.UPLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.UPLOAD
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
else -> when {
|
|
deleteBehavior == DeleteBehavior.KEEP -> SyncDecision.SKIP
|
|
direction == SyncDirection.DOWNLOAD_ONLY || direction == SyncDirection.TWO_WAY -> SyncDecision.DELETE_LOCAL
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
}
|
|
|
|
!localExists && remoteExists -> when {
|
|
known == null -> when (direction) {
|
|
SyncDirection.DOWNLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.DOWNLOAD
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
else -> when {
|
|
deleteBehavior == DeleteBehavior.KEEP -> SyncDecision.SKIP
|
|
direction == SyncDirection.UPLOAD_ONLY || direction == SyncDirection.TWO_WAY -> SyncDecision.DELETE_REMOTE
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
}
|
|
|
|
localChanged && remoteChanged -> when (direction) {
|
|
SyncDirection.UPLOAD_ONLY -> SyncDecision.UPLOAD
|
|
SyncDirection.DOWNLOAD_ONLY -> SyncDecision.DOWNLOAD
|
|
SyncDirection.TWO_WAY -> when (conflictStrategy) {
|
|
ConflictStrategy.KEEP_LOCAL -> SyncDecision.UPLOAD
|
|
ConflictStrategy.KEEP_REMOTE -> SyncDecision.DOWNLOAD
|
|
ConflictStrategy.KEEP_NEWEST -> if ((local?.lastModifiedMs ?: 0L) >= (remote?.modifiedAt?.toEpochMilli() ?: 0L)) SyncDecision.UPLOAD else SyncDecision.DOWNLOAD
|
|
ConflictStrategy.KEEP_LARGEST -> if ((local?.sizeBytes ?: 0L) >= (remote?.sizeBytes ?: 0L)) SyncDecision.UPLOAD else SyncDecision.DOWNLOAD
|
|
ConflictStrategy.KEEP_BOTH -> SyncDecision.CONFLICT
|
|
ConflictStrategy.ASK -> SyncDecision.CONFLICT
|
|
}
|
|
}
|
|
|
|
localChanged -> when (direction) {
|
|
SyncDirection.UPLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.UPLOAD
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
remoteChanged -> when (direction) {
|
|
SyncDirection.DOWNLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.DOWNLOAD
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
}
|
|
|
|
private fun buildState(
|
|
pairId: Long,
|
|
rel: String,
|
|
local: LocalFileInfo?,
|
|
remote: RemoteFile?,
|
|
) = com.syncflow.data.db.entities.SyncFileStateEntity(
|
|
syncPairId = pairId,
|
|
relativePath = rel,
|
|
localModifiedAt = local?.lastModifiedMs?.let { Instant.ofEpochMilli(it) },
|
|
localSizeBytes = local?.sizeBytes ?: 0L,
|
|
localHash = null,
|
|
remoteModifiedAt = remote?.modifiedAt,
|
|
remoteSizeBytes = remote?.sizeBytes ?: 0L,
|
|
remoteEtag = remote?.etag,
|
|
lastSyncedAt = Instant.now(),
|
|
syncedHash = null,
|
|
)
|
|
|
|
private suspend fun logEvent(pairId: Long, type: SyncEventType, file: String?, msg: String?, bytes: Long) {
|
|
eventDao.insert(SyncEventEntity(syncPairId = pairId, timestamp = Instant.now(), eventType = type, filePath = file, message = msg, bytesTransferred = bytes))
|
|
}
|
|
}
|
|
|
|
enum class SyncDecision { UPLOAD, DOWNLOAD, DELETE_LOCAL, DELETE_REMOTE, CONFLICT, SKIP }
|
|
|
|
data class SyncResult(
|
|
val uploaded: Int = 0,
|
|
val downloaded: Int = 0,
|
|
val deleted: Int = 0,
|
|
val skipped: Int = 0,
|
|
val failedFiles: Int = 0,
|
|
val conflicts: Int = 0,
|
|
val bytesTransferred: Long = 0L,
|
|
val error: Exception? = null,
|
|
)
|