c60eb8d27b
Build & Release APK / build (push) Has been cancelled
- SyncEngine: accepts onProgress callback — emits uploaded/downloaded/ deleted/bytes counts atomically as each file completes - SyncWorker: streams progress to WorkManager data so the UI can poll it live; reports per-run counters in the completion notification; adds pause/resume support - HomeViewModel/PairDetailViewModel: subscribe to live WorkManager progress and surface it via SyncProgress state - SyncPairEntity/SyncPairDao/SyncDatabase: persist last-run counters (uploaded, downloaded, deleted, bytesTransferred) in the DB with a Room migration (v3→v4) - AppModule: provides WorkManager as an injectable singleton - .gitignore: add .kotlin/ to exclude compiler session files Security: no new issues — all logging via Timber (debug-only), DB queries use Room parameterized API, file sharing via FileProvider. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
376 lines
20 KiB
Kotlin
376 lines
20 KiB
Kotlin
package com.syncflow.domain.sync
|
|
|
|
import android.content.Context
|
|
import android.net.Uri
|
|
import com.syncflow.data.db.SyncConflictDao
|
|
import com.syncflow.data.db.SyncEventDao
|
|
import com.syncflow.data.db.SyncFileStateDao
|
|
import com.syncflow.data.db.SyncPairDao
|
|
import com.syncflow.data.db.entities.SyncConflictEntity
|
|
import com.syncflow.data.db.entities.SyncEventEntity
|
|
import com.syncflow.data.db.entities.SyncFileStateEntity
|
|
import com.syncflow.data.providers.CloudProvider
|
|
import com.syncflow.domain.model.ConflictStrategy
|
|
import com.syncflow.domain.model.DeleteBehavior
|
|
import com.syncflow.domain.model.RemoteFile
|
|
import com.syncflow.domain.model.SyncDirection
|
|
import com.syncflow.domain.model.SyncEventType
|
|
import com.syncflow.domain.model.SyncPair
|
|
import com.syncflow.domain.model.SyncStatus
|
|
import dagger.hilt.android.qualifiers.ApplicationContext
|
|
import kotlinx.coroutines.*
|
|
import kotlinx.coroutines.sync.Semaphore
|
|
import kotlinx.coroutines.sync.withPermit
|
|
import timber.log.Timber
|
|
import java.io.File
|
|
import java.time.Instant
|
|
import java.util.concurrent.atomic.AtomicInteger
|
|
import java.util.concurrent.atomic.AtomicLong
|
|
import javax.inject.Inject
|
|
|
|
class SyncEngine @Inject constructor(
|
|
private val syncPairDao: SyncPairDao,
|
|
private val fileStateDao: SyncFileStateDao,
|
|
private val conflictDao: SyncConflictDao,
|
|
private val eventDao: SyncEventDao,
|
|
@ApplicationContext private val context: Context,
|
|
) {
|
|
suspend fun sync(
|
|
pair: SyncPair,
|
|
provider: CloudProvider,
|
|
onProgress: (suspend (uploaded: Int, downloaded: Int, deleted: Int, bytesTransferred: Long) -> Unit)? = null,
|
|
): SyncResult {
|
|
syncPairDao.updateStatus(pair.id, SyncStatus.SYNCING)
|
|
logEvent(pair.id, SyncEventType.SYNC_STARTED, null, null, 0)
|
|
|
|
return try {
|
|
val result = performSync(pair, provider, onProgress = onProgress)
|
|
val finalStatus = when {
|
|
result.failedFiles > 0 && result.conflicts > 0 -> SyncStatus.CONFLICT
|
|
result.failedFiles > 0 -> SyncStatus.PARTIAL
|
|
result.conflicts > 0 -> SyncStatus.CONFLICT
|
|
else -> SyncStatus.SUCCESS
|
|
}
|
|
syncPairDao.updateSyncResult(pair.id, Instant.now(), finalStatus, result.conflicts, result.uploaded, result.downloaded, result.deleted, result.bytesTransferred)
|
|
logEvent(pair.id, SyncEventType.SYNC_COMPLETED, null, "↑${result.uploaded} ↓${result.downloaded} ✕${result.deleted} ✗${result.failedFiles}", result.bytesTransferred)
|
|
result
|
|
} catch (e: Exception) {
|
|
Timber.e(e, "Sync failed for pair ${pair.id}")
|
|
syncPairDao.updateSyncResult(pair.id, Instant.now(), SyncStatus.FAILED, 0, 0, 0, 0, 0L)
|
|
logEvent(pair.id, SyncEventType.SYNC_FAILED, null, e.message, 0)
|
|
SyncResult(failedFiles = 1, error = e)
|
|
}
|
|
}
|
|
|
|
private fun makeAccessor(localPath: String): LocalAccessor =
|
|
if (localPath.startsWith("content://"))
|
|
LocalAccessor.Saf(Uri.parse(localPath), context.contentResolver)
|
|
else
|
|
LocalAccessor.JavaFile(File(localPath))
|
|
|
|
private suspend fun performSync(
|
|
pair: SyncPair,
|
|
provider: CloudProvider,
|
|
isRetry: Boolean = false,
|
|
onProgress: (suspend (Int, Int, Int, Long) -> Unit)? = null,
|
|
): SyncResult {
|
|
val accessor = makeAccessor(pair.localPath)
|
|
var knownStates = fileStateDao.getForPair(pair.id).associateBy { it.relativePath }
|
|
val remoteFiles = provider.listFiles(pair.remotePath).getOrThrow()
|
|
.associateBy { it.path.removePrefix(pair.remotePath).trimStart('/') }
|
|
val localFiles = accessor.walkFiles(pair)
|
|
|
|
// Self-healing: if every known-state path is absent from the current local scan but
|
|
// the local folder does have files, the localPath was changed without clearing state.
|
|
// The stale records would cause every old file to look like "DELETE_REMOTE" and every
|
|
// new file to re-upload indefinitely. Wipe and retry once as a fresh initial sync.
|
|
if (!isRetry && knownStates.isNotEmpty() && localFiles.isNotEmpty() &&
|
|
knownStates.keys.none { it in localFiles }) {
|
|
Timber.w("SyncEngine: stale folder states detected for pair ${pair.id} — resetting")
|
|
fileStateDao.deleteForPair(pair.id)
|
|
return performSync(pair, provider, isRetry = true, onProgress = onProgress)
|
|
}
|
|
|
|
val allPaths = (localFiles.keys + remoteFiles.keys + knownStates.keys).toSet()
|
|
val hasPriorSyncState = knownStates.isNotEmpty()
|
|
val semaphore = Semaphore(4)
|
|
val uploadedAtomic = AtomicInteger(0)
|
|
val downloadedAtomic = AtomicInteger(0)
|
|
val deletedAtomic = AtomicInteger(0)
|
|
val bytesAtomic = AtomicLong(0L)
|
|
|
|
// Each async block returns its outcome; no shared mutable state across coroutines.
|
|
data class FileOutcome(
|
|
val uploaded: Int = 0, val downloaded: Int = 0, val deleted: Int = 0,
|
|
val skipped: Int = 0, val failed: Int = 0, val conflicts: Int = 0,
|
|
val bytesTransferred: Long = 0L,
|
|
val newState: SyncFileStateEntity? = null,
|
|
)
|
|
|
|
val outcomes: List<FileOutcome> = coroutineScope {
|
|
allPaths.map { rel ->
|
|
async {
|
|
semaphore.withPermit {
|
|
val local = localFiles[rel]
|
|
val remote = remoteFiles[rel]
|
|
val known = knownStates[rel]
|
|
val decision = syncDecide(pair.syncDirection, pair.conflictStrategy, pair.deleteBehavior, local, remote, known, hasPriorSyncState)
|
|
|
|
when (decision) {
|
|
SyncDecision.UPLOAD -> {
|
|
val bytes = runCatching {
|
|
ensureRemoteDirs(provider, pair.remotePath, rel)
|
|
accessor.openInputStream(rel)?.use { stream ->
|
|
provider.uploadFile(stream, "${pair.remotePath}/$rel", local!!.sizeBytes) { }.getOrThrow()
|
|
}
|
|
local!!.sizeBytes
|
|
}.getOrElse { e ->
|
|
Timber.e(e, "Upload failed: $rel")
|
|
logEvent(pair.id, SyncEventType.FILE_SKIPPED, rel, e.message, 0)
|
|
return@withPermit FileOutcome(failed = 1)
|
|
}
|
|
logEvent(pair.id, SyncEventType.FILE_UPLOADED, rel, null, bytes)
|
|
val up = uploadedAtomic.incrementAndGet()
|
|
bytesAtomic.addAndGet(bytes)
|
|
onProgress?.invoke(up, downloadedAtomic.get(), deletedAtomic.get(), bytesAtomic.get())
|
|
// Don't store remote metadata from upload response — the server (Nextcloud etc.)
|
|
// may change mtime/etag during post-upload processing. Leaving remoteModifiedAt
|
|
// null forces the SKIP reconciliation on the next sync to fill it in from the
|
|
// directory listing, which is the same source all future syncs will use.
|
|
FileOutcome(uploaded = 1, bytesTransferred = bytes,
|
|
newState = buildState(pair.id, rel, local!!, remoteAfterTransfer = null))
|
|
}
|
|
SyncDecision.DOWNLOAD -> {
|
|
val bytes = runCatching {
|
|
accessor.createOutputStream(rel)?.use { stream ->
|
|
provider.downloadFile("${pair.remotePath}/$rel", stream) { }
|
|
}
|
|
remote!!.sizeBytes
|
|
}.getOrElse { e ->
|
|
Timber.e(e, "Download failed: $rel")
|
|
logEvent(pair.id, SyncEventType.FILE_SKIPPED, rel, e.message, 0)
|
|
return@withPermit FileOutcome(failed = 1)
|
|
}
|
|
// Read the actual local mtime written by the OS/SAF after download.
|
|
val localMtime = runCatching { accessor.lastModifiedMs(rel) }
|
|
.getOrDefault(System.currentTimeMillis()).takeIf { it > 0L }
|
|
?: System.currentTimeMillis()
|
|
logEvent(pair.id, SyncEventType.FILE_DOWNLOADED, rel, null, bytes)
|
|
val down = downloadedAtomic.incrementAndGet()
|
|
bytesAtomic.addAndGet(bytes)
|
|
onProgress?.invoke(uploadedAtomic.get(), down, deletedAtomic.get(), bytesAtomic.get())
|
|
FileOutcome(downloaded = 1, bytesTransferred = bytes,
|
|
newState = buildState(pair.id, rel,
|
|
LocalFileInfo(rel, remote!!.sizeBytes, localMtime),
|
|
remoteAfterTransfer = remote,
|
|
storeLocalMtime = false))
|
|
}
|
|
SyncDecision.DELETE_LOCAL -> {
|
|
val deleted = accessor.delete(rel)
|
|
if (!deleted) Timber.w("SyncEngine: DELETE_LOCAL failed (silent) for $rel")
|
|
fileStateDao.delete(pair.id, rel)
|
|
logEvent(pair.id, SyncEventType.FILE_DELETED, rel, "local", 0)
|
|
val del = deletedAtomic.incrementAndGet()
|
|
onProgress?.invoke(uploadedAtomic.get(), downloadedAtomic.get(), del, bytesAtomic.get())
|
|
FileOutcome(deleted = 1)
|
|
}
|
|
SyncDecision.DELETE_REMOTE -> {
|
|
if (pair.deleteBehavior == DeleteBehavior.ARCHIVE) {
|
|
val archivePath = "${pair.remotePath}/_Deleted/$rel"
|
|
runCatching {
|
|
ensureRemoteDirs(provider, "${pair.remotePath}/_Deleted", rel)
|
|
provider.moveFile("${pair.remotePath}/$rel", archivePath).getOrThrow()
|
|
}.onFailure { e -> Timber.e(e, "SyncEngine: ARCHIVE failed for $rel") }
|
|
fileStateDao.delete(pair.id, rel)
|
|
logEvent(pair.id, SyncEventType.FILE_DELETED, rel, "archived", 0)
|
|
} else {
|
|
runCatching { provider.deleteFile("${pair.remotePath}/$rel") }
|
|
.onFailure { e -> Timber.e(e, "SyncEngine: DELETE_REMOTE failed for $rel") }
|
|
fileStateDao.delete(pair.id, rel)
|
|
logEvent(pair.id, SyncEventType.FILE_DELETED, rel, "remote", 0)
|
|
}
|
|
val del = deletedAtomic.incrementAndGet()
|
|
onProgress?.invoke(uploadedAtomic.get(), downloadedAtomic.get(), del, bytesAtomic.get())
|
|
FileOutcome(deleted = 1)
|
|
}
|
|
SyncDecision.CONFLICT -> {
|
|
conflictDao.insert(SyncConflictEntity(
|
|
syncPairId = pair.id,
|
|
relativePath = rel,
|
|
localModifiedAt = local?.lastModifiedMs?.let { Instant.ofEpochMilli(it) } ?: Instant.EPOCH,
|
|
localSizeBytes = local?.sizeBytes ?: 0L,
|
|
remoteModifiedAt = remote?.modifiedAt ?: Instant.EPOCH,
|
|
remoteSizeBytes = remote?.sizeBytes ?: 0L,
|
|
resolution = null,
|
|
detectedAt = Instant.now(),
|
|
))
|
|
logEvent(pair.id, SyncEventType.CONFLICT_DETECTED, rel, null, 0)
|
|
FileOutcome(conflicts = 1)
|
|
}
|
|
SyncDecision.SKIP -> {
|
|
// Save state whenever both sides are present and state is absent or
|
|
// incomplete (post-upload null metadata). Without a baseline record,
|
|
// a subsequent local deletion would look like an unseen remote file
|
|
// and be re-downloaded instead of triggering DELETE_REMOTE.
|
|
val saveState = local != null && remote != null && (
|
|
known == null ||
|
|
known.remoteModifiedAt == null || known.localModifiedAt == null
|
|
)
|
|
if (saveState) {
|
|
FileOutcome(skipped = 1, newState = buildState(pair.id, rel, local, remoteAfterTransfer = remote))
|
|
} else {
|
|
FileOutcome(skipped = 1)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}.awaitAll()
|
|
}
|
|
|
|
fileStateDao.upsertAll(outcomes.mapNotNull { it.newState })
|
|
return SyncResult(
|
|
uploaded = outcomes.sumOf { it.uploaded },
|
|
downloaded = outcomes.sumOf { it.downloaded },
|
|
deleted = outcomes.sumOf { it.deleted },
|
|
skipped = outcomes.sumOf { it.skipped },
|
|
failedFiles = outcomes.sumOf { it.failed },
|
|
conflicts = outcomes.sumOf { it.conflicts },
|
|
bytesTransferred = outcomes.sumOf { it.bytesTransferred },
|
|
)
|
|
}
|
|
|
|
private suspend fun ensureRemoteDirs(provider: CloudProvider, remotePairPath: String, rel: String) {
|
|
val parts = rel.replace('\\', '/').split('/')
|
|
var currentPath = remotePairPath
|
|
for (part in parts.dropLast(1)) {
|
|
currentPath = "$currentPath/$part"
|
|
provider.createDirectory(currentPath).onFailure { e ->
|
|
Timber.w("MKCOL $currentPath: ${e.message}")
|
|
}
|
|
}
|
|
}
|
|
|
|
private fun buildState(
|
|
pairId: Long,
|
|
rel: String,
|
|
local: LocalFileInfo?,
|
|
remoteAfterTransfer: RemoteFile?,
|
|
storeLocalMtime: Boolean = true,
|
|
) = SyncFileStateEntity(
|
|
syncPairId = pairId,
|
|
relativePath = rel,
|
|
// When storeLocalMtime=false, leave localModifiedAt null so the SKIP reconciliation
|
|
// pass on the next sync reads it from the walkFiles cursor (avoids SAF stale-mtime loops).
|
|
localModifiedAt = if (storeLocalMtime) local?.lastModifiedMs?.let { Instant.ofEpochMilli(it) } else null,
|
|
localSizeBytes = local?.sizeBytes ?: 0L,
|
|
localHash = null,
|
|
remoteModifiedAt = remoteAfterTransfer?.modifiedAt,
|
|
remoteSizeBytes = remoteAfterTransfer?.sizeBytes ?: 0L,
|
|
remoteEtag = remoteAfterTransfer?.etag,
|
|
lastSyncedAt = Instant.now(),
|
|
syncedHash = null,
|
|
)
|
|
|
|
private suspend fun logEvent(pairId: Long, type: SyncEventType, file: String?, msg: String?, bytes: Long) {
|
|
eventDao.insert(SyncEventEntity(syncPairId = pairId, timestamp = Instant.now(), eventType = type, filePath = file, message = msg, bytesTransferred = bytes))
|
|
}
|
|
}
|
|
|
|
// Top-level so unit tests can call it directly without instantiating SyncEngine.
|
|
internal fun syncDecide(
|
|
direction: SyncDirection,
|
|
conflictStrategy: ConflictStrategy,
|
|
deleteBehavior: DeleteBehavior,
|
|
local: LocalFileInfo?,
|
|
remote: RemoteFile?,
|
|
known: SyncFileStateEntity?,
|
|
hasPriorSyncState: Boolean = false,
|
|
): SyncDecision {
|
|
val localExists = local != null
|
|
val remoteExists = remote != null
|
|
|
|
// Treat null known timestamps as "not yet recorded" — don't treat as changed.
|
|
// The SKIP reconciliation pass will fill them in on the next sync.
|
|
// Use second-precision for both sides: FAT32 has 2-second mtime resolution, WebDAV
|
|
// RFC-1123 has 1-second resolution, so millisecond comparison causes phantom "changed"
|
|
// detections and rewrite loops after a fresh download/upload.
|
|
val localChanged = known == null ||
|
|
(localExists && known.localModifiedAt != null &&
|
|
local!!.lastModifiedMs / 1000 != known.localModifiedAt.epochSecond)
|
|
val remoteChanged = known == null ||
|
|
(remoteExists && known.remoteModifiedAt != null &&
|
|
(remote!!.etag != known.remoteEtag ||
|
|
remote.modifiedAt.epochSecond != known.remoteModifiedAt.epochSecond))
|
|
|
|
return when {
|
|
!localExists && !remoteExists -> SyncDecision.SKIP
|
|
|
|
localExists && !remoteExists -> when {
|
|
known == null -> when (direction) {
|
|
SyncDirection.UPLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.UPLOAD
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
else -> when {
|
|
deleteBehavior == DeleteBehavior.KEEP -> SyncDecision.SKIP
|
|
direction == SyncDirection.DOWNLOAD_ONLY || direction == SyncDirection.TWO_WAY -> SyncDecision.DELETE_LOCAL
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
}
|
|
|
|
!localExists && remoteExists -> when {
|
|
known == null -> {
|
|
// No state record: could be a new remote file OR a file whose state was lost.
|
|
// Downloading is always safer than deleting — if the user deleted the local
|
|
// copy intentionally, the state record will still exist (known != null) and
|
|
// the else-branch below correctly deletes the remote copy.
|
|
when (direction) {
|
|
SyncDirection.DOWNLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.DOWNLOAD
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
}
|
|
else -> when {
|
|
deleteBehavior == DeleteBehavior.KEEP -> SyncDecision.SKIP
|
|
direction == SyncDirection.UPLOAD_ONLY || direction == SyncDirection.TWO_WAY -> SyncDecision.DELETE_REMOTE
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
}
|
|
|
|
localChanged && remoteChanged -> when (direction) {
|
|
SyncDirection.UPLOAD_ONLY -> SyncDecision.UPLOAD
|
|
SyncDirection.DOWNLOAD_ONLY -> SyncDecision.DOWNLOAD
|
|
SyncDirection.TWO_WAY -> when (conflictStrategy) {
|
|
ConflictStrategy.KEEP_LOCAL -> SyncDecision.UPLOAD
|
|
ConflictStrategy.KEEP_REMOTE -> SyncDecision.DOWNLOAD
|
|
ConflictStrategy.KEEP_NEWEST -> if ((local?.lastModifiedMs ?: 0L) >= (remote?.modifiedAt?.toEpochMilli() ?: 0L)) SyncDecision.UPLOAD else SyncDecision.DOWNLOAD
|
|
ConflictStrategy.KEEP_LARGEST -> if ((local?.sizeBytes ?: 0L) >= (remote?.sizeBytes ?: 0L)) SyncDecision.UPLOAD else SyncDecision.DOWNLOAD
|
|
ConflictStrategy.KEEP_BOTH -> SyncDecision.CONFLICT
|
|
ConflictStrategy.ASK -> SyncDecision.CONFLICT
|
|
}
|
|
}
|
|
|
|
localChanged -> when (direction) {
|
|
SyncDirection.UPLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.UPLOAD
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
remoteChanged -> when (direction) {
|
|
SyncDirection.DOWNLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.DOWNLOAD
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
else -> SyncDecision.SKIP
|
|
}
|
|
}
|
|
|
|
enum class SyncDecision { UPLOAD, DOWNLOAD, DELETE_LOCAL, DELETE_REMOTE, CONFLICT, SKIP }
|
|
|
|
data class SyncResult(
|
|
val uploaded: Int = 0,
|
|
val downloaded: Int = 0,
|
|
val deleted: Int = 0,
|
|
val skipped: Int = 0,
|
|
val failedFiles: Int = 0,
|
|
val conflicts: Int = 0,
|
|
val bytesTransferred: Long = 0L,
|
|
val error: Exception? = null,
|
|
)
|