Skip to content
5 changes: 3 additions & 2 deletions src/bigbuffer.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

const PAGE_SIZE = 1<<30;
const PAGE_SIZE = ( typeof Buffer !== "undefined" && Buffer.constants && Buffer.constants.MAX_LENGTH ) ? Buffer.constants.MAX_LENGTH : (1 << 30);

export default class BigBuffer {

Expand All @@ -8,7 +8,8 @@ export default class BigBuffer {
this.byteLength = size;
for (let i=0; i<size; i+= PAGE_SIZE) {
const n = Math.min(size-i, PAGE_SIZE);
this.buffers.push(new Uint8Array(n));
//this.buffers.push(new Uint8Array(n));
this.buffers.push(new Uint8Array(new SharedArrayBuffer(n)));
}

}
Expand Down
6 changes: 4 additions & 2 deletions src/engine_applykey.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@ export default function buildBatchApplyKey(curve, groupName) {

const task = [];

const b = buff.slice(i*pointsPerChunk*sGin, i*pointsPerChunk*sGin + n*sGin);

task.push({
cmd: "ALLOCSET",
var: 0,
buff: buff.slice(i*pointsPerChunk*sGin, i*pointsPerChunk*sGin + n*sGin)
buff: b
});
task.push({cmd: "ALLOCSET", var: 1, buff: t});
task.push({cmd: "ALLOCSET", var: 2, buff: inc});
Expand Down Expand Up @@ -96,7 +98,7 @@ export default function buildBatchApplyKey(curve, groupName) {
}
task.push({cmd: "GET", out: 0, var: 3, len: n*sGout});

opPromises.push(tm.queueAction(task));
opPromises.push(tm.queueAction(task, [b.buffer]));
t = Fr.mul(t, Fr.exp(inc, n));
}

Expand Down
2 changes: 1 addition & 1 deletion src/engine_batchconvert.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export default function buildBatchConvert(tm, fnName, sIn, sOut) {
{cmd: "GET", out: 0, var: 1, len:sOut * n},
];
opPromises.push(
tm.queueAction(task)
tm.queueAction(task, [buffChunk.buffer])
);
}

Expand Down
14 changes: 7 additions & 7 deletions src/engine_fft.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ export default function buildFFT(curve, groupName) {
} else {
task.push({cmd: "GET", out:0, var: 0, len: sMid*pointsInChunk});
}
promises.push(tm.queueAction(task).then( (r) => {
promises.push(tm.queueAction(task, [buffChunk.buffer]).then( (r) => {
if (logger) logger.debug(`${loggerTxt}: fft ${bits} mix end: ${i}/${nChunks}`);
return r;
}));
Expand Down Expand Up @@ -203,7 +203,7 @@ export default function buildFFT(curve, groupName) {
task.push({cmd: "GET", out: 0, var: 0, len: pointsInChunk*sMid});
task.push({cmd: "GET", out: 1, var: 1, len: pointsInChunk*sMid});
}
opPromises.push(tm.queueAction(task).then( (r) => {
opPromises.push(tm.queueAction(task, [chunks[o1].buffer, chunks[o2].buffer, first.buffer ]).then( (r) => {
if (logger) logger.debug(`${loggerTxt}: fft ${bits} join ${i}/${bits} ${j+1}/${nGroups} ${k}/${nChunksPerGroup/2}`);
return r;
}));
Expand Down Expand Up @@ -402,7 +402,7 @@ export default function buildFFT(curve, groupName) {
task.push({cmd: "GET", out: 0, var: 0, len: n*sOut});
task.push({cmd: "GET", out: 1, var: 1, len: n*sOut});
opPromises.push(
tm.queueAction(task).then( (r) => {
tm.queueAction(task, [b1.buffer, b2.buffer, firstChunk.buffer]).then((r) => {
if (logger) logger.debug(`${loggerTxt}: fftJoinExt End: ${i}/${nPoints}`);
return r;
})
Expand Down Expand Up @@ -550,7 +550,7 @@ export default function buildFFT(curve, groupName) {
}
task.push({cmd: "GET", out: 0, var: 0, len: pointsPerChunk*sG});
opPromises.push(
tm.queueAction(task)
tm.queueAction(task, [b.buffer])
);
}

Expand Down Expand Up @@ -585,7 +585,7 @@ export default function buildFFT(curve, groupName) {
]});
task.push({cmd: "GET", out: 0, var: 0, len: pointsPerChunk*sG});
task.push({cmd: "GET", out: 1, var: 1, len: pointsPerChunk*sG});
opPromises.push(tm.queueAction(task));
opPromises.push(tm.queueAction(task, [chunks[o1].buffer, chunks[o2].buffer, first.buffer]));
}
}

Expand Down Expand Up @@ -664,7 +664,7 @@ export default function buildFFT(curve, groupName) {
task.push({cmd: "GET", out: 0, var: 0, len: pointsPerChunk*sG});
task.push({cmd: "GET", out: 1, var: 1, len: pointsPerChunk*sG});
opPromises.push(
tm.queueAction(task)
tm.queueAction(task, [b1.buffer, b2.buffer, firstChunk.buffer])
);

}
Expand Down Expand Up @@ -740,7 +740,7 @@ export default function buildFFT(curve, groupName) {
]});
task.push({cmd: "GET", out: 0, var: 0, len: n*sGout});
opPromises.push(
tm.queueAction(task)
tm.queueAction(task, [b.buffer])
);

}
Expand Down
100 changes: 57 additions & 43 deletions src/engine_multiexp.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const pTSizes = [
export default function buildMultiexp(curve, groupName) {
const G = curve[groupName];
const tm = G.tm;

async function _multiExpChunk(buffBases, buffScalars, inType, logger, logText) {
if ( ! (buffBases instanceof Uint8Array) ) {
if (logger) logger.error(`${logText} _multiExpChunk buffBases is not Uint8Array`);
Expand All @@ -23,57 +24,54 @@ export default function buildMultiexp(curve, groupName) {

let sGIn;
let fnName;
if (groupName == "G1") {
if (inType == "affine") {
fnName = "g1m_multiexpAffine_chunk";
if (groupName === "G1") {
if (inType === "affine") {
fnName = "g1m_multiexpAffine";
sGIn = G.F.n8*2;
} else {
fnName = "g1m_multiexp_chunk";
fnName = "g1m_multiexp";
sGIn = G.F.n8*3;
}
} else if (groupName == "G2") {
if (inType == "affine") {
fnName = "g2m_multiexpAffine_chunk";
} else if (groupName === "G2") {
if (inType === "affine") {
fnName = "g2m_multiexpAffine";
sGIn = G.F.n8*2;
} else {
fnName = "g2m_multiexp_chunk";
fnName = "g2m_multiexp";
sGIn = G.F.n8*3;
}
} else {
throw new Error("Invalid group");
}
const nPoints = Math.floor(buffBases.byteLength / sGIn);

if (nPoints == 0) return G.zero;
if (nPoints === 0) return G.zero;
const sScalar = Math.floor(buffScalars.byteLength / nPoints);
if( sScalar * nPoints != buffScalars.byteLength) {
if( sScalar * nPoints !== buffScalars.byteLength) {
throw new Error("Scalar size does not match");
}

const bitChunkSize = pTSizes[log2(nPoints)];
const nChunks = Math.floor((sScalar*8 - 1) / bitChunkSize) +1;

const opPromises = [];
for (let i=0; i<nChunks; i++) {
const task = [
{cmd: "ALLOCSET", var: 0, buff: buffBases},
{cmd: "ALLOCSET", var: 1, buff: buffScalars},
{cmd: "ALLOC", var: 2, len: G.F.n8*3},
{cmd: "CALL", fnName: fnName, params: [
{var: 0},
{var: 1},
{val: sScalar},
{val: nPoints},
{val: i*bitChunkSize},
{val: Math.min(sScalar*8 - i*bitChunkSize, bitChunkSize)},
{var: 2}
]},
{cmd: "GET", out: 0, var: 2, len: G.F.n8*3}
];
opPromises.push(
G.tm.queueAction(task)
);
}

const task = [
{cmd: "ALLOCSET", var: 0, buff: buffBases},
{cmd: "ALLOCSET", var: 1, buff: buffScalars},
{cmd: "ALLOC", var: 2, len: G.F.n8*3},
{cmd: "CALL", fnName: fnName, params: [
{var: 0}, //pBases
{var: 1}, // pScalars
{val: sScalar}, // scalarSize
{val: nPoints}, // nPoints
{var: 2} // pr
]},
{cmd: "GET", out: 0, var: 2, len: G.F.n8*3}
];
opPromises.push(
// transfer ownership of the buffers to the worker thread
G.tm.queueAction(task, [buffBases.buffer, buffScalars.buffer])
);

const result = await Promise.all(opPromises);

Expand All @@ -93,14 +91,14 @@ export default function buildMultiexp(curve, groupName) {
const MIN_CHUNK_SIZE = 1 << 10;
let sGIn;

if (groupName == "G1") {
if (inType == "affine") {
if (groupName === "G1") {
if (inType === "affine") {
sGIn = G.F.n8*2;
} else {
sGIn = G.F.n8*3;
}
} else if (groupName == "G2") {
if (inType == "affine") {
} else if (groupName === "G2") {
if (inType === "affine") {
sGIn = G.F.n8*2;
} else {
sGIn = G.F.n8*3;
Expand All @@ -110,33 +108,49 @@ export default function buildMultiexp(curve, groupName) {
}

const nPoints = Math.floor(buffBases.byteLength / sGIn);
if (nPoints == 0) return G.zero;
if (nPoints === 0) return G.zero;
const sScalar = Math.floor(buffScalars.byteLength / nPoints);
if( sScalar * nPoints != buffScalars.byteLength) {
if( sScalar * nPoints !== buffScalars.byteLength) {
throw new Error("Scalar size does not match");
}

console.log("buffBases.buffer instanceof SharedArrayBuffer", buffBases.buffer instanceof SharedArrayBuffer);
console.log("buffScalars.buffer instanceof SharedArrayBuffer", buffScalars.buffer instanceof SharedArrayBuffer);

let result = [];
const opPromises = [];
const bitChunkSize = pTSizes[log2(nPoints)];
const nChunks = Math.floor((sScalar*8 - 1) / bitChunkSize) +1;
let nChunks = Math.floor((sScalar*8 - 1) / bitChunkSize) +1;

if (groupName === "G2") {
// G2 has bigger points, so we reduce chunk size to optimize memory usage
nChunks *= 2;
}

let chunkSize;
chunkSize = Math.floor(nPoints / (tm.concurrency /nChunks));
//chunkSize = Math.floor(nPoints / tm.concurrency) + 1;

// make nChunks multiple of tm.concurrency for optimal load balancing
nChunks = (Math.floor((nChunks-1) / tm.concurrency) + 1) * tm.concurrency;
chunkSize = Math.floor(nPoints / nChunks) + 1;

if (chunkSize>MAX_CHUNK_SIZE) chunkSize = MAX_CHUNK_SIZE;
if (chunkSize<MIN_CHUNK_SIZE) chunkSize = MIN_CHUNK_SIZE;

const opPromises = [];
for (let i=0; i<nPoints; i += chunkSize) {
if (logger) logger.debug(`Multiexp start: ${logText}: ${i}/${nPoints}`);
const n= Math.min(nPoints - i, chunkSize);
const n = Math.min(nPoints - i, chunkSize);

const buffBasesChunk = buffBases.slice(i*sGIn, (i+n)*sGIn);
const buffScalarsChunk = buffScalars.slice(i*sScalar, (i+n)*sScalar);
opPromises.push(_multiExpChunk(buffBasesChunk, buffScalarsChunk, inType, logger, logText).then( (r) => {

opPromises.push(_multiExpChunk(buffBasesChunk, buffScalarsChunk, inType, logger, logText).then((r) => {
if (logger) logger.debug(`Multiexp end: ${logText}: ${i}/${nPoints}`);
return r;
}));
}

const result = await Promise.all(opPromises);
result = await Promise.all(opPromises);

let res = G.zero;
for (let i=result.length-1; i>=0; i--) {
Expand Down
2 changes: 1 addition & 1 deletion src/engine_pairing.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export default function buildPairing(curve) {
task.push({cmd: "GET", out: 0, var: 4, len: curve.Gt.n8});

opPromises.push(
tm.queueAction(task)
tm.queueAction(task, [g1Buff.buffer, g2Buff.buffer])
);
}

Expand Down
20 changes: 15 additions & 5 deletions src/threadman.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ export default async function buildThreadManager(wasm, singleThread) {
concurrency = os.cpus().length;
}

if(concurrency == 0){
if(concurrency === 0){
concurrency = 2;
}

Expand Down Expand Up @@ -151,6 +151,13 @@ export default async function buildThreadManager(wasm, singleThread) {
data = e;
}

// handle errors
if (data.error) {
tm.working[i]=false;
tm.pendingDeferreds[i].reject("Worker error: " + data.error);
throw new Error("Worker error: " + data.error);
}

tm.working[i]=false;
tm.pendingDeferreds[i].resolve(data);
tm.processWorks();
Expand All @@ -166,19 +173,19 @@ export class ThreadManager {
}

startSyncOp() {
if (this.oldPFree != 0) throw new Error("Sync operation in progress");
if (this.oldPFree !== 0) throw new Error("Sync operation in progress");
this.oldPFree = this.u32[0];
}

endSyncOp() {
if (this.oldPFree == 0) throw new Error("No sync operation in progress");
if (this.oldPFree === 0) throw new Error("No sync operation in progress");
this.u32[0] = this.oldPFree;
this.oldPFree = 0;
}

postAction(workerId, e, transfers, _deferred) {
if (this.working[workerId]) {
throw new Error("Posting a job t a working worker");
throw new Error("Posting a job to a working worker");
}
this.working[workerId] = true;

Expand All @@ -189,8 +196,11 @@ export class ThreadManager {
}

processWorks() {
if (this.workers.length === 0 && this.actionQueue.length > 0) {
throw new Error("No workers initialized");
}
for (let i=0; (i<this.workers.length)&&(this.actionQueue.length > 0); i++) {
if (this.working[i] == false) {
if (this.working[i] === false) {
const work = this.actionQueue.shift();
this.postAction(i, work.data, work.transfers, work.deferred);
}
Expand Down
25 changes: 15 additions & 10 deletions src/threadman_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,20 @@ export default function thread(self) {
data = e;
}

if (data[0].cmd == "INIT") {
init(data[0]).then(function() {
self.postMessage(data.result);
});
} else if (data[0].cmd == "TERMINATE") {
self.close();
} else {
const res = runTask(data);
self.postMessage(res);
try {
if (data[0].cmd === "INIT") {
init(data[0]).then(function() {
self.postMessage(data.result);
});
} else if (data[0].cmd === "TERMINATE") {
self.close();
} else {
const res = runTask(data);
self.postMessage(res);
}
} catch (err) {
// Catch any error and send it back to main thread
self.postMessage({error: err.message});
}
};
}
Expand Down Expand Up @@ -72,7 +77,7 @@ export default function thread(self) {
}

function runTask(task) {
if (task[0].cmd == "INIT") {
if (task[0].cmd === "INIT") {
return init(task[0]);
}
const ctx = {
Expand Down
2 changes: 1 addition & 1 deletion src/wasm_field1.js
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ export default class WasmField1 {
{cmd: "GET", out: 0, var: 1, len:sOut * n},
];
opPromises.push(
this.tm.queueAction(task)
this.tm.queueAction(task, [buffChunk.buffer])
);
}

Expand Down
Loading