NativeStream usage.

Hello.

I was checking out NativeStream collection which looked liked it could make my event processing system much simpler. I am processing events in FIFO fashion and I dont need parallel processing. The events are received continuously, every frame from a simulated game, but they are processed individually, sequentially and processing of a single event can take multiple frames before next event can be processed. To be specific, this system should control animation/visual effects of actions in a turn based game.
The events are of different types, so this is the main reason i am considering NativeStream.
I am having trouble with writting data to NativeStream in first job, and reading from it in a second job.
I am receiving exception ArgumentException: BeginForEachIndex can only be called once for the same index (0).
The code is at the bottom.
Is it a correct way of using NativeStream ? Is it appropriate to use NativeStream in my use case ?
NativeStream code

struct PresentationQueueJob : IJob {

public NativeStream.Writer Writer;
public NativeArray<UnitMovedEvent> UnitMovedEvents;
public NativeArray<UnitAttackedEvent> UnitAttackedEvents;

public void Execute()
{
   Writer.BeginForEachIndex(0);
  for(int i =0; i < UnitMovedEvents.Lenght; i++) {
      Writer.Write(UnitMovedHeader);
      Writer.Write(UnitMovedEvents[i]);
  }
// same for other events

  Writer.EndForEachIndex();
}
}

struct PresentationJob : IJob {

public NativeStream.Reader Reader;
public EntityCommandBuffer Buffer;
public NativeArray<PendingAction> PendingActions;

public void Execute()
{
 if(PendingActions.Lenght > 0 && PendingActions[0].IsCompleted) {
   Reader.BeginForEachIndex(0);
   var header = Reader.Read<int>();
       swich(header) {
     case UnitMovedHeader: {
       var movedEvent = Reader.Read<UnitMovedEvent>();
       var actionEntity = Buffer.CreateEntity();
       Buffer.AddComponent(actionEntity, new PlayAnimation{Value = Move});
       Buffer.AddComponent(actionEntity, new PendingAction { IsCompleted = false});
       break;
    }
   }
   Reader.EndForEachIndex();
  }
}
}

Show scheduling code? The error message may be misleading and I’m guessing it is a job chain issue.

I am scheduling job as this:
Code

        protected override JobHandle OnUpdate(JobHandle inputDeps)
        {
            inputDeps = new PresentationQueueJob
            {
                Writer= _presentationStream.AsWriter(),
                UnitMovedEvents = _unitMovedEventsQuery.ToComponentDataArray<UnitMovedEvent>(Allocator.TempJob),
                UnitAttackedEvents = _unitAttackedEventsQuery.ToComponentDataArray<UnitAttackedEvent>(Allocator.TempJob)
            }.Schedule(inputDeps);

            var pendingActions = _pendingActionQuery.ToEntityArray(Allocator.TempJob);
            inputDeps = new PresentationJob
            {
                Buffer = _bufferSystem.CreateCommandBuffer(),
                PendingActions = pendingActions,
                Reader = _presentationStream.AsReader(),
                ProgressFromEntity = GetComponentDataFromEntity<Progress>(true)
            }.Schedule(inputDeps);

            _bufferSystem.AddJobHandleForProducer(inputDeps);

            return inputDeps;
        }

I am creating native stream in OnCreate() as this:```
_presentationStream = new NativeStream(1, Allocator.Persistent);

If this is the only system that touches that NativeStream, then I don’t see anything wrong. The next thing I would look at is the stack trace. That should tell you which job is the offender. You can also change Schedule to Run on either or both jobs and see if that helps.

Here is the test which throws ArgumentException: BeginForEachIndex can only be called once for the same index (0).
Test

       [Test]
        public void TestReadWrite()
        {
            var stream = new NativeStream(1, Allocator.TempJob);
            var writer = stream.AsWriter();
            writer.BeginForEachIndex(0);
            writer.Write(1);
            writer.Write(1.5f);
            writer.EndForEachIndex();
            var reader = stream.AsReader();
           
            reader.BeginForEachIndex(0);
            var intVal1 = reader.Read<int>();
            var floatVal1 = reader.Read<float>();
            reader.EndForEachIndex();

            writer.BeginForEachIndex(0);
            writer.Write(2);
            writer.Write(2.5f);
            writer.EndForEachIndex();

            reader.BeginForEachIndex(0);
            var intVal2 = reader.Read<int>();
            var floatVal2 = reader.Read<float>();
            reader.EndForEachIndex();
           
            Assert.True(1 == intVal1);
            Assert.True(1.5f == floatVal1);
           
            Assert.True(2 == intVal2);
            Assert.True(2.5f == floatVal2);

            stream.Dispose();
        }

Looks like NativeStream cant do what I was assuming. Test shows me that I cannot read/write from the same index multiple times. I thought that the index is there just to enable parallel read/writes.
I will use multiple queues, one for each event type and one extra queue with headers as a alternative.

Is it throwing on the second writer.BeginForEachIndex(0)? If so, that makes sense. You can’t write to the same index multiple times.

I guess I also never noticed that NativeStream doesn’t have a Clear() method. So if you want to write to the stream every frame, you have to make the stream a tempJob and make a new one every frame.