package com.syncflow.domain.sync import android.content.Context import android.net.Uri import com.syncflow.data.db.SyncConflictDao import com.syncflow.data.db.SyncEventDao import com.syncflow.data.db.SyncFileStateDao import com.syncflow.data.db.SyncPairDao import com.syncflow.data.db.entities.SyncConflictEntity import com.syncflow.data.db.entities.SyncEventEntity import com.syncflow.data.providers.CloudProvider import com.syncflow.domain.model.ConflictStrategy import com.syncflow.domain.model.DeleteBehavior import com.syncflow.domain.model.RemoteFile import com.syncflow.domain.model.SyncDirection import com.syncflow.domain.model.SyncEventType import com.syncflow.domain.model.SyncPair import com.syncflow.domain.model.SyncStatus import dagger.hilt.android.qualifiers.ApplicationContext import kotlinx.coroutines.* import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.sync.withPermit import timber.log.Timber import java.io.File import java.time.Instant import javax.inject.Inject class SyncEngine @Inject constructor( private val syncPairDao: SyncPairDao, private val fileStateDao: SyncFileStateDao, private val conflictDao: SyncConflictDao, private val eventDao: SyncEventDao, @ApplicationContext private val context: Context, ) { suspend fun sync(pair: SyncPair, provider: CloudProvider): SyncResult { syncPairDao.updateStatus(pair.id, SyncStatus.SYNCING) logEvent(pair.id, SyncEventType.SYNC_STARTED, null, null, 0) return try { val result = performSync(pair, provider) val finalStatus = when { result.failedFiles > 0 && result.conflicts > 0 -> SyncStatus.CONFLICT result.failedFiles > 0 -> SyncStatus.PARTIAL result.conflicts > 0 -> SyncStatus.CONFLICT else -> SyncStatus.SUCCESS } syncPairDao.updateSyncResult(pair.id, Instant.now(), finalStatus, result.conflicts) logEvent(pair.id, SyncEventType.SYNC_COMPLETED, null, "↑${result.uploaded} ↓${result.downloaded} ✗${result.failedFiles}", result.bytesTransferred) result } catch (e: Exception) { Timber.e(e, "Sync failed for pair ${pair.id}") syncPairDao.updateSyncResult(pair.id, Instant.now(), SyncStatus.FAILED, 0) logEvent(pair.id, SyncEventType.SYNC_FAILED, null, e.message, 0) SyncResult(failedFiles = 1, error = e) } } private fun makeAccessor(localPath: String): LocalAccessor = if (localPath.startsWith("content://")) LocalAccessor.Saf(Uri.parse(localPath), context.contentResolver) else LocalAccessor.JavaFile(File(localPath)) private suspend fun performSync(pair: SyncPair, provider: CloudProvider): SyncResult { val accessor = makeAccessor(pair.localPath) val knownStates = fileStateDao.getForPair(pair.id).associateBy { it.relativePath } val remoteFiles = provider.listFiles(pair.remotePath).getOrThrow() .associateBy { it.path.removePrefix(pair.remotePath).trimStart('/') } val localFiles = accessor.walkFiles(pair) val allPaths = (localFiles.keys + remoteFiles.keys + knownStates.keys).toSet() val semaphore = Semaphore(4) // Each async block returns its outcome; no shared mutable state across coroutines. data class FileOutcome( val uploaded: Int = 0, val downloaded: Int = 0, val deleted: Int = 0, val skipped: Int = 0, val failed: Int = 0, val conflicts: Int = 0, val bytesTransferred: Long = 0L, val newState: com.syncflow.data.db.entities.SyncFileStateEntity? = null, ) val outcomes: List = coroutineScope { allPaths.map { rel -> async { semaphore.withPermit { val local = localFiles[rel] val remote = remoteFiles[rel] val known = knownStates[rel] val decision = decide(pair.syncDirection, pair.conflictStrategy, pair.deleteBehavior, local, remote, known) when (decision) { SyncDecision.UPLOAD -> { val bytes = runCatching { accessor.openInputStream(rel)?.use { stream -> provider.uploadFile(stream, "${pair.remotePath}/$rel", local!!.sizeBytes) { } } local!!.sizeBytes }.getOrElse { e -> Timber.e(e, "Upload failed: $rel") logEvent(pair.id, SyncEventType.FILE_SKIPPED, rel, e.message, 0) return@withPermit FileOutcome(failed = 1) } logEvent(pair.id, SyncEventType.FILE_UPLOADED, rel, null, bytes) FileOutcome(uploaded = 1, bytesTransferred = bytes, newState = buildState(pair.id, rel, local!!, remote)) } SyncDecision.DOWNLOAD -> { val bytes = runCatching { accessor.createOutputStream(rel)?.use { stream -> provider.downloadFile("${pair.remotePath}/$rel", stream) { } } remote!!.sizeBytes }.getOrElse { e -> Timber.e(e, "Download failed: $rel") logEvent(pair.id, SyncEventType.FILE_SKIPPED, rel, e.message, 0) return@withPermit FileOutcome(failed = 1) } logEvent(pair.id, SyncEventType.FILE_DOWNLOADED, rel, null, bytes) FileOutcome(downloaded = 1, bytesTransferred = bytes, newState = buildState(pair.id, rel, null, remote)) } SyncDecision.DELETE_LOCAL -> { accessor.delete(rel) fileStateDao.delete(pair.id, rel) logEvent(pair.id, SyncEventType.FILE_DELETED, rel, "local", 0) FileOutcome(deleted = 1) } SyncDecision.DELETE_REMOTE -> { provider.deleteFile("${pair.remotePath}/$rel") fileStateDao.delete(pair.id, rel) logEvent(pair.id, SyncEventType.FILE_DELETED, rel, "remote", 0) FileOutcome(deleted = 1) } SyncDecision.CONFLICT -> { conflictDao.insert(SyncConflictEntity( syncPairId = pair.id, relativePath = rel, localModifiedAt = local?.lastModifiedMs?.let { Instant.ofEpochMilli(it) } ?: Instant.EPOCH, localSizeBytes = local?.sizeBytes ?: 0L, remoteModifiedAt = remote?.modifiedAt ?: Instant.EPOCH, remoteSizeBytes = remote?.sizeBytes ?: 0L, resolution = null, detectedAt = Instant.now(), )) logEvent(pair.id, SyncEventType.CONFLICT_DETECTED, rel, null, 0) FileOutcome(conflicts = 1) } SyncDecision.SKIP -> FileOutcome(skipped = 1) } } } }.awaitAll() } fileStateDao.upsertAll(outcomes.mapNotNull { it.newState }) return SyncResult( uploaded = outcomes.sumOf { it.uploaded }, downloaded = outcomes.sumOf { it.downloaded }, deleted = outcomes.sumOf { it.deleted }, skipped = outcomes.sumOf { it.skipped }, failedFiles = outcomes.sumOf { it.failed }, conflicts = outcomes.sumOf { it.conflicts }, bytesTransferred = outcomes.sumOf { it.bytesTransferred }, ) } private fun decide( direction: SyncDirection, conflictStrategy: ConflictStrategy, deleteBehavior: DeleteBehavior, local: LocalFileInfo?, remote: RemoteFile?, known: com.syncflow.data.db.entities.SyncFileStateEntity?, ): SyncDecision { val localExists = local != null val remoteExists = remote != null val localChanged = known == null || (localExists && local!!.lastModifiedMs != known.localModifiedAt?.toEpochMilli()) val remoteChanged = known == null || (remoteExists && (remote!!.etag != known.remoteEtag || remote.modifiedAt != known.remoteModifiedAt)) return when { !localExists && !remoteExists -> SyncDecision.SKIP localExists && !remoteExists -> when { known == null -> when (direction) { SyncDirection.UPLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.UPLOAD else -> SyncDecision.SKIP } else -> when { deleteBehavior == DeleteBehavior.KEEP -> SyncDecision.SKIP direction == SyncDirection.DOWNLOAD_ONLY || direction == SyncDirection.TWO_WAY -> SyncDecision.DELETE_LOCAL else -> SyncDecision.SKIP } } !localExists && remoteExists -> when { known == null -> when (direction) { SyncDirection.DOWNLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.DOWNLOAD else -> SyncDecision.SKIP } else -> when { deleteBehavior == DeleteBehavior.KEEP -> SyncDecision.SKIP direction == SyncDirection.UPLOAD_ONLY || direction == SyncDirection.TWO_WAY -> SyncDecision.DELETE_REMOTE else -> SyncDecision.SKIP } } localChanged && remoteChanged -> when (direction) { SyncDirection.UPLOAD_ONLY -> SyncDecision.UPLOAD SyncDirection.DOWNLOAD_ONLY -> SyncDecision.DOWNLOAD SyncDirection.TWO_WAY -> when (conflictStrategy) { ConflictStrategy.KEEP_LOCAL -> SyncDecision.UPLOAD ConflictStrategy.KEEP_REMOTE -> SyncDecision.DOWNLOAD ConflictStrategy.KEEP_NEWEST -> if ((local?.lastModifiedMs ?: 0L) >= (remote?.modifiedAt?.toEpochMilli() ?: 0L)) SyncDecision.UPLOAD else SyncDecision.DOWNLOAD ConflictStrategy.KEEP_LARGEST -> if ((local?.sizeBytes ?: 0L) >= (remote?.sizeBytes ?: 0L)) SyncDecision.UPLOAD else SyncDecision.DOWNLOAD ConflictStrategy.KEEP_BOTH -> SyncDecision.CONFLICT ConflictStrategy.ASK -> SyncDecision.CONFLICT } } localChanged -> when (direction) { SyncDirection.UPLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.UPLOAD else -> SyncDecision.SKIP } remoteChanged -> when (direction) { SyncDirection.DOWNLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.DOWNLOAD else -> SyncDecision.SKIP } else -> SyncDecision.SKIP } } private fun buildState( pairId: Long, rel: String, local: LocalFileInfo?, remote: RemoteFile?, ) = com.syncflow.data.db.entities.SyncFileStateEntity( syncPairId = pairId, relativePath = rel, localModifiedAt = local?.lastModifiedMs?.let { Instant.ofEpochMilli(it) }, localSizeBytes = local?.sizeBytes ?: 0L, localHash = null, remoteModifiedAt = remote?.modifiedAt, remoteSizeBytes = remote?.sizeBytes ?: 0L, remoteEtag = remote?.etag, lastSyncedAt = Instant.now(), syncedHash = null, ) private suspend fun logEvent(pairId: Long, type: SyncEventType, file: String?, msg: String?, bytes: Long) { eventDao.insert(SyncEventEntity(syncPairId = pairId, timestamp = Instant.now(), eventType = type, filePath = file, message = msg, bytesTransferred = bytes)) } } enum class SyncDecision { UPLOAD, DOWNLOAD, DELETE_LOCAL, DELETE_REMOTE, CONFLICT, SKIP } data class SyncResult( val uploaded: Int = 0, val downloaded: Int = 0, val deleted: Int = 0, val skipped: Int = 0, val failedFiles: Int = 0, val conflicts: Int = 0, val bytesTransferred: Long = 0L, val error: Exception? = null, )