Skip to content

Commit da677ad

Browse files
authored
EH pipeline fix, REPL loop fix (#28)
* 2 pipeline fixes ( only for socket gw ) * sockfixes * fix EH in pipelining single mode (cma) * link stage fix * fix dist path * paths * paths * check pipeline inside EH * sync * arm docket build test * compress ext before exit docker * typo/compression * docker link stage * linking * missing extra * docker * script path * test X case * fix flush after X * chunking read in sf mode + ex handler canary * chunking read in sf mode + ex handler canary
1 parent 331de58 commit da677ad

31 files changed

+2500
-1296
lines changed

.buildconfig

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ PG_VERSION=17.4
22
PG_BRANCH=REL_17_4_WASM
33

44
SDK_VERSION=3.1.74.6bi
5-
WASI_SDK_VERSION=24.0.4
5+
WASI_SDK_VERSION=25.0.0
66
SDKROOT=/tmp/sdk
77

88
PG_DIST=/tmp/sdk/dist

patches-REL_17_4_WASM/postgresql-emscripten/src-backend-commands-async.c.diff

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
* just log a low-level debug message if it happens.
66
*/
77
+#if defined(__EMSCRIPTEN__) || defined(__wasi__)
8-
+ HandleNotifyInterrupt();
8+
+ HandleNotifyInterrupt();
99
+#else
1010
if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
1111
elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
--- REL_17_4/src/backend/libpq/pqcomm.c
2+
+++ pglite-REL_17_4/src/backend/libpq/pqcomm.c
3+
@@ -122,10 +122,18 @@
4+
static int PqSendBufferSize; /* Size send buffer */
5+
static size_t PqSendPointer; /* Next index to store a byte in PqSendBuffer */
6+
static size_t PqSendStart; /* Next index to send a byte in PqSendBuffer */
7+
-
8+
+#if !defined(__EMSCRIPTEN__) && !defined(__wasi__)
9+
static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
10+
static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
11+
static int PqRecvLength; /* End of data available in PqRecvBuffer */
12+
+#else
13+
+static char PqRecvBuffer_static[PQ_RECV_BUFFER_SIZE];
14+
+static char *PqRecvBuffer;
15+
+static int PqRecvPointer;
16+
+static int PqRecvLength;
17+
+volatile int querylen = 0;
18+
+volatile FILE* queryfp = NULL;
19+
+#endif
20+
21+
/*
22+
* Message status
23+
@@ -135,6 +143,7 @@
24+
25+
26+
/* Internal functions */
27+
+
28+
static void socket_comm_reset(void);
29+
static void socket_close(int code, Datum arg);
30+
static void socket_set_nonblocking(bool nonblocking);
31+
@@ -148,9 +157,6 @@
32+
static pg_noinline int internal_flush_buffer(const char *buf, size_t *start,
33+
size_t *end);
34+
35+
-static int Lock_AF_UNIX(const char *unixSocketDir, const char *unixSocketPath);
36+
-static int Setup_AF_UNIX(const char *sock_path);
37+
-
38+
static const PQcommMethods PqCommSocketMethods = {
39+
.comm_reset = socket_comm_reset,
40+
.flush = socket_flush,
41+
@@ -160,6 +166,10 @@
42+
.putmessage_noblock = socket_putmessage_noblock
43+
};
44+
45+
+static int Lock_AF_UNIX(const char *unixSocketDir, const char *unixSocketPath);
46+
+static int Setup_AF_UNIX(const char *sock_path);
47+
+
48+
+
49+
const PQcommMethods *PqCommMethods = &PqCommSocketMethods;
50+
51+
WaitEventSet *FeBeWaitSet;
52+
@@ -181,7 +191,7 @@
53+
port->sock = client_sock->sock;
54+
memcpy(&port->raddr.addr, &client_sock->raddr.addr, client_sock->raddr.salen);
55+
port->raddr.salen = client_sock->raddr.salen;
56+
-
57+
+#if !defined(__EMSCRIPTEN__) && !defined(__wasi__)
58+
/* fill in the server (local) address */
59+
port->laddr.salen = sizeof(port->laddr.addr);
60+
if (getsockname(port->sock,
61+
@@ -273,14 +283,15 @@
62+
(void) pq_setkeepalivescount(tcp_keepalives_count, port);
63+
(void) pq_settcpusertimeout(tcp_user_timeout, port);
64+
}
65+
-
66+
+#endif /* WASM */
67+
+PDEBUG("# 285:" __FILE__);
68+
/* initialize state variables */
69+
PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
70+
PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
71+
PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
72+
PqCommBusy = false;
73+
PqCommReadingMsg = false;
74+
-
75+
+#if !defined(__EMSCRIPTEN__) && !defined(__wasi__)
76+
/* set up process-exit hook to close the socket */
77+
on_proc_exit(socket_close, 0);
78+
79+
@@ -310,7 +321,12 @@
80+
MyLatch, NULL);
81+
AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
82+
NULL, NULL);
83+
-
84+
+#else /* WASM */
85+
+ PDEBUG("# 323: FIXME: socketfile");
86+
+ #pragma message "FIXME: use socketfile when overflowing PqRecvBuffer_static"
87+
+ /* because we fill before starting reading message */
88+
+ PqRecvBuffer = &PqRecvBuffer_static[0];
89+
+#endif /* WASM */
90+
/*
91+
* The event positions match the order we added them, but let's sanity
92+
* check them to be sure.
93+
@@ -730,7 +746,7 @@
94+
Assert(Unix_socket_group);
95+
if (Unix_socket_group[0] != '\0')
96+
{
97+
-#ifdef WIN32
98+
+#if defined(WIN32) || defined(__wasi__)
99+
elog(WARNING, "configuration item \"unix_socket_group\" is not supported on this platform");
100+
#else
101+
char *endptr;
102+
@@ -909,6 +925,20 @@
103+
else
104+
PqRecvLength = PqRecvPointer = 0;
105+
}
106+
+#if defined(__EMSCRIPTEN__) || defined(__wasi__)
107+
+ if (queryfp && querylen) {
108+
+ int got = fread( PqRecvBuffer, 1, PQ_RECV_BUFFER_SIZE - PqRecvPointer, queryfp);
109+
+ querylen -= got;
110+
+ PqRecvLength += got;
111+
+ if (querylen<=0) {
112+
+ puts("# 931: could close fp early here " __FILE__);
113+
+ queryfp = NULL;
114+
+ }
115+
+ if (got>0)
116+
+ return 0;
117+
+ }
118+
+ return EOF;
119+
+#endif
120+
121+
/* Ensure that we're in blocking mode */
122+
socket_set_nonblocking(false);
123+
@@ -1011,7 +1041,7 @@
124+
*c = PqRecvBuffer[PqRecvPointer++];
125+
return 1;
126+
}
127+
-
128+
+puts("# 1028: pq_getbyte_if_available N/I in " __FILE__ ); abort();
129+
/* Put the socket into non-blocking mode */
130+
socket_set_nonblocking(true);
131+
132+
@@ -1115,6 +1145,7 @@
133+
return 0;
134+
}
135+
136+
+
137+
/* --------------------------------
138+
* pq_buffer_remaining_data - return number of bytes in receive buffer
139+
*
140+
@@ -1136,6 +1167,26 @@
141+
* This must be called before any of the pq_get* functions.
142+
* --------------------------------
143+
*/
144+
+#if defined(__EMSCRIPTEN__) || defined(__wasi__)
145+
+EMSCRIPTEN_KEEPALIVE void
146+
+pq_recvbuf_fill(FILE* fp, int packetlen) {
147+
+ if (packetlen>PQ_RECV_BUFFER_SIZE) {
148+
+ int got = fread( PqRecvBuffer, 1, PQ_RECV_BUFFER_SIZE, fp);
149+
+ queryfp = fp;
150+
+ querylen = packetlen - got;
151+
+ PqRecvLength = got;
152+
+puts("# 1160: input overflow");
153+
+ } else {
154+
+ fread( PqRecvBuffer, packetlen, 1, fp);
155+
+ PqRecvLength = packetlen;
156+
+ queryfp = NULL;
157+
+ querylen = 0;
158+
+ }
159+
+ PqRecvPointer = 0;
160+
+}
161+
+#endif
162+
+extern int cma_rsize;
163+
+static char * PqSendBuffer_save;
164+
void
165+
pq_startmsgread(void)
166+
{
167+
@@ -1147,7 +1198,29 @@
168+
ereport(FATAL,
169+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
170+
errmsg("terminating connection because protocol synchronization was lost")));
171+
+#if defined(__EMSCRIPTEN__) || defined(__wasi__)
172+
+ if (!pq_buffer_remaining_data()) {
173+
+ if (cma_rsize) {
174+
+ PqRecvPointer = 0;
175+
+ PqRecvLength = cma_rsize;
176+
+ PqRecvBuffer = (char*)0x1;
177+
+
178+
+ PqSendPointer = 0;
179+
+ PqSendBuffer_save = PqSendBuffer;
180+
+ PqSendBuffer = 2 + (char*)(cma_rsize);
181+
+ PqSendBufferSize = (CMA_MB*1024*1024) - (int)(&PqSendBuffer[0]);
182+
+ } else {
183+
+ PqRecvBuffer = &PqRecvBuffer_static[0];
184+
+ if (PqSendBuffer_save)
185+
+ PqSendBuffer=PqSendBuffer_save;
186+
+ PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
187+
+ }
188+
+ }
189+
+#if PDEBUG
190+
+ printf("# 1199: pq_startmsgread cma_rsize=%d PqRecvLength=%d buf=%p reply=%p\n", cma_rsize, PqRecvLength, &PqRecvBuffer[0], &PqSendBuffer[0]);
191+
+#endif
192+
193+
+#endif
194+
PqCommReadingMsg = true;
195+
}
196+
197+
@@ -1270,7 +1343,55 @@
198+
199+
return 0;
200+
}
201+
+#if defined(__EMSCRIPTEN__) || defined(__wasi__)
202+
+extern FILE* SOCKET_FILE;
203+
+extern int SOCKET_DATA;
204+
+static int
205+
+internal_putbytes(const char *s, size_t len) {
206+
+ if (PqSendPointer >= PqSendBufferSize) {
207+
+ fprintf(stderr, "# 1329: overflow %zu >= %d cma_rsize=%d CMA=%d\n", PqSendPointer, PqSendBufferSize,cma_rsize, CMA_MB);
208+
+ }
209+
+
210+
+ if (!cma_rsize) {
211+
+ int wc= fwrite(s, 1, len, SOCKET_FILE);
212+
+ SOCKET_DATA+=wc;
213+
+ } else {
214+
+ size_t amount;
215+
+ while (len > 0) {
216+
+ /* If buffer is full, then flush it out */
217+
+ if (PqSendPointer >= PqSendBufferSize) {
218+
+ socket_set_nonblocking(false);
219+
+ if (internal_flush())
220+
+ return EOF;
221+
+ }
222+
+ amount = PqSendBufferSize - PqSendPointer;
223+
+ if (amount > len)
224+
+ amount = len;
225+
+ memcpy(PqSendBuffer + PqSendPointer, s, amount);
226+
+ PqSendPointer += amount;
227+
+ s += amount;
228+
+ len -= amount;
229+
+ SOCKET_DATA+=amount;
230+
+ }
231+
+ }
232+
+ return 0;
233+
+}
234+
235+
+static int
236+
+socket_flush(void) {
237+
+ return internal_flush();
238+
+}
239+
+
240+
+static int
241+
+internal_flush(void) {
242+
+ /* no flush for raw wire */
243+
+ if (!cma_rsize) {
244+
+ PqSendStart = PqSendPointer = 0;
245+
+ }
246+
+ return 0;
247+
+}
248+
+
249+
+#else
250+
251+
static inline int
252+
internal_putbytes(const char *s, size_t len)
253+
@@ -1421,7 +1542,7 @@
254+
*start = *end = 0;
255+
return 0;
256+
}
257+
-
258+
+#endif /* wasm */
259+
/* --------------------------------
260+
* pq_flush_if_writable - flush pending output if writable without blocking
261+
*

0 commit comments

Comments
 (0)