Files
SyncFlow/app/src/main/kotlin/com/syncflow/worker/FileWatchService.kt
T
amir 897b685c70 Fix perpetual sync loop and wrong delete decisions
Three bugs fixed:

1. catchupScan used raw dir.walk() with no filters, causing hidden/excluded
   files to appear as "new" every startup and trigger a catchup sync.
   Fixed by using LocalAccessor.walkFiles(pair) which applies the same
   filters and uses the same mtime source (SAF cursor) as SyncEngine.

2. catchupScan compared localModifiedAt.toEpochMilli() vs File.lastModified()
   (millisecond precision) while SyncEngine uses second precision. Every file
   appeared "modified" after a successful sync. Fixed by using epochSecond.

3. syncDecide() treated !localExists && remoteExists && known==null as
   "user deleted local copy → delete remote" even on files that were never
   synced. Fixed to treat unknown remote files as new (download them), which
   is safe because a genuinely-deleted file will always have a known state
   record from the previous sync.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 15:13:43 +00:00

382 lines
17 KiB
Kotlin

package com.syncflow.worker
import android.app.*
import android.content.Context
import android.content.Intent
import android.database.ContentObserver
import android.net.Uri
import android.os.Build
import android.os.FileObserver
import android.os.Handler
import android.os.IBinder
import android.os.Looper
import android.provider.DocumentsContract
import androidx.core.app.NotificationCompat
import androidx.work.ExistingWorkPolicy
import androidx.work.WorkInfo
import androidx.work.WorkManager
import kotlinx.coroutines.flow.first
import com.syncflow.MainActivity
import com.syncflow.R
import com.syncflow.data.db.SyncFileStateDao
import com.syncflow.data.db.SyncPairDao
import com.syncflow.data.db.entities.toDomain
import com.syncflow.domain.model.ScheduleType
import com.syncflow.domain.sync.LocalAccessor
import dagger.hilt.android.AndroidEntryPoint
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import timber.log.Timber
import java.io.File
import javax.inject.Inject
@AndroidEntryPoint
class FileWatchService : Service() {
@Inject lateinit var syncPairDao: SyncPairDao
@Inject lateinit var fileStateDao: SyncFileStateDao
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val mainHandler = Handler(Looper.getMainLooper())
// Prevents concurrent refresh() calls from doubling watchers + catchup scans
private val refreshMutex = Mutex()
// Multiple FileObserver instances per pair: one per directory (recursive)
private val fileObservers = mutableMapOf<Long, MutableList<FileObserver>>()
private val contentObservers = mutableMapOf<Long, ContentObserver>()
private val debounceJobs = mutableMapOf<Long, Job>()
// Persistent monitors that watch WorkManager for ANY sync (manual, catchup, onchange)
// so the cooldown is set regardless of who triggered the sync.
private val syncMonitorJobs = mutableMapOf<Long, Job>()
// After a sync completes, suppress FileObserver events for this long.
private val syncCooldownUntil = mutableMapOf<Long, Long>()
companion object {
const val CHANNEL_WATCH = "sync_watching"
private const val NOTIFICATION_ID = 1002
fun start(context: Context) {
val intent = Intent(context, FileWatchService::class.java)
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
context.startForegroundService(intent)
} else {
context.startService(intent)
}
}
fun stop(context: Context) {
context.stopService(Intent(context, FileWatchService::class.java))
}
}
override fun onCreate() {
super.onCreate()
ensureChannel()
startForeground(NOTIFICATION_ID, buildNotification(0))
}
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
scope.launch { refresh() }
return START_STICKY
}
override fun onDestroy() {
clearWatchers()
scope.cancel()
super.onDestroy()
}
override fun onBind(intent: Intent?): IBinder? = null
private suspend fun refresh() = refreshMutex.withLock {
clearWatchers()
val pairs = syncPairDao.getEnabled().filter { it.scheduleType == ScheduleType.ON_CHANGE }
pairs.forEach { pair ->
val pairId = pair.id
val localPath = pair.localPath
if (localPath.startsWith("content://")) {
// Try to resolve the SAF tree URI to a real filesystem path so we can use
// FileObserver. ContentObserver on a DocumentsProvider tree URI only fires
// when changes come through the SAF API, not for raw filesystem writes.
val realPath = safTreeUriToRealPath(localPath)
if (realPath != null) {
watchPath(realPath, pairId, pair.wifiOnly, pair.chargingOnly)
} else {
// Fallback: register a ContentObserver for SAF paths that can't be resolved
val treeUri = Uri.parse(localPath)
val observer = object : ContentObserver(mainHandler) {
override fun onChange(selfChange: Boolean) = onChangeDetected(pairId, pair.wifiOnly, pair.chargingOnly)
override fun onChange(selfChange: Boolean, uri: Uri?) = onChangeDetected(pairId, pair.wifiOnly, pair.chargingOnly)
}
contentResolver.registerContentObserver(treeUri, true, observer)
contentObservers[pairId] = observer
Timber.d("FileWatchService: watching SAF URI (ContentObserver fallback) for pair $pairId")
}
} else {
watchPath(localPath, pairId, pair.wifiOnly, pair.chargingOnly)
}
}
val count = fileObservers.keys.size + contentObservers.size
updateNotification(count)
if (count == 0) {
Timber.d("FileWatchService: no ON_CHANGE pairs, stopping")
stopSelf()
}
}
private fun safTreeUriToRealPath(uriString: String): String? {
return try {
val treeUri = Uri.parse(uriString)
val docId = DocumentsContract.getTreeDocumentId(treeUri)
// docId format is "primary:RelativePath" for primary internal storage
if (docId.startsWith("primary:")) {
val relative = docId.removePrefix("primary:")
"/storage/emulated/0/$relative"
} else {
null
}
} catch (e: Exception) {
Timber.w("FileWatchService: could not resolve SAF URI to real path: $e")
null
}
}
private fun watchPath(path: String, pairId: Long, wifiOnly: Boolean, chargingOnly: Boolean) {
val dir = File(path)
if (!dir.exists()) {
Timber.w("FileWatchService: path does not exist for pair $pairId: $path")
return
}
fileObservers[pairId] = mutableListOf()
// Set startup cooldown BEFORE registering watchers so inotify events that fire
// immediately on registration don't trigger the debounce before catchupScan runs.
syncCooldownUntil[pairId] = System.currentTimeMillis() + 15_000
watchDirRecursive(dir, pairId, wifiOnly, chargingOnly)
Timber.d("FileWatchService: watching pair $pairId at $path (${fileObservers[pairId]?.size} dirs)")
startSyncMonitor(pairId)
scope.launch { catchupScan(pairId, dir, wifiOnly, chargingOnly) }
}
// Watches WorkManager for ANY sync tagged sync_$pairId (manual, catchup, onchange).
// Sets cooldown while running and for 60s after, so FileObserver events from our
// own file writes never trigger a re-sync regardless of what started the sync.
private fun startSyncMonitor(pairId: Long) {
syncMonitorJobs[pairId]?.cancel()
syncMonitorJobs[pairId] = scope.launch {
var wasSyncing = false
WorkManager.getInstance(applicationContext)
.getWorkInfosByTagFlow("sync_$pairId")
.collect { infos ->
val isSyncing = infos.any {
it.state == WorkInfo.State.RUNNING || it.state == WorkInfo.State.ENQUEUED
}
if (isSyncing) {
Timber.d("FileWatchService: sync active for pair $pairId — cooldown extended")
syncCooldownUntil[pairId] = System.currentTimeMillis() + 120_000
wasSyncing = true
} else if (wasSyncing) {
Timber.d("FileWatchService: sync finished for pair $pairId — 60s settle cooldown")
syncCooldownUntil[pairId] = System.currentTimeMillis() + 60_000
wasSyncing = false
}
}
}
}
private fun watchDirRecursive(dir: File, pairId: Long, wifiOnly: Boolean, chargingOnly: Boolean) {
if (!dir.isDirectory) return
val mask = FileObserver.CREATE or FileObserver.DELETE or FileObserver.MODIFY or
FileObserver.MOVED_FROM or FileObserver.MOVED_TO
val observer = if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q) {
object : FileObserver(dir, mask) {
override fun onEvent(event: Int, path: String?) {
if (event and FileObserver.CREATE != 0 && path != null) {
val created = File(dir, path)
if (created.isDirectory) watchDirRecursive(created, pairId, wifiOnly, chargingOnly)
}
onChangeDetected(pairId, wifiOnly, chargingOnly)
}
}
} else {
@Suppress("DEPRECATION")
object : FileObserver(dir.absolutePath, mask) {
override fun onEvent(event: Int, path: String?) {
if (event and FileObserver.CREATE != 0 && path != null) {
val created = File(dir, path)
if (created.isDirectory) watchDirRecursive(created, pairId, wifiOnly, chargingOnly)
}
onChangeDetected(pairId, wifiOnly, chargingOnly)
}
}
}
observer.startWatching()
fileObservers.getOrPut(pairId) { mutableListOf() }.add(observer)
// Recursively watch existing subdirectories
dir.listFiles()?.filter { it.isDirectory }?.forEach { sub ->
watchDirRecursive(sub, pairId, wifiOnly, chargingOnly)
}
}
private suspend fun catchupScan(pairId: Long, dir: File, wifiOnly: Boolean, chargingOnly: Boolean) {
val known = fileStateDao.getForPair(pairId).associateBy { it.relativePath }
if (known.isEmpty()) return // Never synced — first sync will be triggered manually
val pairEntity = syncPairDao.getById(pairId) ?: return
val pair = pairEntity.toDomain()
// Use the same accessor + filters as SyncEngine so hidden/excluded/size-filtered files
// don't appear as "new" in the catchup scan and trigger a perpetual sync loop.
val accessor = if (pair.localPath.startsWith("content://"))
LocalAccessor.Saf(Uri.parse(pair.localPath), contentResolver)
else
LocalAccessor.JavaFile(dir)
val current = accessor.walkFiles(pair)
val hasNew = current.any { (rel, _) -> rel !in known }
val hasModified = current.any { (rel, info) ->
val s = known[rel]; s != null && s.localModifiedAt != null &&
s.localModifiedAt.epochSecond != info.lastModifiedMs / 1000
}
val hasDeleted = known.keys.any { rel -> rel !in current }
if (hasNew || hasModified || hasDeleted) {
Timber.d("FileWatchService: catchup detected changes for pair $pairId, scheduling sync")
// Cancel any debounce that started before our startup cooldown was set
debounceJobs[pairId]?.cancel()
debounceJobs.remove(pairId)
// Hold cooldown for duration of sync + 60s settle
syncCooldownUntil[pairId] = System.currentTimeMillis() + 120_000
val req = SyncWorker.buildOneTimeRequest(pairId, wifiOnly, chargingOnly)
WorkManager.getInstance(applicationContext)
.enqueueUniqueWork("catchup_$pairId", ExistingWorkPolicy.KEEP, req)
scope.launch {
try {
WorkManager.getInstance(applicationContext)
.getWorkInfoByIdFlow(req.id)
.first { it?.state?.isFinished == true }
syncCooldownUntil[pairId] = System.currentTimeMillis() + 60_000
} catch (e: CancellationException) {
throw e
} catch (_: Exception) {
syncCooldownUntil[pairId] = System.currentTimeMillis() + 60_000
}
}
}
}
private fun onChangeDetected(pairId: Long, wifiOnly: Boolean, chargingOnly: Boolean) {
// Ignore events fired by our own sync writing files — prevents the feedback loop
// where downloaded/uploaded files trigger another sync indefinitely.
if (System.currentTimeMillis() < (syncCooldownUntil[pairId] ?: 0L)) {
Timber.d("FileWatchService: suppressing change event for pair $pairId (sync cooldown)")
return
}
debounceJobs[pairId]?.cancel()
debounceJobs[pairId] = scope.launch {
delay(5_000)
// Re-check: catchupScan or another path may have already set a cooldown
// and handled this sync while we were waiting.
if (System.currentTimeMillis() < (syncCooldownUntil[pairId] ?: 0L)) {
Timber.d("FileWatchService: debounce fired but cooldown active for pair $pairId, skipping")
return@launch
}
val pair = syncPairDao.getById(pairId)
if (pair == null || !pair.isEnabled) return@launch
Timber.d("FileWatchService: triggering sync for pair $pairId after debounce")
// Block new triggers from this point until 60s after sync completes
syncCooldownUntil[pairId] = System.currentTimeMillis() + 120_000
val req = SyncWorker.buildOneTimeRequest(pairId, wifiOnly, chargingOnly, silent = true)
WorkManager.getInstance(applicationContext)
.enqueueUniqueWork("onchange_$pairId", ExistingWorkPolicy.KEEP, req)
updateNotificationDynamic("Syncing: ${pair.name}")
scope.launch {
try {
val info = WorkManager.getInstance(applicationContext)
.getWorkInfoByIdFlow(req.id)
.first { it?.state?.isFinished == true }
// Extend cooldown: 60s after sync finishes to let filesystem settle
syncCooldownUntil[pairId] = System.currentTimeMillis() + 60_000
val summary = info?.outputData?.getString(SyncWorker.KEY_RESULT_SUMMARY)
val watchCount = fileObservers.keys.size + contentObservers.size
val watching = "Watching $watchCount folder${if (watchCount != 1) "s" else ""}"
if (info?.state == WorkInfo.State.SUCCEEDED && summary != null) {
updateNotificationDynamic("${pair.name}: $summary$watching")
} else {
updateNotificationDynamic("$watching")
}
delay(12_000)
updateNotificationDynamic(null)
} catch (e: CancellationException) {
throw e
} catch (_: Exception) {
syncCooldownUntil[pairId] = System.currentTimeMillis() + 60_000
updateNotificationDynamic(null)
}
}
}
}
private fun clearWatchers() {
fileObservers.values.flatten().forEach { it.stopWatching() }
fileObservers.clear()
contentObservers.values.forEach { contentResolver.unregisterContentObserver(it) }
contentObservers.clear()
debounceJobs.values.forEach { it.cancel() }
debounceJobs.clear()
syncMonitorJobs.values.forEach { it.cancel() }
syncMonitorJobs.clear()
syncCooldownUntil.clear()
}
private fun ensureChannel() {
val nm = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
if (nm.getNotificationChannel(CHANNEL_WATCH) == null) {
nm.createNotificationChannel(
NotificationChannel(CHANNEL_WATCH, "File watching", NotificationManager.IMPORTANCE_LOW).apply {
description = "Background service watching folders for changes"
setShowBadge(false)
}
)
}
}
private fun buildNotification(count: Int, overrideText: String? = null): Notification {
val tapIntent = PendingIntent.getActivity(
this, 0,
Intent(this, MainActivity::class.java).apply { flags = Intent.FLAG_ACTIVITY_SINGLE_TOP },
PendingIntent.FLAG_UPDATE_CURRENT or PendingIntent.FLAG_IMMUTABLE,
)
return NotificationCompat.Builder(this, CHANNEL_WATCH)
.setContentTitle("SyncFlow")
.setContentText(
overrideText ?: if (count > 0) "Watching $count folder${if (count != 1) "s" else ""} for changes"
else "Starting file watcher…"
)
.setSmallIcon(R.drawable.ic_sync)
.setContentIntent(tapIntent)
.setOngoing(true)
.setPriority(NotificationCompat.PRIORITY_MIN)
.build()
}
private fun updateNotification(count: Int) {
val nm = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
nm.notify(NOTIFICATION_ID, buildNotification(count))
}
private fun updateNotificationDynamic(overrideText: String?) {
val count = fileObservers.keys.size + contentObservers.size
val nm = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
nm.notify(NOTIFICATION_ID, buildNotification(count, overrideText))
}
}