From b19d0dff92c4d7b37fc7ed50269d90b358c58074 Mon Sep 17 00:00:00 2001 From: SteveSandersonMS Date: Tue, 7 Jun 2016 17:16:01 +0100 Subject: [PATCH] Support streamed response from SocketNodeInstance --- .../Content/Node/entrypoint-socket.js | 17 ++++++++- .../HostingModels/SocketNodeInstance.cs | 36 +++++++++++++++---- .../SocketNodeInstanceEntryPoint.ts | 20 ++++++++++- 3 files changed, 64 insertions(+), 9 deletions(-) diff --git a/src/Microsoft.AspNetCore.NodeServices/Content/Node/entrypoint-socket.js b/src/Microsoft.AspNetCore.NodeServices/Content/Node/entrypoint-socket.js index b18bf5e..d96453f 100644 --- a/src/Microsoft.AspNetCore.NodeServices/Content/Node/entrypoint-socket.js +++ b/src/Microsoft.AspNetCore.NodeServices/Content/Node/entrypoint-socket.js @@ -78,14 +78,29 @@ var invocation = JSON.parse(line); var invokedModule = dynamicRequire(path.resolve(process.cwd(), invocation.moduleName)); var invokedFunction = invocation.exportedFunctionName ? invokedModule[invocation.exportedFunctionName] : invokedModule; - // Actually invoke it, passing the callback followed by any supplied args + // Prepare a callback for accepting non-streamed JSON responses + var hasInvokedCallback_1 = false; var invocationCallback = function (errorValue, successValue) { + if (hasInvokedCallback_1) { + throw new Error('Cannot supply more than one result. The callback has already been invoked,' + + ' or the result stream has already been accessed'); + } + hasInvokedCallback_1 = true; connection.end(JSON.stringify({ result: successValue, errorMessage: errorValue && (errorValue.message || errorValue), errorDetails: errorValue && (errorValue.stack || null) })); }; + // Also support streamed binary responses + Object.defineProperty(invocationCallback, 'stream', { + enumerable: true, + get: function () { + hasInvokedCallback_1 = true; + return connection; + } + }); + // Actually invoke it, passing through any supplied args invokedFunction.apply(null, [invocationCallback].concat(invocation.args)); } catch (ex) { diff --git a/src/Microsoft.AspNetCore.NodeServices/HostingModels/SocketNodeInstance.cs b/src/Microsoft.AspNetCore.NodeServices/HostingModels/SocketNodeInstance.cs index 6bbce16..abeeac5 100644 --- a/src/Microsoft.AspNetCore.NodeServices/HostingModels/SocketNodeInstance.cs +++ b/src/Microsoft.AspNetCore.NodeServices/HostingModels/SocketNodeInstance.cs @@ -37,19 +37,41 @@ namespace Microsoft.AspNetCore.NodeServices await EnsureReady(); var virtualConnectionClient = await GetOrCreateVirtualConnectionClientAsync(); - using (var virtualConnection = _currentVirtualConnectionClient.OpenVirtualConnection()) + bool shouldDisposeVirtualConnection = true; + Stream virtualConnection = null; + try { + virtualConnection = _currentVirtualConnectionClient.OpenVirtualConnection(); + // Send request await WriteJsonLineAsync(virtualConnection, invocationInfo); - // Receive response - var response = await ReadJsonAsync>(virtualConnection); - if (response.ErrorMessage != null) + // Determine what kind of response format is expected + if (typeof(T) == typeof(Stream)) { - throw new NodeInvocationException(response.ErrorMessage, response.ErrorDetails); + // Pass through streamed binary response + // It is up to the consumer to dispose this stream, so don't do so here + shouldDisposeVirtualConnection = false; + return (T)(object)virtualConnection; } + else + { + // Parse and return non-streamed JSON response + var response = await ReadJsonAsync>(virtualConnection); + if (response.ErrorMessage != null) + { + throw new NodeInvocationException(response.ErrorMessage, response.ErrorDetails); + } - return response.Result; + return response.Result; + } + } + finally + { + if (shouldDisposeVirtualConnection) + { + virtualConnection.Dispose(); + } } } @@ -180,7 +202,7 @@ namespace Microsoft.AspNetCore.NodeServices } #pragma warning disable 649 // These properties are populated via JSON deserialization - private class RpcResponse + private class RpcJsonResponse { public TResult Result { get; set; } public string ErrorMessage { get; set; } diff --git a/src/Microsoft.AspNetCore.NodeServices/TypeScript/SocketNodeInstanceEntryPoint.ts b/src/Microsoft.AspNetCore.NodeServices/TypeScript/SocketNodeInstanceEntryPoint.ts index 0f762bb..3220a40 100644 --- a/src/Microsoft.AspNetCore.NodeServices/TypeScript/SocketNodeInstanceEntryPoint.ts +++ b/src/Microsoft.AspNetCore.NodeServices/TypeScript/SocketNodeInstanceEntryPoint.ts @@ -29,14 +29,32 @@ virtualConnectionServer.createInterface(server).on('connection', (connection: Du const invokedModule = dynamicRequire(path.resolve(process.cwd(), invocation.moduleName)); const invokedFunction = invocation.exportedFunctionName ? invokedModule[invocation.exportedFunctionName] : invokedModule; - // Actually invoke it, passing the callback followed by any supplied args + // Prepare a callback for accepting non-streamed JSON responses + let hasInvokedCallback = false; const invocationCallback = (errorValue, successValue) => { + if (hasInvokedCallback) { + throw new Error('Cannot supply more than one result. The callback has already been invoked,' + + ' or the result stream has already been accessed'); + } + + hasInvokedCallback = true; connection.end(JSON.stringify({ result: successValue, errorMessage: errorValue && (errorValue.message || errorValue), errorDetails: errorValue && (errorValue.stack || null) })); }; + + // Also support streamed binary responses + Object.defineProperty(invocationCallback, 'stream', { + enumerable: true, + get: (): Duplex => { + hasInvokedCallback = true; + return connection; + } + }); + + // Actually invoke it, passing through any supplied args invokedFunction.apply(null, [invocationCallback].concat(invocation.args)); } catch (ex) { connection.end(JSON.stringify({