Parrallel READ/WRITE to resizable "list"

Hello,

I’m working on a project where I need to populate a list of unknown size and then read from it, of course read and write must happen concurently.
I read this thread but it did not solve my issue : Request: allow parallel writing to NativeList when [WriteOnly] is used, or add .Concurrent

It seems that only the NativeQueue meet the criteria for the cocnurrent growing list.
The problem is once I get that native queue I have no way to read concurently from it.
I’m stuck with a non concurent dequeue.

I managed to “fix” this by adding a PeekAt(index) to the NativeQueue file in the collection package.
From my tests it works fine in any situation concurrent or non concurrent read but I can’t extract that method from that file in an extension mehtod due to the level of protection of the property used.

So the questions are :
1 ) Is there a structure that can handle concurent read and write to an unknown sized “list”, meaning it can grow on write (I don’t care for removal operation in my case) ?
2 ) Given the method I wrote (see below) Is there a way the extract it from the unity file through an extension method or something ?
3 ) If none of the previous question can be solve would anyone also be interested in this functionality and if so would the unity team be willing to review my method to integrate it to the native queue ?

The method I wrote :

      unsafe public T PeekAt(int index)
        {
#if ENABLE_UNITY_COLLECTIONS_CHECKS
                AtomicSafetyHandle.CheckReadAndThrow(m_Safety);
#endif
            NativeQueueBlockHeader* block = (NativeQueueBlockHeader*)m_Buffer->m_FirstBlock;
            if (block == null)
                throw new InvalidOperationException("Trying to peek from an empty queue");

            int targetBlock = index / m_Buffer->m_ItemsPerBlock;
            int currentBlock = 0;

            while (targetBlock > currentBlock)
            {
                block = (NativeQueueBlockHeader*)block->nextBlock;
                currentBlock++;
            }

            int itemIndexInBlock = index % m_Buffer->m_ItemsPerBlock;

            if (itemIndexInBlock  > block->itemsInBlock)
                throw new InvalidOperationException($"Out of bound {itemIndexInBlock} exceed {block->itemsInBlock}");

          

            return UnsafeUtility.ReadArrayElement<T>((byte*)block + UnsafeUtility.SizeOf<NativeQueueBlockHeader>(), itemIndexInBlock);
        }

The class used to test it :

using System.Collections;
using System.Collections.Generic;
using Unity.Collections;
using Unity.Jobs;
using UnityEngine;

public class TestNativeQueue : MonoBehaviour
{
    public NativeArray<int> NativeArray;
    public NativeArray<int> NativeArrayOut;
    public NativeQueue<int> NativeQueue;
    public int TestSize = 10;
    public bool ConcurentTest = true;

    void Awake()
    {
        //Allocate memory
        NativeArray = new NativeArray<int>(TestSize, Allocator.TempJob);
        NativeQueue = new NativeQueue<int>(Allocator.TempJob);
        NativeArrayOut = new NativeArray<int>(TestSize, Allocator.TempJob);

        // populate Array
        for (int i = 0; i < NativeArray.Length; i++)
        {
            NativeArray[i] = i;
            NativeQueue.Enqueue(i);
        }

        if (ConcurentTest) {
            // concurent read test
            var job = new PopulateOutJob()
            {
                NativeArray = NativeArrayOut,
                NativeQueue = NativeQueue
            }.Schedule(NativeQueue.Count, 1);
            job.Complete();
        }
        else {
            // sequential read test
            for (int i = 0; i < NativeArray.Length; i++)
            {
                NativeArrayOut[i] = NativeQueue.PeekAt(i);
            }
        }

        // test Array
        for (int i = 0; i < NativeArray.Length; i++)
        {
            if(NativeArray[i] != NativeArrayOut[i])
            {
                Debug.Log($"NativeArray  {NativeArray[i]}    NativeArrayOut {NativeArrayOut[i]}");
            }
        }

        // Free memory
        NativeArray.Dispose();
        NativeArrayOut.Dispose();
        NativeQueue.Dispose();
    }

    struct PopulateOutJob : IJobParallelFor
    {
        public NativeArray<int> NativeArray;
        [ReadOnly]public NativeQueue<int> NativeQueue;

        public void Execute(int index)
        {
            NativeArray[index] = NativeQueue.PeekAt(index);
        }

    }
   
}

Just look up MPMC (Multiple Producer/Multiple Consumer) queues, theres tons of them in various langauges on github - port one to C#/unsafe.

This is one of those things where almost always there is another approach, but sometimes it’s just not worth the additional complexity. You really don’t want to use this for high volume work where performance is critical.

NativeIntPtr is from Jackson Dunstan’s blog easy to google.

Hello, thanks for your anwsers.

Just to be clear I used the queue because it is the closest one from what I needed but I in fact use it a a list.

The native list can’t grow concurently but can be read at multiple index concurently, the native queue is the oposit.
It was simple to make the read thatn the write so I went with the queue…

I have one job that populate it concurently, then another job read from it concurently, each iteration of the job read from multiple index off the queue. I don’ want to remove anything from the queue while reading it.

Use NativeStream. It offers:

  1. Performance
  2. Determinism
  3. Parallel Write
  4. Parallel Read

Sounds too good to be true, but its not…

Unity.Physics uses it for all temporary “events” contact points, collision events, jacobians etc.

5 Likes

Great! I’ll dig into that. Thanks.

Happen to have any public examples on github perchance? :slight_smile: The physics Samples seem to not have any in the repository, and the latest Physics package I believe is using preview-33 rather than entities 0.1.1

Best is to look at the NativeStreamTests.cs in the collections package.

Physics still uses BlockStream which is the same code. Just moved to Unity.Collections, cleaned up, renamed.
Physics hasn’t been upgraded to use NativeStream yet.

Hello @Joachim_Ante_1 ,
Thanks for the answers. I’m migth be to exigent but it seems to me that it still requieres to know the size of the list first. It’s not a “growing” concurent list like the queue is.

Oh, it is, until we get better documentation.

Gottem

I also found no docs frustrating. But knowing the location of the NativeStreamTests is almost enough to learn to use it properly. You can find it under YOUR_UNITY_PROJECT\Library\PackageCache\com.unity.collections@0.15.0-preview.21\Unity.Collections.Tests\NativeStreamTests.cs

This explanation (by the poster of this thread? :eyes:) gives a good summary too: https://forum.unity.com/threads/designing-an-event-system-for-ecs.1041238/#post-6747910

2 Likes