Schedule SystemGroup every second (and give it the whole second to perform a calculation)

Hi, I want to write a custom SystemGroup that executes its members every second.
(e.g. to execute pathfinding in that group without blocking the movement updates)

Edit:
The pathfinding (or other expansive calculations like A* or MCTS) should be able to run multiple frames and than apply its results.

So at 120 fps it could look like this:
frame 1 → Expansive calculation collects all needed data and starts execution
frame 1-56 → system is calculating
frame 120 → system applies changes
End Edit.

I’m sorry if this was already asked but I have no idea how to search for this.

I tought I could use IFixedRateManager for this but Unity just freezes.
Here’s the code:

public class UpdateIntervalFixedRateManager : IFixedRateManager
{
    public float Timestep { get; set; }
    private float _updateIntervalSeconds;
    private float _lastUpdateTime;
    private float _timeSinceLastUpdate;
    public UpdateIntervalFixedRateManager(float updateIntervalSeconds)
    {
        _updateIntervalSeconds = updateIntervalSeconds;
    }
    public bool ShouldGroupUpdate(ComponentSystemGroup group)
    {
        _timeSinceLastUpdate += Time.realtimeSinceStartup - _lastUpdateTime;
        if(_timeSinceLastUpdate > _updateIntervalSeconds)
        {
            _timeSinceLastUpdate = 0;
            Debug.Log("UpdateIntervalFixedRateManager: time since last update: " + _timeSinceLastUpdate);
            return true;
        }
        _lastUpdateTime = Time.realtimeSinceStartup;
        return false;
    }
}

[UpdateInGroup(typeof(SimulationSystemGroup))]
public class ExecuteOnceASecondSystemGroup : ComponentSystemGroup
{
    public ExecuteOnceASecondSystemGroup() => FixedRateManager = new UpdateIntervalFixedRateManager(1);
    protected override void OnCreate()
    {
        base.OnCreate();
    }
    protected override void OnUpdate()
    {
        base.OnUpdate();
        Debug.Log("updated system group");
    }
}

[UpdateInGroup(typeof(ExecuteOnceASecondSystemGroup))]
public class ExecuteOnceASecondSystem : SystemBase
{
    protected override void OnUpdate()
    {
        Debug.Log("ExecuteOnceASecondSystem");
        Entities.ForEach((ref Entity entity) =>
        {
            ExpensiveCalculation(1000);
        }).Schedule();
    }
    private static long ExpensiveCalculation(int n)
    {
        //do stuff...
        return n;
    }
}

Am I approaching this entirely wrong?

It looks like you are hitting an infinite loop in your ShouldGroupUpdate() function. You aren’t updating _lastUpdateTime when you return true.

I would consider simplifying it a bit to something like this:

public bool ShouldGroupUpdate(ComponentSystemGroup group)
{
    if(Time.realtimeSinceStartup - _lastUpdateTime <= _updateIntervalSeconds)
    {
        return false;
    }
     _lastUpdateTime = Time.realtimeSinceStartup;
     return true;
}

Also, I don’t know if it matters but in Unity’s own use of IFixedRateManager, they pull the elapsed time using group.World.Time instead of Time. You can see their code here: https://github.com/needle-mirror/com.unity.entities/blob/f88c75830dfb9746c1fcb7b85184d39c2a5b297b/Unity.Entities/FixedRateUtils.cs

1 Like

Sorry it took so long to write back, stressful week …

Whoops yes, nice catch :smile:

Okay this solves one problem and the function is now beeing called only once a second. But there is still the blocking problem.

I updated the IFixedUpdateManager to the FixedRateCatchUpManager version out of the old FixedRateUtils.

But Entities.ForEach of SystemBases Update() is still blocking at the executed frame.
I could use something like JobComponentSystem and control the jobHandles myself like in this version:

using System;
using Unity.Burst;
using Unity.Collections;
using Unity.Entities;
using Unity.Jobs;
using Unity.Transforms;
using UnityEngine;


[UpdateInGroup(typeof(ExecuteOnceASecondSystemGroup))]
public class ExecuteOnceASecondSystem : JobComponentSystem, IDisposable
{
    private EntityQuery _allTranslationsQuery;
    private ExpensiveJob _expensiveJob;
    private JobHandle _expensiveJobHandle;
    private JobHandle _applyJobHandle;
    private NativeArray<long> _result;

    private bool needsToApply;
    private const int _expensiveness = 10000;
    protected override void OnCreate()
    {
        base.OnCreate();
        _allTranslationsQuery = GetEntityQuery(ComponentType.ReadOnly<Translation>());
    }

    protected override JobHandle OnUpdate(JobHandle inputDeps)
    {
        Debug.Log("ExecuteOnceASecondSystem was called");

        if (_expensiveJobHandle.IsCompleted)
        {
            // if a job is completed first apply results
            if (needsToApply)
            {
                Debug.Log("Applying result");
                var result = _result;
                /* TODO can not be done with entityInQueryIndex
                 * Result should keep track which Translation belongs to which Entity. Maybe there are more or
                 * less Entities than when _expensiveJob started. Maybe some Entites now belong to a different
                 * Architype and the index is different.
                 * However, such an expensive calculation is probably done for something that does not just disappear
                 * like a MCTS for the next move of a computer player so this may not be a problem.
                 */
                _applyJobHandle = Entities.ForEach((int entityInQueryIndex, ref Translation translation) =>
                {
                    translation.Value *= result[entityInQueryIndex] / _expensiveness;
                }).WithDisposeOnCompletion(result)
                  .Schedule(inputDeps);

                needsToApply = false;
                return JobHandle.CombineDependencies(_applyJobHandle, inputDeps);
            }
            // if results were applied start execution again
            Debug.Log("Starting new execution.");
            int allTranslationsCount = _allTranslationsQuery.CalculateEntityCount();
            var allTranslations = _allTranslationsQuery.ToComponentDataArray<Translation>(Allocator.Persistent);
            _result = new NativeArray<long>(allTranslationsCount, Allocator.Persistent);
            _expensiveJob = new ExpensiveJob
            {
                translations = allTranslations,
                result = _result
            };
            _expensiveJobHandle = _expensiveJob.Schedule(allTranslations.Length, JobHandle.CombineDependencies(_applyJobHandle, inputDeps));

            needsToApply = true;
        }
        else
        {
            Debug.LogWarning("Skipped execution");
        }

        // return only inputDeps and do not depend on expensiveJobHandle
        return inputDeps;
    }


    [BurstCompile]
    public struct ExpensiveJob : IJobFor
    {
        [DeallocateOnJobCompletion]
        public NativeArray<Translation> translations;
        public NativeArray<long> result;

        public void Execute(int index)
        {
            result[index] = ExpensiveCalculation(_expensiveness);
        }
    }

    private static long ExpensiveCalculation(int n)
    {
        int count = 0;
        long a = 2;
        while (count < n)
        {
            long b = 2;
            int prime = 1;
            while (b * b <= a)
            {
                if (a % b == 0)
                {
                    prime = 0;
                    break;
                }
                b++;
            }
            if (prime > 0)
            {
                count++;
            }
            a++;
        }
        return (--a);
    }

    public void Dispose()
    {
        _result.Dispose();
    }
}

With

using Unity.Entities;

[UpdateInGroup(typeof(SimulationSystemGroup))]
public class ExecuteOnceASecondSystemGroup : ComponentSystemGroup
{
    public ExecuteOnceASecondSystemGroup() => FixedRateManager = new UpdateIntervalFixedRateManager(1);
    protected override void OnCreate()
    {
        base.OnCreate();
    }
    protected override void OnUpdate()
    {
        base.OnUpdate();
    }
}

and

using Unity.Core;
using Unity.Entities;
using Unity.Mathematics;

public class UpdateIntervalFixedRateManager : IFixedRateManager
{
    float m_MaximumDeltaTime;
    public float MaximumDeltaTime
    {
        get => m_MaximumDeltaTime;
        set => m_MaximumDeltaTime = math.max(value, _fixedTimestep);
    }

    private double _lastUpdateTime;
    bool _didPushTime;
    double _maxFinalElapsedTime;
    long _fixedUpdateCount;
    private float _fixedTimestep { get; set; }
    public float Timestep
    {
        get => _fixedTimestep;
        set
        {
            _fixedTimestep = math.clamp(value, 0.01f, 10f);
        }
    }

    public UpdateIntervalFixedRateManager(float updateIntervalSeconds)
    {
        Timestep = updateIntervalSeconds;
    }

    public bool ShouldGroupUpdate(ComponentSystemGroup group)
    {
        float worldMaximumDeltaTime = group.World.MaximumDeltaTime;
        float maximumDeltaTime = math.max(worldMaximumDeltaTime, _fixedTimestep);

        // if this is true, means we're being called a second or later time in a loop
        if (_didPushTime)
        {
            group.World.PopTime();
        }
        else
        {
            _maxFinalElapsedTime = _lastUpdateTime + maximumDeltaTime;
        }

        var finalElapsedTime = math.min(_maxFinalElapsedTime, group.World.Time.ElapsedTime);
        if (_fixedUpdateCount == 0)
        {
            // First update should always occur at t=0
        }
        else if (finalElapsedTime - _lastUpdateTime >= _fixedTimestep)
        {
            // Advance the timestep and update the system group
            _lastUpdateTime += _fixedTimestep;
        }
        else
        {
            // No update is necessary at this time.
            _didPushTime = false;
            return false;
        }

        _fixedUpdateCount++;

        group.World.PushTime(new TimeData(
            elapsedTime: _lastUpdateTime,
            deltaTime: _fixedTimestep));

        _didPushTime = true;
        return true;
    }
}

With this approach I could of course ignore the whole “only once a second”-thing altogether and just execute it when its ready. (At the moment it is always waiting one second doing nothing between apply and execute)

But then I have the problem that I need to handle things like Entities that existed 1 second ago and that are gone now by myself.
I would assume the Unity.Physics system is already handling such problems. Like the physics calculation takes 1/60 sec and a new frame may be calculated every 1/200 sec - so there is probably something in place that handles Entities that were removed in the frames between two physics steps.

I wonder if I could use that system if it’s already in place.

Edit: No there is no such system

I would highly suggest you to check FixedRateUtils.FixedRateCatchUpManager.
This is the default setup for FixedStepSimulationSystemGroup with timestep of 1/60 seconds. All Physics systems run inside this group.

For narrowing down your research, check these two files in Unity.Entities namespace for clarity: DefaultWorld.cs & FixedRateUtils.cs
For understanding how this group updates, check OnUpdate() method code of ComponentSystemGroup.cs in Entities namespace.

Okay, I interpreted your answer as “You will have to get up one level from groups into world” :slight_smile:

I’m still not totally sure how to implement it in a way were I could “just alter Entities without thinking”, but after reading this: https://gametorrahod.com/world-system-groups-update-order-and-the-player-loop/ I have an idea in which direction I have to dig.
I’ll give an update here if I finally get it :smile:

Unity Physics rebuilds the entire physics world (small exceptions for statics) each time the BuildPhysicsWorld system runs, so they actually don’t do anything special to handle entities that were removed between the previous physics tick and the current one. That is why it’s called a stateless physics solution. I believe the Havok implementation uses interframe state but is more complex because of it.

Okay, then my misconception was that I thought unity physics is nonblocking (Edit: It is blocking other systems that depend on the same data. It is not blocking the main thread) (which in retrospective is quite strange, because I have seen the profilers performance spikes often enough to know better).
So, just to recap: for the default world: one frame is always the smallest time frame. And only if the frame rate drops below 1/60 (default) the physics engine would make use of its catch-up mechanism.

So probably the cleanest solution for my problem would be to write a native class like EntityCommandBuffer that just queues changes on Entity Components rather than Entities and that does not playback after the ComponentGroup is done, but only when it is told to do so. It would then also have to check if an Entity and the Component still exists before it updates.
But this solution sounds to me more like a feature request than something I should write myself.

For now I will settle with something like this:

  1. Simple nonnative Buffer where component changes can be added to and retrieved from
  2. SystemGroup with FixedRateManager that executes once a second
  3. System that schedules and cleans up the Buffer appropriately

Excuse the amount of code:

Buffer could look like this:

using System;
using Unity.Collections;
using Unity.Entities;
using Unity.Jobs;

public struct ComponentChangeBuffer<T> : INativeDisposable, IDisposable where T : struct
{
    private NativeHashMap<Entity, T> _result;
 
    public ComponentChangeBuffer(int expectedSize)
    {
        _result = new NativeHashMap<Entity, T>(expectedSize, Allocator.Persistent);
    }

    public void AddToBuffer(Entity entity, T component)
    {
        _result.Add(entity, component);
    }

    public T GetComponent(Entity entity, T defaultReturn)
    {
        if (_result.ContainsKey(entity))
            return _result[entity];
        return defaultReturn;
    }

    public void Clear()
    {
        _result.Clear();
    }

    public void Dispose()
    {
        _result.Dispose();
    }
    public void TryDispose()
    {
        if(_result.IsCreated)
            _result.Dispose();
    }

    public JobHandle Dispose(JobHandle inputDeps)
    {
        return _result.Dispose(inputDeps);
    }
}

System like this (shortest version I could come up with that demonstrates the usecase):

using System;
using Unity.Burst;
using Unity.Collections;
using Unity.Entities;
using Unity.Jobs;
using Unity.Mathematics;
using Unity.Transforms;
using UnityEngine;


[UpdateInGroup(typeof(TryExecuteOnceASecondSystemGroup))]
public class TryExecuteOnceASecondSystem : JobComponentSystem, IDisposable
{
    private EntityQuery _expensiveComponentsQuery;
    private expensiveJob _expensiveJob;
    private JobHandle _expensiveJobHandle;
    private JobHandle _applyJobHandle;

    ComponentChangeBuffer<ExpensiveComponent> _expensiveComponentChangeBuffer;

    private bool needsToApply = false;
    private const int _expensiveness = 1000;
    protected override void OnCreate()
    {
        base.OnCreate();
        _expensiveComponentsQuery = GetEntityQuery(ComponentType.ReadOnly<ExpensiveComponent>(), ComponentType.ReadOnly<Translation>());
    }

    protected override JobHandle OnUpdate(JobHandle inputDeps)
    {
        Debug.Log("TryExecuteOnceASecondSystem was called");
        if (_expensiveJobHandle.IsCompleted && _applyJobHandle.IsCompleted)
        {
            // if a job is completed apply results
            if (needsToApply)
            {
                Debug.Log("Applying result");
                needsToApply = false;

                var expensiveComponentChangeBuffer = _expensiveComponentChangeBuffer;

                _applyJobHandle = Entities.WithReadOnly(expensiveComponentChangeBuffer)
                                          .ForEach((ref Entity entity,
                                                    ref Translation translation,
                                                    in ExpensiveComponent expensiveComponent) =>
                {
                    var updatedComponent = expensiveComponentChangeBuffer.GetComponent(entity, expensiveComponent);
                    translation.Value = translation.Value + updatedComponent.complexData / _expensiveness * new float3(0, 1, 0);
                }).WithDisposeOnCompletion(expensiveComponentChangeBuffer)
                  .Schedule(JobHandle.CombineDependencies(inputDeps, _expensiveJobHandle));

                return _applyJobHandle;
            }
            // if results were applied start execution again
            Debug.Log("Starting new execution.");
            needsToApply = true;

            int expensiveComponentsCount = _expensiveComponentsQuery.CalculateEntityCount();
            var expensiveComponents = _expensiveComponentsQuery.ToComponentDataArray<ExpensiveComponent>(Allocator.Persistent);
            var expensiveEntities = _expensiveComponentsQuery.ToEntityArray(Allocator.Persistent);

            _expensiveComponentChangeBuffer = new ComponentChangeBuffer<ExpensiveComponent>(expensiveComponentsCount);

            _expensiveJob = new expensiveJob
            {
                expensiveComponents = expensiveComponents,
                expensiveEntities = expensiveEntities,
                expensiveComponentChangeBuffer = _expensiveComponentChangeBuffer
            };
            _expensiveJobHandle = _expensiveJob.Schedule(expensiveComponents.Length, JobHandle.CombineDependencies(_applyJobHandle, inputDeps));

        }
        else
        {
            Debug.LogWarning("Skipped execution");
        }

        // return only inputDeps and do not depend on expensiveJobHandle
        return inputDeps;
    }

    [BurstCompile]
    public struct expensiveJob : IJobFor
    {
        [DeallocateOnJobCompletion]
        public NativeArray<ExpensiveComponent> expensiveComponents;
        [DeallocateOnJobCompletion]
        public NativeArray<Entity> expensiveEntities;

        public ComponentChangeBuffer<ExpensiveComponent> expensiveComponentChangeBuffer;

        public void Execute(int index)
        {
            var expensiveComponent = expensiveComponents[index];
            expensiveComponent.complexData = ExpensiveCalculation(_expensiveness);
            expensiveComponentChangeBuffer.AddToBuffer(expensiveEntities[index], expensiveComponent);
        }
    }
    protected override void OnStopRunning()
    {
        base.OnStopRunning();
        _expensiveJobHandle.Complete();

        // will clean up _expensiveComponentChangeBuffer if it runs
        _applyJobHandle.Complete();
        // if not clean up by hand
        if (needsToApply)
            _expensiveComponentChangeBuffer.Dispose();
    }
    public void Dispose()
    {
        _expensiveComponentChangeBuffer.Dispose();
    }

    #region Helper
    private static long ExpensiveCalculation(int n)
    {
        int count = 0;
        long a = 2;
        while (count < n)
        {
            long b = 2;
            int prime = 1;
            while (b * b <= a)
            {
                if (a % b == 0)
                {
                    prime = 0;
                    break;
                }
                b++;
            }
            if (prime > 0)
            {
                count++;
            }
            a++;
        }
        return (--a);
    }
    #endregion
}

Used Component:

using System;
using Unity.Entities;
using Unity.Transforms;

[Serializable, GenerateAuthoringComponent]
public struct ExpensiveComponent : IComponentData
{
    public long complexData;
}

SystemGroup:

using Unity.Entities;

[UpdateInGroup(typeof(SimulationSystemGroup))]
public class TryExecuteOnceASecondSystemGroup : ComponentSystemGroup
{
    public TryExecuteOnceASecondSystemGroup() => FixedRateManager = new UpdateIntervalFixedRateManager(1);
    protected override void OnCreate()
    {
        base.OnCreate();
    }
    protected override void OnUpdate()
    {
        base.OnUpdate();
    }
}

FixedRateManager:

using Unity.Core;
using Unity.Entities;
using Unity.Mathematics;

public class UpdateIntervalFixedRateManager : IFixedRateManager
{
    float _maximumDeltaTime;
    public float MaximumDeltaTime
    {
        get => _maximumDeltaTime;
        set => _maximumDeltaTime = math.max(value, _fixedTimestep);
    }

    private double _lastUpdateTime;
    bool _didPushTime;
    double _maxFinalElapsedTime;
    long _fixedUpdateCount;
    private float _fixedTimestep { get; set; }
    public float Timestep
    {
        get => _fixedTimestep;
        set
        {
            _fixedTimestep = math.clamp(value, 0.01f, 10f);
        }
    }

    public UpdateIntervalFixedRateManager(float updateIntervalSeconds)
    {
        Timestep = updateIntervalSeconds;
    }

    public bool ShouldGroupUpdate(ComponentSystemGroup group)
    {
        float worldMaximumDeltaTime = group.World.MaximumDeltaTime;
        float maximumDeltaTime = math.max(worldMaximumDeltaTime, _fixedTimestep);

        // if this is true, means we're being called a second or later time in a loop
        if (_didPushTime)
        {
            group.World.PopTime();
        }
        else
        {
            _maxFinalElapsedTime = _lastUpdateTime + maximumDeltaTime;
        }

        var finalElapsedTime = math.min(_maxFinalElapsedTime, group.World.Time.ElapsedTime);
        if (_fixedUpdateCount == 0)
        {
            // First update should always occur at t=0
        }
        else if (finalElapsedTime - _lastUpdateTime >= _fixedTimestep)
        {
            // Advance the timestep and update the system group
            _lastUpdateTime += _fixedTimestep;
        }
        else
        {
            // No update is necessary at this time.
            _didPushTime = false;
            return false;
        }

        _fixedUpdateCount++;

        group.World.PushTime(new TimeData(
            elapsedTime: _lastUpdateTime,
            deltaTime: _fixedTimestep));

        _didPushTime = true;
        return true;
    }
}

Unity Physics is nonblocking on the main thread because it schedules its jobs against EndFramePhysicsSystem, which then blocks the start of the next BuildPhysicsWorld. So each physics step blocks the next one, but technically nothing else. In practice, other parts of a game are usually blocked by the result of the physics step, but you can easily run large parts of your game concurrently with the physics jobs, or with some trouble simulate physics in the background across multiple render frames. If I understand right, this is also how you should schedule your pathfinding: run an expensive job that blocks the next iteration of the system. Check out EndFramePhysicsSystem to see how they schedule it.

AFAIK, if any entities that existed during BuildPhysicsWorld are destroyed by the time the physics jobs export the results, it just barfs. But I might be wrong about that. If you’re destroying physics entities you should add the physics result as a dependency to your system to avoid that.

Pretty much, with the exception that when your physics step rate is very close to your render frame rate (like they’re both 60hz) you can encounter aliasing. Small deviations in your frame rate can cause the physics step to be skipped for one frame, and then run twice the following frame.

You can create custom EntityCommandBuffers without an EntityCommandBufferSystem and play them back whenever you want, like this:

var ecb = new EntityCommandBuffer(Allocator.Temp);

Job.WithCode(() => { ... }).Run();

ecb.Playback(EntityManager);

So I believe you can pass that EntityCommandBuffer to the expensive job and just play it back whenever it finishes. I would probably use an intermediate data structure though.

Yes, that was poorly worded. I meant “blocking systems that depend on the same data” not “blocking the main thread”. (I’ll edit the post to not confuse future readers)

I think that also describes the problem really good:
I need a system that writes to component data without blocking other systems that also write to the same component.

To make this work with e.g. Entities.ForEach() I would need a way to tell Unity that I want the data as “ref” but I don’t want to treat it as “ref” but as “in” in terms of blocking other systems that operate on the same data.

I tried this approach but was not able to get it to work. I was always presented with messages like “please add JobHandle xy”.
That being said:
Is there way to disable the safety system for a specific code block if I want to handle synchronization problems by myself?

Edit:
I could probably move the update process into a Job and use [NativeDisableParallelForRestriction].