ec478531da
Three root causes found via live logcat on device: 1. concurrent refresh() race: onStartCommand received twice causes two refresh() coroutines to run in parallel, doubling FileObserver and catchupScan registrations. Fixed with Mutex.withLock on refresh(). 2. catchupScan no cooldown: catchup syncs write files but never set syncCooldownUntil, so every written file immediately re-triggers onChangeDetected. Fixed by setting cooldown before enqueue and watching work completion same as onChangeDetected does. 3. CancellationException caught silently: exception handler catch(_: Exception) was catching CancellationException and resetting cooldown to 0L, re-opening the loop. Fixed by rethrowing CancellationException and setting 60s cooldown on other errors. Icon: interlocked rings (blue/red/green/orange) with sync arrow at center, pure black background — matches reference image. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
333 lines
14 KiB
Kotlin
333 lines
14 KiB
Kotlin
package com.syncflow.worker
|
|
|
|
import android.app.*
|
|
import android.content.Context
|
|
import android.content.Intent
|
|
import android.database.ContentObserver
|
|
import android.net.Uri
|
|
import android.os.Build
|
|
import android.os.FileObserver
|
|
import android.os.Handler
|
|
import android.os.IBinder
|
|
import android.os.Looper
|
|
import android.provider.DocumentsContract
|
|
import androidx.core.app.NotificationCompat
|
|
import androidx.work.ExistingWorkPolicy
|
|
import androidx.work.WorkInfo
|
|
import androidx.work.WorkManager
|
|
import kotlinx.coroutines.flow.first
|
|
import com.syncflow.MainActivity
|
|
import com.syncflow.R
|
|
import com.syncflow.data.db.SyncFileStateDao
|
|
import com.syncflow.data.db.SyncPairDao
|
|
import com.syncflow.domain.model.ScheduleType
|
|
import dagger.hilt.android.AndroidEntryPoint
|
|
import kotlinx.coroutines.*
|
|
import kotlinx.coroutines.sync.Mutex
|
|
import kotlinx.coroutines.sync.withLock
|
|
import timber.log.Timber
|
|
import java.io.File
|
|
import javax.inject.Inject
|
|
|
|
@AndroidEntryPoint
|
|
class FileWatchService : Service() {
|
|
|
|
@Inject lateinit var syncPairDao: SyncPairDao
|
|
@Inject lateinit var fileStateDao: SyncFileStateDao
|
|
|
|
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
|
|
private val mainHandler = Handler(Looper.getMainLooper())
|
|
// Prevents concurrent refresh() calls from doubling watchers + catchup scans
|
|
private val refreshMutex = Mutex()
|
|
|
|
// Multiple FileObserver instances per pair: one per directory (recursive)
|
|
private val fileObservers = mutableMapOf<Long, MutableList<FileObserver>>()
|
|
private val contentObservers = mutableMapOf<Long, ContentObserver>()
|
|
private val debounceJobs = mutableMapOf<Long, Job>()
|
|
// After a watcher-triggered sync completes, suppress FileObserver events for this long
|
|
// to stop the feedback loop: sync writes files → FileObserver fires → another sync → repeat.
|
|
private val syncCooldownUntil = mutableMapOf<Long, Long>()
|
|
|
|
companion object {
|
|
const val CHANNEL_WATCH = "sync_watching"
|
|
private const val NOTIFICATION_ID = 1002
|
|
|
|
fun start(context: Context) {
|
|
val intent = Intent(context, FileWatchService::class.java)
|
|
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
|
|
context.startForegroundService(intent)
|
|
} else {
|
|
context.startService(intent)
|
|
}
|
|
}
|
|
|
|
fun stop(context: Context) {
|
|
context.stopService(Intent(context, FileWatchService::class.java))
|
|
}
|
|
}
|
|
|
|
override fun onCreate() {
|
|
super.onCreate()
|
|
ensureChannel()
|
|
startForeground(NOTIFICATION_ID, buildNotification(0))
|
|
}
|
|
|
|
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
|
|
scope.launch { refresh() }
|
|
return START_STICKY
|
|
}
|
|
|
|
override fun onDestroy() {
|
|
clearWatchers()
|
|
scope.cancel()
|
|
super.onDestroy()
|
|
}
|
|
|
|
override fun onBind(intent: Intent?): IBinder? = null
|
|
|
|
private suspend fun refresh() = refreshMutex.withLock {
|
|
clearWatchers()
|
|
val pairs = syncPairDao.getEnabled().filter { it.scheduleType == ScheduleType.ON_CHANGE }
|
|
|
|
pairs.forEach { pair ->
|
|
val pairId = pair.id
|
|
val localPath = pair.localPath
|
|
|
|
if (localPath.startsWith("content://")) {
|
|
// Try to resolve the SAF tree URI to a real filesystem path so we can use
|
|
// FileObserver. ContentObserver on a DocumentsProvider tree URI only fires
|
|
// when changes come through the SAF API, not for raw filesystem writes.
|
|
val realPath = safTreeUriToRealPath(localPath)
|
|
if (realPath != null) {
|
|
watchPath(realPath, pairId, pair.wifiOnly, pair.chargingOnly)
|
|
} else {
|
|
// Fallback: register a ContentObserver for SAF paths that can't be resolved
|
|
val treeUri = Uri.parse(localPath)
|
|
val observer = object : ContentObserver(mainHandler) {
|
|
override fun onChange(selfChange: Boolean) = onChangeDetected(pairId, pair.wifiOnly, pair.chargingOnly)
|
|
override fun onChange(selfChange: Boolean, uri: Uri?) = onChangeDetected(pairId, pair.wifiOnly, pair.chargingOnly)
|
|
}
|
|
contentResolver.registerContentObserver(treeUri, true, observer)
|
|
contentObservers[pairId] = observer
|
|
Timber.d("FileWatchService: watching SAF URI (ContentObserver fallback) for pair $pairId")
|
|
}
|
|
} else {
|
|
watchPath(localPath, pairId, pair.wifiOnly, pair.chargingOnly)
|
|
}
|
|
}
|
|
|
|
val count = fileObservers.keys.size + contentObservers.size
|
|
updateNotification(count)
|
|
|
|
if (count == 0) {
|
|
Timber.d("FileWatchService: no ON_CHANGE pairs, stopping")
|
|
stopSelf()
|
|
}
|
|
}
|
|
|
|
private fun safTreeUriToRealPath(uriString: String): String? {
|
|
return try {
|
|
val treeUri = Uri.parse(uriString)
|
|
val docId = DocumentsContract.getTreeDocumentId(treeUri)
|
|
// docId format is "primary:RelativePath" for primary internal storage
|
|
if (docId.startsWith("primary:")) {
|
|
val relative = docId.removePrefix("primary:")
|
|
"/storage/emulated/0/$relative"
|
|
} else {
|
|
null
|
|
}
|
|
} catch (e: Exception) {
|
|
Timber.w("FileWatchService: could not resolve SAF URI to real path: $e")
|
|
null
|
|
}
|
|
}
|
|
|
|
private fun watchPath(path: String, pairId: Long, wifiOnly: Boolean, chargingOnly: Boolean) {
|
|
val dir = File(path)
|
|
if (!dir.exists()) {
|
|
Timber.w("FileWatchService: path does not exist for pair $pairId: $path")
|
|
return
|
|
}
|
|
fileObservers[pairId] = mutableListOf()
|
|
watchDirRecursive(dir, pairId, wifiOnly, chargingOnly)
|
|
Timber.d("FileWatchService: watching pair $pairId at $path (${fileObservers[pairId]?.size} dirs)")
|
|
scope.launch { catchupScan(pairId, dir, wifiOnly, chargingOnly) }
|
|
}
|
|
|
|
private fun watchDirRecursive(dir: File, pairId: Long, wifiOnly: Boolean, chargingOnly: Boolean) {
|
|
if (!dir.isDirectory) return
|
|
val mask = FileObserver.CREATE or FileObserver.DELETE or FileObserver.MODIFY or
|
|
FileObserver.MOVED_FROM or FileObserver.MOVED_TO
|
|
val observer = if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q) {
|
|
object : FileObserver(dir, mask) {
|
|
override fun onEvent(event: Int, path: String?) {
|
|
if (event and FileObserver.CREATE != 0 && path != null) {
|
|
val created = File(dir, path)
|
|
if (created.isDirectory) watchDirRecursive(created, pairId, wifiOnly, chargingOnly)
|
|
}
|
|
onChangeDetected(pairId, wifiOnly, chargingOnly)
|
|
}
|
|
}
|
|
} else {
|
|
@Suppress("DEPRECATION")
|
|
object : FileObserver(dir.absolutePath, mask) {
|
|
override fun onEvent(event: Int, path: String?) {
|
|
if (event and FileObserver.CREATE != 0 && path != null) {
|
|
val created = File(dir, path)
|
|
if (created.isDirectory) watchDirRecursive(created, pairId, wifiOnly, chargingOnly)
|
|
}
|
|
onChangeDetected(pairId, wifiOnly, chargingOnly)
|
|
}
|
|
}
|
|
}
|
|
observer.startWatching()
|
|
fileObservers.getOrPut(pairId) { mutableListOf() }.add(observer)
|
|
// Recursively watch existing subdirectories
|
|
dir.listFiles()?.filter { it.isDirectory }?.forEach { sub ->
|
|
watchDirRecursive(sub, pairId, wifiOnly, chargingOnly)
|
|
}
|
|
}
|
|
|
|
private suspend fun catchupScan(pairId: Long, dir: File, wifiOnly: Boolean, chargingOnly: Boolean) {
|
|
val known = fileStateDao.getForPair(pairId).associateBy { it.relativePath }
|
|
if (known.isEmpty()) return // Never synced — first sync will be triggered manually
|
|
|
|
val current = mutableMapOf<String, Long>()
|
|
dir.walk().filter { it.isFile }.forEach { f ->
|
|
current[f.relativeTo(dir).path.replace('\\', '/')] = f.lastModified()
|
|
}
|
|
|
|
val hasNew = current.any { (rel, _) -> rel !in known }
|
|
val hasModified = current.any { (rel, mtime) ->
|
|
val s = known[rel]; s != null && s.localModifiedAt != null &&
|
|
s.localModifiedAt.toEpochMilli() != mtime
|
|
}
|
|
val hasDeleted = known.keys.any { rel -> rel !in current }
|
|
|
|
if (hasNew || hasModified || hasDeleted) {
|
|
Timber.d("FileWatchService: catchup detected changes for pair $pairId, scheduling sync")
|
|
val pair = syncPairDao.getById(pairId) ?: return
|
|
// Set cooldown so file writes during this sync don't immediately re-trigger
|
|
syncCooldownUntil[pairId] = System.currentTimeMillis() + 120_000
|
|
val req = SyncWorker.buildOneTimeRequest(pairId, wifiOnly, chargingOnly)
|
|
WorkManager.getInstance(applicationContext)
|
|
.enqueueUniqueWork("catchup_$pairId", ExistingWorkPolicy.KEEP, req)
|
|
scope.launch {
|
|
try {
|
|
WorkManager.getInstance(applicationContext)
|
|
.getWorkInfoByIdFlow(req.id)
|
|
.first { it?.state?.isFinished == true }
|
|
syncCooldownUntil[pairId] = System.currentTimeMillis() + 60_000
|
|
} catch (e: CancellationException) {
|
|
throw e
|
|
} catch (_: Exception) {
|
|
syncCooldownUntil[pairId] = System.currentTimeMillis() + 60_000
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private fun onChangeDetected(pairId: Long, wifiOnly: Boolean, chargingOnly: Boolean) {
|
|
// Ignore events fired by our own sync writing files — prevents the feedback loop
|
|
// where downloaded/uploaded files trigger another sync indefinitely.
|
|
if (System.currentTimeMillis() < (syncCooldownUntil[pairId] ?: 0L)) {
|
|
Timber.d("FileWatchService: suppressing change event for pair $pairId (sync cooldown)")
|
|
return
|
|
}
|
|
|
|
debounceJobs[pairId]?.cancel()
|
|
debounceJobs[pairId] = scope.launch {
|
|
delay(5_000)
|
|
val pair = syncPairDao.getById(pairId)
|
|
if (pair == null || !pair.isEnabled) return@launch
|
|
Timber.d("FileWatchService: triggering sync for pair $pairId after debounce")
|
|
|
|
// Block new triggers from this point until 60s after sync completes
|
|
syncCooldownUntil[pairId] = System.currentTimeMillis() + 120_000
|
|
|
|
val req = SyncWorker.buildOneTimeRequest(pairId, wifiOnly, chargingOnly, silent = true)
|
|
WorkManager.getInstance(applicationContext)
|
|
.enqueueUniqueWork("onchange_$pairId", ExistingWorkPolicy.KEEP, req)
|
|
|
|
updateNotificationDynamic("Syncing: ${pair.name}…")
|
|
|
|
scope.launch {
|
|
try {
|
|
val info = WorkManager.getInstance(applicationContext)
|
|
.getWorkInfoByIdFlow(req.id)
|
|
.first { it?.state?.isFinished == true }
|
|
// Extend cooldown: 60s after sync finishes to let filesystem settle
|
|
syncCooldownUntil[pairId] = System.currentTimeMillis() + 60_000
|
|
val summary = info?.outputData?.getString(SyncWorker.KEY_RESULT_SUMMARY)
|
|
val watchCount = fileObservers.keys.size + contentObservers.size
|
|
val watching = "Watching $watchCount folder${if (watchCount != 1) "s" else ""}"
|
|
if (info?.state == WorkInfo.State.SUCCEEDED && summary != null) {
|
|
updateNotificationDynamic("${pair.name}: $summary — $watching")
|
|
} else {
|
|
updateNotificationDynamic("$watching")
|
|
}
|
|
delay(12_000)
|
|
updateNotificationDynamic(null)
|
|
} catch (e: CancellationException) {
|
|
throw e
|
|
} catch (_: Exception) {
|
|
syncCooldownUntil[pairId] = System.currentTimeMillis() + 60_000
|
|
updateNotificationDynamic(null)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private fun clearWatchers() {
|
|
fileObservers.values.flatten().forEach { it.stopWatching() }
|
|
fileObservers.clear()
|
|
contentObservers.values.forEach { contentResolver.unregisterContentObserver(it) }
|
|
contentObservers.clear()
|
|
debounceJobs.values.forEach { it.cancel() }
|
|
debounceJobs.clear()
|
|
syncCooldownUntil.clear()
|
|
}
|
|
|
|
private fun ensureChannel() {
|
|
val nm = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
|
|
if (nm.getNotificationChannel(CHANNEL_WATCH) == null) {
|
|
nm.createNotificationChannel(
|
|
NotificationChannel(CHANNEL_WATCH, "File watching", NotificationManager.IMPORTANCE_LOW).apply {
|
|
description = "Background service watching folders for changes"
|
|
setShowBadge(false)
|
|
}
|
|
)
|
|
}
|
|
}
|
|
|
|
private fun buildNotification(count: Int, overrideText: String? = null): Notification {
|
|
val tapIntent = PendingIntent.getActivity(
|
|
this, 0,
|
|
Intent(this, MainActivity::class.java).apply { flags = Intent.FLAG_ACTIVITY_SINGLE_TOP },
|
|
PendingIntent.FLAG_UPDATE_CURRENT or PendingIntent.FLAG_IMMUTABLE,
|
|
)
|
|
return NotificationCompat.Builder(this, CHANNEL_WATCH)
|
|
.setContentTitle("SyncFlow")
|
|
.setContentText(
|
|
overrideText ?: if (count > 0) "Watching $count folder${if (count != 1) "s" else ""} for changes"
|
|
else "Starting file watcher…"
|
|
)
|
|
.setSmallIcon(R.drawable.ic_sync)
|
|
.setContentIntent(tapIntent)
|
|
.setOngoing(true)
|
|
.setPriority(NotificationCompat.PRIORITY_MIN)
|
|
.build()
|
|
}
|
|
|
|
private fun updateNotification(count: Int) {
|
|
val nm = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
|
|
nm.notify(NOTIFICATION_ID, buildNotification(count))
|
|
}
|
|
|
|
private fun updateNotificationDynamic(overrideText: String?) {
|
|
val count = fileObservers.keys.size + contentObservers.size
|
|
val nm = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
|
|
nm.notify(NOTIFICATION_ID, buildNotification(count, overrideText))
|
|
}
|
|
}
|