如何寫 Timer Thread

有時候你就是不想用平台提供的 Timer API,想用純 C++ 寫一個,且此 Timer 必須最少滿足一些條件,像是:

1. 不過度設計,簡單提供定時呼叫的機制即可。

2. 可以隨時停止 Timer,不用想關閉程式時,卻還是必須等到時間到了才能結束。

3. Timer task 執行時,可以設一些檢查點判斷 Timer 是否已經被使用者終止,在處理一些需時甚久的 CPU 計算時特別需要。

以下是你需要的介面,讓我們來看看為何需要這幾個函式吧。

#pragma once

#include <functional>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <thread>

class TimerThread final
{
public:
    TimerThread();
    TimerThread(const TimerThread&) = delete;
    TimerThread(TimerThread&&) = delete;
    TimerThread& operator=(const TimerThread&) = delete;
    TimerThread& operator=(TimerThread&&) = delete;
    ~TimerThread();

public:
    void setTimerTask(const std::function<void()>& task);
    void startTimer(long long ms);
    void stopTimer();
    bool isRunning() const;

private:
    std::function<void()> task_;
    std::atomic<bool> running_;
    std::mutex mutex_;
    std::condition_variable cv_timer_;
    std::thread thread_;
};

1. void setTimerTask(const std::function<void()>& task)

設定要定時執行的 function,這很直觀,唯一需要解釋的是,為什麼不在建構子直接傳入 task,而必須另外開一個函式設定 task 呢?

explicit TimerThread(const std::function<void()>& task)

因為擺在建構子雖然可以避免使用者呼叫 startTimer 時 task 是空的錯誤,但彈性降低很多,例如除非使用指標,不然沒辦法當成成員變數,但又要在成員函式中 std::bind 一些 local 的變數傳入 task,這會造成需要較為複雜的寫法,且不易使用。

2. void startTimer(long long ms) 及 void stopTimer()

這沒甚麼好解釋,使用者會希望能停止或重新設定 Timer。

3. bool isRunning() const

這個函式乍看之下可有可無,但其實非常重要,因為它提供了在 task 中檢查 Timer 是否已經被終止的可能,如果使用者提供的 task 需要執行較久的時間,使用者會希望在他的 task 內設定一些點來呼叫此函式並檢查是否終止。例如在關閉程式時,應該沒人會希望等待 task 執行完畢,視窗凍結住吧。

接下來讓我們看看實作。

#include “TimerThread.h”
#include <assert.h>
#include <chrono>

TimerThread::TimerThread() :
    task_(),
    running_(false),
    mutex_(),
    cv_timer_(),
    thread_()
{}

TimerThread::~TimerThread()
{
    stopTimer();
}

void TimerThread::setTimerTask(const std::function<void()>& task)
{
    assert(thread_.get_id() != std::this_thread::get_id());
    assert(!running_);
    task_ = task;
}

void TimerThread::startTimer(long long ms)
{
    assert(thread_.get_id() != std::this_thread::get_id());
    assert(task_);
    assert(!running_);
    running_ = true;
    thread_ = std::thread([this, ms]()
    {
        while (running_)
        {
            {
                std::unique_lock lock(mutex_);
                cv_timer_.wait_for(lock, std::chrono::milliseconds(ms), [this]
                {
                    return !running_;
                });
            }

            if (!running_)
            {
                return;
            }

            task_();
        } // end while
    }); // end thread_
}

void TimerThread::stopTimer()
{
    assert(thread_.get_id() != std::this_thread::get_id());
    running_ = false;
    cv_timer_.notify_one();
    if (thread_.joinable())
    {
        thread_.join();
    }
}

bool TimerThread::isRunning() const
{
    return running_;
}

1. 解構子呼叫 stopTimer(),可以避免使用者忘記呼叫及避免 exception 時無法釋放資源的情況發生。

TimerThread::~TimerThread()
{
    stopTimer();
}

2. 第一個 assert 提醒使用者在開發時,仔細檢查有沒有在 task 內呼叫 setTimerTask,造成 自己等自己 dead lock 的情況發生。

assert(!running_) 提醒使用者,設定新 task 前先停止 Timer,不然可能 TimerThread 在執行 task 時,task 同時被賦值,畢竟此 TimerThread 不是 thread-safe,嘗試 lock task 是多餘的設計,並且效率會有一點減損,別小看這一點減損,在設計看盤系統可能會是致命的。

void TimerThread::setTimerTask(const std::function<void()>& task)
{
    assert(thread_.get_id() != std::this_thread::get_id());
    assert(!running_);
    task_ = task;
}

3. assert(task) 提醒使用者程式不會檢查 task 是否為空,檢查 task 是否為空會慢一點,況且 task 空的是要怎樣。

值得注意的是,使用 std::condition_variable 可以讓 thread 有機會在等待的時間還沒到就被停止條件滿足時喚醒。例如 Timer 設定 60 分鐘,使用者關閉程式時你不會想等 60 分鐘關閉程式吧。

void TimerThread::startTimer(long long ms)
{
    assert(thread_.get_id() != std::this_thread::get_id());
    assert(task_);
    assert(!running_);
    running_ = true;
    thread_ = std::thread([this, ms]()
    {
        while (running_)
        {
            {
                std::unique_lock lock(mutex_);
                cv_timer_.wait_for(lock, std::chrono::milliseconds(ms), [this]
                {
                    return !running_;
                });
            }

            if (!running_)
            {
                return;
            }

            task_();
        } // end while
    }); // end thread_
}

4. 呼叫 notify_one 前,先設定 running_ = false,不然被偽喚醒效率就慢了點了。

void TimerThread::stopTimer()
{
    assert(thread_.get_id() != std::this_thread::get_id());
    running_ = false;
    cv_timer_.notify_one();
    if (thread_.joinable())
    {
        thread_.join();
    }
}

事實上這個 Timer 還有兩個要注意的地方,第一個就是 task 是在不同的 thread 執行,因此如果要同時更新變數,是需要保護的。

第二個就是如果 task 執行的過久,那麼下次 timer 間隔就會超時了,例如設定 timer 1 秒,結果 task 執行了 1 秒,那下次 timer 到達其實是 2 秒後了,不過這完全可以由使用者自行解決,只要 task 內簡單將任務轉給自己的 thread,不佔用 Timer thread 的執行時間即可,這也可以防止 task 當機影響到 Timer thread。

以上就介紹到這裡,程式請隨便取用,有不懂的或錯誤的請留言告訴我,謝謝。

怎麼寫執行緒安全的複製(移動)建構子及複製(移動)賦值運算子

會呼叫建構子就是還沒有東西需要被保護。

呼叫解構子時需要保護成員變數基本上就是設計錯誤,任何時候都不該在變數還被使用時銷毀它,這跟 thread-safe 本身沒有關係。

因此以上兩點不在本文範圍。

現假設有自訂類別如下,將其改為 thread-safe 要怎麼做呢?

class Foo {
public:
    // copy constructor
    Foo(const Foo& rhs) : id_(rhs.id_), ticks_(rhs.ticks_) {}

    // move constructor
    Foo(Foo&& rhs) : id_(rhs.id_), ticks_(std::move(rhs.ticks_)) {}

    // copy assignment
    Foo& operator=(const Foo& rhs) {
        if (this != &rhs) {
            id_ = rhs.id_;
            ticks_ = rhs.ticks_;
        }

        return *this;
    }

    // move assignment
    Foo& operator=(Foo&& rhs) {
        if (this != &rhs) {
            id_ = rhs.id_;
            ticks_ = std::move(rhs.ticks_);
        }

        return *this;
    }

public:
    void SetID(int id) {
        id_ = id;
    }

private:
    int id_;
    std::vector ticks_;
};

首先 #include <mutex>,並新增成員變數 mutable std::mutex m_mutex。

copy constructor 不能使用 initializer list 的方式初始化了,因為沒有辦法在 initializer list 鎖定 rhs。

Foo(const Foo& rhs) : id_(rhs.id_), ticks_(rhs.ticks_) {}

考慮以下程式碼:

Foo foo1;
foo1.SetID(1); // use thread 1 write foo1
Foo foo2(foo1); // use thread 2 read foo1

讀取 rhs 時,可能會有其他 thread 正在修改 rhs,因此複製建構子需要改成先將 rhs 的 mutex 鎖住,再讀取 rhs 並建構自己。

Foo(const Foo& rhs) {
    std::lock_guard<std::mutex> lock(rhs.m_mutex);
    id_ = rhs.id_;
    ticks_ = rhs.ticks_;
}

move constructor 也是一樣。

Foo(Foo&& rhs) {
    std::lock_guard<std::mutex> lock(rhs.m_mutex);
    id_ = rhs.id_;
    ticks_ = std::move(rhs.ticks_);
}

接下來討論 copy assignment 及 move assignment。

Foo& operator=(const Foo& rhs) {
    std::lock_guard<std::mutex> lockThis(m_mutex);
    std::lock_guard<std::mutex> lockRhs(rhs.m_mutex);
    if (this != &rhs) {
        id_ = rhs.id_;
        ticks_ = rhs.ticks_;
    }

    return *this;
}

一開始直覺可能會這樣寫,但這會有 deak lock,考慮以下程式碼:

Foo foo;
foo = foo;

自己賦值給自己,根據程式,先鎖定 this,再鎖定 rhs,但其實都是 foo,也就是同一條 thread 鎖定自己兩次會 dead lock,我們可以移動 lock 的位置,但請不要改成std::recursive_mutex,這會隱藏 lock 的錯誤使用:

Foo& operator=(const Foo& rhs) {
    if (this != &rhs) {
        std::lock_guard<std::mutex> lockThis(m_mutex); // step 1
        std::lock_guard<std::mutex> lockRhs(rhs.m_mutex); // step 2
        id_ = rhs.id_;
        ticks_ = rhs.ticks_;
    }

    return *this;
}

很好,lock 之前會先檢查是不是自己賦值給自己,但這還有其他問題,考慮以下程式碼:

Foo foo1, foo2;
foo1 = foo2; // thread 1
foo2 = foo1; // thread 2

現假設 thread 1 及 thread 2 的執行順序如下

1. thread1 執行 step1,鎖住 this 成功,也就是鎖住了自己 (foo1)。
2. thread2 執行 step1,鎖住 this 成功,也就是鎖住了自己 (foo2)。
3. thread1 執行 step2,鎖住失敗,因為 rhs 也就是 foo2 已被 thread 2 鎖住。
4. thread2 執行 step2,鎖住失敗,因為 rhs 也就是 foo1 已被 thread 1 鎖住。

至此 thread 1 及 thread 2 都在等待對方釋放鎖,dead lock。

怎麼修改呢?見下列程式粗體部分:

Foo& operator=(const Foo& rhs) {
    if (this != &rhs) {
        std::lock(m_mutex, rhs.m_mutex);
        std::lock_guard<std::mutex> lockThis(m_mutex, std::adopt_lock);
        std::lock_guard<std::mutex> lockRhs(rhs.m_mutex, std::adopt_lock);
        id_ = rhs.id_;
        ticks_ = rhs.ticks_;
    }

    return *this;
}

std::lock 可以將所有的 mutex 一起鎖定,如果某一個 mutex 鎖不成功就先 unlock,至於 std::adopt_lock 則是告訴 std::lock_guard 當執行到你這行 code 時,std::lock 已經鎖住了mutex,你就不用再重複鎖定免得 dead lock,但是你還是要負責在離開 block 時幫我自動解鎖。
 

如果你的編譯器支援 C++17 的話,有另一種替代寫法如下:

Foo& operator=(const Foo& rhs) {
    if (this != &rhs) {
        std::scoped_lock lockAll(m_mutex, rhs.m_mutex);
        id_ = rhs.id_;
        ticks_ = rhs.ticks_;
    }

    return *this;
}

move assignment 也是一樣的作法,不再贅述。

最後,如果你的 class 是使用讀寫鎖實作的話,也就是用 std::shared_mutex 配合 std::unique_lock 及 std::shared_lock,則 copy constructor 改成讀鎖:

Foo(const Foo& rhs) {
     std::shared_lock<std::shared_mutex> lock(rhs.m_mutex);
     id_ = rhs.id_;
     ticks_ = rhs.ticks_;
}

copy assignment 改成寫鎖加讀鎖:

Foo& operator=(const Foo& rhs) {
    if (this != &rhs) {
        std::unique_lock<std::shared_mutex> lockThis(m_mutex, std::defer_lock);
        std::shared_lock<std::shared_mutex> lockRhs(rhs.m_mutex, std::defer_lock);
        std::lock(lockThis, lockRhs);
        id_ = rhs.id_;
        ticks_ = rhs.ticks_;
    }

    return *this;
}

注意這時 lockThis 及 lockRhs 改成 std::defer_lock,也就是不獲得 mutex 的所有權,只負責表明是讀寫鎖,且負責離開作用域的釋放鎖,讓 std::lock 來執行真正的鎖定。

為什麼使用讀寫鎖要後呼叫 std::lock,而使用 std::lock_guard 時先呼叫 std::lock 呢?

其實只是因為使用讀寫鎖時,std::lock 要知道哪個是讀鎖,哪個是寫鎖而已,畢竟 std::lock 要知道它內部一個一個呼叫的是 std::shared_mutex 的 lock 還是 lock_shared,因此std::unique_lock 及 std::shared_lock 要寫在前,先表明型別。

move 版本也是一樣,只是非內建型別要用 std::move 包起來而已。

C++ the rule of five

在 C++ 11 之前有所謂 the rule of three,也就是自訂類別有寫自己的複製建構子、複製賦值運算子或解構子其中之一,應該將其他兩個也補上,因為通常編譯器自動產生的不會符合你需要的,不然你也不用自己寫了對吧。

在 C++ 11 後,因為多了移動語意,所以還要加上兩個特別的函式,也就是移動建構子和移動賦值運算子。

那要怎麼寫呢?假設有一個自訂類別 Foo,有兩個成員變數,一個是內建型別 int,另一個是自訂型別 std::vector

class Foo {
private:
    int id_;
    std::vector ticks_;
};

以下示範正確的宣告及實作方法。

首先是建構子,需不需要傳參數是根據此類別的需求而定,請至少寫一種,正確的寫法是使用 initializer list,也就是冒號 (:) 後面接成員變數的初始化,如果是在 body 內用賦值的,其實效率會慢,因為實際行為是先用 defualt 建構子建構出成員變數,再用複製賦值運算子拷貝一次:

class Foo {
public:
    // constructor
    Foo() : id_(0), ticks_() {}

    // constructor
    Foo(int id, const std::vector& ticks) : id_(id), ticks_(ticks) {}

    // constructor 不好的寫法,先用預設建構子建構出 int 及 std::vector,再用 = 賦值一次
    Foo(int id, const std::vector& ticks) {
        id_ = id;
        ticks_ = ticks;
    }

private:
    int id_;
    std::vector ticks_;
};

第二個是解構子,解構子很簡單,首先考慮此函式有沒有指標需要 delete,有的話建議改用 smart point,再來考慮此類別有沒有虛擬函式,有的話就加 virtual,沒有的話就不用加:

class Foo {
public:
    // destructor 沒有其他虛擬函式不用加上 virtual
    ~Foo() {}

    // 或

    // destructor 有其他虛擬函式就加上 virtual
    virtual ~Foo() {}

private:
    int id_;
    std::vector ticks_;
};

第三個是複製建構子和移動建構子,一樣要使用 initializer list 的寫法,要注意的是移動建構子,如果有成員變數不是內建型別,請使用 std::move 包起來,這樣才可以轉成 rvalue,因為具名的右值參考 (named rvalue reference) 其實是左值 (rvalue),也就是程式中的 Foo&& rhs:

class Foo {
public:
    // copy constructor
    Foo(const Foo& rhs) : id_(rhs.id_), ticks_(rhs.ticks_) {}

    // move constructor
    Foo(Foo&& rhs) : id_(rhs.id_), ticks_(std::move(rhs.ticks_)) {} // 雖然 rhs 型別是 Foo&&,但其實是左值,因此要用 std::move 強制轉換成右值

private:
    int id_;
    std::vector ticks_;
};

最後是複製賦值運算子及移動賦值運算子,要注意的是需要先檢查是不是自我賦值,也就是撇除程式中 x = x 或 x = std::move(x) 的寫法,再來移動賦值運算子一樣非內建型別要用 std::move 包起來:

class Foo {
public:
    // copy assignment
    Foo& operator=(const Foo& rhs) {
        if (this != &rhs) { // 檢查自我賦值
            id_ = rhs.id_;
            ticks_ = rhs.ticks_;
        }

        return *this;
    }

    // move assignment
    Foo& operator=(Foo&& rhs) {
        if (this != &rhs) { // 檢查自我賦值
            id_ = rhs.id_;
            ticks_ = std::move(rhs.ticks_); // 非內建型別用 std::move 包起來
        }

        return *this;
    }

private:
    int id_;
    std::vector ticks_;
};

注意 assignment 正確寫法要傳回自己的參考

Foo& operator=(const Foo&);

Foo& operator=(Foo&&);

不然沒辦法這樣賦值

foo1 = foo2 = foo3;

而跟內建型別的賦值行為不同,對使用你自訂類別的使用者來說不是好的使用體驗。

最後,新寫一個類別時不要怕麻煩,養成好習慣一開始寫好就可以避免未來很多 debug 的時間。

請為有參數的建構子及轉型運算子都加上 explicit

假設有一個 class 如下:

class Foo {
public:
    Foo(int i) : i_(i) {}

private:
    int i_;
};

constructor 有一個參數,我們正常的宣告它

Foo f(0);

一切都很好,直到我們有了一個函式,它接收一個 Foo 參數

void Func(Foo f)
{}

我們預期此函式會被這樣呼叫

Func(f);

但有人不小心寫成

Func(0);

你會很驚訝地發現竟然可以編譯成功,因為編譯器發現 Func 需要參數 Foo,而 0,這個 int 型別可以被傳進 Foo 的建構子以建構出 Foo,而編譯器就這麼幹了,這非常可能不是你所預期的行為。

但只要為建構子加上 explicit,告訴編譯器只能顯示建構,編譯器就不會隱式轉換讓上述函式呼叫通過編譯。

class Foo {
public:
    explicit Foo(int i) : i_(i) {}
    …
};

那麼建構子有哪些情況要加上 explicit 呢?

1. 建構子只有一個參數時。有兩個以上的參數不用加,因為不會只傳一個參數進函式就可以建構出 Foo。

class Foo {
public:
    Foo(int i, double d) : i_(i), d_(d) {} // 不用加 explicit,因為無法這樣呼叫 Func(0, 1.0)
    …
};

2. 有多個參數,但是第二個參數有參數預設值。這很好理解,因為只傳一個值,Func(0) 還是可以通過編譯。

class Foo {
public:
    explicit Foo(int i, double d = 1.0) : i_(i), d_(d) {} // 要加 explicit
    …
};

上述情況其實就隱含著如果只有一個參數,但是有參數預設值,或是有多個參數,但第一個參數就有參數預設值的情況,因為 C++ 是前面的參數有預設值,後面就都要有,這應該很好理解。

class Foo {
public:
    explicit Foo(int i = 0) : i_(i), d_(1.0) {} // 只有一個參數,且有預設值,要加
    explicit Foo(int i = 0, double d = 1.0) : i_(i), d_(d) {} // 有多參數,但第一個參數就有預設值,也要加
    …
};

曾經我就遇過這樣一個 bug,vc6 的 CString 建構子因為沒有宣告 explicit,有人想建構一個 “0” 字串,結果傳進的是整數 0,導致後來程式整個亂掉,為了你的肝,請養成好習慣為建構子加上 explicit。

對了,如果你記不起來規則,那只要是建構子不管幾個參數你都加 explicit 算了。

等等!

你以為這樣就結束了?

還有一種函式要加 explicit,那就是轉型運算子

class Foo {
public:
    explicit operator bool() {
        return true;
    }
    …
};

假設有一個 Func2,接收一個 bool 參數

void Func2(bool b)
{}

如果你不加,那這兩種用法都會通過編譯,但應該不是你要的行為

Func2(f);

if (f)
{
    …
}

但如果加了,只有

if (f)
{
    …
}

會通過編譯,而現在 Func2 呼叫需要明確轉型才行

Func2(static_cast<bool>(f));

Func2((bool)f);

這次終於是全部了。

std::partition

需 #include <algorithm>,std::partition 可以根據指定條件將 array 分成兩群。

FwdIt partition(FwdIt first, const FwdIt last, Pr pred);

將傳入的 [first, last) 區間,根據 pred 的條件分成兩群,回傳值是第二群第一個元素的 iterator。符合條件的在 [first, 回傳值),不符條件的在 [回傳值, last)。

那我們可以怎麼使用它呢?

假設現在有一堆股票,我們想要選出股價介於 300 至 500 的股票。

struct Stock {
    std::string name;
    double price;
    unsigned long volume;
};

std::vector<Stock> stocks = {
    {“2201.TW”, 18.45, 2449},
    {“2723.TW”, 138.0, 1010},
    {“3008.TW”, 3835, 650},
    {“1521.TW”, 56.3, 29},
    {“4107.TW”, 106.5, 152},
    {“2059.TW”, 307.0, 297},
    {“5274.TW”, 1100, 172},
    {“3293.TW”, 593, 819},
    {“2330.TW”, 304.5, 51234},
    {“5904.TW”, 472.0, 163}
};

可以這樣寫。

auto pos = std::partition(stocks.begin(), stocks.end(), [](const Stock& stock) {
            // 找成交價介於 300 到 500 的股票
            return 300 <= stock.price && stock.price <= 500;
        });

這樣 [stocks.begin(), pos) 就是股價在區間 [300, 500] 的股票了。

完整程式如下:

struct Stock {
    std::string name;
    double price;
    unsigned long volume;
};

int main()
{
    std::vector<Stock> stocks = {
    {“2201.TW”, 18.45, 2449},
    {“2723.TW”, 138.0, 1010},
    {“3008.TW”, 3835, 650},
    {“1521.TW”, 56.3, 29},
    {“4107.TW”, 106.5, 152},
    {“2059.TW”, 307.0, 297},
    {“5274.TW”, 1100, 172},
    {“3293.TW”, 593, 819},
    {“2330.TW”, 304.5, 51234},
    {“5904.TW”, 472.0, 163} };

    // before partition
    std::cout << “before partition:\n{商品名, 成交價, 總量}\n”;
    for (const auto& stock : stocks) {
        std::cout << “{” << stock.name << “, ” << stock.price << “, ” << stock.volume << “}\n”;
    }

    std::cout << “\n”;

    std::pair<double, double> partition(300, 500);
    // pos 為第二群第一個元素
    auto pos = std::partition(stocks.begin(), stocks.end(), [&partition](const Stock& stock) {              // 找成交價介於 300 到 500 的股票
            return partition.first <= stock.price && stock.price <= partition.second;
        });

    // after partition
    std::cout << “after partition:\n{商品名, 成交價, 總量}\n”;
    for (auto it = stocks.begin(); it != pos; ++it) {
        std::cout << “{” << it->name << “, ” << it->price << “, ” << it->volume << “}\n”;
    }

    std::cout << “以上商品價格介於 ” << partition.first << ” 到 ” << partition.second << std::endl;

    for (auto it = pos, end = stocks.end(); it != end; ++it) {
        std::cout << “{” << it->name << “, ” << it->price << “, ” << it->volume << “}\n”;
    }

    std::cout << ‘\n’;

    return 0;
}

std::partition

std::nth_element

void nth_element(RandomIt First, RandomIt Nth, RandomIt Last, Pr Pred);

按 Pred 比較,將第 Nth 元素就定位後就停止排序,也就是原本完整排序 [First, Last),可以知道第 Nth 元素是什麼值,用此方法就不用全部排完。

預設是用 < 比較,因此第 Nth 元素就定位後,Nth 左邊元素都比 Nth 小,但不必是有序的;Nth 右邊元素都比 Nth 大,也不必是有序的。

這個函式可以用在選股情境中,例如一台洗價機選出台股普通股中總量最大的 30% 股票,但不必按總量排好序,就可直接將股票資料傳送給兩隻手機,因為 UI 顯示可能會有另外的排序規則,一隻手機可能想按照股票代碼排序,另一隻則可能想按照漲跌幅排序。

完整程式如下:

#include <algorithm>

struct Stock {
    std::string name;
    double price;
    unsigned long volume;
};

int main()
{
    std::vector stocks = {
    {“2201.TW”, 18.45, 2449},
    {“2723.TW”, 138.0, 1010},
    {“3008.TW”, 3835, 650},
    {“1521.TW”, 56.3, 29},
    {“4107.TW”, 106.5, 152},
    {“2059.TW”, 307.0, 297},
    {“5274.TW”, 1100, 172},
    {“3293.TW”, 593, 819},
    {“2330.TW”, 304.5, 51234},
    {“5904.TW”, 472.0, 163}
    };

    std::cout << “before nth_element:\n{ Name, Price, Volume}\n”;
    for (const auto& stock : stocks) {
        std::cout << “{” << stock.name << “, ” << std::setw(5) << stock.price << “, ” << std::setw(6) << stock.volume << “}\n”;
    }

    std::nth_element(stocks.begin(), stocks.begin() + stocks.size() * 0.3, stocks.end(),
        [](const Stock& lhs, const Stock& rhs) {
            return lhs.volume > rhs.volume;
        }); // 將最大 30% 總量股票排前面

    std::cout << “\nafter nth_element:\n{ Name, Price, Volume}\n”;
    for (const auto& stock : stocks)
    {
        std::cout << “{” << stock.name << “, ” << std::setw(5) << stock.price << “, ” << std::setw(6) << stock.volume << “}\n”;
    }

    return 0;
}

std::nth_element

std::partial_sort

在寫看盤軟體時,一定會遇到欄位的排序問題。

假設現在想要按照價格由小到大排序,而螢幕只能顯示 40 檔股票,但全部股票有 10,000 檔,那就沒有必要全部排完序,然後只擷取前 40 檔股票顯示,只要最小的 40 檔按順序排完即可。

有沒有這種函式呢?有的。

std::partial_sort 就是這種函式,直接來看 code:

首先 #include <algorithm>。

假設有股票資料結構如下:

struct Stock {
std::string name;
double price;
unsigned long volume;
};

現在使用者點擊 price 欄,程式需要按 price 由小到大顯示股票,而手機螢幕只夠顯示 5 檔股票,原始資料如下:

std::vector<Stock> stocks = {
{“2201.TW”, 18.45, 2449},
{“2723.TW”, 138.0, 1010},
{“3008.TW”, 3835, 650},
{“1521.TW”, 56.3, 29},
{“4107.TW”, 106.5, 152},
{“2059.TW”, 307.0, 297},
{“5274.TW”, 1100, 172},
{“3293.TW”, 593, 819},
{“2330.TW”, 304.5, 51234},
{“5904.TW”, 472.0, 163} };

程式呼叫 std::partial_sort,前 5 個元素排好就停止,寫法如下:

std::partial_sort(stocks.begin(), stocks.begin() + 5, stocks.end(),
[](const Stock& lhs, const Stock& rhs) {
        return lhs.price < rhs.price;
        }
);

參數表示

1. 排序的範圍是 [stocks.begin(), stocks.end())。

2. 最小 5 檔按順序大小排完即停止 [stocks.begin(), stocks.begin() + 5)。

3. 比較方式是取 Stock 結構的 price 成員變數,即按價格排序。
    [](const Stock& lhs, const Stock& rhs) {
        return lhs.price < rhs.price;
        }

顯示結果:

 

完整程式如下:

#include <algorithm>

struct Stock {
std::string name;
double price;
unsigned long volume;
};

int main()
{
    std::vector<Stock> stocks = {
    {“2201.TW”, 18.45, 2449},
    {“2723.TW”, 138.0, 1010},
    {“3008.TW”, 3835, 650},
    {“1521.TW”, 56.3, 29},
    {“4107.TW”, 106.5, 152},
    {“2059.TW”, 307.0, 297},
    {“5274.TW”, 1100, 172},
    {“3293.TW”, 593, 819},
    {“2330.TW”, 304.5, 51234},
    {“5904.TW”, 472.0, 163}
    };

    std::cout << “before partial_sort:\n{ Name, Price, Volume}\n”;
    for (const auto& stock : stocks)
    {
        std::cout << “{” << stock.name << “, ” << std::setw(5) << stock.price << “, ” << std::setw(6) << stock.volume << “}\n”;
    }

    std::partial_sort(stocks.begin(), stocks.begin() + 5, stocks.end(), [](const Stock& lhs, const Stock& rhs) {
        return lhs.price < rhs.price;
    }); // 前 5 個元素排好序就停止

    std::cout << “\nafter partial_sort:\n{ Name, Price, Volume}\n”;
    for (const auto& stock : stocks)
    {
        std::cout << “{” << stock.name << “, ” << std::setw(5) << stock.price << “, ” << std::setw(6) << stock.volume << “}\n”;
    }

    std::cout << ‘\n’;
}

C++ value initialization

假設有一個 K 線資料結構:

struct K {
    unsigned long time;
    double price;
    double volume;
};

因為此資料結構是 Trivially Copyable,所以可以使用 memset 來初始化:

K k;
if (std::is_trivially_copyable<K>::value) {
    memset(&k, 0, sizeof(K));
}

但如果哪一天我們新增了一個商品名稱成員變數,型態是 std::string:

struct K {
    std::string name;
    unsigned long time;
    double price;
    double volume;
};

突然間此結構就不是 Trivially Copyable,不能用 memset 來初始化了。更糟的是,我們可能會漏修改有使用 memset, memcpy 等函式的地方,導致原本的 code crash。

那有沒有什麼方法解決呢?有的。可以使用 value initialization,也就是使用 () 或是 {} 來初始化物件:

K k{}; // value initialization

這樣 k 就會被正確初始化了。

P.S. 注意!這裡不能使用 () 來初始化,因為會被當成函式宣告。

K k(); 表示宣告一個名稱為 k,沒有參數,回傳值是 K 的函式。

指標也是如法炮製即可:

K* pK = new K;
memset(pK, 0, sizeof(K));

改成

K* pK = new K(); // value initialization

K* pK = new K{}; // value initialization

即可。

POD 型別

TriviallyCopyable:如果 T 是此類型,則可以使用 memcpy 高效複製,使用 std::is_trivially_copyable 來判斷是否為此類型。

TrivialType:TriviallyCopyable 且有一個或多個 default 建構子,所有這些建構子都是 trivial 或 deleted,並且至少有一個是非 deleted。使用 std::is_trivial 來判斷是否為此類型。

StandardLayoutType:如果 T 是此類型,則相容於 C,可以和 C 程式互相操作,使用 std::is_standard_layout 來判斷是否為此類型。

PODType:TriviallyCopyable 且 StandardLayoutType。使用 std::is_pod 來判斷是否為此類型。

當我們設計看盤軟體的資料結構時,會希望滿足 POD 型別,這樣可以高效的複製並且可以最大相容於 C API。

#include <type_traits>
struct K
{
	unsigned long d; // 日期
	unsigned long t; // 時間
	double o; // 開
	double h; // 高
	double l; // 低
	double c; // 收
	double v; // 量
};
int main()
{
	std::cout << std::boolalpha;
	std::cout << std::is_trivially_copyable<K>::value << std::endl;
	std::cout << std::is_trivial<K>::value << std::endl;
	std::cout << std::is_standard_layout<K>::value << std::endl;
	std::cout << std::is_pod<K>::value << std::endl;
	return 0;
}
POD type

設計資料結構節省記憶體的小撇步

以下是三種資料結構,X1 的成員變數亂亂排,X2 由小到大排列,X3 由大到小排列,猜猜看這三種資料結構的大小是多少?

struct X1
{
	char a;     // 1 byte
	int b;      // 4 bytes
	short c;    // 2 bytes
	char d;     // 1 byte
};
struct X2
{
	char a;     // 1 byte
	char d;     // 1 byte
	short c;    // 2 bytes
	int b;      // 4 bytes
};
struct X3
{
	int b;      // 4 bytes
	short c;    // 2 bytes
	char a;     // 1 byte
	char d;     // 1 byte
};

答案如下,有出乎意料之外嗎?

data alignment

這是因為資料對齊的關係,資料對齊則又跟 CPU 存取效能有關,在 X1 結構中是以最大的資料成員,也就是 int (4 bytes) 為對齊方式,因此編譯器會以自然對齊的方式來對齊結構成員去填補結構,如下:

struct X1
{
	char a;		// 1 byte
	char pad0[3];	// 填補對齊到 4 位元組的邊界
	int b;		// 4 bytes
	short c;	// 2 bytes
	char d;		// 1 byte
	char pad1[1];	// 填補對齊到 4 的倍數
};

而結構 X2 及 X3 則有效利用了空間,因此記憶體用量也比較小。

那為什麼要講這個呢?因為在設計看盤軟體的 K 棒資料結構時,如果亂排可是會浪費很多的記憶體的,眾所周知股價資料是很大的,一天可以高達好幾十 GB,因此有必要節省記憶體的用量,以下這兩種設計記憶體用量就差距很大:

struct K1
{
	unsigned long d; // 日期
	double o; // 開
	double h; // 高
	double l; // 低
	double c; // 收
	double v; // 量
	unsigned long t; // 時間
};
struct K2
{
	unsigned long d; // 日期
	unsigned long t; // 時間
	double o; // 開
	double h; // 高
	double l; // 低
	double c; // 收
	double v; // 量
};
不同的 K 棒資料結構大小差距

因此可以的話還是好好排列一下成員變數吧!