I’d like to write a job that performs work until a certain amount of time has passed. Datetime does not burst and neither does Mathf.Time.realtimeSinceStartup, is it possible to get a time measurement some other way?
Thanks, Bas
I’d like to write a job that performs work until a certain amount of time has passed. Datetime does not burst and neither does Mathf.Time.realtimeSinceStartup, is it possible to get a time measurement some other way?
Thanks, Bas
Generally this is a dangerous way to schedule work because time is relative to the entire system so it’s easy to create death spirals doing this.
That said I can think of two ways to do this.
Create a small native library that calls the native method, like on windows QueryPerformanceCounter. Custom wrapper is needed because burst doesn’t support platform dependent pinvoke like kernel32.
Update the current time from the main thread in a Shared Static and keep a reference to time passed in an ECS component or NativeArray passed to the job. Inherently not super high resolution but might be good enough.
My workload comes in bursts (pardon the pun), and the time per unit of work varies greatly and is unpredictable. The deathspiral is an issue but will have to be dealth with at a higher level, as the alternative is to schedule conservatively which means unused resources.
Option one looks to be the only viable option, but another issue is actually firing up a job in parallel that does not depend on a set amount of work. I guess it can be hacked by scheduling using a dummy batch of length 8 or so, then in the jobs just do whatever and return as appropriate, but I’ll see if there is a better option.
this seems to do the trick
using System;
using System.Runtime.InteropServices;
using Unity.Burst;
using Unity.Collections;
using Unity.Collections.LowLevel.Unsafe;
using Unity.Jobs;
using Unity.Jobs.LowLevel.Unsafe;
using Unity.Mathematics;
using UnityEngine;
public class NewBehaviourScript : MonoBehaviour
{
void Update()
{
var a = new NativeArray<float>(1, Allocator.Persistent);
new TestJob {Result = a}.ScheduleParallel();
}
}
[BurstCompile]
struct TestJob : IJobParallel
{
[DeallocateOnJobCompletion]
public NativeArray<float> Result;
public void Execute(int threadIndex)
{
// Debug.Log($"{threadIndex}");
for (int i = 0; i < 1000*1000; i++)
Result[0] += math.sqrt(i);
}
}
[JobProducerType(typeof(JobParallelExtensions.JobParallelStruct<>))]
public interface IJobParallel
{
void Execute(int threadIndex);
}
public static class JobParallelExtensions
{
public static unsafe JobHandle Schedule<T>(this T jobData, JobHandle dependency = default) where T : struct, IJobParallel
{
var parameters = new JobsUtility.JobScheduleParameters(UnsafeUtility.AddressOf(ref jobData), JobParallelStruct<T>.Initialize(), dependency, ScheduleMode.Batched);
return JobsUtility.ScheduleParallelFor(ref parameters, 1, 1);
}
public static unsafe JobHandle ScheduleParallel<T>(this T jobData, JobHandle dependency = default) where T : struct, IJobParallel
{
var parameters = new JobsUtility.JobScheduleParameters(UnsafeUtility.AddressOf(ref jobData), JobParallelStruct<T>.Initialize(), dependency, ScheduleMode.Batched);
return JobsUtility.ScheduleParallelFor(ref parameters, JobsUtility.JobWorkerCount, 1);
}
public static unsafe void Run<T>(this T jobData) where T : struct, IJobParallel
{
var parameters = new JobsUtility.JobScheduleParameters(UnsafeUtility.AddressOf(ref jobData), JobParallelStruct<T>.Initialize(), new JobHandle(), ScheduleMode.Run);
JobsUtility.ScheduleParallelFor(ref parameters, 1, 1);
}
[StructLayout(LayoutKind.Sequential, Size = 1)]
internal struct JobParallelStruct<T> where T : struct, IJobParallel
{
static IntPtr jobReflectionData;
public static IntPtr Initialize()
{
if (jobReflectionData == IntPtr.Zero)
jobReflectionData = JobsUtility.CreateJobReflectionData(typeof(T), JobType.ParallelFor, new ExecuteJobFunction(Execute));
return jobReflectionData;
}
public static void Execute(ref T jobData, IntPtr additionalPtr, IntPtr bufferRangePatchData, ref JobRanges ranges, int jobIndex)
{
jobData.Execute(jobIndex);
}
delegate void ExecuteJobFunction(ref T data, IntPtr additionalPtr, IntPtr bufferRangePatchData, ref JobRanges ranges, int jobIndex);
}
}
There is no reason to make a custom IJobParallel.
IJobFor never schedules more jobs than the total amount of worker threads present. It then does quite cheap work-stealing to find the next best batch to process. Thus you can use very small index values.
There is also
IJobParallelForDefer which gives you work stealing where the amount of elements to process is only known when the job starts execution. This is in the com.unity.jobs package.
yeah its largely cosmetic, but I doubt it’s faster than one Interlocked.CompareExchange
I guess I shouldnt hijack my own thread, but the real issue Im trying to address is running a job that runs for a given amount of time, not units of work.
I would like to get to a point where my units of work take roughly the same amount of time to complete, but this is far from the case now. Even if it were the case, I would still like the job to run for a given amount of time so it behaves better on different hardware.
The work here is optional in a sense, where completing more improves the quality of the program. The work is sorted by priority and the job should complete as much as possible in the time alotted. Should I follow a different pattern here?
Interlocked.compareexchange leads to a lot of contention against the same cache line hence is not a performant solution for workstealing. The workstealing structure we use is setup to avoid contention. So yes, it is faster than interlocked.compareexchange especially for short and frequent calls into the workstealing…
Doing anything based on timing is generally a bad idea. Breaks determinism, undebuggable irreproducable behaviour etc. Having some kind of variable based on which you time slice work is generally much better.
Thanks, makes sense. So I can have a configurable number of cycles per frame, and spend an equal amount of cycles on each item. That seems a natural fit.
To spend alotted cycles on the highest priority items is it possible to do some like this? Not sure how I would implement GetBestItem without resorting to Interlocked again… (here cycle budget would be passed as arrayLength)
public static unsafe void Execute(ref T jobData, IntPtr additionalPtr .....)
{
var item = GetBestItem();
while (JobsUtility.GetWorkStealingRange(ref ranges, jobIndex, out var beginIndex, out var endIndex))
{
var amount = endIndex - beginIndex;
// use loop to spend remaining
// return when out of items to work on
var spent = jobData.Execute(item, amount);
if (spent != amount)
item = GetBestItem();
}
}
Edit: assuming JobRanges.NumJobs is the amount of threads this job is scheduled on I could divide the cycle budget to get a budget per thread, then use stealing to get the next item
in case this is useful to anyone, the reason the things mentioned in this thread are not supported by the api is that they dont run deterministicly. For this you need to, under any scheduling imaginable, perform the same amount of work on the same items (the approach in the previous post suffers from the latter for instance).
It is possible though to skew the budget towards high priority items when you have more items than you can finish in one job though. Divide your workload into a set number of chuncks and compose those chunks to favour higher priority items. E.g. if you were to use 16 chunks you can make them up like so:
chunk 1: 0, 16, 32 …
chunk 2: 1, 17, 33 …
chunk 3: 2, 18, 34 … etc
you then schedule using min(items, chunks), steal a range of chunks to work on, and spend a predetermined budget on each chunk, completing the first item of each chunk before moving to the second and so on:
while (JobsUtility.GetWorkStealingRange(ref ranges, jobIndex, out var beginIndex, out var endIndex))
{
for (int i = beginIndex; i < endIndex; i++)
{
var index = i;
var budget = jobData.ChunkBudget;
while (budget > 0 && index < jobData.TotalItems)
{
budget -= jobData.Execute(index, budget);
index += jobData.Chunks;
}
}
}
code
using System;
using System.Runtime.InteropServices;
using Unity.Collections.LowLevel.Unsafe;
using Unity.Jobs;
using Unity.Jobs.LowLevel.Unsafe;
using Unity.Mathematics;
[JobProducerType(typeof(JobParallelExtensions.JobParallelStruct<>))]
public interface IJobParallel
{
int TotalItems { get; }
int Chunks { get; }
int ChunkBudget { get; }
int Execute(int index, int budget);
}
public static class JobParallelExtensions
{
public static unsafe JobHandle Schedule<T>(this T jobData, JobHandle dependency = default) where T : struct, IJobParallel
{
var parameters = new JobsUtility.JobScheduleParameters(UnsafeUtility.AddressOf(ref jobData), JobParallelStruct<T>.Initialize(), dependency, ScheduleMode.Batched);
var amount = Amount(jobData);
return JobsUtility.ScheduleParallelFor(ref parameters, amount, amount);
}
public static unsafe JobHandle ScheduleParallel<T>(this T jobData, int batchSize = 1, JobHandle dependency = default) where T : struct, IJobParallel
{
var parameters = new JobsUtility.JobScheduleParameters(UnsafeUtility.AddressOf(ref jobData), JobParallelStruct<T>.Initialize(), dependency, ScheduleMode.Batched);
var amount = Amount(jobData);
return JobsUtility.ScheduleParallelFor(ref parameters, amount, batchSize);
}
public static unsafe void Run<T>(this T jobData, JobHandle dependency = default) where T : struct, IJobParallel
{
var parameters = new JobsUtility.JobScheduleParameters(UnsafeUtility.AddressOf(ref jobData), JobParallelStruct<T>.Initialize(), dependency, ScheduleMode.Run);
var amount = Amount(jobData);
JobsUtility.ScheduleParallelFor(ref parameters, amount, amount);
}
static int Amount<T>(T jobData) where T : struct, IJobParallel => math.min(jobData.TotalItems, jobData.Chunks);
[StructLayout(LayoutKind.Sequential, Size = 1)]
internal struct JobParallelStruct<T> where T : struct, IJobParallel
{
static IntPtr jobReflectionData;
public static IntPtr Initialize()
{
if (jobReflectionData == IntPtr.Zero)
jobReflectionData = JobsUtility.CreateJobReflectionData(typeof(T), JobType.ParallelFor, new ExecuteJobFunction(Execute));
return jobReflectionData;
}
public static unsafe void Execute(ref T jobData, IntPtr additionalPtr, IntPtr bufferRangePatchData, ref JobRanges ranges, int jobIndex)
{
while (JobsUtility.GetWorkStealingRange(ref ranges, jobIndex, out var beginIndex, out var endIndex))
{
JobsUtility.PatchBufferMinMaxRanges(bufferRangePatchData, UnsafeUtility.AddressOf(ref jobData), beginIndex, endIndex - beginIndex);
for (int i = beginIndex; i < endIndex; i++)
{
var index = i;
var budget = jobData.ChunkBudget;
while (budget > 0 && index < jobData.TotalItems)
{
budget -= jobData.Execute(index, budget);
index += jobData.Chunks;
}
}
}
}
delegate void ExecuteJobFunction(ref T data, IntPtr additionalPtr, IntPtr bufferRangePatchData, ref JobRanges ranges, int jobIndex);
}
}