mirror of
https://github.com/aspnet/JavaScriptServices.git
synced 2025-12-23 01:58:29 +00:00
Split out 'socket' hosting model into a separate optional NuGet package, since most developers won't need it
This commit is contained in:
@@ -1,5 +1,3 @@
|
||||
using System;
|
||||
|
||||
namespace Microsoft.AspNetCore.NodeServices.HostingModels
|
||||
{
|
||||
/// <summary>
|
||||
@@ -15,16 +13,5 @@ namespace Microsoft.AspNetCore.NodeServices.HostingModels
|
||||
{
|
||||
options.NodeInstanceFactory = () => new HttpNodeInstance(options);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Configures the <see cref="INodeServices"/> service so that it will use out-of-process
|
||||
/// Node.js instances and perform RPC calls over binary sockets (on Windows, this is
|
||||
/// implemented as named pipes; on other platforms it uses domain sockets).
|
||||
/// </summary>
|
||||
public static void UseSocketHosting(this NodeServicesOptions options)
|
||||
{
|
||||
var pipeName = "pni-" + Guid.NewGuid().ToString("D"); // Arbitrary non-clashing string
|
||||
options.NodeInstanceFactory = () => new SocketNodeInstance(options, pipeName);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,44 +0,0 @@
|
||||
using System.IO;
|
||||
using System.IO.Pipes;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Microsoft.AspNetCore.NodeServices.HostingModels.PhysicalConnections
|
||||
{
|
||||
internal class NamedPipeConnection : StreamConnection
|
||||
{
|
||||
private bool _disposedValue = false;
|
||||
private NamedPipeClientStream _namedPipeClientStream;
|
||||
|
||||
#pragma warning disable 1998 // Because in the NET451 code path, there's nothing to await
|
||||
public override async Task<Stream> Open(string address)
|
||||
{
|
||||
_namedPipeClientStream = new NamedPipeClientStream(
|
||||
".",
|
||||
address,
|
||||
PipeDirection.InOut,
|
||||
PipeOptions.Asynchronous);
|
||||
|
||||
#if NET451
|
||||
_namedPipeClientStream.Connect();
|
||||
#else
|
||||
await _namedPipeClientStream.ConnectAsync().ConfigureAwait(false);
|
||||
#endif
|
||||
|
||||
return _namedPipeClientStream;
|
||||
}
|
||||
#pragma warning restore 1998
|
||||
|
||||
public override void Dispose()
|
||||
{
|
||||
if (!_disposedValue)
|
||||
{
|
||||
if (_namedPipeClientStream != null)
|
||||
{
|
||||
_namedPipeClientStream.Dispose();
|
||||
}
|
||||
|
||||
_disposedValue = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,30 +0,0 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Microsoft.AspNetCore.NodeServices.HostingModels.PhysicalConnections
|
||||
{
|
||||
internal abstract class StreamConnection : IDisposable
|
||||
{
|
||||
public abstract Task<Stream> Open(string address);
|
||||
public abstract void Dispose();
|
||||
|
||||
public static StreamConnection Create()
|
||||
{
|
||||
#if NET451
|
||||
return new NamedPipeConnection();
|
||||
#else
|
||||
var useNamedPipes = System.Runtime.InteropServices.RuntimeInformation.IsOSPlatform(
|
||||
System.Runtime.InteropServices.OSPlatform.Windows);
|
||||
if (useNamedPipes)
|
||||
{
|
||||
return new NamedPipeConnection();
|
||||
}
|
||||
else
|
||||
{
|
||||
return new UnixDomainSocketConnection();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,51 +0,0 @@
|
||||
using System.IO;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Microsoft.AspNetCore.NodeServices.HostingModels.PhysicalConnections
|
||||
{
|
||||
internal class UnixDomainSocketConnection : StreamConnection
|
||||
{
|
||||
private bool _disposedValue = false;
|
||||
private NetworkStream _networkStream;
|
||||
private Socket _socket;
|
||||
|
||||
#if NET451
|
||||
public override Task<Stream> Open(string address)
|
||||
{
|
||||
// The 'null' assignments avoid the compiler warnings about unassigned fields.
|
||||
// Note that this whole class isn't supported on .NET 4.5.1, since that's not cross-platform.
|
||||
_networkStream = null;
|
||||
_socket = null;
|
||||
throw new System.PlatformNotSupportedException();
|
||||
}
|
||||
#else
|
||||
public override async Task<Stream> Open(string address)
|
||||
{
|
||||
var endPoint = new UnixDomainSocketEndPoint("/tmp/" + address);
|
||||
_socket = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Unspecified);
|
||||
await _socket.ConnectAsync(endPoint).ConfigureAwait(false);
|
||||
_networkStream = new NetworkStream(_socket);
|
||||
return _networkStream;
|
||||
}
|
||||
#endif
|
||||
|
||||
public override void Dispose()
|
||||
{
|
||||
if (!_disposedValue)
|
||||
{
|
||||
if (_networkStream != null)
|
||||
{
|
||||
_networkStream.Dispose();
|
||||
}
|
||||
|
||||
if (_socket != null)
|
||||
{
|
||||
_socket.Dispose();
|
||||
}
|
||||
|
||||
_disposedValue = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,90 +0,0 @@
|
||||
using System;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Text;
|
||||
|
||||
namespace Microsoft.AspNetCore.NodeServices.HostingModels.PhysicalConnections
|
||||
{
|
||||
// From System.IO.Pipes/src/System/Net/Sockets/UnixDomainSocketEndPoint.cs (an internal class in System.IO.Pipes)
|
||||
internal sealed class UnixDomainSocketEndPoint : EndPoint
|
||||
{
|
||||
private const AddressFamily EndPointAddressFamily = AddressFamily.Unix;
|
||||
|
||||
private static readonly Encoding s_pathEncoding = Encoding.UTF8;
|
||||
private static readonly int s_nativePathOffset = 2; // = offsetof(struct sockaddr_un, sun_path). It's the same on Linux and OSX
|
||||
private static readonly int s_nativePathLength = 91; // sockaddr_un.sun_path at http://pubs.opengroup.org/onlinepubs/9699919799/basedefs/sys_un.h.html, -1 for terminator
|
||||
private static readonly int s_nativeAddressSize = s_nativePathOffset + s_nativePathLength;
|
||||
|
||||
private readonly string _path;
|
||||
private readonly byte[] _encodedPath;
|
||||
|
||||
public UnixDomainSocketEndPoint(string path)
|
||||
{
|
||||
if (path == null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(path));
|
||||
}
|
||||
|
||||
_path = path;
|
||||
_encodedPath = s_pathEncoding.GetBytes(_path);
|
||||
|
||||
if (path.Length == 0 || _encodedPath.Length > s_nativePathLength)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(nameof(path));
|
||||
}
|
||||
}
|
||||
|
||||
internal UnixDomainSocketEndPoint(SocketAddress socketAddress)
|
||||
{
|
||||
if (socketAddress == null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(socketAddress));
|
||||
}
|
||||
|
||||
if (socketAddress.Family != EndPointAddressFamily ||
|
||||
socketAddress.Size > s_nativeAddressSize)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(nameof(socketAddress));
|
||||
}
|
||||
|
||||
if (socketAddress.Size > s_nativePathOffset)
|
||||
{
|
||||
_encodedPath = new byte[socketAddress.Size - s_nativePathOffset];
|
||||
for (int i = 0; i < _encodedPath.Length; i++)
|
||||
{
|
||||
_encodedPath[i] = socketAddress[s_nativePathOffset + i];
|
||||
}
|
||||
|
||||
_path = s_pathEncoding.GetString(_encodedPath, 0, _encodedPath.Length);
|
||||
}
|
||||
else
|
||||
{
|
||||
#if NET451
|
||||
_encodedPath = new byte[0];
|
||||
#else
|
||||
_encodedPath = Array.Empty<byte>();
|
||||
#endif
|
||||
_path = string.Empty;
|
||||
}
|
||||
}
|
||||
|
||||
public override SocketAddress Serialize()
|
||||
{
|
||||
var result = new SocketAddress(AddressFamily.Unix, s_nativeAddressSize);
|
||||
|
||||
for (int index = 0; index < _encodedPath.Length; index++)
|
||||
{
|
||||
result[s_nativePathOffset + index] = _encodedPath[index];
|
||||
}
|
||||
result[s_nativePathOffset + _encodedPath.Length] = 0; // path must be null-terminated
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public override EndPoint Create(SocketAddress socketAddress) => new UnixDomainSocketEndPoint(socketAddress);
|
||||
|
||||
public override AddressFamily AddressFamily => EndPointAddressFamily;
|
||||
|
||||
public override string ToString() => _path;
|
||||
}
|
||||
}
|
||||
@@ -1,225 +0,0 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.NodeServices.HostingModels.PhysicalConnections;
|
||||
using Microsoft.AspNetCore.NodeServices.HostingModels.VirtualConnections;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Newtonsoft.Json;
|
||||
using Newtonsoft.Json.Serialization;
|
||||
|
||||
namespace Microsoft.AspNetCore.NodeServices.HostingModels
|
||||
{
|
||||
/// <summary>
|
||||
/// A specialisation of the OutOfProcessNodeInstance base class that uses a lightweight binary streaming protocol
|
||||
/// to perform RPC invocations. The physical transport is Named Pipes on Windows, or Domain Sockets on Linux/Mac.
|
||||
/// For details on the binary streaming protocol, see
|
||||
/// Microsoft.AspNetCore.NodeServices.HostingModels.VirtualConnections.VirtualConnectionClient.
|
||||
/// The advantage versus using HTTP for RPC is that this is faster (not surprisingly - there's much less overhead
|
||||
/// because we don't need most of the functionality of HTTP.
|
||||
///
|
||||
/// The address of the pipe/socket is selected randomly here on the .NET side and sent to the child process as a
|
||||
/// command-line argument (the address space is wide enough that there's no real risk of a clash, unlike when
|
||||
/// selecting TCP port numbers).
|
||||
/// </summary>
|
||||
/// <seealso cref="Microsoft.AspNetCore.NodeServices.HostingModels.OutOfProcessNodeInstance" />
|
||||
internal class SocketNodeInstance : OutOfProcessNodeInstance
|
||||
{
|
||||
private readonly static JsonSerializerSettings jsonSerializerSettings = new JsonSerializerSettings
|
||||
{
|
||||
ContractResolver = new CamelCasePropertyNamesContractResolver(),
|
||||
TypeNameHandling = TypeNameHandling.None
|
||||
};
|
||||
|
||||
private readonly static int streamBufferSize = 16 * 1024;
|
||||
private readonly static UTF8Encoding utf8EncodingWithoutBom = new UTF8Encoding(false);
|
||||
|
||||
private readonly SemaphoreSlim _connectionCreationSemaphore = new SemaphoreSlim(1);
|
||||
private bool _connectionHasFailed;
|
||||
private StreamConnection _physicalConnection;
|
||||
private string _socketAddress;
|
||||
private VirtualConnectionClient _virtualConnectionClient;
|
||||
|
||||
public SocketNodeInstance(NodeServicesOptions options, string socketAddress)
|
||||
: base(
|
||||
EmbeddedResourceReader.Read(
|
||||
typeof(SocketNodeInstance),
|
||||
"/Content/Node/entrypoint-socket.js"),
|
||||
options.ProjectPath,
|
||||
options.WatchFileExtensions,
|
||||
MakeNewCommandLineOptions(socketAddress),
|
||||
options.NodeInstanceOutputLogger,
|
||||
options.EnvironmentVariables,
|
||||
options.InvocationTimeoutMilliseconds,
|
||||
options.LaunchWithDebugging,
|
||||
options.DebuggingPort)
|
||||
{
|
||||
_socketAddress = socketAddress;
|
||||
}
|
||||
|
||||
protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocationInfo, CancellationToken cancellationToken)
|
||||
{
|
||||
if (_connectionHasFailed)
|
||||
{
|
||||
// This special exception type forces NodeServicesImpl to restart the Node instance
|
||||
throw new NodeInvocationException(
|
||||
"The SocketNodeInstance socket connection failed. See logs to identify the reason.",
|
||||
null,
|
||||
nodeInstanceUnavailable: true);
|
||||
}
|
||||
|
||||
if (_virtualConnectionClient == null)
|
||||
{
|
||||
// Although we could pass the cancellationToken into EnsureVirtualConnectionClientCreated and
|
||||
// have it signal cancellations upstream, that would be a bad thing to do, because all callers
|
||||
// wait for the same connection task. There's no reason why the first caller should have the
|
||||
// special ability to cancel the connection process in a way that would affect subsequent
|
||||
// callers. So, each caller just independently stops awaiting connection if that call is cancelled.
|
||||
await EnsureVirtualConnectionClientCreated().OrThrowOnCancellation(cancellationToken);
|
||||
}
|
||||
|
||||
// For each invocation, we open a new virtual connection. This gives an API equivalent to opening a new
|
||||
// physical connection to the child process, but without the overhead of doing so, because it's really
|
||||
// just multiplexed into the existing physical connection stream.
|
||||
bool shouldDisposeVirtualConnection = true;
|
||||
Stream virtualConnection = null;
|
||||
try
|
||||
{
|
||||
virtualConnection = _virtualConnectionClient.OpenVirtualConnection();
|
||||
|
||||
// Send request
|
||||
WriteJsonLine(virtualConnection, invocationInfo);
|
||||
|
||||
// Determine what kind of response format is expected
|
||||
if (typeof(T) == typeof(Stream))
|
||||
{
|
||||
// Pass through streamed binary response
|
||||
// It is up to the consumer to dispose this stream, so don't do so here
|
||||
shouldDisposeVirtualConnection = false;
|
||||
return (T)(object)virtualConnection;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Parse and return non-streamed JSON response
|
||||
var response = await ReadJsonAsync<RpcJsonResponse<T>>(virtualConnection, cancellationToken);
|
||||
if (response.ErrorMessage != null)
|
||||
{
|
||||
throw new NodeInvocationException(response.ErrorMessage, response.ErrorDetails);
|
||||
}
|
||||
|
||||
return response.Result;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (shouldDisposeVirtualConnection)
|
||||
{
|
||||
virtualConnection.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task EnsureVirtualConnectionClientCreated()
|
||||
{
|
||||
// Asynchronous equivalent to a 'lock(...) { ... }'
|
||||
await _connectionCreationSemaphore.WaitAsync();
|
||||
try
|
||||
{
|
||||
if (_virtualConnectionClient == null)
|
||||
{
|
||||
_physicalConnection = StreamConnection.Create();
|
||||
|
||||
var connection = await _physicalConnection.Open(_socketAddress);
|
||||
_virtualConnectionClient = new VirtualConnectionClient(connection);
|
||||
_virtualConnectionClient.OnError += (ex) =>
|
||||
{
|
||||
// This callback is fired only if there's a protocol-level failure (e.g., child process disconnected
|
||||
// unexpectedly). It does *not* fire when RPC calls return errors. Since there's been a protocol-level
|
||||
// failure, this Node instance is no longer usable and should be discarded.
|
||||
_connectionHasFailed = true;
|
||||
|
||||
OutputLogger.LogError(0, ex, ex.Message);
|
||||
};
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_connectionCreationSemaphore.Release();
|
||||
}
|
||||
}
|
||||
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
if (disposing)
|
||||
{
|
||||
if (_virtualConnectionClient != null)
|
||||
{
|
||||
_virtualConnectionClient.Dispose();
|
||||
_virtualConnectionClient = null;
|
||||
}
|
||||
|
||||
if (_physicalConnection != null)
|
||||
{
|
||||
_physicalConnection.Dispose();
|
||||
_physicalConnection = null;
|
||||
}
|
||||
}
|
||||
|
||||
base.Dispose(disposing);
|
||||
}
|
||||
|
||||
private static void WriteJsonLine(Stream stream, object serializableObject)
|
||||
{
|
||||
using (var streamWriter = new StreamWriter(stream, utf8EncodingWithoutBom, streamBufferSize, true))
|
||||
using (var jsonWriter = new JsonTextWriter(streamWriter))
|
||||
{
|
||||
jsonWriter.CloseOutput = false;
|
||||
|
||||
var serializer = JsonSerializer.Create(jsonSerializerSettings);
|
||||
serializer.Serialize(jsonWriter, serializableObject);
|
||||
jsonWriter.Flush();
|
||||
|
||||
streamWriter.WriteLine();
|
||||
streamWriter.Flush();
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task<T> ReadJsonAsync<T>(Stream stream, CancellationToken cancellationToken)
|
||||
{
|
||||
var json = Encoding.UTF8.GetString(await ReadAllBytesAsync(stream, cancellationToken));
|
||||
return JsonConvert.DeserializeObject<T>(json, jsonSerializerSettings);
|
||||
}
|
||||
|
||||
private static async Task<byte[]> ReadAllBytesAsync(Stream input, CancellationToken cancellationToken)
|
||||
{
|
||||
byte[] buffer = new byte[streamBufferSize];
|
||||
|
||||
using (var ms = new MemoryStream())
|
||||
{
|
||||
int read;
|
||||
while ((read = await input.ReadAsync(buffer, 0, buffer.Length, cancellationToken)) > 0)
|
||||
{
|
||||
ms.Write(buffer, 0, read);
|
||||
}
|
||||
|
||||
return ms.ToArray();
|
||||
}
|
||||
}
|
||||
|
||||
private static string MakeNewCommandLineOptions(string listenAddress)
|
||||
{
|
||||
return $"--listenAddress {listenAddress}";
|
||||
}
|
||||
|
||||
#pragma warning disable 649 // These properties are populated via JSON deserialization
|
||||
private class RpcJsonResponse<TResult>
|
||||
{
|
||||
public TResult Result { get; set; }
|
||||
public string ErrorMessage { get; set; }
|
||||
public string ErrorDetails { get; set; }
|
||||
}
|
||||
#pragma warning restore 649
|
||||
}
|
||||
}
|
||||
@@ -1,154 +0,0 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using System.Threading.Tasks.Dataflow;
|
||||
|
||||
namespace Microsoft.AspNetCore.NodeServices.HostingModels.VirtualConnections
|
||||
{
|
||||
/// <summary>
|
||||
/// A virtual read/write connection, typically to a remote process. Multiple virtual connections can be
|
||||
/// multiplexed over a single physical connection (e.g., a named pipe, domain socket, or TCP socket).
|
||||
/// </summary>
|
||||
internal class VirtualConnection : Stream
|
||||
{
|
||||
#if NET451
|
||||
private readonly static Task CompletedTask = Task.FromResult((object)null);
|
||||
#else
|
||||
private readonly static Task CompletedTask = Task.CompletedTask;
|
||||
#endif
|
||||
private VirtualConnectionClient _host;
|
||||
private readonly BufferBlock<byte[]> _receivedDataQueue = new BufferBlock<byte[]>();
|
||||
private ArraySegment<byte> _receivedDataNotYetUsed;
|
||||
private bool _wasClosedByRemote;
|
||||
private bool _isDisposed;
|
||||
|
||||
public VirtualConnection(long id, VirtualConnectionClient host)
|
||||
{
|
||||
Id = id;
|
||||
_host = host;
|
||||
}
|
||||
|
||||
public long Id { get; }
|
||||
|
||||
public override bool CanRead { get { return true; } }
|
||||
public override bool CanSeek { get { return false; } }
|
||||
public override bool CanWrite { get { return true; } }
|
||||
|
||||
public override long Length
|
||||
{
|
||||
get { throw new NotImplementedException(); }
|
||||
}
|
||||
|
||||
public override long Position
|
||||
{
|
||||
get { throw new NotImplementedException(); }
|
||||
set { throw new NotImplementedException(); }
|
||||
}
|
||||
|
||||
public override void Flush()
|
||||
{
|
||||
// We're auto-flushing, so this is a no-op.
|
||||
}
|
||||
|
||||
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
|
||||
{
|
||||
if (_wasClosedByRemote)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
var bytesRead = 0;
|
||||
while (true)
|
||||
{
|
||||
// Pull as many applicable bytes as we can out of receivedDataNotYetUsed, then update its offset/length
|
||||
int bytesToExtract = Math.Min(count - bytesRead, _receivedDataNotYetUsed.Count);
|
||||
if (bytesToExtract > 0)
|
||||
{
|
||||
Buffer.BlockCopy(_receivedDataNotYetUsed.Array, _receivedDataNotYetUsed.Offset, buffer, bytesRead, bytesToExtract);
|
||||
_receivedDataNotYetUsed = new ArraySegment<byte>(_receivedDataNotYetUsed.Array, _receivedDataNotYetUsed.Offset + bytesToExtract, _receivedDataNotYetUsed.Count - bytesToExtract);
|
||||
bytesRead += bytesToExtract;
|
||||
}
|
||||
|
||||
// If we've completely filled the output buffer, we're done
|
||||
if (bytesRead == count)
|
||||
{
|
||||
return bytesRead;
|
||||
}
|
||||
|
||||
// We haven't yet filled the output buffer, so we must have exhausted receivedDataNotYetUsed instead.
|
||||
// We want to get the next block of data from the underlying queue.
|
||||
byte[] nextReceivedBlock;
|
||||
if (bytesRead > 0)
|
||||
{
|
||||
if (!_receivedDataQueue.TryReceive(null, out nextReceivedBlock))
|
||||
{
|
||||
// No more data is available synchronously, and we already have some data, so we can stop now
|
||||
return bytesRead;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Since we don't yet have anything, wait for the underlying source
|
||||
nextReceivedBlock = await _receivedDataQueue.ReceiveAsync(cancellationToken);
|
||||
}
|
||||
|
||||
if (nextReceivedBlock.Length == 0)
|
||||
{
|
||||
// A zero-length block signals that the remote regards this virtual connection as closed
|
||||
_wasClosedByRemote = true;
|
||||
return bytesRead;
|
||||
}
|
||||
else
|
||||
{
|
||||
// We got some more data, so can continue trying to fill the output buffer
|
||||
_receivedDataNotYetUsed = new ArraySegment<byte>(nextReceivedBlock, 0, nextReceivedBlock.Length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
|
||||
{
|
||||
if (_wasClosedByRemote)
|
||||
{
|
||||
throw new InvalidOperationException("The connection was already closed by the remote party");
|
||||
}
|
||||
|
||||
return count > 0 ? _host.WriteAsync(Id, buffer, offset, count, cancellationToken) : CompletedTask;
|
||||
}
|
||||
|
||||
public override int Read(byte[] buffer, int offset, int count)
|
||||
{
|
||||
return ReadAsync(buffer, offset, count, CancellationToken.None).Result;
|
||||
}
|
||||
|
||||
public override long Seek(long offset, SeekOrigin origin)
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
public override void SetLength(long value)
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
public override void Write(byte[] buffer, int offset, int count)
|
||||
{
|
||||
WriteAsync(buffer, offset, count, CancellationToken.None).Wait();
|
||||
}
|
||||
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
if (disposing && !_isDisposed)
|
||||
{
|
||||
_isDisposed = true;
|
||||
_host.CloseInnerStream(Id, _wasClosedByRemote);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task AddDataToQueue(byte[] data)
|
||||
{
|
||||
await _receivedDataQueue.SendAsync(data);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,238 +0,0 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Microsoft.AspNetCore.NodeServices.HostingModels.VirtualConnections
|
||||
{
|
||||
/// <summary>
|
||||
/// A callback that will be invoked if the <see cref="VirtualConnectionClient"/> encounters a read error.
|
||||
/// </summary>
|
||||
/// <param name="ex"></param>
|
||||
public delegate void VirtualConnectionReadErrorHandler(Exception ex);
|
||||
|
||||
/// <summary>
|
||||
/// Wraps an underlying physical read/write stream (e.g., named pipes, domain sockets, or TCP sockets) and
|
||||
/// exposes an API for making 'virtual connections', which act as independent read/write streams.
|
||||
/// Traffic over these virtual connections is multiplexed over the underlying physical stream. This is useful
|
||||
/// for fast stream-based inter-process communication because it avoids the overhead of opening a new physical
|
||||
/// connection each time a new communication channel is needed.
|
||||
/// </summary>
|
||||
internal class VirtualConnectionClient : IDisposable
|
||||
{
|
||||
internal const int MaxFrameBodyLength = 16 * 1024;
|
||||
|
||||
public event VirtualConnectionReadErrorHandler OnError;
|
||||
|
||||
private Stream _underlyingTransport;
|
||||
private Dictionary<long, VirtualConnection> _activeInnerStreams;
|
||||
private long _nextInnerStreamId;
|
||||
private readonly SemaphoreSlim _streamWriterSemaphore = new SemaphoreSlim(1);
|
||||
private readonly object _readControlLock = new object();
|
||||
private Exception _readLoopExitedWithException;
|
||||
private readonly CancellationTokenSource _disposalCancellatonToken = new CancellationTokenSource();
|
||||
private bool _disposedValue = false;
|
||||
|
||||
public VirtualConnectionClient(Stream underlyingTransport)
|
||||
{
|
||||
_underlyingTransport = underlyingTransport;
|
||||
_activeInnerStreams = new Dictionary<long, VirtualConnection>();
|
||||
|
||||
RunReadLoop();
|
||||
}
|
||||
|
||||
public Stream OpenVirtualConnection()
|
||||
{
|
||||
// Improve discoverability of read-loop errors (in case the developer doesn't add an OnError listener)
|
||||
ThrowIfReadLoopFailed();
|
||||
|
||||
var id = Interlocked.Increment(ref _nextInnerStreamId);
|
||||
var newInnerStream = new VirtualConnection(id, this);
|
||||
lock (_activeInnerStreams)
|
||||
{
|
||||
_activeInnerStreams.Add(id, newInnerStream);
|
||||
}
|
||||
|
||||
return newInnerStream;
|
||||
}
|
||||
|
||||
// It's async void because nothing waits for it to finish (it continues indefinitely). It signals any errors via
|
||||
// a separate channel.
|
||||
private async void RunReadLoop()
|
||||
{
|
||||
try
|
||||
{
|
||||
while (!_disposalCancellatonToken.IsCancellationRequested)
|
||||
{
|
||||
var remoteIsStillConnected = await ProcessNextFrameAsync();
|
||||
if (!remoteIsStillConnected)
|
||||
{
|
||||
CloseAllActiveStreams();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Not all underlying transports correctly honor cancellation tokens. For example,
|
||||
// DomainSocketStreamTransport's ReadAsync ignores them, so we only know to stop
|
||||
// the read loop when the underlying stream is disposed and then it throws ObjectDisposedException.
|
||||
if (!(ex is TaskCanceledException || ex is ObjectDisposedException))
|
||||
{
|
||||
_readLoopExitedWithException = ex;
|
||||
|
||||
var evt = OnError;
|
||||
if (evt != null)
|
||||
{
|
||||
evt(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<bool> ProcessNextFrameAsync()
|
||||
{
|
||||
// First read frame header
|
||||
var frameHeaderBuffer = await ReadExactLength(12);
|
||||
if (frameHeaderBuffer == null)
|
||||
{
|
||||
return false; // Underlying stream was closed
|
||||
}
|
||||
|
||||
// Parse frame header, then read the frame body
|
||||
long streamId = BitConverter.ToInt64(frameHeaderBuffer, 0);
|
||||
int frameBodyLength = BitConverter.ToInt32(frameHeaderBuffer, 8);
|
||||
if (frameBodyLength < 0 || frameBodyLength > MaxFrameBodyLength)
|
||||
{
|
||||
throw new InvalidDataException("Illegal frame length: " + frameBodyLength);
|
||||
}
|
||||
|
||||
var frameBody = await ReadExactLength(frameBodyLength);
|
||||
if (frameBody == null)
|
||||
{
|
||||
return false; // Underlying stream was closed
|
||||
}
|
||||
|
||||
// Dispatch the frame to the relevant inner stream
|
||||
VirtualConnection innerStream;
|
||||
lock (_activeInnerStreams)
|
||||
{
|
||||
_activeInnerStreams.TryGetValue(streamId, out innerStream);
|
||||
}
|
||||
|
||||
if (innerStream != null)
|
||||
{
|
||||
await innerStream.AddDataToQueue(frameBody);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private async Task<byte[]> ReadExactLength(int lengthToRead) {
|
||||
byte[] buffer = new byte[lengthToRead];
|
||||
var totalBytesRead = 0;
|
||||
var ct = _disposalCancellatonToken.Token;
|
||||
while (totalBytesRead < lengthToRead)
|
||||
{
|
||||
var chunkLengthRead = await _underlyingTransport.ReadAsync(buffer, totalBytesRead, lengthToRead - totalBytesRead, ct);
|
||||
if (chunkLengthRead == 0)
|
||||
{
|
||||
// Underlying stream was closed
|
||||
return null;
|
||||
}
|
||||
|
||||
totalBytesRead += chunkLengthRead;
|
||||
}
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
private void CloseAllActiveStreams()
|
||||
{
|
||||
IList<VirtualConnection> innerStreamsCopy;
|
||||
|
||||
// Only hold the lock while cloning the list of inner streams. Release the lock before
|
||||
// actually disposing them, because each 'dispose' call will try to take another lock
|
||||
// so it can remove that inner stream from activeInnerStreams.
|
||||
lock (_activeInnerStreams)
|
||||
{
|
||||
innerStreamsCopy = _activeInnerStreams.Values.ToList();
|
||||
}
|
||||
|
||||
foreach (var stream in innerStreamsCopy)
|
||||
{
|
||||
stream.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (!_disposedValue)
|
||||
{
|
||||
_disposedValue = true;
|
||||
|
||||
_disposalCancellatonToken.Cancel(); // Stops the read loop
|
||||
CloseAllActiveStreams();
|
||||
}
|
||||
}
|
||||
|
||||
public async Task WriteAsync(long innerStreamId, byte[] data, int offset, int count, CancellationToken cancellationToken)
|
||||
{
|
||||
// In case the amount of data to be sent exceeds the max frame length, split it into separate frames
|
||||
// Note that we always send at least one frame, even if it's empty, because the zero-length frame is the signal to close a virtual connection
|
||||
// (hence 'do..while' instead of just 'while').
|
||||
int bytesWritten = 0;
|
||||
do {
|
||||
// Improve discoverability of read-loop errors (in case the developer doesn't add an OnError listener)
|
||||
ThrowIfReadLoopFailed();
|
||||
|
||||
// Hold the write lock only for the time taken to send a single frame, not all frames, to allow large sends to be proceed in parallel
|
||||
await _streamWriterSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
// Write stream ID, then length prefix, then chunk payload, then flush
|
||||
var nextChunkBodyLength = Math.Min(MaxFrameBodyLength, count - bytesWritten);
|
||||
await _underlyingTransport.WriteAsync(BitConverter.GetBytes(innerStreamId), 0, 8, cancellationToken).ConfigureAwait(false);
|
||||
await _underlyingTransport.WriteAsync(BitConverter.GetBytes(nextChunkBodyLength), 0, 4, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (nextChunkBodyLength > 0)
|
||||
{
|
||||
await _underlyingTransport.WriteAsync(data, offset + bytesWritten, nextChunkBodyLength, cancellationToken).ConfigureAwait(false);
|
||||
bytesWritten += nextChunkBodyLength;
|
||||
}
|
||||
|
||||
await _underlyingTransport.FlushAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_streamWriterSemaphore.Release();
|
||||
}
|
||||
} while (bytesWritten < count);
|
||||
}
|
||||
|
||||
public void CloseInnerStream(long innerStreamId, bool isAlreadyClosedRemotely)
|
||||
{
|
||||
lock (_activeInnerStreams)
|
||||
{
|
||||
if (_activeInnerStreams.ContainsKey(innerStreamId))
|
||||
{
|
||||
_activeInnerStreams.Remove(innerStreamId);
|
||||
}
|
||||
}
|
||||
|
||||
if (!isAlreadyClosedRemotely) {
|
||||
// Also notify the remote that this innerstream is closed
|
||||
WriteAsync(innerStreamId, new byte[0], 0, 0, new CancellationToken()).Wait();
|
||||
}
|
||||
}
|
||||
|
||||
private void ThrowIfReadLoopFailed()
|
||||
{
|
||||
if (_readLoopExitedWithException != null)
|
||||
{
|
||||
throw new AggregateException("The connection failed - see InnerException for details.", _readLoopExitedWithException);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user