897b685c70
Three bugs fixed: 1. catchupScan used raw dir.walk() with no filters, causing hidden/excluded files to appear as "new" every startup and trigger a catchup sync. Fixed by using LocalAccessor.walkFiles(pair) which applies the same filters and uses the same mtime source (SAF cursor) as SyncEngine. 2. catchupScan compared localModifiedAt.toEpochMilli() vs File.lastModified() (millisecond precision) while SyncEngine uses second precision. Every file appeared "modified" after a successful sync. Fixed by using epochSecond. 3. syncDecide() treated !localExists && remoteExists && known==null as "user deleted local copy → delete remote" even on files that were never synced. Fixed to treat unknown remote files as new (download them), which is safe because a genuinely-deleted file will always have a known state record from the previous sync. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
342 lines
17 KiB
Kotlin
342 lines
17 KiB
Kotlin
package com.syncflow.domain.sync
|
|
|
|
import android.content.Context
|
|
import android.net.Uri
|
|
import com.syncflow.data.db.SyncConflictDao
|
|
import com.syncflow.data.db.SyncEventDao
|
|
import com.syncflow.data.db.SyncFileStateDao
|
|
import com.syncflow.data.db.SyncPairDao
|
|
import com.syncflow.data.db.entities.SyncConflictEntity
|
|
import com.syncflow.data.db.entities.SyncEventEntity
|
|
import com.syncflow.data.db.entities.SyncFileStateEntity
|
|
import com.syncflow.data.providers.CloudProvider
|
|
import com.syncflow.domain.model.ConflictStrategy
|
|
import com.syncflow.domain.model.DeleteBehavior
|
|
import com.syncflow.domain.model.RemoteFile
|
|
import com.syncflow.domain.model.SyncDirection
|
|
import com.syncflow.domain.model.SyncEventType
|
|
import com.syncflow.domain.model.SyncPair
|
|
import com.syncflow.domain.model.SyncStatus
|
|
import dagger.hilt.android.qualifiers.ApplicationContext
|
|
import kotlinx.coroutines.*
|
|
import kotlinx.coroutines.sync.Semaphore
|
|
import kotlinx.coroutines.sync.withPermit
|
|
import timber.log.Timber
|
|
import java.io.File
|
|
import java.time.Instant
|
|
import javax.inject.Inject
|
|
|
|
class SyncEngine @Inject constructor(
|
|
private val syncPairDao: SyncPairDao,
|
|
private val fileStateDao: SyncFileStateDao,
|
|
private val conflictDao: SyncConflictDao,
|
|
private val eventDao: SyncEventDao,
|
|
@ApplicationContext private val context: Context,
|
|
) {
|
|
suspend fun sync(pair: SyncPair, provider: CloudProvider): SyncResult {
|
|
syncPairDao.updateStatus(pair.id, SyncStatus.SYNCING)
|
|
logEvent(pair.id, SyncEventType.SYNC_STARTED, null, null, 0)
|
|
|
|
return try {
|
|
val result = performSync(pair, provider)
|
|
val finalStatus = when {
|
|
result.failedFiles > 0 && result.conflicts > 0 -> SyncStatus.CONFLICT
|
|
result.failedFiles > 0 -> SyncStatus.PARTIAL
|
|
result.conflicts > 0 -> SyncStatus.CONFLICT
|
|
else -> SyncStatus.SUCCESS
|
|
}
|
|
syncPairDao.updateSyncResult(pair.id, Instant.now(), finalStatus, result.conflicts)
|
|
logEvent(pair.id, SyncEventType.SYNC_COMPLETED, null, "↑${result.uploaded} ↓${result.downloaded} ✕${result.deleted} ✗${result.failedFiles}", result.bytesTransferred)
|
|
result
|
|
} catch (e: Exception) {
|
|
Timber.e(e, "Sync failed for pair ${pair.id}")
|
|
syncPairDao.updateSyncResult(pair.id, Instant.now(), SyncStatus.FAILED, 0)
|
|
logEvent(pair.id, SyncEventType.SYNC_FAILED, null, e.message, 0)
|
|
SyncResult(failedFiles = 1, error = e)
|
|
}
|
|
}
|
|
|
|
private fun makeAccessor(localPath: String): LocalAccessor =
|
|
if (localPath.startsWith("content://"))
|
|
LocalAccessor.Saf(Uri.parse(localPath), context.contentResolver)
|
|
else
|
|
LocalAccessor.JavaFile(File(localPath))
|
|
|
|
private suspend fun performSync(
|
|
pair: SyncPair,
|
|
provider: CloudProvider,
|
|
isRetry: Boolean = false,
|
|
): SyncResult {
|
|
val accessor = makeAccessor(pair.localPath)
|
|
var knownStates = fileStateDao.getForPair(pair.id).associateBy { it.relativePath }
|
|
val remoteFiles = provider.listFiles(pair.remotePath).getOrThrow()
|
|
.associateBy { it.path.removePrefix(pair.remotePath).trimStart('/') }
|
|
val localFiles = accessor.walkFiles(pair)
|
|
|
|
// Self-healing: if every known-state path is absent from the current local scan but
|
|
// the local folder does have files, the localPath was changed without clearing state.
|
|
// The stale records would cause every old file to look like "DELETE_REMOTE" and every
|
|
// new file to re-upload indefinitely. Wipe and retry once as a fresh initial sync.
|
|
if (!isRetry && knownStates.isNotEmpty() && localFiles.isNotEmpty() &&
|
|
knownStates.keys.none { it in localFiles }) {
|
|
Timber.w("SyncEngine: stale folder states detected for pair ${pair.id} — resetting")
|
|
fileStateDao.deleteForPair(pair.id)
|
|
return performSync(pair, provider, isRetry = true)
|
|
}
|
|
|
|
val allPaths = (localFiles.keys + remoteFiles.keys + knownStates.keys).toSet()
|
|
val hasPriorSyncState = knownStates.isNotEmpty()
|
|
val semaphore = Semaphore(4)
|
|
|
|
// Each async block returns its outcome; no shared mutable state across coroutines.
|
|
data class FileOutcome(
|
|
val uploaded: Int = 0, val downloaded: Int = 0, val deleted: Int = 0,
|
|
val skipped: Int = 0, val failed: Int = 0, val conflicts: Int = 0,
|
|
val bytesTransferred: Long = 0L,
|
|
val newState: SyncFileStateEntity? = null,
|
|
)
|
|
|
|
val outcomes: List<FileOutcome> = coroutineScope {
|
|
allPaths.map { rel ->
|
|
async {
|
|
semaphore.withPermit {
|
|
val local = localFiles[rel]
|
|
val remote = remoteFiles[rel]
|
|
val known = knownStates[rel]
|
|
val decision = syncDecide(pair.syncDirection, pair.conflictStrategy, pair.deleteBehavior, local, remote, known, hasPriorSyncState)
|
|
|
|
when (decision) {
|
|
SyncDecision.UPLOAD -> {
|
|
var uploadedRemoteFile: RemoteFile? = null
|
|
val bytes = runCatching {
|
|
ensureRemoteDirs(provider, pair.remotePath, rel)
|
|
accessor.openInputStream(rel)?.use { stream ->
|
|
uploadedRemoteFile = provider.uploadFile(stream, "${pair.remotePath}/$rel", local!!.sizeBytes) { }.getOrThrow()
|
|
}
|
|
local!!.sizeBytes
|
|
}.getOrElse { e ->
|
|
Timber.e(e, "Upload failed: $rel")
|
|
logEvent(pair.id, SyncEventType.FILE_SKIPPED, rel, e.message, 0)
|
|
return@withPermit FileOutcome(failed = 1)
|
|
}
|
|
logEvent(pair.id, SyncEventType.FILE_UPLOADED, rel, null, bytes)
|
|
FileOutcome(uploaded = 1, bytesTransferred = bytes,
|
|
newState = buildState(pair.id, rel, local!!, remoteAfterTransfer = uploadedRemoteFile))
|
|
}
|
|
SyncDecision.DOWNLOAD -> {
|
|
val bytes = runCatching {
|
|
accessor.createOutputStream(rel)?.use { stream ->
|
|
provider.downloadFile("${pair.remotePath}/$rel", stream) { }
|
|
}
|
|
remote!!.sizeBytes
|
|
}.getOrElse { e ->
|
|
Timber.e(e, "Download failed: $rel")
|
|
logEvent(pair.id, SyncEventType.FILE_SKIPPED, rel, e.message, 0)
|
|
return@withPermit FileOutcome(failed = 1)
|
|
}
|
|
// Read the actual local mtime written by the OS/SAF after download.
|
|
val localMtime = runCatching { accessor.lastModifiedMs(rel) }
|
|
.getOrDefault(System.currentTimeMillis()).takeIf { it > 0L }
|
|
?: System.currentTimeMillis()
|
|
logEvent(pair.id, SyncEventType.FILE_DOWNLOADED, rel, null, bytes)
|
|
FileOutcome(downloaded = 1, bytesTransferred = bytes,
|
|
newState = buildState(pair.id, rel,
|
|
LocalFileInfo(rel, remote!!.sizeBytes, localMtime),
|
|
remoteAfterTransfer = remote,
|
|
storeLocalMtime = false))
|
|
}
|
|
SyncDecision.DELETE_LOCAL -> {
|
|
val deleted = accessor.delete(rel)
|
|
if (!deleted) Timber.w("SyncEngine: DELETE_LOCAL failed (silent) for $rel")
|
|
fileStateDao.delete(pair.id, rel)
|
|
logEvent(pair.id, SyncEventType.FILE_DELETED, rel, "local", 0)
|
|
FileOutcome(deleted = 1)
|
|
}
|
|
SyncDecision.DELETE_REMOTE -> {
|
|
runCatching { provider.deleteFile("${pair.remotePath}/$rel") }
|
|
.onFailure { e -> Timber.e(e, "SyncEngine: DELETE_REMOTE failed for $rel") }
|
|
fileStateDao.delete(pair.id, rel)
|
|
logEvent(pair.id, SyncEventType.FILE_DELETED, rel, "remote", 0)
|
|
FileOutcome(deleted = 1)
|
|
}
|
|
SyncDecision.CONFLICT -> {
|
|
conflictDao.insert(SyncConflictEntity(
|
|
syncPairId = pair.id,
|
|
relativePath = rel,
|
|
localModifiedAt = local?.lastModifiedMs?.let { Instant.ofEpochMilli(it) } ?: Instant.EPOCH,
|
|
localSizeBytes = local?.sizeBytes ?: 0L,
|
|
remoteModifiedAt = remote?.modifiedAt ?: Instant.EPOCH,
|
|
remoteSizeBytes = remote?.sizeBytes ?: 0L,
|
|
resolution = null,
|
|
detectedAt = Instant.now(),
|
|
))
|
|
logEvent(pair.id, SyncEventType.CONFLICT_DETECTED, rel, null, 0)
|
|
FileOutcome(conflicts = 1)
|
|
}
|
|
SyncDecision.SKIP -> {
|
|
// Save state whenever both sides are present and state is absent or
|
|
// incomplete (post-upload null metadata). Without a baseline record,
|
|
// a subsequent local deletion would look like an unseen remote file
|
|
// and be re-downloaded instead of triggering DELETE_REMOTE.
|
|
val saveState = local != null && remote != null && (
|
|
known == null ||
|
|
known.remoteModifiedAt == null || known.localModifiedAt == null
|
|
)
|
|
if (saveState) {
|
|
FileOutcome(skipped = 1, newState = buildState(pair.id, rel, local, remoteAfterTransfer = remote))
|
|
} else {
|
|
FileOutcome(skipped = 1)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}.awaitAll()
|
|
}
|
|
|
|
fileStateDao.upsertAll(outcomes.mapNotNull { it.newState })
|
|
return SyncResult(
|
|
uploaded = outcomes.sumOf { it.uploaded },
|
|
downloaded = outcomes.sumOf { it.downloaded },
|
|
deleted = outcomes.sumOf { it.deleted },
|
|
skipped = outcomes.sumOf { it.skipped },
|
|
failedFiles = outcomes.sumOf { it.failed },
|
|
conflicts = outcomes.sumOf { it.conflicts },
|
|
bytesTransferred = outcomes.sumOf { it.bytesTransferred },
|
|
)
|
|
}
|
|
|
|
private suspend fun ensureRemoteDirs(provider: CloudProvider, remotePairPath: String, rel: String) {
|
|
val parts = rel.replace('\\', '/').split('/')
|
|
var currentPath = remotePairPath
|
|
for (part in parts.dropLast(1)) {
|
|
currentPath = "$currentPath/$part"
|
|
provider.createDirectory(currentPath).onFailure { e ->
|
|
Timber.w("MKCOL $currentPath: ${e.message}")
|
|
}
|
|
}
|
|
}
|
|
|
|
private fun buildState(
|
|
pairId: Long,
|
|
rel: String,
|
|
local: LocalFileInfo?,
|
|
remoteAfterTransfer: RemoteFile?,
|
|
storeLocalMtime: Boolean = true,
|
|
) = SyncFileStateEntity(
|
|
syncPairId = pairId,
|
|
relativePath = rel,
|
|
// When storeLocalMtime=false, leave localModifiedAt null so the SKIP reconciliation
|
|
// pass on the next sync reads it from the walkFiles cursor (avoids SAF stale-mtime loops).
|
|
localModifiedAt = if (storeLocalMtime) local?.lastModifiedMs?.let { Instant.ofEpochMilli(it) } else null,
|
|
localSizeBytes = local?.sizeBytes ?: 0L,
|
|
localHash = null,
|
|
remoteModifiedAt = remoteAfterTransfer?.modifiedAt,
|
|
remoteSizeBytes = remoteAfterTransfer?.sizeBytes ?: 0L,
|
|
remoteEtag = remoteAfterTransfer?.etag,
|
|
lastSyncedAt = Instant.now(),
|
|
syncedHash = null,
|
|
)
|
|
|
|
private suspend fun logEvent(pairId: Long, type: SyncEventType, file: String?, msg: String?, bytes: Long) {
|
|
eventDao.insert(SyncEventEntity(syncPairId = pairId, timestamp = Instant.now(), eventType = type, filePath = file, message = msg, bytesTransferred = bytes))
|
|
}
|
|
}
|
|
|
|
// Top-level so unit tests can call it directly without instantiating SyncEngine.
|
|
internal fun syncDecide(
|
|
direction: SyncDirection,
|
|
conflictStrategy: ConflictStrategy,
|
|
deleteBehavior: DeleteBehavior,
|
|
local: LocalFileInfo?,
|
|
remote: RemoteFile?,
|
|
known: SyncFileStateEntity?,
|
|
hasPriorSyncState: Boolean = false,
|
|
): SyncDecision {
|
|
val localExists = local != null
|
|
val remoteExists = remote != null
|
|
|
|
// Treat null known timestamps as "not yet recorded" — don't treat as changed.
|
|
// The SKIP reconciliation pass will fill them in on the next sync.
|
|
// Use second-precision for both sides: FAT32 has 2-second mtime resolution, WebDAV
|
|
// RFC-1123 has 1-second resolution, so millisecond comparison causes phantom "changed"
|
|
// detections and rewrite loops after a fresh download/upload.
|
|
val localChanged = known == null ||
|
|
(localExists && known.localModifiedAt != null &&
|
|
local!!.lastModifiedMs / 1000 != known.localModifiedAt.epochSecond)
|
|
val remoteChanged = known == null ||
|
|
(remoteExists && known.remoteModifiedAt != null &&
|
|
(remote!!.etag != known.remoteEtag ||
|
|
remote.modifiedAt.epochSecond != known.remoteModifiedAt.epochSecond))
|
|
|
|
return when {
|
|
!localExists && !remoteExists -> SyncDecision.SKIP
|
|
|
|
localExists && !remoteExists -> when {
|
|
known == null -> when (direction) {
|
|
SyncDirection.UPLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.UPLOAD
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
else -> when {
|
|
deleteBehavior == DeleteBehavior.KEEP -> SyncDecision.SKIP
|
|
direction == SyncDirection.DOWNLOAD_ONLY || direction == SyncDirection.TWO_WAY -> SyncDecision.DELETE_LOCAL
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
}
|
|
|
|
!localExists && remoteExists -> when {
|
|
known == null -> {
|
|
// No state record: could be a new remote file OR a file whose state was lost.
|
|
// Downloading is always safer than deleting — if the user deleted the local
|
|
// copy intentionally, the state record will still exist (known != null) and
|
|
// the else-branch below correctly deletes the remote copy.
|
|
when (direction) {
|
|
SyncDirection.DOWNLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.DOWNLOAD
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
}
|
|
else -> when {
|
|
deleteBehavior == DeleteBehavior.KEEP -> SyncDecision.SKIP
|
|
direction == SyncDirection.UPLOAD_ONLY || direction == SyncDirection.TWO_WAY -> SyncDecision.DELETE_REMOTE
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
}
|
|
|
|
localChanged && remoteChanged -> when (direction) {
|
|
SyncDirection.UPLOAD_ONLY -> SyncDecision.UPLOAD
|
|
SyncDirection.DOWNLOAD_ONLY -> SyncDecision.DOWNLOAD
|
|
SyncDirection.TWO_WAY -> when (conflictStrategy) {
|
|
ConflictStrategy.KEEP_LOCAL -> SyncDecision.UPLOAD
|
|
ConflictStrategy.KEEP_REMOTE -> SyncDecision.DOWNLOAD
|
|
ConflictStrategy.KEEP_NEWEST -> if ((local?.lastModifiedMs ?: 0L) >= (remote?.modifiedAt?.toEpochMilli() ?: 0L)) SyncDecision.UPLOAD else SyncDecision.DOWNLOAD
|
|
ConflictStrategy.KEEP_LARGEST -> if ((local?.sizeBytes ?: 0L) >= (remote?.sizeBytes ?: 0L)) SyncDecision.UPLOAD else SyncDecision.DOWNLOAD
|
|
ConflictStrategy.KEEP_BOTH -> SyncDecision.CONFLICT
|
|
ConflictStrategy.ASK -> SyncDecision.CONFLICT
|
|
}
|
|
}
|
|
|
|
localChanged -> when (direction) {
|
|
SyncDirection.UPLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.UPLOAD
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
remoteChanged -> when (direction) {
|
|
SyncDirection.DOWNLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.DOWNLOAD
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
}
|
|
|
|
enum class SyncDecision { UPLOAD, DOWNLOAD, DELETE_LOCAL, DELETE_REMOTE, CONFLICT, SKIP }
|
|
|
|
data class SyncResult(
|
|
val uploaded: Int = 0,
|
|
val downloaded: Int = 0,
|
|
val deleted: Int = 0,
|
|
val skipped: Int = 0,
|
|
val failedFiles: Int = 0,
|
|
val conflicts: Int = 0,
|
|
val bytesTransferred: Long = 0L,
|
|
val error: Exception? = null,
|
|
)
|