Hello,
I’m working on a project where I need to populate a list of unknown size and then read from it, of course read and write must happen concurently.
I read this thread but it did not solve my issue : Request: allow parallel writing to NativeList when [WriteOnly] is used, or add .Concurrent
It seems that only the NativeQueue meet the criteria for the cocnurrent growing list.
The problem is once I get that native queue I have no way to read concurently from it.
I’m stuck with a non concurent dequeue.
I managed to “fix” this by adding a PeekAt(index) to the NativeQueue file in the collection package.
From my tests it works fine in any situation concurrent or non concurrent read but I can’t extract that method from that file in an extension mehtod due to the level of protection of the property used.
So the questions are :
1 ) Is there a structure that can handle concurent read and write to an unknown sized “list”, meaning it can grow on write (I don’t care for removal operation in my case) ?
2 ) Given the method I wrote (see below) Is there a way the extract it from the unity file through an extension method or something ?
3 ) If none of the previous question can be solve would anyone also be interested in this functionality and if so would the unity team be willing to review my method to integrate it to the native queue ?
The method I wrote :
unsafe public T PeekAt(int index)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle.CheckReadAndThrow(m_Safety);
#endif
NativeQueueBlockHeader* block = (NativeQueueBlockHeader*)m_Buffer->m_FirstBlock;
if (block == null)
throw new InvalidOperationException("Trying to peek from an empty queue");
int targetBlock = index / m_Buffer->m_ItemsPerBlock;
int currentBlock = 0;
while (targetBlock > currentBlock)
{
block = (NativeQueueBlockHeader*)block->nextBlock;
currentBlock++;
}
int itemIndexInBlock = index % m_Buffer->m_ItemsPerBlock;
if (itemIndexInBlock > block->itemsInBlock)
throw new InvalidOperationException($"Out of bound {itemIndexInBlock} exceed {block->itemsInBlock}");
return UnsafeUtility.ReadArrayElement<T>((byte*)block + UnsafeUtility.SizeOf<NativeQueueBlockHeader>(), itemIndexInBlock);
}
The class used to test it :
using System.Collections;
using System.Collections.Generic;
using Unity.Collections;
using Unity.Jobs;
using UnityEngine;
public class TestNativeQueue : MonoBehaviour
{
public NativeArray<int> NativeArray;
public NativeArray<int> NativeArrayOut;
public NativeQueue<int> NativeQueue;
public int TestSize = 10;
public bool ConcurentTest = true;
void Awake()
{
//Allocate memory
NativeArray = new NativeArray<int>(TestSize, Allocator.TempJob);
NativeQueue = new NativeQueue<int>(Allocator.TempJob);
NativeArrayOut = new NativeArray<int>(TestSize, Allocator.TempJob);
// populate Array
for (int i = 0; i < NativeArray.Length; i++)
{
NativeArray[i] = i;
NativeQueue.Enqueue(i);
}
if (ConcurentTest) {
// concurent read test
var job = new PopulateOutJob()
{
NativeArray = NativeArrayOut,
NativeQueue = NativeQueue
}.Schedule(NativeQueue.Count, 1);
job.Complete();
}
else {
// sequential read test
for (int i = 0; i < NativeArray.Length; i++)
{
NativeArrayOut[i] = NativeQueue.PeekAt(i);
}
}
// test Array
for (int i = 0; i < NativeArray.Length; i++)
{
if(NativeArray[i] != NativeArrayOut[i])
{
Debug.Log($"NativeArray {NativeArray[i]} NativeArrayOut {NativeArrayOut[i]}");
}
}
// Free memory
NativeArray.Dispose();
NativeArrayOut.Dispose();
NativeQueue.Dispose();
}
struct PopulateOutJob : IJobParallelFor
{
public NativeArray<int> NativeArray;
[ReadOnly]public NativeQueue<int> NativeQueue;
public void Execute(int index)
{
NativeArray[index] = NativeQueue.PeekAt(index);
}
}
}