897b685c70
Three bugs fixed: 1. catchupScan used raw dir.walk() with no filters, causing hidden/excluded files to appear as "new" every startup and trigger a catchup sync. Fixed by using LocalAccessor.walkFiles(pair) which applies the same filters and uses the same mtime source (SAF cursor) as SyncEngine. 2. catchupScan compared localModifiedAt.toEpochMilli() vs File.lastModified() (millisecond precision) while SyncEngine uses second precision. Every file appeared "modified" after a successful sync. Fixed by using epochSecond. 3. syncDecide() treated !localExists && remoteExists && known==null as "user deleted local copy → delete remote" even on files that were never synced. Fixed to treat unknown remote files as new (download them), which is safe because a genuinely-deleted file will always have a known state record from the previous sync. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
382 lines
17 KiB
Kotlin
382 lines
17 KiB
Kotlin
package com.syncflow.worker
|
|
|
|
import android.app.*
|
|
import android.content.Context
|
|
import android.content.Intent
|
|
import android.database.ContentObserver
|
|
import android.net.Uri
|
|
import android.os.Build
|
|
import android.os.FileObserver
|
|
import android.os.Handler
|
|
import android.os.IBinder
|
|
import android.os.Looper
|
|
import android.provider.DocumentsContract
|
|
import androidx.core.app.NotificationCompat
|
|
import androidx.work.ExistingWorkPolicy
|
|
import androidx.work.WorkInfo
|
|
import androidx.work.WorkManager
|
|
import kotlinx.coroutines.flow.first
|
|
import com.syncflow.MainActivity
|
|
import com.syncflow.R
|
|
import com.syncflow.data.db.SyncFileStateDao
|
|
import com.syncflow.data.db.SyncPairDao
|
|
import com.syncflow.data.db.entities.toDomain
|
|
import com.syncflow.domain.model.ScheduleType
|
|
import com.syncflow.domain.sync.LocalAccessor
|
|
import dagger.hilt.android.AndroidEntryPoint
|
|
import kotlinx.coroutines.*
|
|
import kotlinx.coroutines.sync.Mutex
|
|
import kotlinx.coroutines.sync.withLock
|
|
import timber.log.Timber
|
|
import java.io.File
|
|
import javax.inject.Inject
|
|
|
|
@AndroidEntryPoint
|
|
class FileWatchService : Service() {
|
|
|
|
@Inject lateinit var syncPairDao: SyncPairDao
|
|
@Inject lateinit var fileStateDao: SyncFileStateDao
|
|
|
|
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
|
|
private val mainHandler = Handler(Looper.getMainLooper())
|
|
// Prevents concurrent refresh() calls from doubling watchers + catchup scans
|
|
private val refreshMutex = Mutex()
|
|
|
|
// Multiple FileObserver instances per pair: one per directory (recursive)
|
|
private val fileObservers = mutableMapOf<Long, MutableList<FileObserver>>()
|
|
private val contentObservers = mutableMapOf<Long, ContentObserver>()
|
|
private val debounceJobs = mutableMapOf<Long, Job>()
|
|
// Persistent monitors that watch WorkManager for ANY sync (manual, catchup, onchange)
|
|
// so the cooldown is set regardless of who triggered the sync.
|
|
private val syncMonitorJobs = mutableMapOf<Long, Job>()
|
|
// After a sync completes, suppress FileObserver events for this long.
|
|
private val syncCooldownUntil = mutableMapOf<Long, Long>()
|
|
|
|
companion object {
|
|
const val CHANNEL_WATCH = "sync_watching"
|
|
private const val NOTIFICATION_ID = 1002
|
|
|
|
fun start(context: Context) {
|
|
val intent = Intent(context, FileWatchService::class.java)
|
|
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
|
|
context.startForegroundService(intent)
|
|
} else {
|
|
context.startService(intent)
|
|
}
|
|
}
|
|
|
|
fun stop(context: Context) {
|
|
context.stopService(Intent(context, FileWatchService::class.java))
|
|
}
|
|
}
|
|
|
|
override fun onCreate() {
|
|
super.onCreate()
|
|
ensureChannel()
|
|
startForeground(NOTIFICATION_ID, buildNotification(0))
|
|
}
|
|
|
|
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
|
|
scope.launch { refresh() }
|
|
return START_STICKY
|
|
}
|
|
|
|
override fun onDestroy() {
|
|
clearWatchers()
|
|
scope.cancel()
|
|
super.onDestroy()
|
|
}
|
|
|
|
override fun onBind(intent: Intent?): IBinder? = null
|
|
|
|
private suspend fun refresh() = refreshMutex.withLock {
|
|
clearWatchers()
|
|
val pairs = syncPairDao.getEnabled().filter { it.scheduleType == ScheduleType.ON_CHANGE }
|
|
|
|
pairs.forEach { pair ->
|
|
val pairId = pair.id
|
|
val localPath = pair.localPath
|
|
|
|
if (localPath.startsWith("content://")) {
|
|
// Try to resolve the SAF tree URI to a real filesystem path so we can use
|
|
// FileObserver. ContentObserver on a DocumentsProvider tree URI only fires
|
|
// when changes come through the SAF API, not for raw filesystem writes.
|
|
val realPath = safTreeUriToRealPath(localPath)
|
|
if (realPath != null) {
|
|
watchPath(realPath, pairId, pair.wifiOnly, pair.chargingOnly)
|
|
} else {
|
|
// Fallback: register a ContentObserver for SAF paths that can't be resolved
|
|
val treeUri = Uri.parse(localPath)
|
|
val observer = object : ContentObserver(mainHandler) {
|
|
override fun onChange(selfChange: Boolean) = onChangeDetected(pairId, pair.wifiOnly, pair.chargingOnly)
|
|
override fun onChange(selfChange: Boolean, uri: Uri?) = onChangeDetected(pairId, pair.wifiOnly, pair.chargingOnly)
|
|
}
|
|
contentResolver.registerContentObserver(treeUri, true, observer)
|
|
contentObservers[pairId] = observer
|
|
Timber.d("FileWatchService: watching SAF URI (ContentObserver fallback) for pair $pairId")
|
|
}
|
|
} else {
|
|
watchPath(localPath, pairId, pair.wifiOnly, pair.chargingOnly)
|
|
}
|
|
}
|
|
|
|
val count = fileObservers.keys.size + contentObservers.size
|
|
updateNotification(count)
|
|
|
|
if (count == 0) {
|
|
Timber.d("FileWatchService: no ON_CHANGE pairs, stopping")
|
|
stopSelf()
|
|
}
|
|
}
|
|
|
|
private fun safTreeUriToRealPath(uriString: String): String? {
|
|
return try {
|
|
val treeUri = Uri.parse(uriString)
|
|
val docId = DocumentsContract.getTreeDocumentId(treeUri)
|
|
// docId format is "primary:RelativePath" for primary internal storage
|
|
if (docId.startsWith("primary:")) {
|
|
val relative = docId.removePrefix("primary:")
|
|
"/storage/emulated/0/$relative"
|
|
} else {
|
|
null
|
|
}
|
|
} catch (e: Exception) {
|
|
Timber.w("FileWatchService: could not resolve SAF URI to real path: $e")
|
|
null
|
|
}
|
|
}
|
|
|
|
private fun watchPath(path: String, pairId: Long, wifiOnly: Boolean, chargingOnly: Boolean) {
|
|
val dir = File(path)
|
|
if (!dir.exists()) {
|
|
Timber.w("FileWatchService: path does not exist for pair $pairId: $path")
|
|
return
|
|
}
|
|
fileObservers[pairId] = mutableListOf()
|
|
// Set startup cooldown BEFORE registering watchers so inotify events that fire
|
|
// immediately on registration don't trigger the debounce before catchupScan runs.
|
|
syncCooldownUntil[pairId] = System.currentTimeMillis() + 15_000
|
|
watchDirRecursive(dir, pairId, wifiOnly, chargingOnly)
|
|
Timber.d("FileWatchService: watching pair $pairId at $path (${fileObservers[pairId]?.size} dirs)")
|
|
startSyncMonitor(pairId)
|
|
scope.launch { catchupScan(pairId, dir, wifiOnly, chargingOnly) }
|
|
}
|
|
|
|
// Watches WorkManager for ANY sync tagged sync_$pairId (manual, catchup, onchange).
|
|
// Sets cooldown while running and for 60s after, so FileObserver events from our
|
|
// own file writes never trigger a re-sync regardless of what started the sync.
|
|
private fun startSyncMonitor(pairId: Long) {
|
|
syncMonitorJobs[pairId]?.cancel()
|
|
syncMonitorJobs[pairId] = scope.launch {
|
|
var wasSyncing = false
|
|
WorkManager.getInstance(applicationContext)
|
|
.getWorkInfosByTagFlow("sync_$pairId")
|
|
.collect { infos ->
|
|
val isSyncing = infos.any {
|
|
it.state == WorkInfo.State.RUNNING || it.state == WorkInfo.State.ENQUEUED
|
|
}
|
|
if (isSyncing) {
|
|
Timber.d("FileWatchService: sync active for pair $pairId — cooldown extended")
|
|
syncCooldownUntil[pairId] = System.currentTimeMillis() + 120_000
|
|
wasSyncing = true
|
|
} else if (wasSyncing) {
|
|
Timber.d("FileWatchService: sync finished for pair $pairId — 60s settle cooldown")
|
|
syncCooldownUntil[pairId] = System.currentTimeMillis() + 60_000
|
|
wasSyncing = false
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private fun watchDirRecursive(dir: File, pairId: Long, wifiOnly: Boolean, chargingOnly: Boolean) {
|
|
if (!dir.isDirectory) return
|
|
val mask = FileObserver.CREATE or FileObserver.DELETE or FileObserver.MODIFY or
|
|
FileObserver.MOVED_FROM or FileObserver.MOVED_TO
|
|
val observer = if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q) {
|
|
object : FileObserver(dir, mask) {
|
|
override fun onEvent(event: Int, path: String?) {
|
|
if (event and FileObserver.CREATE != 0 && path != null) {
|
|
val created = File(dir, path)
|
|
if (created.isDirectory) watchDirRecursive(created, pairId, wifiOnly, chargingOnly)
|
|
}
|
|
onChangeDetected(pairId, wifiOnly, chargingOnly)
|
|
}
|
|
}
|
|
} else {
|
|
@Suppress("DEPRECATION")
|
|
object : FileObserver(dir.absolutePath, mask) {
|
|
override fun onEvent(event: Int, path: String?) {
|
|
if (event and FileObserver.CREATE != 0 && path != null) {
|
|
val created = File(dir, path)
|
|
if (created.isDirectory) watchDirRecursive(created, pairId, wifiOnly, chargingOnly)
|
|
}
|
|
onChangeDetected(pairId, wifiOnly, chargingOnly)
|
|
}
|
|
}
|
|
}
|
|
observer.startWatching()
|
|
fileObservers.getOrPut(pairId) { mutableListOf() }.add(observer)
|
|
// Recursively watch existing subdirectories
|
|
dir.listFiles()?.filter { it.isDirectory }?.forEach { sub ->
|
|
watchDirRecursive(sub, pairId, wifiOnly, chargingOnly)
|
|
}
|
|
}
|
|
|
|
private suspend fun catchupScan(pairId: Long, dir: File, wifiOnly: Boolean, chargingOnly: Boolean) {
|
|
val known = fileStateDao.getForPair(pairId).associateBy { it.relativePath }
|
|
if (known.isEmpty()) return // Never synced — first sync will be triggered manually
|
|
|
|
val pairEntity = syncPairDao.getById(pairId) ?: return
|
|
val pair = pairEntity.toDomain()
|
|
// Use the same accessor + filters as SyncEngine so hidden/excluded/size-filtered files
|
|
// don't appear as "new" in the catchup scan and trigger a perpetual sync loop.
|
|
val accessor = if (pair.localPath.startsWith("content://"))
|
|
LocalAccessor.Saf(Uri.parse(pair.localPath), contentResolver)
|
|
else
|
|
LocalAccessor.JavaFile(dir)
|
|
val current = accessor.walkFiles(pair)
|
|
|
|
val hasNew = current.any { (rel, _) -> rel !in known }
|
|
val hasModified = current.any { (rel, info) ->
|
|
val s = known[rel]; s != null && s.localModifiedAt != null &&
|
|
s.localModifiedAt.epochSecond != info.lastModifiedMs / 1000
|
|
}
|
|
val hasDeleted = known.keys.any { rel -> rel !in current }
|
|
|
|
if (hasNew || hasModified || hasDeleted) {
|
|
Timber.d("FileWatchService: catchup detected changes for pair $pairId, scheduling sync")
|
|
// Cancel any debounce that started before our startup cooldown was set
|
|
debounceJobs[pairId]?.cancel()
|
|
debounceJobs.remove(pairId)
|
|
// Hold cooldown for duration of sync + 60s settle
|
|
syncCooldownUntil[pairId] = System.currentTimeMillis() + 120_000
|
|
val req = SyncWorker.buildOneTimeRequest(pairId, wifiOnly, chargingOnly)
|
|
WorkManager.getInstance(applicationContext)
|
|
.enqueueUniqueWork("catchup_$pairId", ExistingWorkPolicy.KEEP, req)
|
|
scope.launch {
|
|
try {
|
|
WorkManager.getInstance(applicationContext)
|
|
.getWorkInfoByIdFlow(req.id)
|
|
.first { it?.state?.isFinished == true }
|
|
syncCooldownUntil[pairId] = System.currentTimeMillis() + 60_000
|
|
} catch (e: CancellationException) {
|
|
throw e
|
|
} catch (_: Exception) {
|
|
syncCooldownUntil[pairId] = System.currentTimeMillis() + 60_000
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private fun onChangeDetected(pairId: Long, wifiOnly: Boolean, chargingOnly: Boolean) {
|
|
// Ignore events fired by our own sync writing files — prevents the feedback loop
|
|
// where downloaded/uploaded files trigger another sync indefinitely.
|
|
if (System.currentTimeMillis() < (syncCooldownUntil[pairId] ?: 0L)) {
|
|
Timber.d("FileWatchService: suppressing change event for pair $pairId (sync cooldown)")
|
|
return
|
|
}
|
|
|
|
debounceJobs[pairId]?.cancel()
|
|
debounceJobs[pairId] = scope.launch {
|
|
delay(5_000)
|
|
// Re-check: catchupScan or another path may have already set a cooldown
|
|
// and handled this sync while we were waiting.
|
|
if (System.currentTimeMillis() < (syncCooldownUntil[pairId] ?: 0L)) {
|
|
Timber.d("FileWatchService: debounce fired but cooldown active for pair $pairId, skipping")
|
|
return@launch
|
|
}
|
|
val pair = syncPairDao.getById(pairId)
|
|
if (pair == null || !pair.isEnabled) return@launch
|
|
Timber.d("FileWatchService: triggering sync for pair $pairId after debounce")
|
|
|
|
// Block new triggers from this point until 60s after sync completes
|
|
syncCooldownUntil[pairId] = System.currentTimeMillis() + 120_000
|
|
|
|
val req = SyncWorker.buildOneTimeRequest(pairId, wifiOnly, chargingOnly, silent = true)
|
|
WorkManager.getInstance(applicationContext)
|
|
.enqueueUniqueWork("onchange_$pairId", ExistingWorkPolicy.KEEP, req)
|
|
|
|
updateNotificationDynamic("Syncing: ${pair.name}…")
|
|
|
|
scope.launch {
|
|
try {
|
|
val info = WorkManager.getInstance(applicationContext)
|
|
.getWorkInfoByIdFlow(req.id)
|
|
.first { it?.state?.isFinished == true }
|
|
// Extend cooldown: 60s after sync finishes to let filesystem settle
|
|
syncCooldownUntil[pairId] = System.currentTimeMillis() + 60_000
|
|
val summary = info?.outputData?.getString(SyncWorker.KEY_RESULT_SUMMARY)
|
|
val watchCount = fileObservers.keys.size + contentObservers.size
|
|
val watching = "Watching $watchCount folder${if (watchCount != 1) "s" else ""}"
|
|
if (info?.state == WorkInfo.State.SUCCEEDED && summary != null) {
|
|
updateNotificationDynamic("${pair.name}: $summary — $watching")
|
|
} else {
|
|
updateNotificationDynamic("$watching")
|
|
}
|
|
delay(12_000)
|
|
updateNotificationDynamic(null)
|
|
} catch (e: CancellationException) {
|
|
throw e
|
|
} catch (_: Exception) {
|
|
syncCooldownUntil[pairId] = System.currentTimeMillis() + 60_000
|
|
updateNotificationDynamic(null)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private fun clearWatchers() {
|
|
fileObservers.values.flatten().forEach { it.stopWatching() }
|
|
fileObservers.clear()
|
|
contentObservers.values.forEach { contentResolver.unregisterContentObserver(it) }
|
|
contentObservers.clear()
|
|
debounceJobs.values.forEach { it.cancel() }
|
|
debounceJobs.clear()
|
|
syncMonitorJobs.values.forEach { it.cancel() }
|
|
syncMonitorJobs.clear()
|
|
syncCooldownUntil.clear()
|
|
}
|
|
|
|
private fun ensureChannel() {
|
|
val nm = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
|
|
if (nm.getNotificationChannel(CHANNEL_WATCH) == null) {
|
|
nm.createNotificationChannel(
|
|
NotificationChannel(CHANNEL_WATCH, "File watching", NotificationManager.IMPORTANCE_LOW).apply {
|
|
description = "Background service watching folders for changes"
|
|
setShowBadge(false)
|
|
}
|
|
)
|
|
}
|
|
}
|
|
|
|
private fun buildNotification(count: Int, overrideText: String? = null): Notification {
|
|
val tapIntent = PendingIntent.getActivity(
|
|
this, 0,
|
|
Intent(this, MainActivity::class.java).apply { flags = Intent.FLAG_ACTIVITY_SINGLE_TOP },
|
|
PendingIntent.FLAG_UPDATE_CURRENT or PendingIntent.FLAG_IMMUTABLE,
|
|
)
|
|
return NotificationCompat.Builder(this, CHANNEL_WATCH)
|
|
.setContentTitle("SyncFlow")
|
|
.setContentText(
|
|
overrideText ?: if (count > 0) "Watching $count folder${if (count != 1) "s" else ""} for changes"
|
|
else "Starting file watcher…"
|
|
)
|
|
.setSmallIcon(R.drawable.ic_sync)
|
|
.setContentIntent(tapIntent)
|
|
.setOngoing(true)
|
|
.setPriority(NotificationCompat.PRIORITY_MIN)
|
|
.build()
|
|
}
|
|
|
|
private fun updateNotification(count: Int) {
|
|
val nm = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
|
|
nm.notify(NOTIFICATION_ID, buildNotification(count))
|
|
}
|
|
|
|
private fun updateNotificationDynamic(overrideText: String?) {
|
|
val count = fileObservers.keys.size + contentObservers.size
|
|
val nm = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
|
|
nm.notify(NOTIFICATION_ID, buildNotification(count, overrideText))
|
|
}
|
|
}
|