Support cancellation of NodeServices invocations

This commit is contained in:
SteveSandersonMS
2016-09-08 10:56:50 +01:00
parent f358d8e2b2
commit 2799861296
7 changed files with 87 additions and 27 deletions

View File

@@ -4,6 +4,7 @@ using System.IO;
using System.Net.Http; using System.Net.Http;
using System.Text; using System.Text;
using System.Text.RegularExpressions; using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Newtonsoft.Json; using Newtonsoft.Json;
@@ -57,15 +58,17 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
return $"--port {port}"; return $"--port {port}";
} }
protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocationInfo) protected override async Task<T> InvokeExportAsync<T>(
NodeInvocationInfo invocationInfo, CancellationToken cancellationToken)
{ {
var payloadJson = JsonConvert.SerializeObject(invocationInfo, jsonSerializerSettings); var payloadJson = JsonConvert.SerializeObject(invocationInfo, jsonSerializerSettings);
var payload = new StringContent(payloadJson, Encoding.UTF8, "application/json"); var payload = new StringContent(payloadJson, Encoding.UTF8, "application/json");
var response = await _client.PostAsync("http://localhost:" + _portNumber, payload); var response = await _client.PostAsync("http://localhost:" + _portNumber, payload, cancellationToken);
if (!response.IsSuccessStatusCode) if (!response.IsSuccessStatusCode)
{ {
var responseErrorString = await response.Content.ReadAsStringAsync(); // Unfortunately there's no true way to cancel ReadAsStringAsync calls, hence AbandonIfCancelled
var responseErrorString = await response.Content.ReadAsStringAsync().OrThrowOnCancellation(cancellationToken);
throw new Exception("Call to Node module failed with error: " + responseErrorString); throw new Exception("Call to Node module failed with error: " + responseErrorString);
} }
@@ -81,11 +84,11 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
typeof(T).FullName); typeof(T).FullName);
} }
var responseString = await response.Content.ReadAsStringAsync(); var responseString = await response.Content.ReadAsStringAsync().OrThrowOnCancellation(cancellationToken);
return (T)(object)responseString; return (T)(object)responseString;
case "application/json": case "application/json":
var responseJson = await response.Content.ReadAsStringAsync(); var responseJson = await response.Content.ReadAsStringAsync().OrThrowOnCancellation(cancellationToken);
return JsonConvert.DeserializeObject<T>(responseJson, jsonSerializerSettings); return JsonConvert.DeserializeObject<T>(responseJson, jsonSerializerSettings);
case "application/octet-stream": case "application/octet-stream":
@@ -97,7 +100,7 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
typeof(T).FullName + ". Instead you must use the generic type System.IO.Stream."); typeof(T).FullName + ". Instead you must use the generic type System.IO.Stream.");
} }
return (T)(object)(await response.Content.ReadAsStreamAsync()); return (T)(object)(await response.Content.ReadAsStreamAsync().OrThrowOnCancellation(cancellationToken));
default: default:
throw new InvalidOperationException("Unexpected response content type: " + responseContentType.MediaType); throw new InvalidOperationException("Unexpected response content type: " + responseContentType.MediaType);

View File

@@ -1,10 +1,11 @@
using System; using System;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Microsoft.AspNetCore.NodeServices.HostingModels namespace Microsoft.AspNetCore.NodeServices.HostingModels
{ {
public interface INodeInstance : IDisposable public interface INodeInstance : IDisposable
{ {
Task<T> InvokeExportAsync<T>(string moduleName, string exportNameOrNull, params object[] args); Task<T> InvokeExportAsync<T>(CancellationToken cancellationToken, string moduleName, string exportNameOrNull, params object[] args);
} }
} }

View File

@@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@@ -67,7 +68,8 @@ If you haven't yet installed node-inspector, you can do so as follows:
ConnectToInputOutputStreams(); ConnectToInputOutputStreams();
} }
public async Task<T> InvokeExportAsync<T>(string moduleName, string exportNameOrNull, params object[] args) public async Task<T> InvokeExportAsync<T>(
CancellationToken cancellationToken, string moduleName, string exportNameOrNull, params object[] args)
{ {
if (_nodeProcess.HasExited || _nodeProcessNeedsRestart) if (_nodeProcess.HasExited || _nodeProcessNeedsRestart)
{ {
@@ -79,15 +81,17 @@ If you haven't yet installed node-inspector, you can do so as follows:
throw new NodeInvocationException(message, null, nodeInstanceUnavailable: true); throw new NodeInvocationException(message, null, nodeInstanceUnavailable: true);
} }
// Wait until the connection is established. This will throw if the connection fails to initialize. // Wait until the connection is established. This will throw if the connection fails to initialize,
await _connectionIsReadySource.Task; // or if cancellation is requested first. Note that we can't really cancel the "establishing connection"
// task because that's shared with all callers, but we can stop waiting for it if this call is cancelled.
await _connectionIsReadySource.Task.OrThrowOnCancellation(cancellationToken);
return await InvokeExportAsync<T>(new NodeInvocationInfo return await InvokeExportAsync<T>(new NodeInvocationInfo
{ {
ModuleName = moduleName, ModuleName = moduleName,
ExportedFunctionName = exportNameOrNull, ExportedFunctionName = exportNameOrNull,
Args = args Args = args
}); }, cancellationToken);
} }
public void Dispose() public void Dispose()
@@ -96,7 +100,9 @@ If you haven't yet installed node-inspector, you can do so as follows:
GC.SuppressFinalize(this); GC.SuppressFinalize(this);
} }
protected abstract Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocationInfo); protected abstract Task<T> InvokeExportAsync<T>(
NodeInvocationInfo invocationInfo,
CancellationToken cancellationToken);
// This method is virtual, as it provides a way to override the NODE_PATH or the path to node.exe // This method is virtual, as it provides a way to override the NODE_PATH or the path to node.exe
protected virtual ProcessStartInfo PrepareNodeProcessStartInfo( protected virtual ProcessStartInfo PrepareNodeProcessStartInfo(

View File

@@ -57,7 +57,7 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
_socketAddress = socketAddress; _socketAddress = socketAddress;
} }
protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocationInfo) protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocationInfo, CancellationToken cancellationToken)
{ {
if (_connectionHasFailed) if (_connectionHasFailed)
{ {
@@ -70,7 +70,12 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
if (_virtualConnectionClient == null) if (_virtualConnectionClient == null)
{ {
await EnsureVirtualConnectionClientCreated(); // Although we could pass the cancellationToken into EnsureVirtualConnectionClientCreated and
// have it signal cancellations upstream, that would be a bad thing to do, because all callers
// wait for the same connection task. There's no reason why the first caller should have the
// special ability to cancel the connection process in a way that would affect subsequent
// callers. So, each caller just independently stops awaiting connection if that call is cancelled.
await EnsureVirtualConnectionClientCreated().OrThrowOnCancellation(cancellationToken);
} }
// For each invocation, we open a new virtual connection. This gives an API equivalent to opening a new // For each invocation, we open a new virtual connection. This gives an API equivalent to opening a new
@@ -83,7 +88,7 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
virtualConnection = _virtualConnectionClient.OpenVirtualConnection(); virtualConnection = _virtualConnectionClient.OpenVirtualConnection();
// Send request // Send request
await WriteJsonLineAsync(virtualConnection, invocationInfo); await WriteJsonLineAsync(virtualConnection, invocationInfo, cancellationToken);
// Determine what kind of response format is expected // Determine what kind of response format is expected
if (typeof(T) == typeof(Stream)) if (typeof(T) == typeof(Stream))
@@ -96,7 +101,7 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
else else
{ {
// Parse and return non-streamed JSON response // Parse and return non-streamed JSON response
var response = await ReadJsonAsync<RpcJsonResponse<T>>(virtualConnection); var response = await ReadJsonAsync<RpcJsonResponse<T>>(virtualConnection, cancellationToken);
if (response.ErrorMessage != null) if (response.ErrorMessage != null)
{ {
throw new NodeInvocationException(response.ErrorMessage, response.ErrorDetails); throw new NodeInvocationException(response.ErrorMessage, response.ErrorDetails);
@@ -163,27 +168,27 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
base.Dispose(disposing); base.Dispose(disposing);
} }
private static async Task WriteJsonLineAsync(Stream stream, object serializableObject) private static async Task WriteJsonLineAsync(Stream stream, object serializableObject, CancellationToken cancellationToken)
{ {
var json = JsonConvert.SerializeObject(serializableObject, jsonSerializerSettings); var json = JsonConvert.SerializeObject(serializableObject, jsonSerializerSettings);
var bytes = Encoding.UTF8.GetBytes(json + '\n'); var bytes = Encoding.UTF8.GetBytes(json + '\n');
await stream.WriteAsync(bytes, 0, bytes.Length); await stream.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
} }
private static async Task<T> ReadJsonAsync<T>(Stream stream) private static async Task<T> ReadJsonAsync<T>(Stream stream, CancellationToken cancellationToken)
{ {
var json = Encoding.UTF8.GetString(await ReadAllBytesAsync(stream)); var json = Encoding.UTF8.GetString(await ReadAllBytesAsync(stream, cancellationToken));
return JsonConvert.DeserializeObject<T>(json, jsonSerializerSettings); return JsonConvert.DeserializeObject<T>(json, jsonSerializerSettings);
} }
private static async Task<byte[]> ReadAllBytesAsync(Stream input) private static async Task<byte[]> ReadAllBytesAsync(Stream input, CancellationToken cancellationToken)
{ {
byte[] buffer = new byte[16 * 1024]; byte[] buffer = new byte[16 * 1024];
using (var ms = new MemoryStream()) using (var ms = new MemoryStream())
{ {
int read; int read;
while ((read = await input.ReadAsync(buffer, 0, buffer.Length)) > 0) while ((read = await input.ReadAsync(buffer, 0, buffer.Length, cancellationToken)) > 0)
{ {
ms.Write(buffer, 0, read); ms.Write(buffer, 0, read);
} }

View File

@@ -1,4 +1,5 @@
using System; using System;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Microsoft.AspNetCore.NodeServices namespace Microsoft.AspNetCore.NodeServices
@@ -6,8 +7,11 @@ namespace Microsoft.AspNetCore.NodeServices
public interface INodeServices : IDisposable public interface INodeServices : IDisposable
{ {
Task<T> InvokeAsync<T>(string moduleName, params object[] args); Task<T> InvokeAsync<T>(string moduleName, params object[] args);
Task<T> InvokeAsync<T>(CancellationToken cancellationToken, string moduleName, params object[] args);
Task<T> InvokeExportAsync<T>(string moduleName, string exportedFunctionName, params object[] args); Task<T> InvokeExportAsync<T>(string moduleName, string exportedFunctionName, params object[] args);
Task<T> InvokeExportAsync<T>(CancellationToken cancellationToken, string moduleName, string exportedFunctionName, params object[] args);
[Obsolete("Use InvokeAsync instead")] [Obsolete("Use InvokeAsync instead")]
Task<T> Invoke<T>(string moduleName, params object[] args); Task<T> Invoke<T>(string moduleName, params object[] args);

View File

@@ -1,4 +1,5 @@
using System; using System;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.AspNetCore.NodeServices.HostingModels; using Microsoft.AspNetCore.NodeServices.HostingModels;
@@ -34,19 +35,29 @@ namespace Microsoft.AspNetCore.NodeServices
return InvokeExportAsync<T>(moduleName, null, args); return InvokeExportAsync<T>(moduleName, null, args);
} }
public Task<T> InvokeExportAsync<T>(string moduleName, string exportedFunctionName, params object[] args) public Task<T> InvokeAsync<T>(CancellationToken cancellationToken, string moduleName, params object[] args)
{ {
return InvokeExportWithPossibleRetryAsync<T>(moduleName, exportedFunctionName, args, allowRetry: true); return InvokeExportAsync<T>(cancellationToken, moduleName, null, args);
} }
public async Task<T> InvokeExportWithPossibleRetryAsync<T>(string moduleName, string exportedFunctionName, object[] args, bool allowRetry) public Task<T> InvokeExportAsync<T>(string moduleName, string exportedFunctionName, params object[] args)
{
return InvokeExportWithPossibleRetryAsync<T>(moduleName, exportedFunctionName, args, /* allowRetry */ true, CancellationToken.None);
}
public Task<T> InvokeExportAsync<T>(CancellationToken cancellationToken, string moduleName, string exportedFunctionName, params object[] args)
{
return InvokeExportWithPossibleRetryAsync<T>(moduleName, exportedFunctionName, args, /* allowRetry */ true, cancellationToken);
}
public async Task<T> InvokeExportWithPossibleRetryAsync<T>(string moduleName, string exportedFunctionName, object[] args, bool allowRetry, CancellationToken cancellationToken)
{ {
ThrowAnyOutstandingDelayedDisposalException(); ThrowAnyOutstandingDelayedDisposalException();
var nodeInstance = GetOrCreateCurrentNodeInstance(); var nodeInstance = GetOrCreateCurrentNodeInstance();
try try
{ {
return await nodeInstance.InvokeExportAsync<T>(moduleName, exportedFunctionName, args); return await nodeInstance.InvokeExportAsync<T>(cancellationToken, moduleName, exportedFunctionName, args);
} }
catch (NodeInvocationException ex) catch (NodeInvocationException ex)
{ {
@@ -69,7 +80,7 @@ namespace Microsoft.AspNetCore.NodeServices
// One the next call, don't allow retries, because we could get into an infinite retry loop, or a long retry // One the next call, don't allow retries, because we could get into an infinite retry loop, or a long retry
// loop that masks an underlying problem. A newly-created Node instance should be able to accept invocations, // loop that masks an underlying problem. A newly-created Node instance should be able to accept invocations,
// or something more serious must be wrong. // or something more serious must be wrong.
return await InvokeExportWithPossibleRetryAsync<T>(moduleName, exportedFunctionName, args, allowRetry: false); return await InvokeExportWithPossibleRetryAsync<T>(moduleName, exportedFunctionName, args, /* allowRetry */ false, cancellationToken);
} }
else else
{ {

View File

@@ -0,0 +1,30 @@
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.NodeServices
{
internal static class TaskExtensions
{
public static Task OrThrowOnCancellation(this Task task, CancellationToken cancellationToken)
{
return task.IsCompleted
? task // If the task is already completed, no need to wrap it in a further layer of task
: task.ContinueWith(
_ => {}, // If the task completes, allow execution to continue
cancellationToken,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
public static Task<T> OrThrowOnCancellation<T>(this Task<T> task, CancellationToken cancellationToken)
{
return task.IsCompleted
? task // If the task is already completed, no need to wrap it in a further layer of task
: task.ContinueWith(
t => t.Result, // If the task completes, pass through its result
cancellationToken,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
}
}