Initial commit — SyncFlow Android file sync app
Supports WebDAV, SFTP, SFTPGo, Nextcloud, ownCloud, Google Drive, Dropbox, and OneDrive. Credentials encrypted with Android Keystore. Biometric app-lock, conflict resolution, and auto-sync via WorkManager. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,278 @@
|
||||
package com.syncflow.domain.sync
|
||||
|
||||
import com.syncflow.data.db.SyncConflictDao
|
||||
import com.syncflow.data.db.SyncEventDao
|
||||
import com.syncflow.data.db.SyncFileStateDao
|
||||
import com.syncflow.data.db.SyncPairDao
|
||||
import com.syncflow.data.db.entities.SyncConflictEntity
|
||||
import com.syncflow.data.db.entities.SyncEventEntity
|
||||
import com.syncflow.data.db.entities.SyncFileStateEntity
|
||||
import com.syncflow.data.providers.CloudProvider
|
||||
import com.syncflow.domain.model.ConflictStrategy
|
||||
import com.syncflow.domain.model.DeleteBehavior
|
||||
import com.syncflow.domain.model.RemoteFile
|
||||
import com.syncflow.domain.model.SyncDirection
|
||||
import com.syncflow.domain.model.SyncEventType
|
||||
import com.syncflow.domain.model.SyncPair
|
||||
import com.syncflow.domain.model.SyncStatus
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.sync.Semaphore
|
||||
import kotlinx.coroutines.sync.withPermit
|
||||
import timber.log.Timber
|
||||
import java.io.File
|
||||
import java.security.MessageDigest
|
||||
import java.time.Instant
|
||||
import javax.inject.Inject
|
||||
|
||||
class SyncEngine @Inject constructor(
|
||||
private val syncPairDao: SyncPairDao,
|
||||
private val fileStateDao: SyncFileStateDao,
|
||||
private val conflictDao: SyncConflictDao,
|
||||
private val eventDao: SyncEventDao,
|
||||
) {
|
||||
suspend fun sync(pair: SyncPair, provider: CloudProvider): SyncResult {
|
||||
syncPairDao.updateStatus(pair.id, SyncStatus.SYNCING)
|
||||
logEvent(pair.id, SyncEventType.SYNC_STARTED, null, null, 0)
|
||||
|
||||
return try {
|
||||
val result = performSync(pair, provider)
|
||||
val finalStatus = when {
|
||||
result.failedFiles > 0 && result.conflicts > 0 -> SyncStatus.CONFLICT
|
||||
result.failedFiles > 0 -> SyncStatus.PARTIAL
|
||||
result.conflicts > 0 -> SyncStatus.CONFLICT
|
||||
else -> SyncStatus.SUCCESS
|
||||
}
|
||||
syncPairDao.updateSyncResult(pair.id, Instant.now(), finalStatus, result.conflicts)
|
||||
logEvent(pair.id, SyncEventType.SYNC_COMPLETED, null, "↑${result.uploaded} ↓${result.downloaded} ✗${result.failedFiles}", result.bytesTransferred)
|
||||
result
|
||||
} catch (e: Exception) {
|
||||
Timber.e(e, "Sync failed for pair ${pair.id}")
|
||||
syncPairDao.updateSyncResult(pair.id, Instant.now(), SyncStatus.FAILED, 0)
|
||||
logEvent(pair.id, SyncEventType.SYNC_FAILED, null, e.message, 0)
|
||||
SyncResult(failedFiles = 1, error = e)
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun performSync(pair: SyncPair, provider: CloudProvider): SyncResult {
|
||||
val localRoot = File(pair.localPath)
|
||||
val knownStates = fileStateDao.getForPair(pair.id).associateBy { it.relativePath }
|
||||
val remoteFiles = provider.listFiles(pair.remotePath).getOrThrow().associateBy { it.path.removePrefix(pair.remotePath).trimStart('/') }
|
||||
val localFiles = localRoot.walkFiles(pair)
|
||||
|
||||
var uploaded = 0; var downloaded = 0; var deleted = 0; var skipped = 0; var failed = 0; var conflicts = 0
|
||||
var bytesTransferred = 0L
|
||||
val newStates = mutableListOf<SyncFileStateEntity>()
|
||||
|
||||
val allPaths = (localFiles.keys + remoteFiles.keys + knownStates.keys).toSet()
|
||||
|
||||
// Fan out with bounded parallelism
|
||||
val semaphore = Semaphore(4)
|
||||
coroutineScope {
|
||||
allPaths.map { rel ->
|
||||
async {
|
||||
semaphore.withPermit {
|
||||
val local = localFiles[rel]
|
||||
val remote = remoteFiles[rel]
|
||||
val known = knownStates[rel]
|
||||
val decision = decide(pair.syncDirection, pair.conflictStrategy, pair.deleteBehavior, local, remote, known)
|
||||
|
||||
when (decision) {
|
||||
SyncDecision.UPLOAD -> {
|
||||
val file = File(localRoot, rel)
|
||||
val bytes = runCatching {
|
||||
file.inputStream().use { stream ->
|
||||
provider.uploadFile(stream, "${pair.remotePath}/$rel", file.length()) { }
|
||||
}
|
||||
file.length()
|
||||
}.getOrElse { e ->
|
||||
Timber.e(e, "Upload failed: $rel")
|
||||
failed++
|
||||
logEvent(pair.id, SyncEventType.FILE_SKIPPED, rel, e.message, 0)
|
||||
return@withPermit
|
||||
}
|
||||
uploaded++
|
||||
bytesTransferred += bytes
|
||||
newStates += buildState(pair.id, rel, file, remote)
|
||||
logEvent(pair.id, SyncEventType.FILE_UPLOADED, rel, null, bytes)
|
||||
}
|
||||
SyncDecision.DOWNLOAD -> {
|
||||
val dest = File(localRoot, rel).also { it.parentFile?.mkdirs() }
|
||||
val bytes = runCatching {
|
||||
dest.outputStream().use { stream ->
|
||||
provider.downloadFile("${pair.remotePath}/$rel", stream) { }
|
||||
}
|
||||
remote!!.sizeBytes
|
||||
}.getOrElse { e ->
|
||||
Timber.e(e, "Download failed: $rel")
|
||||
failed++
|
||||
logEvent(pair.id, SyncEventType.FILE_SKIPPED, rel, e.message, 0)
|
||||
return@withPermit
|
||||
}
|
||||
downloaded++
|
||||
bytesTransferred += bytes
|
||||
newStates += buildState(pair.id, rel, dest, remote)
|
||||
logEvent(pair.id, SyncEventType.FILE_DOWNLOADED, rel, null, bytes)
|
||||
}
|
||||
SyncDecision.DELETE_LOCAL -> {
|
||||
File(localRoot, rel).delete()
|
||||
fileStateDao.delete(pair.id, rel)
|
||||
deleted++
|
||||
logEvent(pair.id, SyncEventType.FILE_DELETED, rel, "local", 0)
|
||||
}
|
||||
SyncDecision.DELETE_REMOTE -> {
|
||||
provider.deleteFile("${pair.remotePath}/$rel")
|
||||
fileStateDao.delete(pair.id, rel)
|
||||
deleted++
|
||||
logEvent(pair.id, SyncEventType.FILE_DELETED, rel, "remote", 0)
|
||||
}
|
||||
SyncDecision.CONFLICT -> {
|
||||
conflicts++
|
||||
conflictDao.insert(SyncConflictEntity(
|
||||
syncPairId = pair.id,
|
||||
relativePath = rel,
|
||||
localModifiedAt = local?.lastModified()?.let { Instant.ofEpochMilli(it) } ?: Instant.EPOCH,
|
||||
localSizeBytes = local?.length() ?: 0L,
|
||||
remoteModifiedAt = remote?.modifiedAt ?: Instant.EPOCH,
|
||||
remoteSizeBytes = remote?.sizeBytes ?: 0L,
|
||||
resolution = null,
|
||||
detectedAt = Instant.now(),
|
||||
))
|
||||
logEvent(pair.id, SyncEventType.CONFLICT_DETECTED, rel, null, 0)
|
||||
}
|
||||
SyncDecision.SKIP -> skipped++
|
||||
}
|
||||
}
|
||||
}
|
||||
}.awaitAll()
|
||||
}
|
||||
|
||||
fileStateDao.upsertAll(newStates)
|
||||
return SyncResult(uploaded, downloaded, deleted, skipped, failed, conflicts, bytesTransferred)
|
||||
}
|
||||
|
||||
private fun decide(
|
||||
direction: SyncDirection,
|
||||
conflictStrategy: ConflictStrategy,
|
||||
deleteBehavior: DeleteBehavior,
|
||||
local: File?,
|
||||
remote: RemoteFile?,
|
||||
known: SyncFileStateEntity?,
|
||||
): SyncDecision {
|
||||
val localExists = local?.exists() == true
|
||||
val remoteExists = remote != null
|
||||
|
||||
val localChanged = known == null || (localExists && local!!.lastModified() != known.localModifiedAt?.toEpochMilli())
|
||||
val remoteChanged = known == null || (remoteExists && remote!!.etag != known.remoteEtag && remote.modifiedAt != known.remoteModifiedAt)
|
||||
|
||||
return when {
|
||||
!localExists && !remoteExists -> SyncDecision.SKIP
|
||||
|
||||
// File only exists locally
|
||||
localExists && !remoteExists -> when {
|
||||
known == null -> when (direction) {
|
||||
SyncDirection.UPLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.UPLOAD
|
||||
else -> SyncDecision.SKIP
|
||||
}
|
||||
// Remote was deleted — respect deleteBehavior
|
||||
else -> when {
|
||||
deleteBehavior == DeleteBehavior.KEEP -> SyncDecision.SKIP
|
||||
direction == SyncDirection.DOWNLOAD_ONLY || direction == SyncDirection.TWO_WAY -> SyncDecision.DELETE_LOCAL
|
||||
else -> SyncDecision.SKIP
|
||||
}
|
||||
}
|
||||
|
||||
// File only exists remotely
|
||||
!localExists && remoteExists -> when {
|
||||
known == null -> when (direction) {
|
||||
SyncDirection.DOWNLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.DOWNLOAD
|
||||
else -> SyncDecision.SKIP
|
||||
}
|
||||
// Local was deleted — respect deleteBehavior
|
||||
else -> when {
|
||||
deleteBehavior == DeleteBehavior.KEEP -> SyncDecision.SKIP
|
||||
direction == SyncDirection.UPLOAD_ONLY || direction == SyncDirection.TWO_WAY -> SyncDecision.DELETE_REMOTE
|
||||
else -> SyncDecision.SKIP
|
||||
}
|
||||
}
|
||||
|
||||
// Both changed — conflict
|
||||
localChanged && remoteChanged -> when (direction) {
|
||||
SyncDirection.UPLOAD_ONLY -> SyncDecision.UPLOAD
|
||||
SyncDirection.DOWNLOAD_ONLY -> SyncDecision.DOWNLOAD
|
||||
SyncDirection.TWO_WAY -> when (conflictStrategy) {
|
||||
ConflictStrategy.KEEP_LOCAL -> SyncDecision.UPLOAD
|
||||
ConflictStrategy.KEEP_REMOTE -> SyncDecision.DOWNLOAD
|
||||
ConflictStrategy.KEEP_NEWEST -> if ((local?.lastModified() ?: 0L) >= (remote?.modifiedAt?.toEpochMilli() ?: 0L)) SyncDecision.UPLOAD else SyncDecision.DOWNLOAD
|
||||
ConflictStrategy.KEEP_LARGEST -> if ((local?.length() ?: 0L) >= (remote?.sizeBytes ?: 0L)) SyncDecision.UPLOAD else SyncDecision.DOWNLOAD
|
||||
ConflictStrategy.KEEP_BOTH -> SyncDecision.CONFLICT // engine keeps both via rename
|
||||
ConflictStrategy.ASK -> SyncDecision.CONFLICT
|
||||
}
|
||||
}
|
||||
|
||||
localChanged -> when (direction) {
|
||||
SyncDirection.UPLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.UPLOAD
|
||||
else -> SyncDecision.SKIP
|
||||
}
|
||||
remoteChanged -> when (direction) {
|
||||
SyncDirection.DOWNLOAD_ONLY, SyncDirection.TWO_WAY -> SyncDecision.DOWNLOAD
|
||||
else -> SyncDecision.SKIP
|
||||
}
|
||||
else -> SyncDecision.SKIP
|
||||
}
|
||||
}
|
||||
|
||||
private fun buildState(pairId: Long, rel: String, local: File, remote: RemoteFile?) = SyncFileStateEntity(
|
||||
syncPairId = pairId,
|
||||
relativePath = rel,
|
||||
localModifiedAt = if (local.exists()) Instant.ofEpochMilli(local.lastModified()) else null,
|
||||
localSizeBytes = local.length(),
|
||||
localHash = null,
|
||||
remoteModifiedAt = remote?.modifiedAt,
|
||||
remoteSizeBytes = remote?.sizeBytes ?: 0L,
|
||||
remoteEtag = remote?.etag,
|
||||
lastSyncedAt = Instant.now(),
|
||||
syncedHash = null,
|
||||
)
|
||||
|
||||
private suspend fun logEvent(pairId: Long, type: SyncEventType, file: String?, msg: String?, bytes: Long) {
|
||||
eventDao.insert(SyncEventEntity(syncPairId = pairId, timestamp = Instant.now(), eventType = type, filePath = file, message = msg, bytesTransferred = bytes))
|
||||
}
|
||||
|
||||
private fun File.walkFiles(pair: SyncPair): Map<String, File> {
|
||||
if (!exists()) return emptyMap()
|
||||
val includeExts = pair.includeExtensions.map { it.lowercase().trimStart('.') }.toSet()
|
||||
val excludeExts = pair.excludeExtensions.map { it.lowercase().trimStart('.') }.toSet()
|
||||
val minBytes = pair.minFileSizeKb * 1024
|
||||
val maxBytes = if (pair.maxFileSizeKb > 0) pair.maxFileSizeKb * 1024 else Long.MAX_VALUE
|
||||
|
||||
return walkTopDown()
|
||||
.onEnter { dir ->
|
||||
pair.recursive || dir == this
|
||||
}
|
||||
.filter { it.isFile }
|
||||
.filter { f -> !pair.skipHiddenFiles || !f.name.startsWith('.') }
|
||||
.filter { f -> pair.excludePatterns.none { pat -> f.name.matches(pat.toGlob()) } }
|
||||
.filter { f ->
|
||||
val ext = f.extension.lowercase()
|
||||
(includeExts.isEmpty() || ext in includeExts) && ext !in excludeExts
|
||||
}
|
||||
.filter { f -> f.length() >= minBytes && f.length() <= maxBytes }
|
||||
.associate { f -> f.relativeTo(this).path to f }
|
||||
}
|
||||
|
||||
private fun String.toGlob(): Regex =
|
||||
Regex(replace(".", "\\.").replace("*", ".*").replace("?", "."), RegexOption.IGNORE_CASE)
|
||||
}
|
||||
|
||||
enum class SyncDecision { UPLOAD, DOWNLOAD, DELETE_LOCAL, DELETE_REMOTE, CONFLICT, SKIP }
|
||||
|
||||
data class SyncResult(
|
||||
val uploaded: Int = 0,
|
||||
val downloaded: Int = 0,
|
||||
val deleted: Int = 0,
|
||||
val skipped: Int = 0,
|
||||
val failedFiles: Int = 0,
|
||||
val conflicts: Int = 0,
|
||||
val bytesTransferred: Long = 0L,
|
||||
val error: Exception? = null,
|
||||
)
|
||||
Reference in New Issue
Block a user