본문 바로가기
프로그래밍/Unity

JobQueue

by neive 2020. 12. 27.
728x90

wiki.unity3d.com/index.php/JobQueue

 

JobQueue - Unify Community Wiki

Description This is a simple threaded job queue for any kind jobs you want to execute on a thread. The JobQueue class is a management class, the actual job scheduler. When you create an instance of that class you have to specify the "job class" it will man

wiki.unity3d.com

ㄴ 원문

 

JobQueue.cs

/******************************************************************************
 * The MIT License (MIT)
 * 
 * Copyright (c) 2016 Bunny83
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to
 * deal in the Software without restriction, including without limitation the
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 * sell copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 * 
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 * 
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
 * DEALINGS IN THE SOFTWARE.
 * 
 * This implements a simple threaded job queue. Simply derive a class from JobItem
 * and override the DoWork method.
 * 
 *****************************************************************************/


using System;
using System.Threading;
using System.Collections.Generic;

public abstract class JobItem
{
    private volatile bool m_Abort = false;
    private volatile bool m_Started = false;
    private volatile bool m_DataReady = false;

    /// <summary>
    /// This is the actual job routine. override it in a concrete Job class
    /// </summary>
    protected abstract void DoWork();

    /// <summary>
    /// This is a callback which will be called from the main thread when
    /// the job has finised. Can be overridden.
    /// </summary>
    public virtual void OnFinished() { }

    public bool IsAborted { get { return m_Abort; } }
    public bool IsStarted { get { return m_Started; } }
    public bool IsDataReady { get { return m_DataReady; } }

    public void Execute()
    {
        m_Started = true;
        DoWork();
        m_DataReady = true;
    }

    public void AbortJob()
    {
        m_Abort = true;
    }

    public void ResetJobState()
    {
        m_Started = false;
        m_DataReady = false;
        m_Abort = false;
    }
}


public class JobQueue<T> : IDisposable where T : JobItem
{
    private class ThreadItem
    {
        private Thread m_Thread;
        private AutoResetEvent m_Event;
        private volatile bool m_Abort = false;

        // simple linked list to manage active threaditems
        public ThreadItem NextActive = null;

        // the job item this thread is currently processing
        public T Data;

        public ThreadItem()
        {
            m_Event = new AutoResetEvent(false);
            m_Thread = new Thread(ThreadMainLoop);
            m_Thread.Start();
        }

        private void ThreadMainLoop()
        {
            while (true)
            {
                if (m_Abort)
                    return;
                m_Event.WaitOne();
                if (m_Abort)
                    return;
                Data.Execute();
            }
        }

        public void StartJob(T aJob)
        {
            aJob.ResetJobState();
            Data = aJob;
            // signal the thread to start working.
            m_Event.Set();
        }

        public void Abort()
        {
            m_Abort = true;
            if (Data != null)
                Data.AbortJob();
            // signal the thread so it can finish itself.
            m_Event.Set();
        }

        public void Reset()
        {
            Data = null;
        }
    }
    // internal thread pool
    private Stack<ThreadItem> m_Threads = new Stack<ThreadItem>();
    private Queue<T> m_NewJobs = new Queue<T>();
    private volatile bool m_NewJobsAdded = false;
    private Queue<T> m_Jobs = new Queue<T>();
    // start of the linked list of active threads
    private ThreadItem m_Active = null;

    public event Action<T> OnJobFinished;

    public JobQueue(int aThreadCount)
    {
        if (aThreadCount < 1)
            aThreadCount = 1;
        for (int i = 0; i < aThreadCount; i++)
            m_Threads.Push(new ThreadItem());
    }

    public void AddJob(T aJob)
    {
        if (m_Jobs == null)
            throw new System.InvalidOperationException("AddJob not allowed. JobQueue has already been shutdown");
        if (aJob != null)
        {
            m_Jobs.Enqueue(aJob);
            ProcessJobQueue();
        }
    }

    public void AddJobFromOtherThreads(T aJob)
    {
        lock (m_NewJobs)
        {
            if (m_Jobs == null)
                throw new System.InvalidOperationException("AddJob not allowed. JobQueue has already been shutdown");
            m_NewJobs.Enqueue(aJob);
            m_NewJobsAdded = true;
        }
    }

    public int CountActiveJobs()
    {
        int count = 0;
        for (var thread = m_Active; thread != null; thread = thread.NextActive)
            count++;
        return count;
    }

    private void CheckActiveJobs()
    {
        ThreadItem thread = m_Active;
        ThreadItem last = null;
        while (thread != null)
        {
            ThreadItem next = thread.NextActive;
            T job = thread.Data;
            if (job.IsAborted)
            {
                if (last == null)
                    m_Active = next;
                else
                    last.NextActive = next;
                thread.NextActive = null;

                thread.Reset();
                m_Threads.Push(thread);
            }
            else if (thread.Data.IsDataReady)
            {
                job.OnFinished();
                if (OnJobFinished != null)
                    OnJobFinished(job);

                if (last == null)
                    m_Active = next;
                else
                    last.NextActive = next;
                thread.NextActive = null;

                thread.Reset();
                m_Threads.Push(thread);
            }
            else
                last = thread;
            thread = next;
        }
    }

    private void ProcessJobQueue()
    {
        if (m_NewJobsAdded)
        {
            lock (m_NewJobs)
            {
                while (m_NewJobs.Count > 0)
                    AddJob(m_NewJobs.Dequeue());
                m_NewJobsAdded = false;
            }
        }
        while (m_Jobs.Count > 0 && m_Threads.Count > 0)
        {
            var job = m_Jobs.Dequeue();
            if (!job.IsAborted)
            {
                var thread = m_Threads.Pop();
                thread.StartJob(job);
                // add thread to the linked list of active threads
                thread.NextActive = m_Active;
                m_Active = thread;
            }
        }
    }

    public void Update()
    {
        CheckActiveJobs();
        ProcessJobQueue();
    }

    public void ShutdownQueue()
    {
        for (var thread = m_Active; thread != null; thread = thread.NextActive)
            thread.Abort();
        while (m_Threads.Count > 0)
            m_Threads.Pop().Abort();
        while (m_Jobs.Count > 0)
            m_Jobs.Dequeue().AbortJob();
        m_Jobs = null;
        m_Active = null;
        m_Threads = null;
    }

    public void Dispose()
    {
        ShutdownQueue();
    }
}

 

GameObject 에 대충 붙여서 테스트 할 수 있는 예제

Test_JobQueue.cs

using System.Collections;
using System.Collections.Generic;
using UnityEngine;


// 잡 클래스 예제 :
public class CalcLimit2 : JobItem
{
    // 실제 직업과 관련이없는 일부 식별 이름
    public string m_CustomName;
    // 입력 변수. 작업을 시작하기 전에 설정해야합니다.
    public int m_Count = 5;
    // 출력 / 결과 변수. 이것은이 작업이 생성하는 데이터를 나타냅니다.
    public float m_Result;

    // 이것은 별도의 스레드에서 실행됩니다.
    protected override void DoWork()
    {
        float v = 0;
        for (int i = 0; i < m_Count; i++)
        {
            v += Mathf.Pow(0.5f, i);
            // 작업을 중단해야하는 경우 100 회 반복마다 확인
            if ((i % 100) == 0 && IsAborted)
                return;
        }
        m_Result = v;
    }
    public override void OnFinished()
    {
        // 이것은 메인 스레드에서 실행됩니다.
        Debug.Log("Job: " + m_CustomName + " has finished with the result: " + m_Result);
    }
}

public class Test_JobQueue : MonoBehaviour
{
    JobQueue<CalcLimit2> m_JobQueue;

    void OnEnable()
    {
        m_JobQueue = new JobQueue<CalcLimit2>(2); // 2 개의 스레드로 새 대기열 생성
        m_JobQueue.AddJob(new CalcLimit2 { m_Count = 200, m_CustomName = "200 iterations" });
        m_JobQueue.AddJob(new CalcLimit2 { m_Count = 20, m_CustomName = "Do 20 iterations" });
        m_JobQueue.AddJob(new CalcLimit2 { m_Count = 91, m_CustomName = "over 90" });
        m_JobQueue.AddJob(new CalcLimit2 { m_Count = 500, m_CustomName = "500" });
    }

    void OnDisable()
    {
        m_JobQueue.ShutdownQueue(); // 큐 내부의 스레드를 종료하는 것이 중요합니다.
    }

    // Start is called before the first frame update
    void Start()
    {

    }

    // Update is called once per frame
    void Update()
    {
        m_JobQueue.Update();
    }
}

 

 

테스트

using System.Collections;
using System.Collections.Generic;
using UnityEngine;


// 잡 클래스 예제 :
public class CalcLimit2 : JobItem
{
    // 실제 직업과 관련이없는 일부 식별 이름
    public string m_CustomName;
    // 입력 변수. 작업을 시작하기 전에 설정해야합니다.
    public int m_Count = 5;
    // 출력 / 결과 변수. 이것은이 작업이 생성하는 데이터를 나타냅니다.
    public string m_Result;

    public List<int> m_List;

    // 이것은 별도의 스레드에서 실행됩니다.
    protected override void DoWork()
    {
        /*
        float v = 0;
        for (int i = 0; i < m_Count; i++)
        {
            v += Mathf.Pow(0.5f, i);
            // 작업을 중단해야하는 경우 100 회 반복마다 확인
            if ((i % 100) == 0 && IsAborted)
                return;
        }
        m_Result = v;
        */

        var EndTiem = System.DateTime.Now.AddSeconds(5.0f);
        int n;

        while(true)
        {
            //Debug.LogFormat("{0} : Working", m_CustomName);

            lock (m_List)
            {
                if (m_List.Count > 0)
                {
                    n = m_List[0];
                    m_List.RemoveAt(0);
                }
                else
                    break;

                Debug.LogFormat("{0} : Working Remove {1}", m_CustomName, n);
            }

            //if (EndTiem < System.DateTime.Now)
            //    break;

            if (IsAborted)
                break;
        }

        m_Result = "End";
    }
    public override void OnFinished()
    {
        // 이것은 메인 스레드에서 실행됩니다.
        Debug.Log("Job: " + m_CustomName + " has finished with the result: " + m_Result);
    }
}

public class Test_JobQueue : MonoBehaviour
{
    JobQueue<CalcLimit2> m_JobQueue;

    List<int> m_List = new List<int>();

    void OnEnable()
    {
        //m_List.Clear;
        for(int i=0; i<1000; i++)
            m_List.Add(i);

        m_JobQueue = new JobQueue<CalcLimit2>(2); // 2 개의 스레드로 새 대기열 생성
        //m_JobQueue.AddJob(new CalcLimit2 { m_Count = 200, m_CustomName = "200 iterations" });
        //m_JobQueue.AddJob(new CalcLimit2 { m_Count = 20, m_CustomName = "Do 20 iterations" });
        //m_JobQueue.AddJob(new CalcLimit2 { m_Count = 91, m_CustomName = "over 90" });
        //m_JobQueue.AddJob(new CalcLimit2 { m_Count = 500, m_CustomName = "500" });
        m_JobQueue.AddJob(new CalcLimit2 { m_CustomName = "Test1", m_List = m_List });
        m_JobQueue.AddJob(new CalcLimit2 { m_CustomName = "Test2", m_List = m_List });
    }

    void OnDisable()
    {
        m_JobQueue.ShutdownQueue(); // 큐 내부의 스레드를 종료하는 것이 중요합니다.
    }

    // Start is called before the first frame update
    void Start()
    {

    }

    // Update is called once per frame
    void Update()
    {
        m_JobQueue.Update();
    }

    void OnGui()
    {
        if (GUI.Button(new Rect(0, 0, 100, 100), "Add Job"))
            m_JobQueue.AddJob(new CalcLimit2 { m_CustomName = "Test1" + Time.time.ToString() });
    }
}
728x90

댓글