diff --git a/consumer.js b/consumer.js index 4b8e37b..fe18be3 100644 --- a/consumer.js +++ b/consumer.js @@ -1,6 +1,6 @@ ( // Module boilerplate to support browser globals, node.js and AMD. (typeof module !== "undefined" && function (m) { module.exports = m(require('stream'), require('events'), require('smith')); }) || - (typeof define === "function" && function (m) { define("vfs-socket/consumer", ["./stream-amd", "./events-amd", "smith"], m); }) || + (typeof define === "function" && function (m) { define(["./stream-amd", "./events-amd", "smith"], m); }) || (function (m) { window.consumer = m(window.stream, window.events, window.smith); }) )(function (stream, events, smith) { "use strict"; @@ -62,6 +62,7 @@ function Consumer() { ping: ping, // Send a simple ping request to the worker resolve: route("resolve"), stat: route("stat"), + metadata: route("metadata"), readfile: route("readfile"), readdir: route("readdir"), mkfile: route("mkfile"), @@ -74,6 +75,7 @@ function Consumer() { watch: route("watch"), connect: route("connect"), spawn: route("spawn"), + pty: route("pty"), execFile: route("execFile"), extend: route("extend"), unextend: route("unextend"), @@ -103,16 +105,16 @@ function Consumer() { err = new Error("EDISCONNECT: vfs socket disconnected"); err.code = "EDISCONNECT"; } - Object.keys(streams).forEach(function (id) { - var stream = streams[id]; - stream.emit("close"); - }); - Object.keys(proxyStreams).forEach(onClose); Object.keys(proxyProcesses).forEach(function (pid) { var proxyProcess = proxyProcesses[pid]; delete proxyProcesses[pid]; proxyProcess.emit("exit", 1); }); + Object.keys(streams).forEach(function (id) { + var stream = streams[id]; + stream.emit("close"); + }); + Object.keys(proxyStreams).forEach(onClose); Object.keys(proxyWatchers).forEach(function (id) { var proxyWatcher = proxyWatchers[id]; delete proxyWatchers[id]; @@ -124,6 +126,11 @@ function Consumer() { proxyApi.emit("error", err); }); }); + + this.on("error", function(err){ + // just adding an empty listener so that events-amd doesn't throw + console.error(err); + }); var nextStreamID = 1; function storeStream(stream) { @@ -138,9 +145,9 @@ function Consumer() { stream.pause && stream.pause(); } }); - stream.on("end", function () { + stream.on("end", function (chunk) { delete streams[id]; - remote.onEnd(id); + remote.onEnd(id, chunk); }); } stream.on("close", function () { @@ -193,6 +200,19 @@ function Consumer() { }; return process; } + function makePtyProxy(token){ + var pty = makeStreamProxy(token); + var pid = token.pid; + pty.pid = pid; + proxyProcesses[pid] = pty; + pty.kill = function (signal) { + remote.kill(pid, signal); + }; + pty.resize = function (cols, rows) { + remote.resize(pid, cols, rows); + }; + return pty; + } function makeWatcherProxy(token) { var watcher = new EventEmitter(); @@ -248,13 +268,13 @@ function Consumer() { if (!stream) return; stream.emit("data", chunk); } - function onEnd(id) { + function onEnd(id, chunk) { var stream = proxyStreams[id]; if (!stream) return; // TODO: not delete proxy if close is going to be called later. // but somehow do delete proxy if close won't be called later. delete proxyStreams[id]; - stream.emit("end"); + stream.emit("end", chunk); } function onClose(id) { var stream = proxyStreams[id]; @@ -289,7 +309,6 @@ function Consumer() { if (!stream) return; stream.destroy(); delete streams[id]; - nextStreamID = id; } function pause(id) { var stream = streams[id]; @@ -307,7 +326,6 @@ function Consumer() { delete streams[id]; if (chunk) stream.end(chunk); else stream.end(); - nextStreamID = id; } function on(name, handler, callback) { @@ -360,6 +378,9 @@ function Consumer() { if (meta.process) { meta.process = makeProcessProxy(meta.process); } + if (meta.pty) { + meta.pty = makePtyProxy(meta.pty); + } if (meta.watcher) { meta.watcher = makeWatcherProxy(meta.watcher); } diff --git a/events-amd.js b/events-amd.js index ca8b051..4b787e1 100644 --- a/events-amd.js +++ b/events-amd.js @@ -261,6 +261,7 @@ EventEmitter.prototype.removeListener = function(type, listener) { return this; }; +EventEmitter.prototype.off = EventEmitter.prototype.removeListener; /** * Removes all listeners, or those of the specified event. diff --git a/worker.js b/worker.js index a18309a..c37adb0 100644 --- a/worker.js +++ b/worker.js @@ -30,6 +30,9 @@ function Worker(vfs) { // Endpoints for processes at meta.process kill: kill, + // Endpoints for processes at meta.pty + resize: resize, + // Endpoint for watchers at meta.watcher close: closeWatcher, @@ -47,6 +50,7 @@ function Worker(vfs) { // Route other calls to the local vfs instance resolve: route("resolve"), stat: route("stat"), + metadata: route("metadata"), readfile: route("readfile"), readdir: route("readdir"), mkfile: route("mkfile"), @@ -59,6 +63,7 @@ function Worker(vfs) { watch: route("watch"), connect: route("connect"), spawn: route("spawn"), + pty: route("pty"), execFile: route("execFile"), extend: route("extend"), unextend: route("unextend"), @@ -107,16 +112,16 @@ function Worker(vfs) { err = new Error("EDISCONNECT: vfs socket disconnected"); err.code = "EDISCONNECT"; } - Object.keys(streams).forEach(function (id) { - var stream = streams[id]; - stream.emit("close", err); - }); - Object.keys(proxyStreams).forEach(onClose); Object.keys(processes).forEach(function (pid) { var process = processes[pid]; process.kill(); delete processes[pid]; }); + Object.keys(streams).forEach(function (id) { + var stream = streams[id]; + stream.emit("close", err); + }); + Object.keys(proxyStreams).forEach(onClose); Object.keys(watchers).forEach(function (id) { var watcher = watchers[id]; delete watchers[id]; @@ -163,9 +168,9 @@ function Worker(vfs) { stream.pause && stream.pause(); } }); - stream.on("end", function () { + stream.on("end", function (chunk) { delete streams[id]; - remote.onEnd(id); + remote.onEnd(id, chunk); }); } stream.on("close", function () { @@ -178,7 +183,7 @@ function Worker(vfs) { return token; } - function storeProcess(process) { + function storeProcess(process, onlyPid) { var pid = process.pid; processes[pid] = process; process.on("exit", function (code, signal) { @@ -187,15 +192,20 @@ function Worker(vfs) { }); process.on("close", function () { delete processes[pid]; - delete streams[process.stdout.id]; - delete streams[process.stderr.id]; - delete streams[process.stdin.id]; + if (!onlyPid) { + delete streams[process.stdout.id]; + delete streams[process.stderr.id]; + delete streams[process.stdin.id]; + } remote.onProcessClose(pid); }); process.kill = function(code) { killtree(pid, code); }; + + if (onlyPid) + return pid; var token = {pid: pid}; token.stdin = storeStream(process.stdin); @@ -203,6 +213,14 @@ function Worker(vfs) { token.stderr = storeStream(process.stderr); return token; } + + function storePty(pty) { + var pid = storeProcess(pty, true); + var token = storeStream(pty); + token.pid = pid; + + return token; + } function killtree(pid, code) { childrenOfPid(pid, function(err, pidlist){ @@ -275,7 +293,6 @@ function Worker(vfs) { if (!stream) return; delete streams[id]; stream.destroy(); - nextStreamID = id; } function end(id, chunk) { var stream = streams[id]; @@ -283,7 +300,6 @@ function Worker(vfs) { delete streams[id]; if (chunk) stream.end(chunk); else stream.end(); - nextStreamID = id; } function kill(pid, code) { @@ -292,6 +308,15 @@ function Worker(vfs) { process.kill(code); } + function resize(pid, cols, rows) { + var process = processes[pid]; + if (!process) return; + + // Resize can throw + try { process.resize(cols, rows); } + catch(e) {}; + } + function closeWatcher(id) { var watcher = watchers[id]; if (!watcher) return; @@ -323,13 +348,13 @@ function Worker(vfs) { if (!stream) return; stream.emit("data", chunk); } - function onEnd(id) { + function onEnd(id, chunk) { var stream = proxyStreams[id]; if (!stream) return; // TODO: not delete proxy if close is going to be called later. // but somehow do delete proxy if close won't be called later. delete proxyStreams[id]; - stream.emit("end"); + stream.emit("end", chunk); } function onClose(id) { var stream = proxyStreams[id]; @@ -360,6 +385,7 @@ function Worker(vfs) { switch (key) { case "stream": token.stream = storeStream(meta.stream); break; case "process": token.process = storeProcess(meta.process); break; + case "pty": token.pty = storePty(meta.pty); break; case "watcher": token.watcher = storeWatcher(meta.watcher); break; case "api": token.api = storeApi(meta.api); break; default: token[key] = meta[key]; break;