Implement SocketNodeInstance

This commit is contained in:
SteveSandersonMS
2016-06-01 16:05:29 +01:00
parent 32ebaecdd8
commit 341cd4f1cb
23 changed files with 3982 additions and 8 deletions

View File

@@ -0,0 +1,12 @@
using System;
namespace Microsoft.AspNetCore.NodeServices
{
public class NodeInvocationException : Exception
{
public NodeInvocationException(string message, string details)
: base(message + Environment.NewLine + details)
{
}
}
}

View File

@@ -13,7 +13,7 @@ namespace Microsoft.AspNetCore.NodeServices
public abstract class OutOfProcessNodeInstance : INodeServices
{
private readonly object _childProcessLauncherLock;
private readonly string _commandLineArguments;
private string _commandLineArguments;
private readonly StringAsTempFile _entryPointScript;
private Process _nodeProcess;
private TaskCompletionSource<bool> _nodeProcessIsReadySource;
@@ -27,6 +27,12 @@ namespace Microsoft.AspNetCore.NodeServices
_projectPath = projectPath;
_commandLineArguments = commandLineArguments ?? string.Empty;
}
public string CommandLineArguments
{
get { return _commandLineArguments; }
set { _commandLineArguments = value; }
}
protected Process NodeProcess
{
@@ -59,12 +65,23 @@ namespace Microsoft.AspNetCore.NodeServices
public abstract Task<T> Invoke<T>(NodeInvocationInfo invocationInfo);
protected void ExitNodeProcess()
{
if (_nodeProcess != null && !_nodeProcess.HasExited)
{
// TODO: Is there a more graceful way to end it? Or does this still let it perform any cleanup?
_nodeProcess.Kill();
}
}
protected async Task EnsureReady()
{
lock (_childProcessLauncherLock)
{
if (_nodeProcess == null || _nodeProcess.HasExited)
{
this.OnBeforeLaunchProcess();
var startInfo = new ProcessStartInfo("node")
{
Arguments = "\"" + _entryPointScript.FileName + "\" " + _commandLineArguments,
@@ -89,7 +106,6 @@ namespace Microsoft.AspNetCore.NodeServices
startInfo.Environment.Add("NODE_PATH", nodePathValue);
#endif
OnBeforeLaunchProcess();
_nodeProcess = Process.Start(startInfo);
ConnectToInputOutputStreams();
}
@@ -162,11 +178,7 @@ namespace Microsoft.AspNetCore.NodeServices
_entryPointScript.Dispose();
}
if (_nodeProcess != null && !_nodeProcess.HasExited)
{
_nodeProcess.Kill();
// TODO: Is there a more graceful way to end it? Or does this still let it perform any cleanup?
}
ExitNodeProcess();
_disposed = true;
}

View File

@@ -0,0 +1,32 @@
using System.IO;
using System.IO.Pipes;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.NodeServices.HostingModels.PhysicalConnections
{
internal class NamedPipeConnection : StreamConnection
{
private bool _disposedValue = false;
private NamedPipeClientStream _namedPipeClientStream;
public override async Task<Stream> Open(string address)
{
_namedPipeClientStream = new NamedPipeClientStream(".", address, PipeDirection.InOut);
await _namedPipeClientStream.ConnectAsync().ConfigureAwait(false);
return _namedPipeClientStream;
}
public override void Dispose()
{
if (!_disposedValue)
{
if (_namedPipeClientStream != null)
{
_namedPipeClientStream.Dispose();
}
_disposedValue = true;
}
}
}
}

View File

@@ -0,0 +1,26 @@
using System;
using System.IO;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.NodeServices.HostingModels.PhysicalConnections
{
internal abstract class StreamConnection : IDisposable
{
public abstract Task<Stream> Open(string address);
public abstract void Dispose();
public static StreamConnection Create()
{
var useNamedPipes = RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
if (useNamedPipes)
{
return new NamedPipeConnection();
}
else
{
return new UnixDomainSocketConnection();
}
}
}
}

View File

@@ -0,0 +1,40 @@
using System.IO;
using System.Net.Sockets;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.NodeServices.HostingModels.PhysicalConnections
{
internal class UnixDomainSocketConnection : StreamConnection
{
private bool _disposedValue = false;
private NetworkStream _networkStream;
private Socket _socket;
public override async Task<Stream> Open(string address)
{
var endPoint = new UnixDomainSocketEndPoint("/tmp/" + address);
_socket = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Unspecified);
await _socket.ConnectAsync(endPoint).ConfigureAwait(false);
_networkStream = new NetworkStream(_socket);
return _networkStream;
}
public override void Dispose()
{
if (!_disposedValue)
{
if (_networkStream != null)
{
_networkStream.Dispose();
}
if (_socket != null)
{
_socket.Dispose();
}
_disposedValue = true;
}
}
}
}

View File

@@ -0,0 +1,86 @@
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
namespace Microsoft.AspNetCore.NodeServices.HostingModels.PhysicalConnections
{
// From System.IO.Pipes/src/System/Net/Sockets/UnixDomainSocketEndPoint.cs (an internal class in System.IO.Pipes)
internal sealed class UnixDomainSocketEndPoint : EndPoint
{
private const AddressFamily EndPointAddressFamily = AddressFamily.Unix;
private static readonly Encoding s_pathEncoding = Encoding.UTF8;
private static readonly int s_nativePathOffset = 2; // = offsetof(struct sockaddr_un, sun_path). It's the same on Linux and OSX
private static readonly int s_nativePathLength = 91; // sockaddr_un.sun_path at http://pubs.opengroup.org/onlinepubs/9699919799/basedefs/sys_un.h.html, -1 for terminator
private static readonly int s_nativeAddressSize = s_nativePathOffset + s_nativePathLength;
private readonly string _path;
private readonly byte[] _encodedPath;
public UnixDomainSocketEndPoint(string path)
{
if (path == null)
{
throw new ArgumentNullException(nameof(path));
}
_path = path;
_encodedPath = s_pathEncoding.GetBytes(_path);
if (path.Length == 0 || _encodedPath.Length > s_nativePathLength)
{
throw new ArgumentOutOfRangeException(nameof(path));
}
}
internal UnixDomainSocketEndPoint(SocketAddress socketAddress)
{
if (socketAddress == null)
{
throw new ArgumentNullException(nameof(socketAddress));
}
if (socketAddress.Family != EndPointAddressFamily ||
socketAddress.Size > s_nativeAddressSize)
{
throw new ArgumentOutOfRangeException(nameof(socketAddress));
}
if (socketAddress.Size > s_nativePathOffset)
{
_encodedPath = new byte[socketAddress.Size - s_nativePathOffset];
for (int i = 0; i < _encodedPath.Length; i++)
{
_encodedPath[i] = socketAddress[s_nativePathOffset + i];
}
_path = s_pathEncoding.GetString(_encodedPath, 0, _encodedPath.Length);
}
else
{
_encodedPath = Array.Empty<byte>();
_path = string.Empty;
}
}
public override SocketAddress Serialize()
{
var result = new SocketAddress(AddressFamily.Unix, s_nativeAddressSize);
for (int index = 0; index < _encodedPath.Length; index++)
{
result[s_nativePathOffset + index] = _encodedPath[index];
}
result[s_nativePathOffset + _encodedPath.Length] = 0; // path must be null-terminated
return result;
}
public override EndPoint Create(SocketAddress socketAddress) => new UnixDomainSocketEndPoint(socketAddress);
public override AddressFamily AddressFamily => EndPointAddressFamily;
public override string ToString() => _path;
}
}

View File

@@ -0,0 +1,191 @@
using System;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.NodeServices.HostingModels.PhysicalConnections;
using Microsoft.AspNetCore.NodeServices.HostingModels.VirtualConnections;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;
namespace Microsoft.AspNetCore.NodeServices
{
internal class SocketNodeInstance : OutOfProcessNodeInstance
{
private readonly static JsonSerializerSettings jsonSerializerSettings = new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver()
};
private string _addressForNextConnection;
private readonly SemaphoreSlim _clientModificationSemaphore = new SemaphoreSlim(1);
private StreamConnection _currentPhysicalConnection;
private VirtualConnectionClient _currentVirtualConnectionClient;
private readonly string[] _watchFileExtensions;
public SocketNodeInstance(string projectPath, string[] watchFileExtensions = null): base(
EmbeddedResourceReader.Read(
typeof(SocketNodeInstance),
"/Content/Node/entrypoint-socket.js"),
projectPath)
{
_watchFileExtensions = watchFileExtensions;
}
public override async Task<T> Invoke<T>(NodeInvocationInfo invocationInfo)
{
await EnsureReady();
var virtualConnectionClient = await GetOrCreateVirtualConnectionClientAsync();
using (var virtualConnection = _currentVirtualConnectionClient.OpenVirtualConnection())
{
// Send request
await WriteJsonLineAsync(virtualConnection, invocationInfo);
// Receive response
var response = await ReadJsonAsync<RpcResponse<T>>(virtualConnection);
if (response.ErrorMessage != null)
{
throw new NodeInvocationException(response.ErrorMessage, response.ErrorDetails);
}
return response.Result;
}
}
private async Task<VirtualConnectionClient> GetOrCreateVirtualConnectionClientAsync()
{
var client = _currentVirtualConnectionClient;
if (client == null)
{
await _clientModificationSemaphore.WaitAsync();
try
{
if (_currentVirtualConnectionClient == null)
{
var address = _addressForNextConnection;
if (string.IsNullOrEmpty(address))
{
// This shouldn't happen, because we always await 'EnsureReady' before getting here.
throw new InvalidOperationException("Cannot open connection to Node process until it has signalled that it is ready");
}
_currentPhysicalConnection = StreamConnection.Create();
var connection = await _currentPhysicalConnection.Open(address);
_currentVirtualConnectionClient = new VirtualConnectionClient(connection);
_currentVirtualConnectionClient.OnError += (ex) =>
{
// TODO: Log the exception properly. Need to change the chain of calls up to this point to supply
// an ILogger or IServiceProvider etc.
Console.WriteLine(ex.Message);
ExitNodeProcess(); // We'll restart it next time there's a request to it
};
}
return _currentVirtualConnectionClient;
}
finally
{
_clientModificationSemaphore.Release();
}
}
else
{
return client;
}
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
EnsurePipeRpcClientDisposed();
}
base.Dispose(disposing);
}
protected override void OnBeforeLaunchProcess()
{
// Either we've never yet launched the Node process, or we did but the old one died.
// Stop waiting for any outstanding requests and prepare to launch the new process.
EnsurePipeRpcClientDisposed();
_addressForNextConnection = "pni-" + Guid.NewGuid().ToString("D"); // Arbitrary non-clashing string
CommandLineArguments = MakeNewCommandLineOptions(_addressForNextConnection, _watchFileExtensions);
}
private static async Task WriteJsonLineAsync(Stream stream, object serializableObject)
{
var json = JsonConvert.SerializeObject(serializableObject, jsonSerializerSettings);
var bytes = Encoding.UTF8.GetBytes(json + '\n');
await stream.WriteAsync(bytes, 0, bytes.Length);
}
private static async Task<T> ReadJsonAsync<T>(Stream stream)
{
var json = Encoding.UTF8.GetString(await ReadAllBytesAsync(stream));
return JsonConvert.DeserializeObject<T>(json, jsonSerializerSettings);
}
private static async Task<byte[]> ReadAllBytesAsync(Stream input)
{
byte[] buffer = new byte[16*1024];
using (var ms = new MemoryStream())
{
int read;
while ((read = await input.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
ms.Write(buffer, 0, read);
}
return ms.ToArray();
}
}
private static string MakeNewCommandLineOptions(string pipeName, string[] watchFileExtensions)
{
var result = "--pipename " + pipeName;
if (watchFileExtensions != null && watchFileExtensions.Length > 0)
{
result += " --watch " + string.Join(",", watchFileExtensions);
}
return result;
}
private void EnsurePipeRpcClientDisposed()
{
_clientModificationSemaphore.Wait();
try
{
if (_currentVirtualConnectionClient != null)
{
_currentVirtualConnectionClient.Dispose();
_currentVirtualConnectionClient = null;
}
if (_currentPhysicalConnection != null)
{
_currentPhysicalConnection.Dispose();
_currentPhysicalConnection = null;
}
}
finally
{
_clientModificationSemaphore.Release();
}
}
#pragma warning disable 649 // These properties are populated via JSON deserialization
private class RpcResponse<TResult>
{
public TResult Result { get; set; }
public string ErrorMessage { get; set; }
public string ErrorDetails { get; set; }
}
#pragma warning restore 649
}
}

View File

@@ -0,0 +1,149 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Microsoft.AspNetCore.NodeServices.HostingModels.VirtualConnections
{
/// <summary>
/// A virtual read/write connection, typically to a remote process. Multiple virtual connections can be
/// multiplexed over a single physical connection (e.g., a named pipe, domain socket, or TCP socket).
/// </summary>
internal class VirtualConnection : Stream
{
private VirtualConnectionClient _host;
private readonly BufferBlock<byte[]> _receivedDataQueue = new BufferBlock<byte[]>();
private ArraySegment<byte> _receivedDataNotYetUsed;
private bool _wasClosedByRemote;
private bool _isDisposed;
public VirtualConnection(long id, VirtualConnectionClient host)
{
Id = id;
_host = host;
}
public long Id { get; }
public override bool CanRead { get { return true; } }
public override bool CanSeek { get { return false; } }
public override bool CanWrite { get { return true; } }
public override long Length
{
get { throw new NotImplementedException(); }
}
public override long Position
{
get { throw new NotImplementedException(); }
set { throw new NotImplementedException(); }
}
public override void Flush()
{
// We're auto-flushing, so this is a no-op.
}
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (_wasClosedByRemote)
{
return 0;
}
var bytesRead = 0;
while (true)
{
// Pull as many applicable bytes as we can out of receivedDataNotYetUsed, then update its offset/length
int bytesToExtract = Math.Min(count - bytesRead, _receivedDataNotYetUsed.Count);
if (bytesToExtract > 0)
{
Buffer.BlockCopy(_receivedDataNotYetUsed.Array, _receivedDataNotYetUsed.Offset, buffer, bytesRead, bytesToExtract);
_receivedDataNotYetUsed = new ArraySegment<byte>(_receivedDataNotYetUsed.Array, _receivedDataNotYetUsed.Offset + bytesToExtract, _receivedDataNotYetUsed.Count - bytesToExtract);
bytesRead += bytesToExtract;
}
// If we've completely filled the output buffer, we're done
if (bytesRead == count)
{
return bytesRead;
}
// We haven't yet filled the output buffer, so we must have exhausted receivedDataNotYetUsed instead.
// We want to get the next block of data from the underlying queue.
byte[] nextReceivedBlock;
if (bytesRead > 0)
{
if (!_receivedDataQueue.TryReceive(null, out nextReceivedBlock))
{
// No more data is available synchronously, and we already have some data, so we can stop now
return bytesRead;
}
}
else
{
// Since we don't yet have anything, wait for the underlying source
nextReceivedBlock = await _receivedDataQueue.ReceiveAsync(cancellationToken);
}
if (nextReceivedBlock.Length == 0)
{
// A zero-length block signals that the remote regards this virtual connection as closed
_wasClosedByRemote = true;
return bytesRead;
}
else
{
// We got some more data, so can continue trying to fill the output buffer
_receivedDataNotYetUsed = new ArraySegment<byte>(nextReceivedBlock, 0, nextReceivedBlock.Length);
}
}
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (_wasClosedByRemote)
{
throw new InvalidOperationException("The connection was already closed by the remote party");
}
return count > 0 ? _host.WriteAsync(Id, buffer, offset, count, cancellationToken) : Task.CompletedTask;
}
public override int Read(byte[] buffer, int offset, int count)
{
return ReadAsync(buffer, offset, count, CancellationToken.None).Result;
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
WriteAsync(buffer, offset, count, CancellationToken.None).Wait();
}
protected override void Dispose(bool disposing)
{
if (disposing && !_isDisposed)
{
_isDisposed = true;
_host.CloseInnerStream(Id, _wasClosedByRemote);
}
}
public async Task AddDataToQueue(byte[] data)
{
await _receivedDataQueue.SendAsync(data);
}
}
}

View File

@@ -0,0 +1,230 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.NodeServices.HostingModels.VirtualConnections
{
public delegate void VirtualConnectionReadErrorHandler(Exception ex);
/// <summary>
/// Wraps an underlying physical read/write stream (e.g., named pipes, domain sockets, or TCP sockets) and
/// exposes an API for making 'virtual connections', which act as independent read/write streams.
/// Traffic over these virtual connections is multiplexed over the underlying physical stream. This is useful
/// for fast stream-based inter-process communication because it avoids the overhead of opening a new physical
/// connection each time a new communication channel is needed.
/// </summary>
internal class VirtualConnectionClient : IDisposable
{
internal const int MaxFrameBodyLength = 16 * 1024;
public event VirtualConnectionReadErrorHandler OnError;
private Stream _underlyingTransport;
private Dictionary<long, VirtualConnection> _activeInnerStreams;
private long _nextInnerStreamId;
private readonly SemaphoreSlim _streamWriterSemaphore = new SemaphoreSlim(1);
private readonly object _readControlLock = new object();
private Exception _readLoopExitedWithException;
private readonly CancellationTokenSource _disposalCancellatonToken = new CancellationTokenSource();
private bool _disposedValue = false;
public VirtualConnectionClient(Stream underlyingTransport)
{
_underlyingTransport = underlyingTransport;
_activeInnerStreams = new Dictionary<long, VirtualConnection>();
RunReadLoop();
}
public Stream OpenVirtualConnection()
{
// Improve discoverability of read-loop errors (in case the developer doesn't add an OnError listener)
ThrowIfReadLoopFailed();
var id = Interlocked.Increment(ref _nextInnerStreamId);
var newInnerStream = new VirtualConnection(id, this);
_activeInnerStreams.Add(id, newInnerStream);
return newInnerStream;
}
// It's async void because nothing waits for it to finish (it continues indefinitely). It signals any errors via
// a separate channel.
private async void RunReadLoop()
{
try
{
while (!_disposalCancellatonToken.IsCancellationRequested)
{
var remoteIsStillConnected = await ProcessNextFrameAsync();
if (!remoteIsStillConnected)
{
CloseAllActiveStreams();
}
}
}
catch (Exception ex)
{
// Not all underlying transports correctly honor cancellation tokens. For example,
// DomainSocketStreamTransport's ReadAsync ignores them, so we only know to stop
// the read loop when the underlying stream is disposed and then it throws ObjectDisposedException.
if (!(ex is TaskCanceledException || ex is ObjectDisposedException))
{
_readLoopExitedWithException = ex;
var evt = OnError;
if (evt != null)
{
evt(ex);
}
}
}
}
private async Task<bool> ProcessNextFrameAsync()
{
// First read frame header
var frameHeaderBuffer = await ReadExactLength(12);
if (frameHeaderBuffer == null)
{
return false; // Underlying stream was closed
}
// Parse frame header, then read the frame body
long streamId = BitConverter.ToInt64(frameHeaderBuffer, 0);
int frameBodyLength = BitConverter.ToInt32(frameHeaderBuffer, 8);
if (frameBodyLength < 0 || frameBodyLength > MaxFrameBodyLength)
{
throw new InvalidDataException("Illegal frame length: " + frameBodyLength);
}
var frameBody = await ReadExactLength(frameBodyLength);
if (frameBody == null)
{
return false; // Underlying stream was closed
}
// Dispatch the frame to the relevant inner stream
VirtualConnection innerStream;
lock (_activeInnerStreams)
{
_activeInnerStreams.TryGetValue(streamId, out innerStream);
}
if (innerStream != null)
{
await innerStream.AddDataToQueue(frameBody);
}
return true;
}
private async Task<byte[]> ReadExactLength(int lengthToRead) {
byte[] buffer = new byte[lengthToRead];
var totalBytesRead = 0;
var ct = _disposalCancellatonToken.Token;
while (totalBytesRead < lengthToRead)
{
var chunkLengthRead = await _underlyingTransport.ReadAsync(buffer, totalBytesRead, lengthToRead - totalBytesRead, ct);
if (chunkLengthRead == 0)
{
// Underlying stream was closed
return null;
}
totalBytesRead += chunkLengthRead;
}
return buffer;
}
private void CloseAllActiveStreams()
{
IList<VirtualConnection> innerStreamsCopy;
// Only hold the lock while cloning the list of inner streams. Release the lock before
// actually disposing them, because each 'dispose' call will try to take another lock
// so it can remove that inner stream from activeInnerStreams.
lock (_activeInnerStreams)
{
innerStreamsCopy = _activeInnerStreams.Values.ToList();
}
foreach (var stream in innerStreamsCopy)
{
stream.Dispose();
}
}
public void Dispose()
{
if (!_disposedValue)
{
_disposedValue = true;
_disposalCancellatonToken.Cancel(); // Stops the read loop
CloseAllActiveStreams();
}
}
public async Task WriteAsync(long innerStreamId, byte[] data, int offset, int count, CancellationToken cancellationToken)
{
// In case the amount of data to be sent exceeds the max frame length, split it into separate frames
// Note that we always send at least one frame, even if it's empty, because the zero-length frame is the signal to close a virtual connection
// (hence 'do..while' instead of just 'while').
int bytesWritten = 0;
do {
// Improve discoverability of read-loop errors (in case the developer doesn't add an OnError listener)
ThrowIfReadLoopFailed();
// Hold the write lock only for the time taken to send a single frame, not all frames, to allow large sends to be proceed in parallel
await _streamWriterSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
// Write stream ID, then length prefix, then chunk payload, then flush
var nextChunkBodyLength = Math.Min(MaxFrameBodyLength, count - bytesWritten);
await _underlyingTransport.WriteAsync(BitConverter.GetBytes(innerStreamId), 0, 8, cancellationToken).ConfigureAwait(false);
await _underlyingTransport.WriteAsync(BitConverter.GetBytes(nextChunkBodyLength), 0, 4, cancellationToken).ConfigureAwait(false);
if (nextChunkBodyLength > 0)
{
await _underlyingTransport.WriteAsync(data, offset + bytesWritten, nextChunkBodyLength, cancellationToken).ConfigureAwait(false);
bytesWritten += nextChunkBodyLength;
}
await _underlyingTransport.FlushAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
_streamWriterSemaphore.Release();
}
} while (bytesWritten < count);
}
public void CloseInnerStream(long innerStreamId, bool isAlreadyClosedRemotely)
{
lock (_activeInnerStreams)
{
if (_activeInnerStreams.ContainsKey(innerStreamId))
{
_activeInnerStreams.Remove(innerStreamId);
}
}
if (!isAlreadyClosedRemotely) {
// Also notify the remote that this innerstream is closed
WriteAsync(innerStreamId, new byte[0], 0, 0, new CancellationToken()).Wait();
}
}
private void ThrowIfReadLoopFailed()
{
if (_readLoopExitedWithException != null)
{
throw new AggregateException("The connection failed - see InnerException for details.", _readLoopExitedWithException);
}
}
}
}