package com.syncflow.domain.sync import android.content.Context import android.net.Uri import com.syncflow.data.db.SyncConflictDao import com.syncflow.data.db.SyncEventDao import com.syncflow.data.db.SyncFileStateDao import com.syncflow.data.db.SyncPairDao import com.syncflow.data.db.entities.SyncConflictEntity import com.syncflow.data.db.entities.SyncEventEntity import com.syncflow.data.db.entities.SyncFileStateEntity import com.syncflow.data.providers.CloudProvider import com.syncflow.domain.model.ConflictStrategy import com.syncflow.domain.model.DeleteBehavior import com.syncflow.domain.model.RemoteFile import com.syncflow.domain.model.SyncDirection import com.syncflow.domain.model.SyncEventType import com.syncflow.domain.model.SyncPair import com.syncflow.domain.model.SyncStatus import dagger.hilt.android.qualifiers.ApplicationContext import kotlinx.coroutines.* import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.sync.withPermit import timber.log.Timber import java.io.File import java.time.Instant import javax.inject.Inject class SyncEngine @Inject constructor( private val syncPairDao: SyncPairDao, private val fileStateDao: SyncFileStateDao, private val conflictDao: SyncConflictDao, private val eventDao: SyncEventDao, @ApplicationContext private val context: Context, ) { suspend fun sync(pair: SyncPair, provider: CloudProvider): SyncResult { syncPairDao.updateStatus(pair.id, SyncStatus.SYNCING) logEvent(pair.id, SyncEventType.SYNC_STARTED, null, null, 0) return try { val result = performSync(pair, provider) val finalStatus = when { result.failedFiles > 0 && result.conflicts > 0 -> SyncStatus.CONFLICT result.failedFiles > 0 -> SyncStatus.PARTIAL result.conflicts > 0 -> SyncStatus.CONFLICT else -> SyncStatus.SUCCESS } syncPairDao.updateSyncResult(pair.id, Instant.now(), finalStatus, result.conflicts) logEvent(pair.id, SyncEventType.SYNC_COMPLETED, null, "↑${result.uploaded} ↓${result.downloaded} ✕${result.deleted} ✗${result.failedFiles}", result.bytesTransferred) result } catch (e: Exception) { Timber.e(e, "Sync failed for pair ${pair.id}") syncPairDao.updateSyncResult(pair.id, Instant.now(), SyncStatus.FAILED, 0) logEvent(pair.id, SyncEventType.SYNC_FAILED, null, e.message, 0) SyncResult(failedFiles = 1, error = e) } } private fun makeAccessor(localPath: String): LocalAccessor = if (localPath.startsWith("content://")) LocalAccessor.Saf(Uri.parse(localPath), context.contentResolver) else LocalAccessor.JavaFile(File(localPath)) private suspend fun performSync(pair: SyncPair, provider: CloudProvider): SyncResult { val accessor = makeAccessor(pair.localPath) val knownStates = fileStateDao.getForPair(pair.id).associateBy { it.relativePath } val remoteFiles = provider.listFiles(pair.remotePath).getOrThrow() .associateBy { it.path.removePrefix(pair.remotePath).trimStart('/') } val localFiles = accessor.walkFiles(pair) val allPaths = (localFiles.keys + remoteFiles.keys + knownStates.keys).toSet() val hasPriorSyncState = knownStates.isNotEmpty() val semaphore = Semaphore(4) // Each async block returns its outcome; no shared mutable state across coroutines. data class FileOutcome( val uploaded: Int = 0, val downloaded: Int = 0, val deleted: Int = 0, val skipped: Int = 0, val failed: Int = 0, val conflicts: Int = 0, val bytesTransferred: Long = 0L, val newState: SyncFileStateEntity? = null, ) val outcomes: List = coroutineScope { allPaths.map { rel -> async { semaphore.withPermit { val local = localFiles[rel] val remote = remoteFiles[rel] val known = knownStates[rel] val decision = syncDecide(pair.syncDirection, pair.conflictStrategy, pair.deleteBehavior, local, remote, known, hasPriorSyncState) when (decision) { SyncDecision.UPLOAD -> { var uploadedRemoteFile: RemoteFile? = null val bytes = runCatching { ensureRemoteDirs(provider, pair.remotePath, rel) accessor.openInputStream(rel)?.use { stream -> uploadedRemoteFile = provider.uploadFile(stream, "${pair.remotePath}/$rel", local!!.sizeBytes) { }.getOrThrow() } local!!.sizeBytes }.getOrElse { e -> Timber.e(e, "Upload failed: $rel") logEvent(pair.id, SyncEventType.FILE_SKIPPED, rel, e.message, 0) return@withPermit FileOutcome(failed = 1) } logEvent(pair.id, SyncEventType.FILE_UPLOADED, rel, null, bytes) FileOutcome(uploaded = 1, bytesTransferred = bytes, newState = buildState(pair.id, rel, local!!, remoteAfterTransfer = uploadedRemoteFile)) } SyncDecision.DOWNLOAD -> { val bytes = runCatching { accessor.createOutputStream(rel)?.use { stream -> provider.downloadFile("${pair.remotePath}/$rel", stream) { } } remote!!.sizeBytes }.getOrElse { e -> Timber.e(e, "Download failed: $rel") logEvent(pair.id, SyncEventType.FILE_SKIPPED, rel, e.message, 0) return@withPermit FileOutcome(failed = 1) } // Read the actual local mtime written by the OS/SAF after download. val localMtime = runCatching { accessor.lastModifiedMs(rel) } .getOrDefault(System.currentTimeMillis()).takeIf { it > 0L } ?: System.currentTimeMillis() logEvent(pair.id, SyncEventType.FILE_DOWNLOADED, rel, null, bytes) FileOutcome(downloaded = 1, bytesTransferred = bytes, newState = buildState(pair.id, rel, LocalFileInfo(rel, remote!!.sizeBytes, localMtime), remoteAfterTransfer = remote)) } SyncDecision.DELETE_LOCAL -> { accessor.delete(rel) fileStateDao.delete(pair.id, rel) logEvent(pair.id, SyncEventType.FILE_DELETED, rel, "local", 0) FileOutcome(deleted = 1) } SyncDecision.DELETE_REMOTE -> { provider.deleteFile("${pair.remotePath}/$rel") fileStateDao.delete(pair.id, rel) logEvent(pair.id, SyncEventType.FILE_DELETED, rel, "remote", 0) FileOutcome(deleted = 1) } SyncDecision.CONFLICT -> { conflictDao.insert(SyncConflictEntity( syncPairId = pair.id, relativePath = rel, localModifiedAt = local?.lastModifiedMs?.let { Instant.ofEpochMilli(it) } ?: Instant.EPOCH, localSizeBytes = local?.sizeBytes ?: 0L, remoteModifiedAt = remote?.modifiedAt ?: Instant.EPOCH, remoteSizeBytes = remote?.sizeBytes ?: 0L, resolution = null, detectedAt = Instant.now(), )) logEvent(pair.id, SyncEventType.CONFLICT_DETECTED, rel, null, 0) FileOutcome(conflicts = 1) } SyncDecision.SKIP -> { // Save state whenever both sides are present and state is absent or // incomplete (post-upload null metadata). Without a baseline record, // a subsequent local deletion would look like an unseen remote file // and be re-downloaded instead of triggering DELETE_REMOTE. val saveState = local != null && remote != null && ( known == null || known.remoteModifiedAt == null || known.localModifiedAt == null ) if (saveState) { FileOutcome(skipped = 1, newState = buildState(pair.id, rel, local, remoteAfterTransfer = remote)) } else { FileOutcome(skipped = 1) } } } } } }.awaitAll() } fileStateDao.upsertAll(outcomes.mapNotNull { it.newState }) return SyncResult( uploaded = outcomes.sumOf { it.uploaded }, downloaded = outcomes.sumOf { it.downloaded }, deleted = outcomes.sumOf { it.deleted }, skipped = outcomes.sumOf { it.skipped }, failedFiles = outcomes.sumOf { it.failed }, conflicts = outcomes.sumOf { it.conflicts }, bytesTransferred = outcomes.sumOf { it.bytesTransferred }, ) } private suspend fun ensureRemoteDirs(provider: CloudProvider, remotePairPath: String, rel: String) { val parts = rel.replace('\\', '/').split('/') var currentPath = remotePairPath for (part in parts.dropLast(1)) { currentPath = "$currentPath/$part" provider.createDirectory(currentPath).onFailure { e -> Timber.w("MKCOL $currentPath: ${e.message}") } } } private fun buildState( pairId: Long, rel: String, local: LocalFileInfo?, remoteAfterTransfer: RemoteFile?, ) = SyncFileStateEntity( syncPairId = pairId, relativePath = rel, localModifiedAt = local?.lastModifiedMs?.let { Instant.ofEpochMilli(it) }, localSizeBytes = local?.sizeBytes ?: 0L, localHash = null, remoteModifiedAt = remoteAfterTransfer?.modifiedAt, remoteSizeBytes = remoteAfterTransfer?.sizeBytes ?: 0L, remoteEtag = remoteAfterTransfer?.etag, lastSyncedAt = Instant.now(), syncedHash = null, ) private suspend fun logEvent(pairId: Long, type: SyncEventType, file: String?, msg: String?, bytes: Long) { eventDao.insert(SyncEventEntity(syncPairId = pairId, timestamp = Instant.now(), eventType = type, filePath = file, message = msg, bytesTransferred = bytes)) } } // Top-level so unit tests can call it directly without instantiating SyncEngine. internal fun syncDecide( direction: SyncDirection, conflictStrategy: ConflictStrategy, deleteBehavior: DeleteBehavior, local: LocalFileInfo?, remote: RemoteFile?, known: SyncFileStateEntity?, hasPriorSyncState: Boolean = false, ): SyncDecision { val localExists = local != null val remoteExists = remote != null // Treat null known timestamps as "not yet recorded" — don't treat as changed. // The SKIP reconciliation pass will fill them in on the next sync. val localChanged = known == null || (localExists && known.localModifiedAt != null && local!!.lastModifiedMs != known.localModifiedAt.toEpochMilli()) val remoteChanged = known == null || (remoteExists && known.remoteModifiedAt != null && (remote!!.etag != known.remoteEtag || remote.modifiedAt != known.remoteModifiedAt)) return when { !localExists && !remoteExists -> SyncDecision.SKIP localExists && !remoteExists -> when { known == null -> when (direction) { SyncDirection.UPLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.UPLOAD else -> SyncDecision.SKIP } else -> when { deleteBehavior == DeleteBehavior.KEEP -> SyncDecision.SKIP direction == SyncDirection.DOWNLOAD_ONLY || direction == SyncDirection.TWO_WAY -> SyncDecision.DELETE_LOCAL else -> SyncDecision.SKIP } } !localExists && remoteExists -> when { known == null -> if (!hasPriorSyncState) { // Initial sync: no history at all — remote files are new, download them. when (direction) { SyncDirection.DOWNLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.DOWNLOAD else -> SyncDecision.SKIP } } else { // Pair has been synced before but this file has no state record // (e.g. uploaded before state-tracking was fixed). Treat the same // as a known remote-deletion: apply mirror/keep behavior. when { deleteBehavior == DeleteBehavior.KEEP -> SyncDecision.SKIP direction == SyncDirection.UPLOAD_ONLY || direction == SyncDirection.TWO_WAY -> SyncDecision.DELETE_REMOTE else -> SyncDecision.SKIP } } else -> when { deleteBehavior == DeleteBehavior.KEEP -> SyncDecision.SKIP direction == SyncDirection.UPLOAD_ONLY || direction == SyncDirection.TWO_WAY -> SyncDecision.DELETE_REMOTE else -> SyncDecision.SKIP } } localChanged && remoteChanged -> when (direction) { SyncDirection.UPLOAD_ONLY -> SyncDecision.UPLOAD SyncDirection.DOWNLOAD_ONLY -> SyncDecision.DOWNLOAD SyncDirection.TWO_WAY -> when (conflictStrategy) { ConflictStrategy.KEEP_LOCAL -> SyncDecision.UPLOAD ConflictStrategy.KEEP_REMOTE -> SyncDecision.DOWNLOAD ConflictStrategy.KEEP_NEWEST -> if ((local?.lastModifiedMs ?: 0L) >= (remote?.modifiedAt?.toEpochMilli() ?: 0L)) SyncDecision.UPLOAD else SyncDecision.DOWNLOAD ConflictStrategy.KEEP_LARGEST -> if ((local?.sizeBytes ?: 0L) >= (remote?.sizeBytes ?: 0L)) SyncDecision.UPLOAD else SyncDecision.DOWNLOAD ConflictStrategy.KEEP_BOTH -> SyncDecision.CONFLICT ConflictStrategy.ASK -> SyncDecision.CONFLICT } } localChanged -> when (direction) { SyncDirection.UPLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.UPLOAD else -> SyncDecision.SKIP } remoteChanged -> when (direction) { SyncDirection.DOWNLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.DOWNLOAD else -> SyncDecision.SKIP } else -> SyncDecision.SKIP } } enum class SyncDecision { UPLOAD, DOWNLOAD, DELETE_LOCAL, DELETE_REMOTE, CONFLICT, SKIP } data class SyncResult( val uploaded: Int = 0, val downloaded: Int = 0, val deleted: Int = 0, val skipped: Int = 0, val failedFiles: Int = 0, val conflicts: Int = 0, val bytesTransferred: Long = 0L, val error: Exception? = null, )