How to wait for EntityCommandBuffer ParallelWriter to finish? (read var incremented)

I’m following a tutorial that accumulates points when you pick up (destroy) coin entities. This tutorial uses .WithoutBurst().Run() which I am trying to make work with .ScheduleParallel() The following code runs, but no points are ever accumulated.

I think the problem is that the GameManager.instance.AddPoints() code is being run before the Entities.ForEach(), but maybe it’s something else, like maybe Interlocked.Increment() doesn’t work with burst?

Any help would be appreciated.

Special notes:

  • accumulation var is line 4
  • Entities.ForEach() when player runs into the coin it flags coin for kill (line 16) and then accumulates points (line 19)
  • After the Parallel job runs, I take the accumulated points and write it to a monobehaviour (line 29)
        protected override void OnUpdate() {
            var ecbSystem = World.GetOrCreateSystem<EndSimulationEntityCommandBufferSystem>();
            var ecb = ecbSystem.CreateCommandBuffer().AsParallelWriter();
            var pointsToAdd = 0;

            Entities
                .WithAll<Player>()
                .ForEach((int entityInQueryIndex, Entity playerEntity, DynamicBuffer<TriggerBuffer> triggerBuffer, in Player player) =>
                {
                    for (var i = 0; i < triggerBuffer.Length; i++)
                    {
                        var _tb = triggerBuffer[i];

                        if (HasComponent<Collectable>(_tb.entity) && !HasComponent<Kill>(_tb.entity))
                        {
                            ecb.AddComponent(entityInQueryIndex, _tb.entity, new Kill() { timer = 0 });
                            var collectable = GetComponent<Collectable>(_tb.entity);
                            //pointsToAdd += collectable.points;
                            System.Threading.Interlocked.Add(ref pointsToAdd, collectable.points);
                        }
                    }
                }).ScheduleParallel();


            ecbSystem.AddJobHandleForProducer(this.Dependency);

            Job.WithCode(() =>
            {
                GameManager.instance.AddPoints(pointsToAdd);
            }).WithoutBurst().Schedule();

        }

The issue is that you are using an int as output from your job, you should use NativeReference instead. For any kind of output from jobs, you should always use NativeCollections.

I ended up doing exactly as you said and it “works”, but only in that I’m introducing a class-level (persistent) accumulator, therefore not controlling the execution of the recording of points (lines 27-30 above).

It seems like my above code works, except that I’m not able to schedule lines 27-30 to run after the Entities.ForEach(). Is there a way to do that?

Oh. just saw that you are using ScheduleParallel, in that case you will need to change a little bit your logic to accomodate that, here is an example:

            private EntityQuery _query;

            protected override void OnUpdate() {
                var ecbSystem = World.GetOrCreateSystem<EndSimulationEntityCommandBufferSystem>();
                var ecb = ecbSystem.CreateCommandBuffer().AsParallelWriter();
                var pointsToAddArray = new NativeArray<int>(_query.CalculateEntityCount, Allocator.TempJob, NativeArrayOptions.UninitializedMemory);
    
                Entities
                    .WithStoreEntityQueryInField(ref _query)
                    .WithAll<Player>()
                    .ForEach((int entityInQueryIndex, Entity playerEntity, DynamicBuffer<TriggerBuffer> triggerBuffer, in Player player) =>
                    {
                        var pointsToAdd = 0;

                        for (var i = 0; i < triggerBuffer.Length; i++)
                        {
                            var _tb = triggerBuffer[i];
    
                            if (HasComponent<Collectable>(_tb.entity) && !HasComponent<Kill>(_tb.entity))
                            {
                                ecb.AddComponent(entityInQueryIndex, _tb.entity, new Kill() { timer = 0 });
                                var collectable = GetComponent<Collectable>(_tb.entity);
                                pointsToAdd += collectable.points;
                            }
                        }

                        pointsToAddArray[entityInQueryIndex] = pointsToAdd;
                    }).ScheduleParallel();
    
    
                ecbSystem.AddJobHandleForProducer(this.Dependency);
    
                Job.WithReadOnly(pointsToAddArray).WithCode(() =>
                {
                    foreach (var pointsToAdd in pointsToAddArray)
                    {
                        GameManager.instance.AddPoints(pointsToAdd);
                    }
                }).WithoutBurst().Schedule();
    
            }

If ScheduleParallel is not necessary, the other option (my previous comment) would be:

            protected override void OnUpdate() {
                var ecbSystem = World.GetOrCreateSystem<EndSimulationEntityCommandBufferSystem>();
                var ecb = ecbSystem.CreateCommandBuffer().AsParallelWriter();
                var pointsToAdd = new NativeReference<int>(0, Allocator.TempJob);
    
                Entities
                    .WithAll<Player>()
                    .ForEach((int entityInQueryIndex, Entity playerEntity, DynamicBuffer<TriggerBuffer> triggerBuffer, in Player player) =>
                    {
                        for (var i = 0; i < triggerBuffer.Length; i++)
                        {
                            var _tb = triggerBuffer[i];
    
                            if (HasComponent<Collectable>(_tb.entity) && !HasComponent<Kill>(_tb.entity))
                            {
                                ecb.AddComponent(entityInQueryIndex, _tb.entity, new Kill() { timer = 0 });
                                var collectable = GetComponent<Collectable>(_tb.entity);
                                pointsToAdd.Value += collectable.points;
                            }
                        }
                    }).Schedule();
    
    
                ecbSystem.AddJobHandleForProducer(this.Dependency);
    
                Job.WithReadOnly(pointsToAdd).WithCode(() =>
                {
                    GameManager.instance.AddPoints(pointsToAdd.Value);
                }).WithoutBurst().Schedule();
    
            }
1 Like

whoa, thanks @brunocoimbra , your way of doing it is completely new to me, which is great because it shows me something different. (The array to accumulate, and the Job depending on the array)

Thank you. I’ll fiddle and post my full code when done.

One question @brunocoimbra , I’m getting an error because I don’t dispose the NativeCollection that gets allocated in the job. Is there a graceful way of doing this inside of a system job?

pointsToAddArray.Dispose(Dependency) on line 40 will solve it for you
on 30 for second sample:
pointsToAdd.Dispose(Dependency)

2 Likes

Thanks @eizenhorn , I got it working with your help.

Here is the final code, mostly following @brunocoimbra 's first example, but using Interlocked.Add() instead of a temp array.

        //unsafe parallel using only job temp variable
        {
            var ecbSystem = World.GetOrCreateSystem<EndSimulationEntityCommandBufferSystem>();
            var ecb = ecbSystem.CreateCommandBuffer().AsParallelWriter();

            var pointsToAdd =new NativeReference<int>(Allocator.TempJob);
            var p_pointsToAdd =(int*) pointsToAdd.GetUnsafePtr();

            Entities
                    .WithAll<Player>()
                    .ForEach((int entityInQueryIndex, Entity playerEntity, DynamicBuffer<TriggerBuffer> triggerBuffer, in Player player) =>
                    {
                        for (var i = 0; i < triggerBuffer.Length; i++)
                        {
                            var _tb = triggerBuffer[i];

                            if (HasComponent<Collectable>(_tb.entity) && !HasComponent<Kill>(_tb.entity))
                            {
                                ecb.AddComponent(entityInQueryIndex, _tb.entity, new Kill() { timer = 0 });
                                var collectable = GetComponent<Collectable>(_tb.entity);
                                System.Threading.Interlocked.Add(ref *p_pointsToAdd, collectable.points);
                            }
                        }
                    })
                    .WithNativeDisableUnsafePtrRestriction(p_pointsToAdd)
                    .ScheduleParallel();


            ecbSystem.AddJobHandleForProducer(this.Dependency);

          
            Job.WithReadOnly(pointsToAdd).WithCode(() => {
                GameManager.instance.AddPoints(pointsToAdd.Value);
              
            }).WithoutBurst().Run();

            pointsToAdd.Dispose(this.Dependency);
        }
1 Like

https://forum.unity.com/threads/taking-resource-contention-into-consideration-when-choosing-between-ijob-and-ijobparallelfor.632854

edit: sorry on mobile - link point to a post from me, I meant to point to the one before from Joachim

1 Like

Thanks for bringing this up @sngdan , Yeah I was also thinking about this, cache locality is super important. much better to have many systems running parallel than to have many systems with parallel internals.