Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .buildconfig
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ PG_VERSION=17.4
PG_BRANCH=REL_17_4_WASM

SDK_VERSION=3.1.74.6bi
WASI_SDK_VERSION=24.0.4
WASI_SDK_VERSION=25.0.0
SDKROOT=/tmp/sdk

PG_DIST=/tmp/sdk/dist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* just log a low-level debug message if it happens.
*/
+#if defined(__EMSCRIPTEN__) || defined(__wasi__)
+ HandleNotifyInterrupt();
+ HandleNotifyInterrupt();
+#else
if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
--- REL_17_4/src/backend/libpq/pqcomm.c
+++ pglite-REL_17_4/src/backend/libpq/pqcomm.c
@@ -122,10 +122,18 @@
static int PqSendBufferSize; /* Size send buffer */
static size_t PqSendPointer; /* Next index to store a byte in PqSendBuffer */
static size_t PqSendStart; /* Next index to send a byte in PqSendBuffer */
-
+#if !defined(__EMSCRIPTEN__) && !defined(__wasi__)
static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
static int PqRecvLength; /* End of data available in PqRecvBuffer */
+#else
+static char PqRecvBuffer_static[PQ_RECV_BUFFER_SIZE];
+static char *PqRecvBuffer;
+static int PqRecvPointer;
+static int PqRecvLength;
+volatile int querylen = 0;
+volatile FILE* queryfp = NULL;
+#endif

/*
* Message status
@@ -135,6 +143,7 @@


/* Internal functions */
+
static void socket_comm_reset(void);
static void socket_close(int code, Datum arg);
static void socket_set_nonblocking(bool nonblocking);
@@ -148,9 +157,6 @@
static pg_noinline int internal_flush_buffer(const char *buf, size_t *start,
size_t *end);

-static int Lock_AF_UNIX(const char *unixSocketDir, const char *unixSocketPath);
-static int Setup_AF_UNIX(const char *sock_path);
-
static const PQcommMethods PqCommSocketMethods = {
.comm_reset = socket_comm_reset,
.flush = socket_flush,
@@ -160,6 +166,10 @@
.putmessage_noblock = socket_putmessage_noblock
};

+static int Lock_AF_UNIX(const char *unixSocketDir, const char *unixSocketPath);
+static int Setup_AF_UNIX(const char *sock_path);
+
+
const PQcommMethods *PqCommMethods = &PqCommSocketMethods;

WaitEventSet *FeBeWaitSet;
@@ -181,7 +191,7 @@
port->sock = client_sock->sock;
memcpy(&port->raddr.addr, &client_sock->raddr.addr, client_sock->raddr.salen);
port->raddr.salen = client_sock->raddr.salen;
-
+#if !defined(__EMSCRIPTEN__) && !defined(__wasi__)
/* fill in the server (local) address */
port->laddr.salen = sizeof(port->laddr.addr);
if (getsockname(port->sock,
@@ -273,14 +283,15 @@
(void) pq_setkeepalivescount(tcp_keepalives_count, port);
(void) pq_settcpusertimeout(tcp_user_timeout, port);
}
-
+#endif /* WASM */
+PDEBUG("# 285:" __FILE__);
/* initialize state variables */
PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
PqCommBusy = false;
PqCommReadingMsg = false;
-
+#if !defined(__EMSCRIPTEN__) && !defined(__wasi__)
/* set up process-exit hook to close the socket */
on_proc_exit(socket_close, 0);

@@ -310,7 +321,12 @@
MyLatch, NULL);
AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
NULL, NULL);
-
+#else /* WASM */
+ PDEBUG("# 323: FIXME: socketfile");
+ #pragma message "FIXME: use socketfile when overflowing PqRecvBuffer_static"
+ /* because we fill before starting reading message */
+ PqRecvBuffer = &PqRecvBuffer_static[0];
+#endif /* WASM */
/*
* The event positions match the order we added them, but let's sanity
* check them to be sure.
@@ -730,7 +746,7 @@
Assert(Unix_socket_group);
if (Unix_socket_group[0] != '\0')
{
-#ifdef WIN32
+#if defined(WIN32) || defined(__wasi__)
elog(WARNING, "configuration item \"unix_socket_group\" is not supported on this platform");
#else
char *endptr;
@@ -909,6 +925,20 @@
else
PqRecvLength = PqRecvPointer = 0;
}
+#if defined(__EMSCRIPTEN__) || defined(__wasi__)
+ if (queryfp && querylen) {
+ int got = fread( PqRecvBuffer, 1, PQ_RECV_BUFFER_SIZE - PqRecvPointer, queryfp);
+ querylen -= got;
+ PqRecvLength += got;
+ if (querylen<=0) {
+ puts("# 931: could close fp early here " __FILE__);
+ queryfp = NULL;
+ }
+ if (got>0)
+ return 0;
+ }
+ return EOF;
+#endif

/* Ensure that we're in blocking mode */
socket_set_nonblocking(false);
@@ -1011,7 +1041,7 @@
*c = PqRecvBuffer[PqRecvPointer++];
return 1;
}
-
+puts("# 1028: pq_getbyte_if_available N/I in " __FILE__ ); abort();
/* Put the socket into non-blocking mode */
socket_set_nonblocking(true);

@@ -1115,6 +1145,7 @@
return 0;
}

+
/* --------------------------------
* pq_buffer_remaining_data - return number of bytes in receive buffer
*
@@ -1136,6 +1167,26 @@
* This must be called before any of the pq_get* functions.
* --------------------------------
*/
+#if defined(__EMSCRIPTEN__) || defined(__wasi__)
+EMSCRIPTEN_KEEPALIVE void
+pq_recvbuf_fill(FILE* fp, int packetlen) {
+ if (packetlen>PQ_RECV_BUFFER_SIZE) {
+ int got = fread( PqRecvBuffer, 1, PQ_RECV_BUFFER_SIZE, fp);
+ queryfp = fp;
+ querylen = packetlen - got;
+ PqRecvLength = got;
+puts("# 1160: input overflow");
+ } else {
+ fread( PqRecvBuffer, packetlen, 1, fp);
+ PqRecvLength = packetlen;
+ queryfp = NULL;
+ querylen = 0;
+ }
+ PqRecvPointer = 0;
+}
+#endif
+extern int cma_rsize;
+static char * PqSendBuffer_save;
void
pq_startmsgread(void)
{
@@ -1147,7 +1198,29 @@
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("terminating connection because protocol synchronization was lost")));
+#if defined(__EMSCRIPTEN__) || defined(__wasi__)
+ if (!pq_buffer_remaining_data()) {
+ if (cma_rsize) {
+ PqRecvPointer = 0;
+ PqRecvLength = cma_rsize;
+ PqRecvBuffer = (char*)0x1;
+
+ PqSendPointer = 0;
+ PqSendBuffer_save = PqSendBuffer;
+ PqSendBuffer = 2 + (char*)(cma_rsize);
+ PqSendBufferSize = (CMA_MB*1024*1024) - (int)(&PqSendBuffer[0]);
+ } else {
+ PqRecvBuffer = &PqRecvBuffer_static[0];
+ if (PqSendBuffer_save)
+ PqSendBuffer=PqSendBuffer_save;
+ PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
+ }
+ }
+#if PDEBUG
+ printf("# 1199: pq_startmsgread cma_rsize=%d PqRecvLength=%d buf=%p reply=%p\n", cma_rsize, PqRecvLength, &PqRecvBuffer[0], &PqSendBuffer[0]);
+#endif

+#endif
PqCommReadingMsg = true;
}

@@ -1270,7 +1343,55 @@

return 0;
}
+#if defined(__EMSCRIPTEN__) || defined(__wasi__)
+extern FILE* SOCKET_FILE;
+extern int SOCKET_DATA;
+static int
+internal_putbytes(const char *s, size_t len) {
+ if (PqSendPointer >= PqSendBufferSize) {
+ fprintf(stderr, "# 1329: overflow %zu >= %d cma_rsize=%d CMA=%d\n", PqSendPointer, PqSendBufferSize,cma_rsize, CMA_MB);
+ }
+
+ if (!cma_rsize) {
+ int wc= fwrite(s, 1, len, SOCKET_FILE);
+ SOCKET_DATA+=wc;
+ } else {
+ size_t amount;
+ while (len > 0) {
+ /* If buffer is full, then flush it out */
+ if (PqSendPointer >= PqSendBufferSize) {
+ socket_set_nonblocking(false);
+ if (internal_flush())
+ return EOF;
+ }
+ amount = PqSendBufferSize - PqSendPointer;
+ if (amount > len)
+ amount = len;
+ memcpy(PqSendBuffer + PqSendPointer, s, amount);
+ PqSendPointer += amount;
+ s += amount;
+ len -= amount;
+ SOCKET_DATA+=amount;
+ }
+ }
+ return 0;
+}

+static int
+socket_flush(void) {
+ return internal_flush();
+}
+
+static int
+internal_flush(void) {
+ /* no flush for raw wire */
+ if (!cma_rsize) {
+ PqSendStart = PqSendPointer = 0;
+ }
+ return 0;
+}
+
+#else

static inline int
internal_putbytes(const char *s, size_t len)
@@ -1421,7 +1542,7 @@
*start = *end = 0;
return 0;
}
-
+#endif /* wasm */
/* --------------------------------
* pq_flush_if_writable - flush pending output if writable without blocking
*
Loading