Designing an event system for ecs?

So I’m about 30 hours into Unity+DOTS now, and thinking about how I’d like to do my prototype’s architecture.

I like the idea of event systems. I think the way entities are queried naturally lends towards this pattern, but I’m curious what “experienced” DOTS developers think?

Specifically, I wonder about performance pitfalls, such as if adding/removing tag components often have a big performance impact? Is there a better way?

1 Like

Welcome! People do use event systems in DOTS. Using tag components can work, but it can have some notable drawbacks. Mainly - in order to add or remove the ‘event’ components, you’ll probably need to use an Entity Command Buffer. Those only execute on the main thread, so all System updates will temporarily stop while these components are added. That can have a notable impact on perf, depending on the situation.

There are a number of other approaches devs have taken. Here are two options that don’t involve tag components or sync points:

  1. Store all events of a type in a DynamicBuffer. That buffer is preallocated to the largest needed size. Each event producer is given an assigned index range which they can safely write to. This allows multiple producers to write to the same DynamicBuffer in separate, parallel jobs. All event “listener” jobs would be scheduled to be dependent on all event “producer” jobs completing first. Clear the event buffer after all listener jobs have completed.

  2. There are a lot of solutions out there which use Native collections in various ways to store events. NativeArray, NativeQueue, and NativeStream are all popular. Some of these are quite advanced, like the one tertle has made public.

2 Likes

ECB is thread safe and burstable from ~5 versions ago, you can add commands from jobs but it will create sync points when the ECB playsback.

  1. you can use the designed System groups that unity already use to playback and safely do sync points.
  2. @tertle created a good event system here which is based on nativestreams (no sync points).
  3. You can also build reactive systems based on components versions. when a System runs and is supposed to write to some CD, the chunk containing those entities increases their CD versions.

Thank you so much for that reference and details. I had better crawl that git repo… but I can’t stop myself from writing a toy implementation to play with :slight_smile:

From spending a few hrs writing a (still broken) toy implementation, I’m getting the feeling that ECS is already kind of an event system. Though, I think a true Event system is important if there is going to be network play, and maybe useful for serializing.

I’ll post my toy code here when I get it working!

2 Likes

Best of luck! Thank you for sharing.

So I finished writing up a toy message system. Not an event system, as events imply multiple consumers.

My code is based on NativeQueues, and is a pretty great learning experience into DOTS/ECS so I don’t regret doing it, but I think it’s more instructional on the kinds of patterns ecs uses, not really useful as is.

The main part of the code is here:

using UnityEngine;
using Unity.Burst;
using Unity.Collections;
using Unity.Entities;
using Unity.Jobs;
using Unity.Mathematics;
using Unity.Transforms;
using System.Runtime.InteropServices;
using System;
using Unity.Collections.LowLevel.Unsafe;


public enum SystemMessageType : int
{
    Audio_Sfx,
    Audio_Music,

}
//[StructLayout(LayoutKind.Explicit, Size = 132)]
//public struct SystemMessage132
//{
//    [FieldOffset(0)]
//    public SystemMessageType type;
//    [FieldOffset(4)]
//    public FixedString128 val_str128;
//}


public enum EventMsgType : int
{
    Spawn,
    Hit,
    Kill,
    Move,
}
[StructLayout(LayoutKind.Sequential)]
public struct EventMsg
{
    public EventMsgType type;
    public double gt;
    public Entity sender;
    public Entity target;
    public FixedListInt4096 data; //TODO: huge.  would want multiple sized message queues to prevent too much wasted mem.
}

public unsafe class MessageSystem : SystemBase
{

    private struct EventMsgParseJob : IJobParallelFor
    {

        [Unity.Collections.LowLevel.Unsafe.NativeSetThreadIndex]
        public int nativeThreadIndex;
        /// <summary>
        /// don't use this reference, but it's included so we tell burst that we do read-write with it so other jobs touching it don't run at the same time.
        /// other jobs using it should use the [ReadOnly] attribute.
        /// </summary>
        public NativeArray<UnsafeList<EventMsg>> threadQueue;
        /// <summary>
        /// Thread local storge.  ptr to NativeArray<UnsafeList<EventMsg>>
        /// </summary>
        [NativeDisableUnsafePtrRestriction]
        public UnsafeList<EventMsg>* p_threadQueue;
        [NativeDisableParallelForRestriction, ReadOnly]
        public ComponentDataFromEntity<OnKill> onKillData;
        [NativeDisableParallelForRestriction]
        public NativeQueue<AudioSystem.AudioMessage>.ParallelWriter audioIn;
        [NativeDisableParallelForRestriction, ReadOnly]
        public EntityManager em;

        public unsafe void Execute(int index)
        {
            //Debug.Log($"EventTest Thread=${nativeThreadIndex}, index={index}");
            var p_list = p_threadQueue[index].Ptr;
            var count = p_threadQueue[index].length;
            for (var i = 0; i < count; i++)
            {
                _processMsg(ref p_list[i]);
            }
            if (count != p_threadQueue[index].length)
            {
                throw new Exception("race, shouldn't happen!");
            }
            p_threadQueue[index].Clear();
        }

        private void _processMsg(ref EventMsg msg)
        {
            switch (msg.type)
            {
                case EventMsgType.Kill:
                    //if (em.HasComponent<OnKill>(msg.target)) //dots bug: https://discussions.unity.com/t/825201
                    if (onKillData.HasComponent(msg.target))
                    {
                        var onKill = onKillData[msg.target];
                        //var onKill = em.GetComponentData<OnKill>(msg.target);  //dots bug: https://discussions.unity.com/t/825201
                        audioIn.Enqueue(new AudioSystem.AudioMessage() { type = SystemMessageType.Audio_Sfx, audioFile = onKill.sfxName });                                                                                                                                     //onKill.
                    }
                    break;
            }
        }
    }

    //    public NativeArray<NativeQueue<EventMsg>> threadQueue;
    //public NativeQueue<EventMsg>[] threadQueue;
    public static NativeArray<UnsafeList<EventMsg>> threadQueue;

    protected override void OnCreate()
    {


        base.OnCreate();
        //threadQueue = new NativeArray<NativeQueue<EventMsg>>(Unity.Jobs.LowLevel.Unsafe.JobsUtility.MaxJobThreadCount, Allocator.Persistent, NativeArrayOptions.UninitializedMemory);
        //threadQueue = new NativeQueue<EventMsg>[Unity.Jobs.LowLevel.Unsafe.JobsUtility.MaxJobThreadCount];
        threadQueue = new NativeArray<UnsafeList<EventMsg>>(Unity.Jobs.LowLevel.Unsafe.JobsUtility.MaxJobThreadCount, Allocator.Persistent, NativeArrayOptions.UninitializedMemory);
        for (int i = 0; i < threadQueue.Length; i++)
        {
            //threadQueue[i] = new NativeQueue<EventMsg>(Allocator.Persistent);
            threadQueue[i] = new UnsafeList<EventMsg>(100, Allocator.Persistent, NativeArrayOptions.ClearMemory);
            //threadQueue[i].
        }
    }
    protected override void OnDestroy()
    {
        base.OnDestroy();
        for (int i = 0; i < threadQueue.Length; i++)
        {
            threadQueue[i].Dispose();
        }
        threadQueue.Dispose();
    }
    protected override void OnUpdate()
    {

        var job = new EventMsgParseJob()
        {

            audioIn = AudioSystem.messageIn,
            em = this.EntityManager,
            threadQueue = threadQueue,
            p_threadQueue = (UnsafeList<EventMsg>*)threadQueue.GetUnsafePtr(),
            onKillData = GetComponentDataFromEntity<OnKill>(),
        };



        var handle = job
            .Schedule(threadQueue.Length, 1); //each item is a queue, so batches of 1
        handle.Complete();
    }
}

Missing is the publishing and consuming systems. Publisher would enqueue to the MessageSystem.threadQueue static. consumers are currently manually hardcoded into the messageSystem loop (line 91).

After getting that working, I went out to see how other DOTS event systems were designed.

A great primer on using c# events (meaning no burst) can be found here:

(video by @CodeMonkeyYT ) He does it two ways: NativeQueue and message Entities. both ways seem unsatisfactory to me due to sync points and the no-burst requirement but it’s still a good primer.

I started reading through @tertle 's event system: Event System and found the idea of NativeStreams very interesting. Unfortunately the lack of documention is super frustrating. Via search I found SimpleUIDemo/Tiny3D/Library/PackageCache/com.unity.collections@0.3.0-preview.0/Unity.Collections.Tests/NativeStreamTests.cs at master · Unity-Technologies/SimpleUIDemo · GitHub
which shows some usage of NativeStreams. NativeStreams seem to allow multiple producers and consumers, basically what events are all about.

Given that’ I’m new to Unity, I think I better figure out how to setup unit tests, so I can validate some of my assumptions (like NativeStreams: If a Reader is added after a write, that reader won’t get the prior written data?) and also will look further into tertle’s event system after that.

I’ll add more to this thread afterwards.

1 Like

Native stream is the data container. the reader and writer are ways to access and modify that data container.

My summarized explanation of it is has follow.

You create a native stream with the foreachCount.
That foreachCount is basically the number of thread that can write to the native stream. So in case of IJobChunk and EntitiesForeach it would be the number of chunks.

That will basically create a container that is sort of an array of native queues (the difference being that you can write anything to it not just a single type of data).

When you want to write to the native stream you request a Writer instance of it.
This gives you access to the beginforeach and endforeach method.
Specifying the index in the beginforeach method will tell the native stream that you want to write to the “native queue” referenced at that index in the array of native queues.
You can only write to that index in a single thread.
Once you are done writing, you invoke the endforeach method.
Once it’s done, you can’t ever write to that ever again.

Then to read the native stream, you do the same thing but requesting a Reader instead of a Writer.
You can read the same index multiple times and I think you can read the same index form multiple thread at the same time (as each reader instance will have it’s own tracking of at what point it is in the reading process).
The reader will have access to everything that was written up to that point.
You can not read and write at the same time.

Hope this help anyone reading this, it took me a while to understand how that container worked and I hope I’m not completely wrong about it. I’have been using it for some time now for my ability/skill system and it works great. (I only ran into one issue but found a work around, more detail here Native stream Bug in IJobChunk with entities 0.16.0-preview.21 and 0.17.0-preview.41 )

6 Likes

Really, thank you so much for that explanation. Without it I think I would have given up, as if you don’t use NativeStream exactly as you described, you will get obtuse runtime exceptions thrown.

If anyone wants to see basic NativeStream usage, here is a test I wrote that is passing successfully

    [Test]
    [Category("ecs lowlevel")]
    public void NativeStreamBasic()
    {
        var stream = new NativeStream(1,Allocator.Persistent);

       
        var writer = stream.AsWriter();
        var reader = stream.AsReader();

        writer.BeginForEachIndex(0);
        for (var i = 0; i < 100; i++)
        {
            if (i % 2 == 0)
            {
                //unsafe{
                //    var pInt = (int*)writer.Allocate(sizeof(int));
                //    *pInt = i;
                //}
                ref int x = ref writer.Allocate<int>();
                x = i;
            }
            else
            {
                writer.Write(i);
            }
        }
        writer.EndForEachIndex();

        reader.BeginForEachIndex(0);
        for (var i = 0; i < 100; i++)
        {
            var result = reader.Read<int>();
            Assert.AreEqual(i, result);
        }
        reader.EndForEachIndex();

        stream.Dispose();
    }
1 Like

I having trouble understanding this and would really like too, is there a small simple example I can look at. Useing dynamic buffers