-
Notifications
You must be signed in to change notification settings - Fork 24
Fix for high concurrency issue #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ function PostgresDB(options) { | |
this.closed = false; | ||
|
||
this.pg_config = options; | ||
this.pool = new pg.Pool(this.pg_config) | ||
|
||
}; | ||
module.exports = PostgresDB; | ||
|
||
|
@@ -20,11 +21,6 @@ PostgresDB.prototype.close = function(callback) { | |
if (callback) callback(); | ||
}; | ||
|
||
function rollback(client, done) { | ||
client.query('ROLLBACK', function(err) { | ||
return done(err); | ||
}) | ||
} | ||
|
||
// Persists an op and snapshot if it is for the next version. Calls back with | ||
// callback(err, succeeded) | ||
|
@@ -39,91 +35,53 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca | |
* } | ||
* snapshot: PostgresSnapshot | ||
*/ | ||
pg.connect(this.pg_config, function(err, client, done) { | ||
if (err) { | ||
done(client); | ||
callback(err); | ||
return; | ||
} | ||
function commit() { | ||
client.query('COMMIT', function(err) { | ||
done(err); | ||
if (err) { | ||
callback(err); | ||
} else { | ||
callback(null, true); | ||
} | ||
}) | ||
this.pool.connect((err, client, done) => { | ||
if (err) { | ||
done(client); | ||
callback(err); | ||
return; | ||
} | ||
/*const*/ var query = { | ||
|
||
name: 'sdb-commit-op-and-snap', | ||
text: `With snaps as ( | ||
Insert into snapshots (collection,doc_id,doc_type, version,data) | ||
Select n.* From ( select $1 c, $2 d, $4 t, $3::integer v, $5::jsonb daa) | ||
n | ||
where v = (select version+1 v from snapshots where collection = $1 and doc_id = $2 for update) or not exists (select 1 from snapshots where collection = $1 and doc_id = $2 for update) | ||
On conflict(collection, doc_id) do update set version = $3, data = $5 , doc_type = $4 | ||
Returning version | ||
) | ||
Insert into ops (collection,doc_id, version,operation) | ||
Select n.* From ( select $1 c, $2 t, $3::integer v, $6::jsonb daa) | ||
n | ||
where (v = (select max(version)+1 v from ops where collection = $1 and doc_id = $2) or not exists (select 1 from ops where collection = $1 and doc_id = $2 for update)) and exists (select 1 from snaps) | ||
Returning version`, | ||
|
||
values: [collection,id,snapshot.v, snapshot.type, snapshot.data,op] | ||
} | ||
client.query( | ||
'SELECT max(version) AS max_version FROM ops WHERE collection = $1 AND doc_id = $2', | ||
[collection, id], | ||
function(err, res) { | ||
var max_version = res.rows[0].max_version; | ||
if (max_version == null) | ||
max_version = 0; | ||
if (snapshot.v !== max_version + 1) { | ||
return callback(null, false); | ||
} | ||
client.query('BEGIN', function(err) { | ||
client.query( | ||
'INSERT INTO ops (collection, doc_id, version, operation) VALUES ($1, $2, $3, $4)', | ||
[collection, id, snapshot.v, op], | ||
function(err, res) { | ||
if (err) { | ||
// TODO: if err is "constraint violation", callback(null, false) instead | ||
rollback(client, done); | ||
callback(err); | ||
return; | ||
} | ||
if (snapshot.v === 1) { | ||
client.query( | ||
'INSERT INTO snapshots (collection, doc_id, doc_type, version, data) VALUES ($1, $2, $3, $4, $5)', | ||
[collection, id, snapshot.type, snapshot.v, snapshot.data], | ||
function(err, res) { | ||
// TODO: | ||
// if the insert was successful and did insert, callback(null, true) | ||
// if the insert was successful and did not insert, callback(null, false) | ||
// if there was an error, rollback and callback(error) | ||
if (err) { | ||
rollback(client, done); | ||
callback(err); | ||
return; | ||
} | ||
commit(); | ||
} | ||
) | ||
} else { | ||
client.query( | ||
'UPDATE snapshots SET doc_type = $3, version = $4, data = $5 WHERE collection = $1 AND doc_id = $2 AND version = ($4 - 1)', | ||
[collection, id, snapshot.type, snapshot.v, snapshot.data], | ||
function(err, res) { | ||
// TODO: | ||
// if any rows were updated, success | ||
// if 0 rows were updated, rollback and not success | ||
// if error, rollback and not success | ||
if (err) { | ||
rollback(client, done); | ||
callback(err); | ||
return; | ||
} | ||
commit(); | ||
} | ||
) | ||
} | ||
} | ||
) | ||
}) | ||
client.query(query, (err, res) => { | ||
if (err) { | ||
console.log(err.stack) | ||
|
||
callback(err) | ||
} else if(res.rows.length === 0) { | ||
done(client); | ||
console.log("Unable to commit, not the latest version") | ||
callback(null,false) | ||
} | ||
else { | ||
done(client); | ||
console.log(res.rows[0]) | ||
callback(null,true) | ||
} | ||
) | ||
}) | ||
}) | ||
|
||
}) | ||
}; | ||
|
||
// Get the named document from the database. The callback is called with (err, | ||
// snapshot). A snapshot with a version of zero is returned if the docuemnt | ||
// has never been created in the database. | ||
PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, callback) { | ||
pg.connect(this.pg_config, function(err, client, done) { | ||
this.pool.connect(function(err, client, done) { | ||
if (err) { | ||
done(client); | ||
callback(err); | ||
|
@@ -173,7 +131,7 @@ PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, cal | |
// | ||
// Callback should be called as callback(error, [list of ops]); | ||
PostgresDB.prototype.getOps = function(collection, id, from, to, options, callback) { | ||
pg.connect(this.pg_config, function(err, client, done) { | ||
this.pool.connect(function(err, client, done) { | ||
if (err) { | ||
done(client); | ||
callback(err); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,3 +14,13 @@ CREATE TABLE snapshots ( | |
data json not null, | ||
PRIMARY KEY (collection, doc_id) | ||
); | ||
|
||
ALTER TABLE ops | ||
|
||
ALTER COLUMN operation | ||
SET DATA TYPE jsonb | ||
USING operation::jsonb; | ||
|
||
ALTER TABLE snapshots | ||
ALTER COLUMN data | ||
SET DATA TYPE jsonb | ||
USING data::jsonb; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can change this back :)