How to run IJobs in parallel that holds a DynamicBuffer?

Say I have this entity setup:

Entity entity = this.entityManager.CreateEntity();
this.entityManager.AddComponentData(entity, new Request());
this.entityManager.AddBuffer<IntBufferElement>(entity);

For each of this entity, I want to run a single IJob:

struct Job : IJob {
    public int index;
    public BufferAccesor<IntBufferElement> buffers;

    public void Execute() {
        DynamicBuffer<IntBufferElement> list = this.buffers[this.index];

        // Do stuff with list
    }
}

Here’s my code so far (using chunk iteration):

private JobHandle Process(ArchetypeChunk chunk, JobHandle inputDeps) {
    BufferAccessor<IntBufferElement> buffers = chunk.GetBufferAccessor(this.bufferType);
    JobHandle handle = inputDeps;

    for(int i = 0; i < chunk.Count; ++i) {
        Job job = new Job {
            index = i,
            buffers = buffers
        };

        handle = job.Schedule(handle);
    }

    return handle;
}

This doesn’t even run the IJobs in parallel. They are run one after the other. However, I’m getting this error:

InvalidOperationException: The previously scheduled job Job writes to the NativeArray Job.buffers. You must call JobHandle.Complete() on the job Job, before you can read from the NativeArray safely.

Tried using any of the NativeDisable* attribute but they don’t work. How would you do this?

From looking at your code, was inputDeps.Complete() called before you scheduled your job? I haven’t done so much pure chunk iteration on my own since majority of my stuff can be handled through IJobForEach<>.

And as an alternative how about a IJobForEachWithEntity instead?

You can structure your job like so:

struct Job : IJobForEachWithEntity<ComponentData> {
    public GetBufferFromEntity<IntBufferElement> buffers;
    public void Execute(Entity e, int index, ref ComponentData data) {
       var list = buffers[e];
        // Do stuff with list
    }
}

Then in your actual JobComponentSystem

protected override JobHandle OnUpdate(JobHandle inputDeps) {

   return new Job {
      buffers = GetBufferFromEntity<IntBufferElement>()
   }.Schedule(this, inputDeps);
}

The reason your jobs are not running in parallel is that each job you are scheduling is being declared with a dependency on the previous job. You’re creating this dependency by passing the handle created by the previous job to the next. A job will only start performing its work once all handles up its chain are marked as complete.

I’m not sure why you are getting that exception as your job dependencies should prevent multiple writes from happening at the same time. My guess is this is happening somewhere else in your system but its difficult to tell from the example.

Anyway, with all that being said your approach to chunk iteration is a little off. Simple chunk iteration is best performed using the IJobChunk interface. A chunk based job and schedule would look something like this:

public sealed class ChunkIterationSystem : JobComponentSystem
{
    private EntityQuery _chunkQuery;

    private ArchetypeChunkBufferType<IntBufferElement> _buffersTypeRW;


    protected override void OnCreate()
    {
        _chunkQuery = GetEntityQuery(new EntityQueryDesc
        {
            All = new ComponentType[] { typeof(IntBufferElement) }
        });
    }

    private void GatherTypes()
    {
        _buffersTypeRW = GetArchetypeChunkBufferType<IntBufferElement>(false);
    }

    protected override JobHandle OnUpdate(JobHandle inputDeps)
    {
        GatherTypes();

        return new ChunkIterationJob
        {
            BuffersTypeRW = _buffersTypeRW
        }.Schedule(_chunkQuery, inputDeps);
    }
 

    private struct ChunkIterationJob : IJobChunk
    {
        public ArchetypeChunkBufferType<IntBufferElement> BuffersTypeRW;


        public void Execute(ArchetypeChunk chunk, int chunkIndex, int firstEntityIndex)
        {
            var buffers = chunk.GetBufferAccessor(BuffersTypeRW);

            for (int i = 0; i < chunk.Count; i++)
            {
                var buffer = buffers[i];
                // Do stuff with list
            }
        }
    }
}

The job will automatically use one thread per chunk contained in the query and, because each thread is only working on the buffers within its own chunk, requires no NativeDisable* attributes to allow parallel access.

1 Like

My example is simplified from the actual code. The job is more complex that needs more containers per run. I want to run each request in its own job so they will be distributed among the threads. There’s no sense in running an IJobChunk since the request entities won’t probably go over one chunk. The processing per request is heavy, so this means that the thread running the IJobChunk might run too long as each request is processed one after the other.

I’m trying to use IJobParallelFor now since I’ve discovered that I can allocate native containers inside jobs.

You can use the method above and use a ISharedComponent to split your entities across multiple chunks. This is one of the intended uses for shared components. i.e. if you have a shared component with an integer value from 0 - n-1 where n is the number of threads you want to split the workload across. This way you’re still playing nicely with the ECS system. If you are worried about unused chunk space, which I wouldn’t at the scale you’re describing, Unity will be adding functionality to customize chunk capacity in the future.

1 Like