Skip to content

SKIP LOCKED clause for SELECT WITH LOCK, UPDATE and DELETE #7350

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## New features

* [#7350](https://github.com/FirebirdSQL/firebird/pull/7350): SKIP LOCKED clause for SELECT WITH LOCK, UPDATE and DELETE
Reference(s): [/doc/sql.extensions/README.skip_locked.md](https://github.com/FirebirdSQL/firebird/raw/master/doc/sql.extensions/README.skip_locked.md)
Contributor(s): Adriano dos Santos Fernandes

* [#7216](https://github.com/FirebirdSQL/firebird/pull/7216): New built-in function `BLOB_APPEND`
Reference(s): [/doc/sql.extensions/README.blob_append.md](https://github.com/FirebirdSQL/firebird/raw/master/doc/sql.extensions/README.blob_append.md)
Contributor(s): Vlad Khorsun
Expand Down
93 changes: 93 additions & 0 deletions doc/sql.extensions/README.skip_locked.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# SKIP LOCKED clause (FB 5.0)

`SKIP LOCKED` clause can be used with `SELECT ... WITH LOCK`, `UPDATE` and `DELETE` statements.

It makes engine skip records locked by others transactions instead of wait on them or raise conflict errors.

This is very useful to implement work queues where one or more processes post work to a table and issue
an event while workers listen for events and read/delete items from the table. Using `SKIP LOCKED` multiple
workers can get exclusive work items from the table without conflicts.

## Syntax

```
SELECT
[FIRST ...]
[SKIP ...]
FROM <sometable>
[WHERE ...]
[PLAN ...]
[ORDER BY ...]
[{ ROWS ... } | {OFFSET ...} | {FETCH ...}]
[FOR UPDATE [OF ...]]
[WITH LOCK [SKIP LOCKED]]
```

```
UPDATE <sometable>
SET ...
[WHERE ...]
[PLAN ...]
[ORDER BY ...]
[ROWS ...]
[SKIP LOCKED]
[RETURNING ...]
```

```
DELETE FROM <sometable>
[WHERE ...]
[PLAN ...]
[ORDER BY ...]
[ROWS ...]
[SKIP LOCKED]
[RETURNING ...]
```

## Notes

As it happens with subclauses `FIRST`/`SKIP`/`ROWS`/`OFFSET`/`FETCH` record lock
(and "skip locked" check) is done in between of skip (`SKIP`/`ROWS`/`OFFSET`/`FETCH`) and
limit (`FIRST`/`ROWS`/`OFFSET`/`FETCH`) checks.

## Examples

### Prepare metadata

```
create table emails_queue (
subject varchar(60) not null,
text blob sub_type text not null
);

set term !;

create trigger emails_queue_ins after insert on emails_queue
as
begin
post_event('EMAILS_QUEUE');
end!

set term ;!
```

### Sender application or routine

```
insert into emails_queue (subject, text)
values ('E-mail subject', 'E-mail text...');
commit;
```

### Client application

Client application can listen to event `EMAILS_QUEUE` to actually send e-mails using this query:

```
delete from emails_queue
rows 10
skip locked
returning subject, text;
```

More than one instance of the application may be running, for example to load balance work.
1 change: 1 addition & 0 deletions src/common/keywords.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ static const TOK tokens[] =
{TOK_LOCALTIME, "LOCALTIME", false},
{TOK_LOCALTIMESTAMP, "LOCALTIMESTAMP", false},
{TOK_LOCK, "LOCK", true},
{TOK_LOCKED, "LOCKED", true},
{TOK_LOG, "LOG", true},
{TOK_LOG10, "LOG10", true},
{TOK_LONG, "LONG", false},
Expand Down
4 changes: 2 additions & 2 deletions src/dsql/BoolNodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1117,7 +1117,7 @@ BoolExprNode* ComparativeBoolNode::createRseNode(DsqlCompilerScratch* dsqlScratc
const DsqlContextStack::iterator baseDT(dsqlScratch->derivedContext);
const DsqlContextStack::iterator baseUnion(dsqlScratch->unionContext);

RseNode* rse = PASS1_rse(dsqlScratch, select_expr, false);
RseNode* rse = PASS1_rse(dsqlScratch, select_expr, false, false);
rse->flags |= RseNode::FLAG_DSQL_COMPARATIVE;

// Create a conjunct to be injected.
Expand Down Expand Up @@ -1480,7 +1480,7 @@ BoolExprNode* RseBoolNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
const DsqlContextStack::iterator base(*dsqlScratch->context);

RseBoolNode* node = FB_NEW_POOL(dsqlScratch->getPool()) RseBoolNode(dsqlScratch->getPool(), blrOp,
PASS1_rse(dsqlScratch, nodeAs<SelectExprNode>(dsqlRse), false));
PASS1_rse(dsqlScratch, nodeAs<SelectExprNode>(dsqlRse), false, false));

// Finish off by cleaning up contexts
dsqlScratch->context->clear(base);
Expand Down
2 changes: 1 addition & 1 deletion src/dsql/DdlNodes.epp
Original file line number Diff line number Diff line change
Expand Up @@ -8735,7 +8735,7 @@ void CreateAlterViewNode::execute(thread_db* tdbb, DsqlCompilerScratch* dsqlScra

dsqlScratch->resetContextStack();
++dsqlScratch->contextNumber;
RseNode* rse = PASS1_rse(dsqlScratch, selectExpr, false);
RseNode* rse = PASS1_rse(dsqlScratch, selectExpr, false, false);

dsqlScratch->getBlrData().clear();
dsqlScratch->appendUChar(dsqlScratch->isVersion4() ? blr_version4 : blr_version5);
Expand Down
2 changes: 1 addition & 1 deletion src/dsql/ExprNodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11091,7 +11091,7 @@ ValueExprNode* SubQueryNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)

const DsqlContextStack::iterator base(*dsqlScratch->context);

RseNode* rse = PASS1_rse(dsqlScratch, nodeAs<SelectExprNode>(dsqlRse), false);
RseNode* rse = PASS1_rse(dsqlScratch, nodeAs<SelectExprNode>(dsqlRse), false, false);

SubQueryNode* node = FB_NEW_POOL(dsqlScratch->getPool()) SubQueryNode(dsqlScratch->getPool(), blrOp, rse,
rse->dsqlSelectList->items[0], NullNode::instance());
Expand Down
17 changes: 12 additions & 5 deletions src/dsql/StmtNodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,7 @@ DeclareCursorNode* DeclareCursorNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
dt->querySpec = dsqlSelect->dsqlExpr;
dt->alias = dsqlName.c_str();

rse = PASS1_derived_table(dsqlScratch, dt, NULL, dsqlSelect->dsqlWithLock);
rse = PASS1_derived_table(dsqlScratch, dt, NULL, dsqlSelect->dsqlWithLock, dsqlSelect->dsqlSkipLocked);

// Assign number and store in the dsqlScratch stack.
cursorNumber = dsqlScratch->cursorNumber++;
Expand Down Expand Up @@ -2254,6 +2254,7 @@ StmtNode* EraseNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)

const auto node = FB_NEW_POOL(dsqlScratch->getPool()) EraseNode(dsqlScratch->getPool());
node->dsqlCursorName = dsqlCursorName;
node->dsqlSkipLocked = dsqlSkipLocked;

if (dsqlCursorName.hasData() && dsqlScratch->isPsql())
{
Expand Down Expand Up @@ -2302,6 +2303,9 @@ StmtNode* EraseNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)

if (dsqlRows)
PASS1_limit(dsqlScratch, dsqlRows->length, dsqlRows->skip, rse);

if (dsqlSkipLocked)
rse->flags |= RseNode::FLAG_WRITELOCK | RseNode::FLAG_SKIP_LOCKED;
}

if (dsqlReturning && dsqlScratch->isPsql())
Expand Down Expand Up @@ -4917,7 +4921,7 @@ ForNode* ForNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
dt->querySpec = dsqlSelect->dsqlExpr;
dt->alias = dsqlCursor->dsqlName.c_str();

node->rse = PASS1_derived_table(dsqlScratch, dt, NULL, dsqlSelect->dsqlWithLock);
node->rse = PASS1_derived_table(dsqlScratch, dt, NULL, dsqlSelect->dsqlWithLock, dsqlSelect->dsqlSkipLocked);

dsqlCursor->rse = node->rse;
dsqlCursor->cursorNumber = dsqlScratch->cursorNumber++;
Expand Down Expand Up @@ -6583,6 +6587,9 @@ StmtNode* ModifyNode::internalDsqlPass(DsqlCompilerScratch* dsqlScratch, bool up

if (dsqlRows)
PASS1_limit(dsqlScratch, dsqlRows->length, dsqlRows->skip, rse);

if (dsqlSkipLocked)
rse->flags |= RseNode::FLAG_WRITELOCK | RseNode::FLAG_SKIP_LOCKED;
}

node->dsqlReturning = dsqlProcessReturning(dsqlScratch,
Expand Down Expand Up @@ -7508,7 +7515,7 @@ StmtNode* StoreNode::internalDsqlPass(DsqlCompilerScratch* dsqlScratch,
if (dsqlRse && dsqlScratch->isPsql() && dsqlReturning)
selExpr->dsqlFlags |= RecordSourceNode::DFLAG_SINGLETON;

RseNode* rse = PASS1_rse(dsqlScratch, selExpr, false);
RseNode* rse = PASS1_rse(dsqlScratch, selExpr, false, false);
node->dsqlRse = rse;
values = rse->dsqlSelectList;
needSavePoint = false;
Expand Down Expand Up @@ -8188,7 +8195,7 @@ SelectNode* SelectNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
node->dsqlForUpdate = dsqlForUpdate;

const DsqlContextStack::iterator base(*dsqlScratch->context);
node->dsqlRse = PASS1_rse(dsqlScratch, dsqlExpr, dsqlWithLock);
node->dsqlRse = PASS1_rse(dsqlScratch, dsqlExpr, dsqlWithLock, dsqlSkipLocked);
dsqlScratch->context->clear(base);

if (dsqlForUpdate)
Expand Down Expand Up @@ -10532,7 +10539,7 @@ static void forceWriteLock(thread_db * tdbb, record_param * rpb, jrd_tra * trans

// VIO_writelock returns false if record has been deleted or modified
// by someone else.
if (VIO_writelock(tdbb, rpb, transaction))
if (VIO_writelock(tdbb, rpb, transaction, false) == WriteLockResult::LOCKED)
break;
}
}
Expand Down
35 changes: 11 additions & 24 deletions src/dsql/StmtNodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -537,19 +537,7 @@ class EraseNode final : public TypedNode<StmtNode, StmtNode::TYPE_ERASE>
public:
explicit EraseNode(MemoryPool& pool)
: TypedNode<StmtNode, StmtNode::TYPE_ERASE>(pool),
dsqlRelation(NULL),
dsqlBoolean(NULL),
dsqlPlan(NULL),
dsqlOrder(NULL),
dsqlRows(NULL),
dsqlCursorName(pool),
dsqlReturning(NULL),
dsqlRse(NULL),
dsqlContext(NULL),
statement(NULL),
subStatement(NULL),
stream(0),
marks(0)
dsqlCursorName(pool)
{
}

Expand All @@ -576,12 +564,13 @@ class EraseNode final : public TypedNode<StmtNode, StmtNode::TYPE_ERASE>
MetaName dsqlCursorName;
NestConst<ReturningClause> dsqlReturning;
NestConst<RseNode> dsqlRse;
dsql_ctx* dsqlContext;
bool dsqlSkipLocked = false;
dsql_ctx* dsqlContext = nullptr;
NestConst<StmtNode> statement;
NestConst<StmtNode> subStatement;
NestConst<ForNode> forNode; // parent implicit cursor, if present
StreamType stream;
unsigned marks; // see StmtNode::IUD_MARK_xxx
StreamType stream = 0;
unsigned marks = 0; // see StmtNode::IUD_MARK_xxx
};


Expand Down Expand Up @@ -1186,6 +1175,7 @@ class ModifyNode final : public TypedNode<StmtNode, StmtNode::TYPE_MODIFY>
NestConst<ReturningClause> dsqlReturning;
NestConst<RecordSourceNode> dsqlRse;
dsql_ctx* dsqlContext = nullptr;
bool dsqlSkipLocked = false;
NestConst<StmtNode> statement;
NestConst<StmtNode> statement2;
NestConst<StmtNode> subMod;
Expand Down Expand Up @@ -1333,11 +1323,7 @@ class SelectNode final : public TypedNode<StmtNode, StmtNode::TYPE_SELECT>
public:
explicit SelectNode(MemoryPool& pool)
: TypedNode<StmtNode, StmtNode::TYPE_SELECT>(pool),
dsqlExpr(NULL),
dsqlRse(NULL),
statements(pool),
dsqlForUpdate(false),
dsqlWithLock(false)
statements(pool)
{
}

Expand All @@ -1354,9 +1340,10 @@ class SelectNode final : public TypedNode<StmtNode, StmtNode::TYPE_SELECT>
public:
NestConst<SelectExprNode> dsqlExpr;
NestConst<RseNode> dsqlRse;
Firebird::Array<NestConst<StmtNode> > statements;
bool dsqlForUpdate;
bool dsqlWithLock;
Firebird::Array<NestConst<StmtNode>> statements;
bool dsqlForUpdate = false;
bool dsqlWithLock = false;
bool dsqlSkipLocked = false;
};


Expand Down
3 changes: 3 additions & 0 deletions src/dsql/gen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,9 @@ void GEN_rse(DsqlCompilerScratch* dsqlScratch, RseNode* rse)
if (rse->flags & RseNode::FLAG_WRITELOCK)
dsqlScratch->appendUChar(blr_writelock);

if (rse->flags & RseNode::FLAG_SKIP_LOCKED)
dsqlScratch->appendUChar(blr_skip_locked);

if (rse->dsqlFirst)
{
dsqlScratch->appendUChar(blr_first);
Expand Down
2 changes: 1 addition & 1 deletion src/dsql/parse-conflicts.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
68 shift/reduce conflicts, 18 reduce/reduce conflicts.
68 shift/reduce conflicts, 19 reduce/reduce conflicts.
Loading