How can I prioritize processing data coming from a separate thread?

Hey all! I’m working on a scientific research project and I’m using Unity to develop the application needed for my lab’s experiments. The experiment we want to run involves streaming data into Unity from an ML model in Python, and then rendering a square in a Unity shader based on the data coming in.

The problem I’m running into is that I can’t get this data pipeline to be fast enough. I’ve timestamped the heck out of it, and I’ve found that the bottleneck is in how I handle moving the incoming data within Unity. You see, you can only change shader variables from the main thread, but I have a separate dedicated thread for listening to the data coming in from python. I then put that data onto a queue in this separate thread, and when the main thread sees that the queue is not empty, it will dequeue the data and render the square.

public class NetMQListener
{
    private readonly Thread _listenerWorker;

    private bool _listenerCancelled;

    public delegate void MessageDelegate(string message);

    private readonly MessageDelegate _messageDelegate;

    private readonly ConcurrentQueue<string> _messageQueue = new ConcurrentQueue<string>();
    
    // This function runs forever in a separate thread
    // Its only purpose is to pull data from Python and queue it up
    private void ListenerWork()
    {
        AsyncIO.ForceDotNet.Force();
        using (var subSocket = new SubscriberSocket())
        {
            subSocket.Options.ReceiveHighWatermark = 1000;
            subSocket.Connect("tcp://localhost:5557");
            subSocket.Subscribe("");
            // loop forever until the thread is no longer needed
            while (!_listenerCancelled)
            {
                //byte[] frameBytes;
                string frameString;
                if (!subSocket.TryReceiveFrameString(out frameString)) continue;
                int unity_recieve_timestamp = DateTime.Now.Millisecond*1000;
                //we recieve the data from Python and enqueue it into our messageQueue
                _messageQueue.Enqueue(frameString + ' ' + unity_recieve_timestamp.ToString());
            }
            subSocket.Close();
        }
        NetMQConfig.Cleanup();
    }

    // This is called within the Update loop of a monobehaviour class that holds this NetMQListener
    // This is the only function called within that loop, 
    // so it should run at the same time that Update() runs on the main thread
    public void Update()
    {
        // by the time we reach this line, it's been 5-30ms since the data came in
        // which is far too much time
        while (!_messageQueue.IsEmpty)
        {
            string message_out;
            if (_messageQueue.TryDequeue(out message_out))
            {
                // This is hooked up to the rendering code which changes the shader variables 
                // to render the square
                _messageDelegate(message_out);
            }
            else
            {
                break;
            }
        }
    }

    public NetMQListener(MessageDelegate messageDelegate)
    {
        _messageDelegate = messageDelegate;
        _listenerWorker = new Thread(ListenerWork);
    }

    public void Start()
    {
        _listenerCancelled = false;
        _listenerWorker.Start();
    }

    public void Stop()
    {
        _listenerCancelled = true;
        _listenerWorker.Join();
    }
}

This means my data processing speed is limited based on the speed of Unity’s update loop. This adds a huge amount of latency (anywhere from 5 to 30ms which are super unacceptable) to the pipeline. I need a way to change the shader variables immediately upon recieving a data point from python. Is there any way to change shader variables from a separate thread that I’m missing? Any coding patterns that would cause the main thread to prioritize processing the incoming data above all else?

Thanks in advance!

Have you looked at the async pattern, paired with cancellation tokens? They allow you to quickly cancel a task and return to the main thread to execute code. Here’s an example console project. I don’t have too much time right now to explain it, but I can do it later, if you need:

public class NetMQListener
{
    private readonly Thread _listenerWorker;

    private bool _listenerCancelled;

    public delegate void MessageDelegate(string message);

    private readonly MessageDelegate _messageDelegate;

    private readonly ConcurrentQueue<string> _messageQueue = new ConcurrentQueue<string>();
    private CancellationTokenSource Source;
    private bool CanListen;

    // This function runs forever in a separate thread
    // Its only purpose is to pull data from Python and queue it up
    private void ListenerWork()
    {
        AsyncIO.ForceDotNet.Force();
        using (var subSocket = new SubscriberSocket())
        {
            subSocket.Options.ReceiveHighWatermark = 1000;
            subSocket.Connect("tcp://localhost:5557");
            subSocket.Subscribe("");
            // loop forever until the thread is no longer needed
            while (!_listenerCancelled)
            {
                //byte[] frameBytes;
                string frameString;
                if (!subSocket.TryReceiveFrameString(out frameString)) continue;
                int unity_recieve_timestamp = DateTime.Now.Millisecond * 1000;
                //we recieve the data from Python and enqueue it into our messageQueue
                AddMessage(frameString + ' ' + unity_recieve_timestamp.ToString());
            }
            subSocket.Close();
        }
        NetMQConfig.Cleanup();
    }

    /// <summary>
    /// Store message and immediately start reading
    /// </summary>
    /// <param name="message"></param>
    private void AddMessage(string message)
    {
        _messageQueue.Enqueue(message);
        Source.Cancel();
    }

    /// <summary>
    /// Listen infinitely for any incoming messages and refresh the token
    /// </summary>
    async void WaitForInput()
    {
        while (CanListen)
        {
            if (Source != null)
                Source.Dispose();

            Source = new CancellationTokenSource();
            bool tokenCancelled = await Task.Run(() => WaitUntilTokenIsCancelled(Source.Token));

            if (tokenCancelled)
            {
                ReadMessages();
            }
        }
    }

    /// <summary>
    /// Will return true when task has been cancelled, and will block
    /// background thread till it has
    /// </summary>
    /// <param name="token"></param>
    /// <returns></returns>
    Task<bool> WaitUntilTokenIsCancelled(CancellationToken token)
    {
        return Task.Delay(-1, token)
                .ContinueWith(tsk => tsk.Exception == default);
    }

    void ReadMessages()
    {
        while (!_messageQueue.IsEmpty)
        {
            string message_out;
            if (_messageQueue.TryDequeue(out message_out))
            {
                // This is hooked up to the rendering code which changes the shader variables 
                // to render the square
                _messageDelegate(message_out);
            }
            else
            {
                break;
            }
        }
    }

    public NetMQListener(MessageDelegate messageDelegate)
    {
        _messageDelegate = messageDelegate;
        _listenerWorker = new Thread(ListenerWork);
    }

    public void Start()
    {
        _listenerCancelled = false;
        CanListen = true;

        WaitForInput();
        _listenerWorker.Start();
    }

    public void Stop()
    {
        CanListen = false;

        _listenerCancelled = true;
        _listenerWorker.Join();
    }
}

@CaptainSpaceCat