How to implement an Event-driven pattern on ECS?

Hello.

I’ve heard a few people talking about using short-lived entities like events on an event-driven system. You could have a ‘dispatcher’ system which would create an entity and attach a component to pass data around. Your ‘listener’ systems would query for those components, and ‘react’ or ‘consume’ those ‘events’.

This type of mechanism would work well as a solution to a problem I’m trying to solve, but I can’t think of a good way to register and ‘unregister’ my systems as ‘listeners’ at runtime. EntityQuery and Jobs need to be explicitly declared and using ComponentSystem.Enabled is no good as a single system might be ‘listening’ to multiple ‘events.’

I thought about using reflection, but it won’t work inside jobs.

Has anybody here implemented anything similar?

The best approach - is to not implement it at all.

Process only your current data. Define a system that runs on a specific set of data.
Submit an entity with that data. In case these entities fit system requirements via entity query - it will run.

“Listen” to data (Components) from the specific systems requiring those data.

TL;DR: You don’t need event-driven aproach in ECS. Because systems will only run if there’s data for them.

Use your data as events. Fire and forget the entity, process them. And discard entities when done with them via EntityManager.DestroyEntity(entityQueryOverload) (its even faster than winding up a job + ECB).

Doing .GetEntityQuery(yourQuery) inside the system will add dependency for the query internally.
So if there’s no fitting entity archetype, system will not run at all.

2 Likes

You can look at a system I wrote a while ago to do ecs like events.

You wouldn’t subscribe so much as just query a component.

An alternative to Events is Triggered Systems. In control systems, Events imply asynchronous actions - they invoke when the event occurs, preempting execution. ECS uses a discrete step approach, so events are really fighting the system - you want to work within the steps. A Triggered System is a system that is gated by a control port. When the signal on the port is high, the system runs, when it is low the system doesn’t run. An event can be a pulse with a width of one step.

Here’s a crude way you can do it. I wouldn’t recommend this exactly, but it might get you started.

In your triggered system add a RequireForUpdate in the OnUpdate method.

RequireForUpdate(EntityManager.CreateEntityQuery(typeof(EventTypeTrigger)));

This serves as the trigger port or “event subscriber”.

Your EventTypeTrigger can be something like

 struct EventTypeTrigger : IComponentData {
   public int eventSource;
   public int eventType;
   public int eventDataReference;
}

On the main thread, you can use the LateUpdate() to clear the events.

I used something like this for UI Buttons. I modified the UnityUI Button code to hold an identifier (eventSource, set in the inspector), and to produce a ButtonTrigger entity. The system ran when the component was present. I checked the eventType to see if I want to respond, and got cached event data using eventDataReference if needed. If the system was a dedicated handler I destroyed the entity there. Otherwise, in LateUpdate() I would just delete all Trigger Entities (brute-force, not really recommended), to ensure the event lifetime / pulse width was only one step.

What I am doing now is streaming entity definitions from the UI into a queue instead of creating then right away. On each update cycle I grab the queue data, flush the queue, and bulk create (or destroy) the Trigger Entities. This way I can debounce, etc. For a mouse click I create a 1-step pulse entity for the leading edge of mouse-down, and a long-lived “high” pulse to indicate the button is down, and the when the button is released I create a 1-step pulse for the trailing edge, and destroy the long-lived entity. My triggered systems run once on down/up, and continuously while the button is held down.

I also use this approach for system timings. I have a world-wide Timing Bus that generates pulse trains of 1-step entities on things like Second, Hour, Day, Year, etc. RequireForUpdate “listens” for the pulses and the system is triggered to run once every period.

3 Likes

I’ve been experimenting with a few different approaches, based on top of @tertle 's system. And making job and component system abstractions.

As Randy talked about, the idea is to just create Entities with a single component that houses your event data. It’s a pretty cheap operation because you can create and destroy them in bulk, and you can also assign the data to them in burst jobs from a queue.

So whenever you want to ‘fire’ an event you just queue it to be created (which can be done in jobs/concurrently), and it shows up in the system after the event system runs to process the queue. You can then have systems ‘react’ and detect the presence of the event entity with RequireForUpdate and a query.

Here’s one of my current ideas (Intended for structural changes/outside of a job).

[UpdateInGroup(typeof(EventRespondersGroup))]
public abstract unsafe class EntityEventResponderSystem<T1,T2> : ComponentSystem
    where T1 : struct, IComponentData
    where T2 : struct, IComponentData
{
    public class SystemConfig
    {
        public int StartingBufferSize = 50;
        public Action<Entity> EntityHandler;
        public Action<Entity, T1> Component1Handler;
        public Action<Entity, T2> Component2Handler;
    }

    protected EntityEventSystem EventSystem { get; private set; }
    protected EntityQuery Query { get; private set; }
    protected SystemConfig Config { get; private set; } = new SystemConfig();

    private NativeList<Entity> _buffer;
    private EntityQueryDesc _desc;

    public void Configure(Action<SystemConfig> options)
    {
        _desc = new EntityQueryDesc
        {
            Any = new[]
            {
                ComponentType.ReadOnly<T1>(),
                ComponentType.ReadOnly<T2>()
            }
        };

        Query = GetEntityQuery(_desc);

        RequireForUpdate(Query);

        EventSystem = World.GetOrCreateSystem<EntityEventSystem>();

        options?.Invoke(Config);

        _buffer = new NativeList<Entity>(Config.StartingBufferSize, Allocator.Persistent);
    }

    protected override unsafe void OnUpdate()
    {
        var entityCount = Query.CalculateLength();
        if (entityCount == 0)
        {
            return;
        }

        _buffer.ResizeUninitialized(entityCount);

        var entityType = EntityManager.GetArchetypeChunkEntityType();

        var job = new PublicGatherEntitiesJob
        {
            EntityType = entityType,
            Entities = _buffer,
        };
        job.Run(Query);

        var ptr = _buffer.GetUnsafePtr();

        for (int i = 0; i < _buffer.Length; i++)
        {
            ref Entity entity = ref UnsafeUtilityEx.ArrayElementAsRef<Entity>(ptr, i);

            Config.EntityHandler?.Invoke(entity);

            if (Config.Component1Handler != null && EntityManager.HasComponent<T1>(entity))
            {
                Config.Component1Handler(entity, EntityManager.GetComponentData<T1>(entity));
            }
            if (Config.Component2Handler != null && EntityManager.HasComponent<T2>(entity))
            {
                Config.Component2Handler(entity, EntityManager.GetComponentData<T2>(entity));
            }
        }
    }
}

[BurstCompile]
unsafe struct PublicGatherEntitiesJob : IJobChunk
{
    [ReadOnly]
    public ArchetypeChunkEntityType EntityType;
    public NativeArray<Entity> Entities;

    public void Execute(ArchetypeChunk chunk, int chunkIndex, int entityOffset)
    {
        var destinationPtr = (Entity*)Entities.GetUnsafePtr() + entityOffset;
        var sourcePtr = chunk.GetNativeArray(EntityType).GetUnsafeReadOnlyPtr();
        var copySizeInBytes = sizeof(Entity) * chunk.Count;
        UnsafeUtility.MemCpy(destinationPtr, sourcePtr, copySizeInBytes);
    }
}

A few things to note:

  • This is the same as what Query.ToEntityArray() does internally except with an early out on Length and not allocating a new array. Although maybe RequireForUpdate makes entityCount == 0 impossible.
  • By using a Core 2.0 style configuration I think the layer can be a bit more transparent because derived classes are free to override the component system methods OnCreate etc without having to remember to call base.
  • It would be better if this all could be done inside burst/jobs but its fine for structural changes which are currently forced to happen outside of all of that anyway.
  • I’ve modified the event system queues to be persistent rather than creating a new NativeQueue every time which is why it can be stored as a field in the example below.
using Assets.Game.BufferData;
using Unity.Collections;
using Unity.Entities;
using UnityEngine;

public class ScoreSystem : EntityEventResponderSystem<MatchedEvent, ExplosionEvent>
{
    private NativeQueue<ScoreAddedEvent> _scoreEventQueue;

    protected override void OnCreate()
    {
        Configure(c =>
        {
            c.Component1Handler = AddScoreFromMatches;
            c.Component2Handler = AddScoreFromExplosions;
        });

        _scoreEventQueue = EventSystem.GetEventQueue<ScoreAddedEvent>();
    }

    public const int BaseScoreMultiplier = 10;

    private void AddScoreFromMatches(Entity entity, MatchedEvent matchedEvent)
    {
        var matches = EntityManager.GetBuffer<SelectionBufferData>(entity);

        _scoreEventQueue.Enqueue(new ScoreAddedEvent
        {
            Source = ScoreSource.Match,
            Value = CalculateScore(matches),
            Origin = matches.Last().Position,
            ChainSize = matches.Length
        });
    }

    private void AddScoreFromExplosions(Entity entity, ExplosionEvent explosionEvent)
    {
        var matches = EntityManager.GetBuffer<ExplosionHitBufferData>(entity);

        _scoreEventQueue.Enqueue(new ScoreAddedEvent
        {
            Source = ScoreSource.Explosion,
            Value = CalculateScore(matches),
            Origin = explosionEvent.Origin,
            ChainSize = matches.Length,
        });
    }

    private static float CalculateScore<T>(DynamicBuffer<T> matches) where T : struct, IBlockDefinition
    {
        var score = 0f;
        for (int i = 0; i < matches.Length; i++)
        {
            var m = i * matches[i].BlockDefinition.ValueMultiplier;
            score += m * (1 + BaseScoreMultiplier) * m;
        }
        return score;
    }
}

public class ScoreDebugSystem : EventResponderSystem<ScoreAddedEvent>
{
    protected override void OnEvent(ref ScoreAddedEvent scoreEvent)
    {
        Debug.Log($"{scoreEvent}");
    }
}

I don’t make any claims as to the awesomeness of this setup but hopefully it gives others some ideas.

1 Like