Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 24 additions & 30 deletions skipruntime-ts/adapters/postgres/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ pg.types.setTypeParser(pg.types.builtins.TIMESTAMPTZ, (x: string) => x);
*/
export class PostgresExternalService implements ExternalService {
private client: pg.Client;
private clientID: string;
private open_instances: Set<string> = new Set<string>();

isConnected(): boolean {
Expand All @@ -48,33 +47,27 @@ export class PostgresExternalService implements ExternalService {
user: string;
password: string;
}) {
// generate random client ID for PostgreSQL notifications
this.clientID = "skip_pg_client_" + Math.random().toString(36).slice(2);

this.client = new pg.Client(db_config);
this.client
.connect()
.then(() => this.client.query(format(`LISTEN %I;`, this.clientID)))
.then(
() => {
const handler = () => {
void this.shutdown().then(() => process.exit());
};
[
"SIGINT",
"SIGTERM",
"SIGUSR1",
"SIGUSR2",
"uncaughtException",
].forEach((sig) => process.on(sig, handler));
},
(e: unknown) => {
console.error(
"Error connecting to Postgres at " + JSON.stringify(db_config),
);
throw e;
},
);
this.client.connect().then(
() => {
const handler = () => {
void this.shutdown().then(() => process.exit());
};
[
"SIGINT",
"SIGTERM",
"SIGUSR1",
"SIGUSR2",
"uncaughtException",
].forEach((sig) => process.on(sig, handler));
},
(e: unknown) => {
console.error(
"Error connecting to Postgres at " + JSON.stringify(db_config),
);
throw e;
},
);
}

/**
Expand Down Expand Up @@ -129,12 +122,13 @@ BEGIN
RETURN NULL;
END $f$ LANGUAGE PLPGSQL;`,
instance,
this.clientID,
instance,
key.col,
this.clientID,
instance,
key.col,
),
);
await this.client.query(format(`LISTEN %I;`, instance));
await this.client.query(
format(
`
Expand All @@ -148,7 +142,7 @@ FOR EACH ROW EXECUTE FUNCTION %I();`,
);
}
this.client.on("notification", (msg) => {
if (msg.payload != undefined) {
if (msg.channel == instance && msg.payload !== undefined) {
const query = key.select(table, msg.payload);
this.client.query(query).then(
(changes) => {
Expand Down
51 changes: 43 additions & 8 deletions skipruntime-ts/tests/src/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,16 @@ class PostgresResource implements Resource<Input_NN> {
params: { key: { col: "id", type: "INTEGER" } },
})
.map(PostgresRowExtract);
return collections.input.map(PointwiseSum, pg_data);
const pg_data2: EagerCollection<number, number> = context
.useExternalResource<number, PostgresRow>({
service: "postgres",
identifier: "skip_test2",
params: { key: { col: "id", type: "INTEGER" } },
})
.map(PostgresRowExtract);
return collections.input
.map(PointwiseSum, pg_data)
.map(PointwiseSum, pg_data2);
}
}

Expand Down Expand Up @@ -621,7 +630,10 @@ async function trySetupDB(
await pgSetupClient.query(`
DROP TABLE IF EXISTS skip_test;
CREATE TABLE skip_test (id INTEGER PRIMARY KEY, x INTEGER, "createdAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
INSERT INTO skip_test (id, x) VALUES (1, 1), (2, 2), (3, 3);`);
INSERT INTO skip_test (id, x) VALUES (1, 1), (2, 2), (3, 3);
DROP TABLE IF EXISTS skip_test2;
CREATE TABLE skip_test2 (id INTEGER PRIMARY KEY, x INTEGER, "createdAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
INSERT INTO skip_test2 (id, x) VALUES (1, 100), (2, 200), (3, 300);`);
await pgSetupClient.end();
pgIsSetup = true;
}
Expand Down Expand Up @@ -1161,8 +1173,10 @@ export function initTests(

it("testPostgres", async () => {
let service;
const pgClient = new pg.Client(pg_config);
try {
service = await initService(await postgresService());
await pgClient.connect();
} catch {
if ("CIRCLECI" in process.env) {
throw new Error("Failed to set up CircleCI environment with Postgres.");
Expand Down Expand Up @@ -1194,23 +1208,44 @@ export function initTests(
[3, [30]],
]);

let count = 0;
let retries = 0;
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
while (true) {
try {
await timeout(5 + 100 * 2 ** retries);
expect(service.getAll("resource").payload).toEqual([
[1, [111]],
[2, [222]],
[3, [333]],
]);
break;
} catch (e: unknown) {
if (retries < 2) retries++;
else throw e;
}
}
await pgClient.query("UPDATE skip_test SET x = 1000 WHERE id = 1;");
await pgClient.query("DELETE FROM skip_test WHERE id = 2;");
retries = 0;
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
while (true) {
try {
await timeout(5);
await timeout(5 + 100 * 2 ** retries);
expect(service.getAll("resource").payload).toEqual([
[1, [11]],
[2, [22]],
[3, [33]],
[1, [1110]],
[2, [220]],
[3, [333]],
]);
break;
} catch (e: unknown) {
if (count < 2) count++;
if (retries < 2) retries++;
else throw e;
}
}
} finally {
await pgClient.query("DELETE FROM skip_test WHERE id = 1;");
await pgClient.query("INSERT INTO skip_test (id, x) VALUES (1,1),(2,2);");
await pgClient.end();
await service.close();
}
});
Expand Down