#pragma
once
#include <chrono>//std::chrono
#include <mutex>//std::mutex,std::unique_lock,std::lock_guard
#include <thread>//std::thread
#include <condition_variable>//std::condition_variable
#include <iostream>//std::cout,std::endl
#include <map>//std::map
namespace MyProducerToConsumer {
static
const int gRepositorySize =
10
;//total size of the repository
static
const int gItemNum =
97
;//number of products to produce
std::mutex produce_mtx, consume_mtx;//mutex for
all
the producer thread or consumer thread
std::map<std::thread::id, int> threadPerformance;//records of every thread's producing/consuming number
struct ItemRepository {//repository class
int m_ItemBuffer[gRepositorySize];//Repository itself (as a circular queue)
int m_ProducePos;//rear position of circular queue
int m_ConsumePos;//head position of circular queue
std::mutex m_mtx;//mutex for operating the repository
std::condition_variable m_RepoUnfull;//indicating that this repository is unfull(then producers can produce items)
std::condition_variable m_RepoUnempty;//indicating that this repository is unempty(then consumers can produce items)
}gItemRepo;
void ProduceItem(ItemRepository *ir, int item) {
std::unique_lock <std::mutex>ulk(ir->m_mtx);
while ((ir->m_ProducePos +
1
) % gRepositorySize == ir->m_ConsumePos) {//full(spare one slot for indicating)
std::cout <<
"Reposity is full. Waiting for consumers..."
<< std::endl;
ir->m_RepoUnfull.
wait
(ulk);//unlocking ulk and waiting for unfull condition
}
//when unfull
ir->m_ItemBuffer[ir->m_ProducePos++] = item;//procude and shift
std::cout <<
"Item No."
<< item <<
" produced successfully by "
<<std::this_thread::get_id()<<
"!"
<< std::endl;
threadPerformance[std::this_thread::get_id()]++;
if (ir->m_ProducePos == gRepositorySize)//loop
ir->m_ProducePos =
0
;
ir->m_RepoUnempty.notify_all();//item produced, so it's unempty; notify
all
consumers
}
int ConsumeItem(ItemRepository *ir) {
std::unique_lock<std::mutex>ulk(ir->m_mtx);
while (ir->m_ConsumePos == ir->m_ProducePos) {//empty
std::cout <<
"Repository is empty.Waiting for producing..."
<< std::endl;
ir->m_RepoUnempty.
wait
(ulk);
}
int item = ir->m_ItemBuffer[ir->m_ConsumePos++];
std::cout <<
"Item No."
<< item <<
" consumed successfully by "
<<std::this_thread::get_id()<<
"!"
<< std::endl;
threadPerformance[std::this_thread::get_id()]++;
if (ir->m_ConsumePos == gRepositorySize)
ir->m_ConsumePos =
0
;
ir->m_RepoUnfull.notify_all();//item consumed, so it's unempty; notify
all
consumers
return item;
}
void ProducerThread() {
static
int produced =
0
;//
static
variable to indicate the number of produced items
while (
1
) {
std::this_thread::sleep_for(std::chrono::milliseconds(
10
));//sleep long enough in case it runs too
fast
for other threads to procude
std::lock_guard<std::mutex>lck(produce_mtx);//
auto
unlock when break
produced++;
if (produced > gItemNum)break;
gItemRepo.m_mtx.lock();
std::cout <<
"Producing item No."
<< produced <<
"..."
<< std::endl;
gItemRepo.m_mtx.unlock();
ProduceItem(&gItemRepo, produced);
}
gItemRepo.m_mtx.lock();
std::cout <<
"Producer thread "
<< std::this_thread::get_id()
<<
" exited."
<< std::endl;
gItemRepo.m_mtx.unlock();
}
void ConsumerThread() {
static
int consumed =
0
;
while (
1
) {
std::this_thread::sleep_for(std::chrono::milliseconds(
10
));
std::lock_guard<std::mutex>lck(consume_mtx);
consumed++;
if (consumed > gItemNum)break;
gItemRepo.m_mtx.lock();
std::cout <<
"Consuming item available..."
<< std::endl;
gItemRepo.m_mtx.unlock();
ConsumeItem(&gItemRepo);
}
gItemRepo.m_mtx.lock();
std::cout <<
"Consumer thread "
<< std::this_thread::get_id()
<<
" exited."
<< std::endl;
gItemRepo.m_mtx.unlock();
}
void InitItemRepository(ItemRepository* ir) {
ir->m_ConsumePos =
0
;
ir->m_ProducePos =
0
;
}
void Run() {
InitItemRepository(&gItemRepo);
std::thread thdConsume[
11
];
std::thread thdProduce[
11
];
for (
auto
& t : thdConsume)t = std::thread(ConsumerThread);
for (
auto
& t : thdProduce)t = std::thread(ProducerThread);
for (
auto
& t : thdConsume)t.join();
for (
auto
& t : thdProduce)t.join();
for (
auto
& iter : threadPerformance)cout << iter.first <<
":"
<< iter.second << endl;
}
}