Producer-Consumer(생산자-소비자) 패턴
추상적인 패턴이기 때문에 만드는 방법이 다양하며 multi threaded 프로그래밍에서 상당히 많이 쓰이는 패턴이다.
예를 들어, Job queue를 생각해 볼 수 있다.
왼쪽은 Job을 만들어서 추가하는 Producer가 되고
오른쪽이 Job을 가져와 처리하는 Consumer쪽이 된다.
Job Queue에 1초에 2개의 Job이 들어오고 처리하는 쪽에서는 1초에 오직 하나의 Job만 처리할 수있다고 한다면 처리하는 쪽에 2개 이상의 Thread를 생성하여 Job Queue가 오버플로우되지 않도록 처리하는 구조를 가져갈 수 있을 것이다.
예를 들어, 왼쪽 Producer쪽에서 Job을 추가하려면 Job Queue(shared memory)에 오직 하나의 thread만 접근이 가능해야하므로 mutex lock이 필요하다. Job이 하나가 추가된 이후에는 이제 Job을 가져가도 된다는 Signal을 보내는 CV(Condition Variable)를 사용할 수 있다.
Consumer쪽에서도 Job을 하나 가져가서 처리할 때 하나의 thread만 접근해야하므로 mutex lock이 필요하다.
그리고 Job Queue가 비어있을 때
- Consumer thread가 wait 상태로 대기
- Producer가 Job을 추가하고 signal을 전송
- wait 상태였던 thread가 깨어나서 Job을 가져간 후 처리
하는 방식이 효율적일 것이다.
이를 코드로 보기전에 주어진 예제를 간략하게 바꿔보자. 편의를 위해 큐 형태가 아닌 스택을 사용한다.
Example Code : string stack을 이용한 producer-consumer 패턴
#include <iostream>
#include <thread>
#include <condition_variable>
#include <chrono>
#include <mutex>
#include <vector>
class StrStack
{
public:
// producer
void addStr(std::string s)
{
// 데이터가 추가되는 부분
{
std::lock_guard<std::mutex> lck(mMtx);
mStrs.emplace_back(std::move(s));
}
mCv.notify_one();
}
// consumer
std::string getStr()
{
// shared memory에 접근하기 전에 lock을 건다.
std::unique_lock<std::mutex> lck(mMtx);
while (mStrs.empty()) // 값이 비어있다면 wait
{
mCv.wait(lck);
}
std::string s = std::move(mStrs.back());
mStrs.pop_back();
return s;
}
private:
std::vector<std::string> mStrs;
// 멀티 쓰레드 환경에서 동작하게 만들기 위한 변수들
std::mutex mMtx;
std::condition_variable mCv;
};
int main()
{
StrStack strStack;
std::thread t1([&]() {
strStack.addStr("bark");
});
std::thread t2([&]() {
strStack.addStr("meow");
});
std::thread t3([&]() {
std::cout << strStack.getStr() << std::endl;
});
std::thread t4([&]() {
std::cout << strStack.getStr() << std::endl;
});
t1.join();
t2.join();
t3.join();
t4.join();
return 0;
}
다음과 같이 하나의 consumer thread에서 while을 사용하여 계속해서 wait 상태로 대기시키는 것도 가능하다.
int main()
{
StrStack strStack;
std::thread t1([&]() {
strStack.addStr("bark");
});
std::thread t2([&]() {
strStack.addStr("meow");
});
std::thread t3([&]() {
while (true)
{
std::cout << strStack.getStr() << std::endl;
}
});
t1.join();
t2.join();
t3.join();
return 0;
}
'C++ > Thread' 카테고리의 다른 글
[Thread] std::latch (0) | 2022.08.09 |
---|---|
[Thread]semaphore (0) | 2022.07.26 |
[Thread] Condition Variable (0) | 2022.07.23 |
[Thread] scoped static 초기화 (0) | 2022.07.19 |
[Thread] std::call_once (0) | 2022.07.18 |