Move logic for restarting Node child process into NodeServicesImpl. Tidy up lots.

This commit is contained in:
SteveSandersonMS
2016-07-07 11:35:25 +01:00
parent 4fb3b18868
commit a19e37f3c0
8 changed files with 245 additions and 206 deletions

View File

@@ -48,7 +48,8 @@ namespace Microsoft.AspNetCore.NodeServices
case NodeHostingModel.Http:
return new HttpNodeInstance(options.ProjectPath, /* port */ 0, options.WatchFileExtensions);
case NodeHostingModel.Socket:
return new SocketNodeInstance(options.ProjectPath, options.WatchFileExtensions);
var pipeName = "pni-" + Guid.NewGuid().ToString("D"); // Arbitrary non-clashing string
return new SocketNodeInstance(options.ProjectPath, options.WatchFileExtensions, pipeName);
default:
throw new ArgumentException("Unknown hosting model: " + options.HostingModel);
}

View File

@@ -176,7 +176,7 @@
// Begin listening now. The underlying transport varies according to the runtime platform.
// On Windows it's Named Pipes; on Linux/OSX it's Domain Sockets.
var useWindowsNamedPipes = /^win/.test(process.platform);
var listenAddress = (useWindowsNamedPipes ? '\\\\.\\pipe\\' : '/tmp/') + parsedArgs.pipename;
var listenAddress = (useWindowsNamedPipes ? '\\\\.\\pipe\\' : '/tmp/') + parsedArgs.listenAddress;
server.listen(listenAddress);

View File

@@ -9,6 +9,18 @@ using Newtonsoft.Json.Serialization;
namespace Microsoft.AspNetCore.NodeServices.HostingModels
{
/// <summary>
/// A specialisation of the OutOfProcessNodeInstance base class that uses HTTP to perform RPC invocations.
///
/// The Node child process starts an HTTP listener on an arbitrary available port (except where a nonzero
/// port number is specified as a constructor parameter), and signals which port was selected using the same
/// input/output-based mechanism that the base class uses to determine when the child process is ready to
/// accept RPC invocations.
///
/// TODO: Remove the file-watching logic from here and centralise it in OutOfProcessNodeInstance, implementing
/// the actual watching in .NET code (not Node), for consistency across platforms.
/// </summary>
/// <seealso cref="Microsoft.AspNetCore.NodeServices.HostingModels.OutOfProcessNodeInstance" />
internal class HttpNodeInstance : OutOfProcessNodeInstance
{
private static readonly Regex PortMessageRegex =
@@ -19,7 +31,7 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
ContractResolver = new CamelCasePropertyNamesContractResolver()
};
private HttpClient _client;
private readonly HttpClient _client;
private bool _disposed;
private int _portNumber;
@@ -47,9 +59,6 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocationInfo)
{
await EnsureReady();
// TODO: Use System.Net.Http.Formatting (PostAsJsonAsync etc.)
var payloadJson = JsonConvert.SerializeObject(invocationInfo, JsonSerializerSettings);
var payload = new StringContent(payloadJson, Encoding.UTF8, "application/json");
var response = await _client.PostAsync("http://localhost:" + _portNumber, payload);
@@ -97,6 +106,9 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
protected override void OnOutputDataReceived(string outputData)
{
// Watch for "port selected" messages, and when observed, store the port number
// so we can use it when making HTTP requests. The child process will always send
// one of these messages before it sends a "ready for connections" message.
var match = _portNumber != 0 ? null : PortMessageRegex.Match(outputData);
if (match != null && match.Success)
{
@@ -108,12 +120,6 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
}
}
protected override void OnBeforeLaunchProcess()
{
// Prepare to receive a new port number
_portNumber = 0;
}
protected override void Dispose(bool disposing) {
base.Dispose(disposing);

View File

@@ -4,9 +4,17 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
{
public class NodeInvocationException : Exception
{
public bool NodeInstanceUnavailable { get; private set; }
public NodeInvocationException(string message, string details)
: base(message + Environment.NewLine + details)
{
}
public NodeInvocationException(string message, string details, bool nodeInstanceUnavailable)
: this(message, details)
{
NodeInstanceUnavailable = nodeInstanceUnavailable;
}
}
}

View File

@@ -6,37 +6,43 @@ using System.Threading.Tasks;
namespace Microsoft.AspNetCore.NodeServices.HostingModels
{
/// <summary>
/// Class responsible for launching the Node child process, determining when it is ready to accept invocations,
/// and finally killing it when the parent process exits. Also it restarts the child process if it dies.
/// Class responsible for launching a Node child process on the local machine, determining when it is ready to
/// accept invocations, detecting if it dies on its own, and finally terminating it on disposal.
///
/// This abstract base class uses the input/output streams of the child process to perform a simple handshake
/// to determine when the child process is ready to accept invocations. This is agnostic to the mechanism that
/// derived classes use to actually perform the invocations (e.g., they could use HTTP-RPC, or a binary TCP
/// protocol, or any other RPC-type mechanism).
/// </summary>
/// <seealso cref="Microsoft.AspNetCore.NodeServices.INodeInstance" />
/// <seealso cref="Microsoft.AspNetCore.NodeServices.HostingModels.INodeInstance" />
public abstract class OutOfProcessNodeInstance : INodeInstance
{
private readonly object _childProcessLauncherLock;
private string _commandLineArguments;
private readonly StringAsTempFile _entryPointScript;
private Process _nodeProcess;
private TaskCompletionSource<bool> _nodeProcessIsReadySource;
private readonly string _projectPath;
private const string ConnectionEstablishedMessage = "[Microsoft.AspNetCore.NodeServices:Listening]";
private readonly TaskCompletionSource<object> _connectionIsReadySource = new TaskCompletionSource<object>();
private bool _disposed;
private readonly StringAsTempFile _entryPointScript;
private readonly Process _nodeProcess;
public OutOfProcessNodeInstance(string entryPointScript, string projectPath, string commandLineArguments = null)
{
_childProcessLauncherLock = new object();
_entryPointScript = new StringAsTempFile(entryPointScript);
_projectPath = projectPath;
_commandLineArguments = commandLineArguments ?? string.Empty;
_nodeProcess = LaunchNodeProcess(_entryPointScript.FileName, projectPath, commandLineArguments);
ConnectToInputOutputStreams();
}
public string CommandLineArguments
public async Task<T> InvokeExportAsync<T>(string moduleName, string exportNameOrNull, params object[] args)
{
get { return _commandLineArguments; }
set { _commandLineArguments = value; }
}
// Wait until the connection is established. This will throw if the connection fails to initialize.
await _connectionIsReadySource.Task;
public Task<T> InvokeExportAsync<T>(string moduleName, string exportNameOrNull, params object[] args)
{
return InvokeExportAsync<T>(new NodeInvocationInfo
if (_nodeProcess.HasExited)
{
// This special kind of exception triggers a transparent retry - NodeServicesImpl will launch
// a new Node instance and pass the invocation to that one instead.
throw new NodeInvocationException("The Node process has exited", null, nodeInstanceUnavailable: true);
}
return await InvokeExportAsync<T>(new NodeInvocationInfo
{
ModuleName = moduleName,
ExportedFunctionName = exportNameOrNull,
@@ -52,100 +58,6 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
protected abstract Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocationInfo);
protected void ExitNodeProcess()
{
if (_nodeProcess != null && !_nodeProcess.HasExited)
{
// TODO: Is there a more graceful way to end it? Or does this still let it perform any cleanup?
_nodeProcess.Kill();
}
}
protected async Task EnsureReady()
{
lock (_childProcessLauncherLock)
{
if (_nodeProcess == null || _nodeProcess.HasExited)
{
this.OnBeforeLaunchProcess();
var startInfo = new ProcessStartInfo("node")
{
Arguments = "\"" + _entryPointScript.FileName + "\" " + _commandLineArguments,
UseShellExecute = false,
RedirectStandardInput = true,
RedirectStandardOutput = true,
RedirectStandardError = true,
WorkingDirectory = _projectPath
};
// Append projectPath to NODE_PATH so it can locate node_modules
var existingNodePath = Environment.GetEnvironmentVariable("NODE_PATH") ?? string.Empty;
if (existingNodePath != string.Empty)
{
existingNodePath += ":";
}
var nodePathValue = existingNodePath + Path.Combine(_projectPath, "node_modules");
#if NET451
startInfo.EnvironmentVariables["NODE_PATH"] = nodePathValue;
#else
startInfo.Environment["NODE_PATH"] = nodePathValue;
#endif
_nodeProcess = Process.Start(startInfo);
ConnectToInputOutputStreams();
}
}
var task = _nodeProcessIsReadySource.Task;
var initializationSucceeded = await task;
if (!initializationSucceeded)
{
throw new InvalidOperationException("The Node.js process failed to initialize", task.Exception);
}
}
private void ConnectToInputOutputStreams()
{
var initializationIsCompleted = false; // TODO: Make this thread-safe? (Interlocked.Exchange etc.)
_nodeProcessIsReadySource = new TaskCompletionSource<bool>();
_nodeProcess.OutputDataReceived += (sender, evt) =>
{
if (evt.Data == "[Microsoft.AspNetCore.NodeServices:Listening]" && !initializationIsCompleted)
{
_nodeProcessIsReadySource.SetResult(true);
initializationIsCompleted = true;
}
else if (evt.Data != null)
{
OnOutputDataReceived(evt.Data);
}
};
_nodeProcess.ErrorDataReceived += (sender, evt) =>
{
if (evt.Data != null)
{
OnErrorDataReceived(evt.Data);
if (!initializationIsCompleted)
{
_nodeProcessIsReadySource.SetResult(false);
initializationIsCompleted = true;
}
}
};
_nodeProcess.BeginOutputReadLine();
_nodeProcess.BeginErrorReadLine();
}
protected virtual void OnBeforeLaunchProcess()
{
}
protected virtual void OnOutputDataReceived(string outputData)
{
Console.WriteLine("[Node] " + outputData);
@@ -165,12 +77,84 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
_entryPointScript.Dispose();
}
ExitNodeProcess();
// Make sure the Node process is finished
// TODO: Is there a more graceful way to end it? Or does this still let it perform any cleanup?
if (!_nodeProcess.HasExited)
{
_nodeProcess.Kill();
}
_disposed = true;
}
}
private static Process LaunchNodeProcess(string entryPointFilename, string projectPath, string commandLineArguments)
{
var startInfo = new ProcessStartInfo("node")
{
Arguments = "\"" + entryPointFilename + "\" " + (commandLineArguments ?? string.Empty),
UseShellExecute = false,
RedirectStandardInput = true,
RedirectStandardOutput = true,
RedirectStandardError = true,
WorkingDirectory = projectPath
};
// Append projectPath to NODE_PATH so it can locate node_modules
var existingNodePath = Environment.GetEnvironmentVariable("NODE_PATH") ?? string.Empty;
if (existingNodePath != string.Empty)
{
existingNodePath += ":";
}
var nodePathValue = existingNodePath + Path.Combine(projectPath, "node_modules");
#if NET451
startInfo.EnvironmentVariables["NODE_PATH"] = nodePathValue;
#else
startInfo.Environment["NODE_PATH"] = nodePathValue;
#endif
return Process.Start(startInfo);
}
private void ConnectToInputOutputStreams()
{
var initializationIsCompleted = false;
_nodeProcess.OutputDataReceived += (sender, evt) =>
{
if (evt.Data == ConnectionEstablishedMessage && !initializationIsCompleted)
{
_connectionIsReadySource.SetResult(null);
initializationIsCompleted = true;
}
else if (evt.Data != null)
{
OnOutputDataReceived(evt.Data);
}
};
_nodeProcess.ErrorDataReceived += (sender, evt) =>
{
if (evt.Data != null)
{
if (!initializationIsCompleted)
{
_connectionIsReadySource.SetException(
new InvalidOperationException("The Node.js process failed to initialize: " + evt.Data));
initializationIsCompleted = true;
}
else
{
OnErrorDataReceived(evt.Data);
}
}
};
_nodeProcess.BeginOutputReadLine();
_nodeProcess.BeginErrorReadLine();
}
~OutOfProcessNodeInstance()
{
Dispose(false);

View File

@@ -10,6 +10,22 @@ using Newtonsoft.Json.Serialization;
namespace Microsoft.AspNetCore.NodeServices.HostingModels
{
/// <summary>
/// A specialisation of the OutOfProcessNodeInstance base class that uses a lightweight binary streaming protocol
/// to perform RPC invocations. The physical transport is Named Pipes on Windows, or Domain Sockets on Linux/Mac.
/// For details on the binary streaming protocol, see
/// Microsoft.AspNetCore.NodeServices.HostingModels.VirtualConnections.VirtualConnectionClient.
/// The advantage versus using HTTP for RPC is that this is faster (not surprisingly - there's much less overhead
/// because we don't need most of the functionality of HTTP.
///
/// The address of the pipe/socket is selected randomly here on the .NET side and sent to the child process as a
/// command-line argument (the address space is wide enough that there's no real risk of a clash, unlike when
/// selecting TCP port numbers).
///
/// TODO: Remove the file-watching logic from here and centralise it in OutOfProcessNodeInstance, implementing
/// the actual watching in .NET code (not Node), for consistency across platforms.
/// </summary>
/// <seealso cref="Microsoft.AspNetCore.NodeServices.HostingModels.OutOfProcessNodeInstance" />
internal class SocketNodeInstance : OutOfProcessNodeInstance
{
private readonly static JsonSerializerSettings jsonSerializerSettings = new JsonSerializerSettings
@@ -17,31 +33,48 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
ContractResolver = new CamelCasePropertyNamesContractResolver()
};
private string _addressForNextConnection;
private readonly SemaphoreSlim _clientModificationSemaphore = new SemaphoreSlim(1);
private StreamConnection _currentPhysicalConnection;
private VirtualConnectionClient _currentVirtualConnectionClient;
private readonly SemaphoreSlim _connectionCreationSemaphore = new SemaphoreSlim(1);
private bool _connectionHasFailed;
private StreamConnection _physicalConnection;
private string _socketAddress;
private VirtualConnectionClient _virtualConnectionClient;
private readonly string[] _watchFileExtensions;
public SocketNodeInstance(string projectPath, string[] watchFileExtensions = null): base(
public SocketNodeInstance(string projectPath, string[] watchFileExtensions, string socketAddress): base(
EmbeddedResourceReader.Read(
typeof(SocketNodeInstance),
"/Content/Node/entrypoint-socket.js"),
projectPath)
projectPath,
MakeNewCommandLineOptions(socketAddress, watchFileExtensions))
{
_watchFileExtensions = watchFileExtensions;
_socketAddress = socketAddress;
}
protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocationInfo)
{
await EnsureReady();
var virtualConnectionClient = await GetOrCreateVirtualConnectionClientAsync();
if (_connectionHasFailed)
{
// This special exception type forces NodeServicesImpl to restart the Node instance
throw new NodeInvocationException(
"The SocketNodeInstance socket connection failed. See logs to identify the reason.",
null,
nodeInstanceUnavailable: true);
}
if (_virtualConnectionClient == null)
{
await EnsureVirtualConnectionClientCreated();
}
// For each invocation, we open a new virtual connection. This gives an API equivalent to opening a new
// physical connection to the child process, but without the overhead of doing so, because it's really
// just multiplexed into the existing physical connection stream.
bool shouldDisposeVirtualConnection = true;
Stream virtualConnection = null;
try
{
virtualConnection = _currentVirtualConnectionClient.OpenVirtualConnection();
virtualConnection = _virtualConnectionClient.OpenVirtualConnection();
// Send request
await WriteJsonLineAsync(virtualConnection, invocationInfo);
@@ -75,46 +108,34 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
}
}
private async Task<VirtualConnectionClient> GetOrCreateVirtualConnectionClientAsync()
private async Task EnsureVirtualConnectionClientCreated()
{
var client = _currentVirtualConnectionClient;
if (client == null)
// Asynchronous equivalent to a 'lock(...) { ... }'
await _connectionCreationSemaphore.WaitAsync();
try
{
await _clientModificationSemaphore.WaitAsync();
try
if (_virtualConnectionClient == null)
{
if (_currentVirtualConnectionClient == null)
_physicalConnection = StreamConnection.Create();
var connection = await _physicalConnection.Open(_socketAddress);
_virtualConnectionClient = new VirtualConnectionClient(connection);
_virtualConnectionClient.OnError += (ex) =>
{
var address = _addressForNextConnection;
if (string.IsNullOrEmpty(address))
{
// This shouldn't happen, because we always await 'EnsureReady' before getting here.
throw new InvalidOperationException("Cannot open connection to Node process until it has signalled that it is ready");
}
// This callback is fired only if there's a protocol-level failure (e.g., child process disconnected
// unexpectedly). It does *not* fire when RPC calls return errors. Since there's been a protocol-level
// failure, this Node instance is no longer usable and should be discarded.
_connectionHasFailed = true;
_currentPhysicalConnection = StreamConnection.Create();
var connection = await _currentPhysicalConnection.Open(address);
_currentVirtualConnectionClient = new VirtualConnectionClient(connection);
_currentVirtualConnectionClient.OnError += (ex) =>
{
// TODO: Log the exception properly. Need to change the chain of calls up to this point to supply
// an ILogger or IServiceProvider etc.
Console.WriteLine(ex.Message);
ExitNodeProcess(); // We'll restart it next time there's a request to it
};
}
return _currentVirtualConnectionClient;
}
finally
{
_clientModificationSemaphore.Release();
// TODO: Log the exception properly. Need to change the chain of calls up to this point to supply
// an ILogger or IServiceProvider etc.
Console.WriteLine(ex.Message);
};
}
}
else
finally
{
return client;
_connectionCreationSemaphore.Release();
}
}
@@ -122,21 +143,22 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
{
if (disposing)
{
EnsurePipeRpcClientDisposed();
if (_virtualConnectionClient != null)
{
_virtualConnectionClient.Dispose();
_virtualConnectionClient = null;
}
if (_physicalConnection != null)
{
_physicalConnection.Dispose();
_physicalConnection = null;
}
}
base.Dispose(disposing);
}
protected override void OnBeforeLaunchProcess()
{
// Either we've never yet launched the Node process, or we did but the old one died.
// Stop waiting for any outstanding requests and prepare to launch the new process.
EnsurePipeRpcClientDisposed();
_addressForNextConnection = "pni-" + Guid.NewGuid().ToString("D"); // Arbitrary non-clashing string
CommandLineArguments = MakeNewCommandLineOptions(_addressForNextConnection, _watchFileExtensions);
}
private static async Task WriteJsonLineAsync(Stream stream, object serializableObject)
{
var json = JsonConvert.SerializeObject(serializableObject, jsonSerializerSettings);
@@ -166,9 +188,9 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
}
}
private static string MakeNewCommandLineOptions(string pipeName, string[] watchFileExtensions)
private static string MakeNewCommandLineOptions(string listenAddress, string[] watchFileExtensions)
{
var result = "--pipename " + pipeName;
var result = "--listenAddress " + listenAddress;
if (watchFileExtensions != null && watchFileExtensions.Length > 0)
{
result += " --watch " + string.Join(",", watchFileExtensions);
@@ -177,30 +199,6 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
return result;
}
private void EnsurePipeRpcClientDisposed()
{
_clientModificationSemaphore.Wait();
try
{
if (_currentVirtualConnectionClient != null)
{
_currentVirtualConnectionClient.Dispose();
_currentVirtualConnectionClient = null;
}
if (_currentPhysicalConnection != null)
{
_currentPhysicalConnection.Dispose();
_currentPhysicalConnection = null;
}
}
finally
{
_clientModificationSemaphore.Release();
}
}
#pragma warning disable 649 // These properties are populated via JSON deserialization
private class RpcJsonResponse<TResult>
{

View File

@@ -37,9 +37,44 @@ namespace Microsoft.AspNetCore.NodeServices
}
public Task<T> InvokeExportAsync<T>(string moduleName, string exportedFunctionName, params object[] args)
{
return InvokeExportWithPossibleRetryAsync<T>(moduleName, exportedFunctionName, args, allowRetry: true);
}
public async Task<T> InvokeExportWithPossibleRetryAsync<T>(string moduleName, string exportedFunctionName, object[] args, bool allowRetry)
{
var nodeInstance = GetOrCreateCurrentNodeInstance();
return nodeInstance.InvokeExportAsync<T>(moduleName, exportedFunctionName, args);
try
{
return await nodeInstance.InvokeExportAsync<T>(moduleName, exportedFunctionName, args);
}
catch (NodeInvocationException ex)
{
// If the Node instance can't complete the invocation because it needs to restart (e.g., because the underlying
// Node process has exited, or a file it depends on has changed), then we make one attempt to restart transparently.
if (allowRetry && ex.NodeInstanceUnavailable)
{
// Perform the retry after clearing away the old instance
lock (_currentNodeInstanceAccessLock)
{
if (_currentNodeInstance == nodeInstance)
{
DisposeNodeInstance(_currentNodeInstance);
_currentNodeInstance = null;
}
}
// One the next call, don't allow retries, because we could get into an infinite retry loop, or a long retry
// loop that masks an underlying problem. A newly-created Node instance should be able to accept invocations,
// or something more serious must be wrong.
return await InvokeExportWithPossibleRetryAsync<T>(moduleName, exportedFunctionName, args, allowRetry: false);
}
else
{
throw;
}
}
}
public void Dispose()
@@ -48,12 +83,19 @@ namespace Microsoft.AspNetCore.NodeServices
{
if (_currentNodeInstance != null)
{
_currentNodeInstance.Dispose();
DisposeNodeInstance(_currentNodeInstance);
_currentNodeInstance = null;
}
}
}
private static void DisposeNodeInstance(INodeInstance nodeInstance)
{
// TODO: Implement delayed disposal for connection draining
// Or consider having the delayedness of it being a responsibility of the INodeInstance
nodeInstance.Dispose();
}
private INodeInstance GetOrCreateCurrentNodeInstance()
{
var instance = _currentNodeInstance;

View File

@@ -69,7 +69,7 @@ virtualConnectionServer.createInterface(server).on('connection', (connection: Du
// Begin listening now. The underlying transport varies according to the runtime platform.
// On Windows it's Named Pipes; on Linux/OSX it's Domain Sockets.
const useWindowsNamedPipes = /^win/.test(process.platform);
const listenAddress = (useWindowsNamedPipes ? '\\\\.\\pipe\\' : '/tmp/') + parsedArgs.pipename;
const listenAddress = (useWindowsNamedPipes ? '\\\\.\\pipe\\' : '/tmp/') + parsedArgs.listenAddress;
server.listen(listenAddress);
interface RpcInvocation {