Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ plugins {
alias(libs.plugins.keeper) apply false
alias(libs.plugins.kotlin.atomicfu) apply false
alias(libs.plugins.cocoapods) apply false
alias(libs.plugins.ksp) apply false
alias(libs.plugins.androidx.room) apply false
id("org.jetbrains.dokka") version libs.versions.dokkaBase
id("dokka-convention")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ internal class PowerSyncDatabaseImpl(
logger.d { "PowerSyncVersion: $powerSyncVersion" }

internalDb.writeTransaction { tx ->
tx.getOptional("SELECT powersync_init()") {}
tx.async.getOptional("SELECT powersync_init()") {}
}

updateSchemaInternal(schema)
Expand Down
22 changes: 12 additions & 10 deletions core/src/commonMain/kotlin/com/powersync/db/Queries.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,15 @@ import kotlin.time.Duration.Companion.milliseconds

public fun interface ThrowableTransactionCallback<R> {
@Throws(PowerSyncException::class, kotlinx.coroutines.CancellationException::class)
public fun execute(transaction: PowerSyncTransaction): R
public suspend fun execute(transaction: PowerSyncTransaction): R
}

public fun interface ThrowableLockCallback<R> {
@Throws(PowerSyncException::class, kotlinx.coroutines.CancellationException::class)
public fun execute(context: ConnectionContext): R
public suspend fun execute(context: ConnectionContext): R
}

public interface Queries {
public companion object {
/**
* The default throttle duration for [onChange] and [watch] operations.
*/
public val DEFAULT_THROTTLE: Duration = 30.milliseconds
}

public interface QueryRunner {
/**
* Executes a write query (INSERT, UPDATE, DELETE).
*
Expand Down Expand Up @@ -94,6 +87,15 @@ public interface Queries {
parameters: List<Any?>? = listOf(),
mapper: (SqlCursor) -> RowType,
): RowType?
}

public interface Queries : QueryRunner {
public companion object {
/**
* The default throttle duration for [onChange] and [watch] operations.
*/
public val DEFAULT_THROTTLE: Duration = 30.milliseconds
}

/**
* Returns a [Flow] that emits whenever the source tables are modified.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ internal class RawConnectionLease(
}

override suspend fun isInTransaction(): Boolean {
return isInTransactionSync()
}

override fun isInTransactionSync(): Boolean {
checkNotCompleted()
return connection.inTransaction()
}
Expand All @@ -31,10 +27,6 @@ internal class RawConnectionLease(
sql: String,
block: (SQLiteStatement) -> R
): R {
return usePreparedSync(sql, block)
}

override fun <R> usePreparedSync(sql: String, block: (SQLiteStatement) -> R): R {
checkNotCompleted()
return connection.prepare(sql).use(block)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,13 @@ public interface SQLiteConnectionLease {
*/
public suspend fun isInTransaction(): Boolean

public fun isInTransactionSync(): Boolean {
return runBlocking { isInTransaction() }
}

/**
* Prepares [sql] as statement and runs [block] with it.
*
* Block most only run on a single-thread. The statement must not be used once [block] returns.
*/
public suspend fun <R> usePrepared(sql: String, block: (SQLiteStatement) -> R): R

public fun <R> usePreparedSync(sql: String, block: (SQLiteStatement) -> R): R {
return runBlocking {
usePrepared(sql, block)
}
}

public suspend fun execSQL(sql: String) {
usePrepared(sql) {
it.step()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ package com.powersync.db.internal
import androidx.sqlite.SQLiteStatement
import com.powersync.ExperimentalPowerSyncAPI
import com.powersync.PowerSyncException
import com.powersync.db.QueryRunner
import com.powersync.db.SqlCursor
import com.powersync.db.StatementBasedCursor
import com.powersync.db.driver.SQLiteConnectionLease
import kotlinx.coroutines.runBlocking

public interface ConnectionContext {
// TODO (breaking): Make asynchronous, create shared superinterface with Queries

public val async: QueryRunner

@Throws(PowerSyncException::class)
public fun execute(
sql: String,
Expand Down Expand Up @@ -38,13 +42,53 @@ public interface ConnectionContext {
): RowType
}

@ExperimentalPowerSyncAPI
internal class ConnectionContextImplementation(
private val rawConnection: SQLiteConnectionLease,
) : ConnectionContext {
/**
* An implementation of a [ConnectionContext] that delegates to a [QueryRunner] via [runBlocking].
*/
internal abstract class BaseConnectionContextImplementation(): ConnectionContext {
override fun execute(
sql: String,
parameters: List<Any?>?,
): Long = runBlocking { async.execute(sql, parameters) }

override fun <RowType : Any> getOptional(
sql: String,
parameters: List<Any?>?,
mapper: (SqlCursor) -> RowType,
): RowType? = runBlocking {
async.getOptional(sql, parameters, mapper)
}

override fun <RowType : Any> getAll(
sql: String,
parameters: List<Any?>?,
mapper: (SqlCursor) -> RowType,
): List<RowType> =
runBlocking {
async.getAll(sql, parameters, mapper)
}

override fun <RowType : Any> get(
sql: String,
parameters: List<Any?>?,
mapper: (SqlCursor) -> RowType,
): RowType = runBlocking {
async.get(sql, parameters, mapper)
}
}

@OptIn(ExperimentalPowerSyncAPI::class)
internal class ConnectionContextImplementation(lease: SQLiteConnectionLease): BaseConnectionContextImplementation() {
override val async = ContextQueryRunner(lease)
}

@OptIn(ExperimentalPowerSyncAPI::class)
internal class ContextQueryRunner(
private val rawConnection: SQLiteConnectionLease
): QueryRunner {
override suspend fun execute(
sql: String,
parameters: List<Any?>?
): Long {
withStatement(sql, parameters) {
while (it.step()) {
Expand All @@ -58,45 +102,43 @@ internal class ConnectionContextImplementation(
}
}

override fun <RowType : Any> getOptional(
override suspend fun <RowType : Any> get(
sql: String,
parameters: List<Any?>?,
mapper: (SqlCursor) -> RowType,
): RowType? =
withStatement(sql, parameters) { stmt ->
if (stmt.step()) {
mapper(StatementBasedCursor(stmt))
} else {
null
}
}
mapper: (SqlCursor) -> RowType
): RowType = getOptional(sql, parameters, mapper) ?: throw PowerSyncException("get() called with query that returned no rows", null)

override fun <RowType : Any> getAll(
override suspend fun <RowType : Any> getAll(
sql: String,
parameters: List<Any?>?,
mapper: (SqlCursor) -> RowType,
): List<RowType> =
withStatement(sql, parameters) { stmt ->
buildList {
val cursor = StatementBasedCursor(stmt)
while (stmt.step()) {
add(mapper(cursor))
}
mapper: (SqlCursor) -> RowType
): List<RowType> = withStatement(sql, parameters) { stmt ->
buildList {
val cursor = StatementBasedCursor(stmt)
while (stmt.step()) {
add(mapper(cursor))
}
}
}

override fun <RowType : Any> get(
override suspend fun <RowType : Any> getOptional(
sql: String,
parameters: List<Any?>?,
mapper: (SqlCursor) -> RowType,
): RowType = getOptional(sql, parameters, mapper) ?: throw PowerSyncException("get() called with query that returned no rows", null)
mapper: (SqlCursor) -> RowType
): RowType? = withStatement(sql, parameters) { stmt ->
if (stmt.step()) {
mapper(StatementBasedCursor(stmt))
} else {
null
}
}

private inline fun <T> withStatement(
private suspend inline fun <T> withStatement(
sql: String,
parameters: List<Any?>?,
crossinline block: (SQLiteStatement) -> T,
): T {
return rawConnection.usePreparedSync(sql) { stmt ->
return rawConnection.usePrepared(sql) { stmt ->
stmt.bind(parameters)
block(stmt)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ internal class InternalDatabaseImpl(
parameters: List<Any?>?,
): Long =
writeLock { context ->
context.execute(sql, parameters)
context.async.execute(sql, parameters)
}

override suspend fun updateSchema(schemaJson: String) {
withContext(dbContext) {
runWrapped {
pool.withAllConnections { writer, readers ->
writer.runTransaction { tx ->
tx.getOptional(
tx.async.getOptional(
"SELECT powersync_replace_schema(?);",
listOf(schemaJson),
) {}
Expand All @@ -63,19 +63,19 @@ internal class InternalDatabaseImpl(
sql: String,
parameters: List<Any?>?,
mapper: (SqlCursor) -> RowType,
): RowType = readLock { connection -> connection.get(sql, parameters, mapper) }
): RowType = readLock { connection -> connection.async.get(sql, parameters, mapper) }

override suspend fun <RowType : Any> getAll(
sql: String,
parameters: List<Any?>?,
mapper: (SqlCursor) -> RowType,
): List<RowType> = readLock { connection -> connection.getAll(sql, parameters, mapper) }
): List<RowType> = readLock { connection -> connection.async.getAll(sql, parameters, mapper) }

override suspend fun <RowType : Any> getOptional(
sql: String,
parameters: List<Any?>?,
mapper: (SqlCursor) -> RowType,
): RowType? = readLock { connection -> connection.getOptional(sql, parameters, mapper) }
): RowType? = readLock { connection -> connection.async.getOptional(sql, parameters, mapper) }

override fun onChange(
tables: Set<String>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,65 @@ package com.powersync.db.internal

import com.powersync.ExperimentalPowerSyncAPI
import com.powersync.PowerSyncException
import com.powersync.db.QueryRunner
import com.powersync.db.SqlCursor
import com.powersync.db.driver.SQLiteConnectionLease

public interface PowerSyncTransaction : ConnectionContext

@ExperimentalPowerSyncAPI
internal class PowerSyncTransactionImpl(
private val lease: SQLiteConnectionLease,
) : PowerSyncTransaction,
ConnectionContext {
private val delegate = ConnectionContextImplementation(lease)
lease: SQLiteConnectionLease,
) : PowerSyncTransaction, BaseConnectionContextImplementation() {
override val async = AsyncPowerSyncTransactionImpl(lease)
}

@OptIn(ExperimentalPowerSyncAPI::class)
internal class AsyncPowerSyncTransactionImpl(
private val lease: SQLiteConnectionLease
): QueryRunner {

private val delegate = ContextQueryRunner(lease)

private fun checkInTransaction() {
if (!lease.isInTransactionSync()) {
private suspend fun checkInTransaction() {
if (!lease.isInTransaction()) {
throw PowerSyncException("Tried executing statement on a transaction that has been rolled back", cause = null)
}
}

override fun execute(
override suspend fun execute(
sql: String,
parameters: List<Any?>?,
parameters: List<Any?>?
): Long {
checkInTransaction()
return delegate.execute(sql, parameters)
}

override fun <RowType : Any> getOptional(
override suspend fun <RowType : Any> get(
sql: String,
parameters: List<Any?>?,
mapper: (SqlCursor) -> RowType,
): RowType? {
mapper: (SqlCursor) -> RowType
): RowType {
checkInTransaction()
return delegate.getOptional(sql, parameters, mapper)
return delegate.get(sql, parameters, mapper)
}

override fun <RowType : Any> getAll(
override suspend fun <RowType : Any> getAll(
sql: String,
parameters: List<Any?>?,
mapper: (SqlCursor) -> RowType,
mapper: (SqlCursor) -> RowType
): List<RowType> {
checkInTransaction()
return delegate.getAll(sql, parameters, mapper)
}

override fun <RowType : Any> get(
override suspend fun <RowType : Any> getOptional(
sql: String,
parameters: List<Any?>?,
mapper: (SqlCursor) -> RowType,
): RowType {
mapper: (SqlCursor) -> RowType
): RowType? {
checkInTransaction()
return delegate.get(sql, parameters, mapper)
return delegate.getOptional(sql, parameters, mapper)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public actual class DatabaseDriverFactory {
}

public actual fun BundledSQLiteDriver.addPowerSyncExtension() {
addExtension(powersyncExtension, "sqlite3_powersync_init")
addExtension("/Users/simon/src/powersync-sqlite-core/target/debug/libpowersync.dylib", "sqlite3_powersync_init")
}

private val powersyncExtension: String by lazy { extractLib("powersync") }
Loading