I’ve running two servers using Unity Transport in the same application, each uses an instance of the same NetworkTransportManager class for the network handling. Network events are checked for in a job and when there’s Data the DataStreamReader is passed out of the job to be processed in the main thread.
The problem I’m having is quite often the reader is already disposed when it reaches the main thread. However if I only run a single server I’m not seeing this issue.
This is the NetworkTransportManager class:
public class NetworkTransportManager : MonoBehaviour
{
NativeArray<int> updateCount;
NetworkDriver networkDriver;
NetworkPipeline reliablePipeline;
NativeList<NetworkConnection> networkConnections;
NativeQueue<NetworkEventDetail> eventDetails;
JobHandle jobHandle;
NetworkUpdateConnectionsJob connectionsJob;
NetworkUpdateJob updateJob;
NetworkState networkState;
public event Action OnServerStarted = default;
public event Action<NetworkConnection> OnClientConnected = default;
public event Action<NetworkConnection> OnClientDisconnected = default;
public event Action<NetworkConnection, DataStreamReader> OnReceiveMessage = default;
private void OnEnable()
{
updateCount = new NativeArray<int>(1, Allocator.Persistent);
updateCount[0] = 1;
networkConnections = new NativeList<NetworkConnection>(16, Allocator.Persistent);
eventDetails = new NativeQueue<NetworkEventDetail>(Allocator.Persistent);
networkState = NetworkState.Shutdown;
}
private void Update()
{
if (networkDriver.IsCreated)
{
jobHandle.Complete();
CheckForNetworkEvents();
updateCount[0]++;
ScheduleByState();
}
}
private void CheckForNetworkEvents()
{
while (eventDetails.TryDequeue(out var eventDetail))
{
Debug.Log("NetworkTransportManager CheckForNetworkEvents: " + eventDetail);
switch (eventDetail.EventType)
{
case NetworkEventType.Connected:
OnClientConnected?.Invoke(eventDetail.Connection);
break;
case NetworkEventType.Disconnect:
OnClientDisconnected?.Invoke(eventDetail.Connection);
break;
case NetworkEventType.Data:
Debug.Log($"CheckForNetworkEvents Data updateCount: {eventDetail.UpdateCount} current updateCount: {updateCount[0]}");
Debug.Log($"CheckForNetworkEvents Data length: {eventDetail.Reader.Length}");
OnReceiveMessage?.Invoke(eventDetail.Connection, eventDetail.Reader);
break;
}
}
}
private void ScheduleByState()
{
switch (networkState)
{
case NetworkState.Started:
jobHandle = networkDriver.ScheduleUpdate();
jobHandle = connectionsJob.Schedule(jobHandle);
jobHandle = updateJob.Schedule(networkConnections, 1, jobHandle);
break;
case NetworkState.ShuttingdownStart:
Debug.Log("NetworkTransportManager ScheduleByState ShuttingdownStart");
jobHandle = networkDriver.ScheduleUpdate();
networkState = NetworkState.ShuttingdownEnd;
break;
case NetworkState.ShuttingdownEnd:
Debug.Log("NetworkTransportManager ScheduleByState ShuttingdownEnd");
networkDriver.Dispose();
networkState = NetworkState.Shutdown;
break;
}
}
internal void DisconnectClient(NetworkConnection connection)
{
if (networkDriver.IsCreated)
{
jobHandle.Complete();
for (int i = 0; i < networkConnections.Length; i++)
{
if (connection == networkConnections[i])
{
Debug.Log("DisconnectClient found client: " + networkConnections[i].InternalId);
connection.Disconnect(networkDriver);
networkConnections[i] = default;
}
}
}
}
internal void SendMessage(NetworkConnection connection, IMessage message)
{
Debug.Log($"NetworkTransportManager SendMessage connection: {connection} message: {message}");
jobHandle.Complete();
if (networkDriver.IsCreated && networkDriver.GetConnectionState(connection) == NetworkConnection.State.Connected)
{
Debug.Log($"NetworkTransportManager connectionState: {networkDriver.GetConnectionState(connection)}");
if (networkDriver.BeginSend(reliablePipeline, connection, out DataStreamWriter writer) == 0)
{
message.Serialize(ref writer);
int written = networkDriver.EndSend(writer);
Debug.Log($"NetworkTransportManager SendMessage written: " + written);
}
}
}
internal void StopServer()
{
Debug.Log("NetworkTransportManager StopServer");
jobHandle.Complete();
foreach (var networkConnection in networkConnections)
{
if (networkConnection.GetState(networkDriver) == NetworkConnection.State.Connected)
{
networkConnection.Disconnect(networkDriver);
}
}
networkState = NetworkState.ShuttingdownStart;
}
internal void StartServer(ushort serverPort)
{
var networkEndPoint = NetworkEndPoint.AnyIpv4;
networkEndPoint.Port = serverPort;
NetworkSettings networkSettings = new NetworkSettings();
networkSettings.WithReliableStageParameters(32);
try
{
networkDriver = NetworkDriver.Create();
reliablePipeline = networkDriver.CreatePipeline(typeof(ReliableSequencedPipelineStage));
if ((networkDriver.Bind(networkEndPoint) == 0))
{
connectionsJob = new NetworkUpdateConnectionsJob(networkDriver, networkConnections, eventDetails.AsParallelWriter());
updateJob = new NetworkUpdateJob(networkDriver.ToConcurrent(), networkConnections.AsDeferredJobArray(),
eventDetails.AsParallelWriter(), updateCount);
if (networkDriver.Listen() == 0)
{
networkState = NetworkState.Started;
OnServerStarted?.Invoke();
}
}
}
catch (InvalidOperationException exception)
{
Debug.LogError($"ServerNetworkManager StartServer exception: {exception.Message}\n{exception.StackTrace}");
}
}
public void OnDestroy()
{
ReleaseNetworkResources();
}
private void ReleaseNetworkResources()
{
jobHandle.Complete();
if (updateCount.IsCreated) updateCount.Dispose();
if (networkConnections.IsCreated) networkConnections.Dispose();
if (eventDetails.IsCreated) eventDetails.Dispose();
if (networkDriver.IsCreated) networkDriver.Dispose();
}
}
And the job class that picks up network events:
struct NetworkUpdateJob : IJobParallelForDefer
{
public NetworkDriver.Concurrent driver;
public NativeArray<NetworkConnection> connections;
public NativeQueue<NetworkEventDetail>.ParallelWriter connectionEvents;
NativeArray<int> updateCount;
public NetworkUpdateJob(NetworkDriver.Concurrent driver, NativeArray<NetworkConnection> connections,
NativeQueue<NetworkEventDetail>.ParallelWriter connectionEvents, NativeArray<int> updateCount)
{
this.driver = driver;
this.connections = connections;
this.connectionEvents = connectionEvents;
this.updateCount = updateCount;
}
public void Execute(int index)
{
if (connections[index].IsCreated)
{
DataStreamReader reader;
NetworkEvent.Type eventType;
while ((eventType = driver.PopEventForConnection(connections[index], out reader)) != NetworkEvent.Type.Empty)
{
switch (eventType)
{
case NetworkEvent.Type.Data:
Debug.Log("NetworkUpdateJob Execute Data updateCount: " + updateCount[0]);
Debug.Log("NetworkUpdateJob Execute Data isCreated: " + reader.IsCreated);
Debug.Log("NetworkUpdateJob Execute Data length: " + reader.Length);
if (reader.IsCreated)
{
connectionEvents.Enqueue(new NetworkEventDetail(connections[index],
NetworkEventType.Data, ref reader, updateCount[0]));
}
break;
case NetworkEvent.Type.Disconnect:
connectionEvents.Enqueue(new NetworkEventDetail(connections[index], NetworkEventType.Disconnect));
connections[index] = default;
break;
}
}
}
}
}
The above code was passing out the reader by value but I changed to by reference to test it but it didn’t make a difference. The reader is not disposed within the job itself. updateCount is just a sanity check on when the Data event is received and when it’s actioned.
This is the error I’m getting:
ObjectDisposedException: The Unity.Collections.NativeList`1[System.Byte] has been deallocated, it is not allowed to access it
I’m also seeing this twice preceding it, but I don’t recall seeing it until I added updateCount.
AtomicSafetyNode has either been corrupted or is being accessed on a job which is not allowed.
Clients do something similar with passing out the reader to the main thread and there are no errors. Is there something in the code or some cross-over between NetworkTransportManager instances that’s causing the issue?