Docs says NativeQueue is a FIFO queue, however I can see that it is not allowing to write from multiple different jobs, but only from a specific type of job and worker threads. My goal is to allow to write from different type of jobs.
Is there any workaround to include dependencies or write a NativeContainer that will support Multi producer / Single consumer pattern?
For different jobs, build a pool of NativeQueues that each job can draw from. Then write your consumer job to take in a whole bunch of NativeQueues and call an iteration function with each one. I’m finding out T4 can be useful for generating this kind of code for a large number of native containers in a job.
Obviously I am using a concurrent version of it. A system that writes to it from a different job simply throws a dependency exception. So the queue is not supporting writing from different job types, but from only unique one.
public class ClientTestSendSystem : NetworkJobComponentSystem, IClientSystem
{
protected override JobHandle OnUpdate(JobHandle inputDeps)
{
var handle = new SendJob
{
JobCommandQueue = networkIOSystem.JobCommandQueue.ToConcurrent(),
}
.Schedule(this, inputDeps);
networkEndFrameBarrier.AddJobHandleForProducer(handle);
return handle;
}
}
public class ClientTestSendSystem2 : NetworkJobComponentSystem, IClientSystem
{
protected override JobHandle OnUpdate(JobHandle inputDeps)
{
var handle = new SendJob
{
JobCommandQueue = networkIOSystem.JobCommandQueue.ToConcurrent(),
}
.Schedule(this, inputDeps);
networkEndFrameBarrier.AddJobHandleForProducer(handle);
return handle;
}
}
[NativeDisableContainerSafetyRestriction]
public NativeQueue<IntPtr>.Concurrent JobCommandQueue;
[NativeDisableContainerSafetyRestriction] attribute in a Job prevents Editor from throwing an exception about safety writing. However, race condition may occur.
I skimmed over it and I don’t think this is the case. NativeQuery.Concurrent uses the job systems ThreadIndex for TLS which is independent of which job is run by the thread. It should work cross jobs but you lose all other safety checks too of course.
Btw the NativeQuery implemention should be alot faster than ConcurrentBuffer as it prevents same cacheline writes by multiple threads which can be expensive.
I am not aware of this restriction at all, and I remembered it works fine in my game being queued from multiple places. What is the actual condition to make the Concurrent version fail? What did the dependency exception looks like?
Thank you for the reply.
That basically means that a race condition may occur? I also need a order of enqueued commands. So somehow I need to have a job dependency on the jobs that write to the same NativeQueue. Right now with an attribute it means that Job1 and Job2 do not work in any order.
I get what you meant now. What’s happening here is that the “Concurrent” mechanism should work in your case, but safety system is getting in the way since the inputDeps is not bringing the information about your native container and could not auto complete the prior job. Input deps auto complete based on type, but this is a whole container. (As it is a common problem with any native containers yanked from somewhere and throw in the job, the inputDeps system can only work on dependencies known from EntityQuery) However the safety system do know which container is currently being worked on by which jobs, so it is able to thow this error. Just that it couldn’t complete for you.
However, Concurrent variant’s trait is that it could handle multiple jobs working on it at the same time. But, (I think) it was designed for IJobForEach/IJobChunk/IJobParallelFor to be that “multiple jobs”. (Jobs that could split itself to do the same task)
In your case they are “completely different” jobs, what you want to do is to tell the safety system that it is fine not to complete the other job. [NativeDisableContainerSafetyRestriction] is for this purpose.
No race condition should occur because this is what Concurrent containers are made for. It uses a unique thread index to block on write properly with a mutex. No 2 jobs could start with a same thread index, unless you use non Concurrent version where it all receive thread index 0. (Valid thread index from [NativeSetThreadIndex] starts at 1)
You can just confirm by yourself with something like this. Try changing the Concurrent version to normal version and the exception will eventually throw with dequeued value 0.
public class QueueWrite1 : JobComponentSystem
{
public NativeQueue<int> q;
protected override void OnCreateManager()
{
q = new NativeQueue<int>(Allocator.Persistent);
}
protected override void OnDestroyManager()
{
q.Dispose();
}
protected override JobHandle OnUpdate(JobHandle inputDeps)
{
var handle = new WriteJob
{
JobCommandQueue = q.ToConcurrent(),
}
.Schedule(inputDeps);
return handle;
}
}
public class QueueWrite2 : JobComponentSystem
{
QueueWrite1 s;
protected override void OnCreateManager()
{
s = World.GetOrCreateSystem<QueueWrite1>();
}
protected override JobHandle OnUpdate(JobHandle inputDeps)
{
var handle = new WriteJob
{
JobCommandQueue = s.q.ToConcurrent(),
}
.Schedule(inputDeps);
return handle;
}
}
public struct WriteJob : IJob
{
[NativeDisableContainerSafetyRestriction] public NativeQueue<int>.Concurrent JobCommandQueue;
public void Execute()
{
for (int i = 0; i < 10000; i++)
{
JobCommandQueue.Enqueue(123456789);
}
}
}
public class CheckIntegrity : ComponentSystem
{
QueueWrite1 s;
protected override void OnCreateManager()
{
s = World.GetOrCreateSystem<QueueWrite1>();
}
protected override void OnUpdate()
{
while(s.q.TryDequeue(out int item))
{
if(item != 123456789)
{
throw new Exception($"Race condition occurred (Dequeued {item})");
}
}
}
}
What’s confusing is that the scope for concurrent is a single job handle. But a job handle can represent both parallel and non parallel jobs, so it’s not consistent behavior in terms of the amount of parallelism, which makes it a bit non intuitive. The safety system should really special case these IMO rather then end users having to manually disable the safety system for the container.
@5argon last question. Would it be possible to access the NativeQueue in a regular C# thread? Because accessing it that way throws an exception that a NativeContainer can be accessed only from job thread or the main one?