15 std::thread collect_thread_;
16 std::thread submit_thread_;
17 std::mutex collect_mutex_;
18 std::mutex ready_queue_mutex_;
19 std::condition_variable collect_cv_;
20 std::condition_variable ready_cv_;
21 std::queue<Message> msg_collect_queue_;
22 std::queue<Message> msg_ready_queue_;
23 std::atomic<bool> stop_flag_{
false};
25 std::function<void(
const Message&, std::ostream&)> proxy_func_;
29 std::ostream* os = &std::cout,
30 std::function<
void(
const Message&, std::ostream&)> proxy_func =
nullptr)
34 proxy_func_ = [](
const Message& msg, std::ostream& stream) {
38 proxy_func_ = proxy_func;
42 collect_thread_ = std::thread([
this]() { collectThreadFunc(); });
45 submit_thread_ = std::thread([
this]() { submitThreadFunc(); });
55 std::lock_guard<std::mutex> lock(collect_mutex_);
56 msg_collect_queue_.push(msg);
58 collect_cv_.notify_one();
65 std::lock_guard<std::mutex> lock(collect_mutex_);
66 msg_collect_queue_.push(std::move(msg));
68 collect_cv_.notify_one();
74 stop_flag_.store(
true, std::memory_order_release);
77 collect_cv_.notify_one();
78 ready_cv_.notify_one();
81 if (collect_thread_.joinable()) {
82 collect_thread_.join();
84 if (submit_thread_.joinable()) {
85 submit_thread_.join();
89 flushRemainingMessages();
96 std::unique_lock<std::mutex> collect_lock(collect_mutex_);
97 std::unique_lock<std::mutex> ready_lock(ready_queue_mutex_);
99 if (msg_collect_queue_.empty() && msg_ready_queue_.empty()) {
104 collect_lock.unlock();
106 std::this_thread::yield();
111 void collectThreadFunc() {
112 while (!stop_flag_.load(std::memory_order_acquire)) {
113 std::queue<Message> temp_queue;
117 std::unique_lock<std::mutex> lock(collect_mutex_);
118 collect_cv_.wait(lock, [
this]() {
119 return stop_flag_.load(std::memory_order_acquire) ||
120 !msg_collect_queue_.empty();
123 if (stop_flag_.load(std::memory_order_acquire)) {
128 temp_queue.swap(msg_collect_queue_);
132 if (!temp_queue.empty()) {
133 std::lock_guard<std::mutex> lock(ready_queue_mutex_);
134 while (!temp_queue.empty()) {
135 msg_ready_queue_.push(std::move(temp_queue.front()));
138 ready_cv_.notify_one();
143 processRemainingMessagesInCollect();
146 void submitThreadFunc() {
147 while (!stop_flag_.load(std::memory_order_acquire)) {
149 bool has_msg =
false;
153 std::unique_lock<std::mutex> lock(ready_queue_mutex_);
154 ready_cv_.wait(lock, [
this]() {
155 return stop_flag_.load(std::memory_order_acquire) ||
156 !msg_ready_queue_.empty();
159 if (stop_flag_.load(std::memory_order_acquire) &&
160 msg_ready_queue_.empty()) {
164 if (!msg_ready_queue_.empty()) {
165 msg = std::move(msg_ready_queue_.front());
166 msg_ready_queue_.pop();
172 if (has_msg && os_) {
174 proxy_func_(msg, *os_);
183 processRemainingMessagesInSubmit();
186 void processRemainingMessagesInCollect() {
187 std::queue<Message> temp_queue;
189 std::lock_guard<std::mutex> lock(collect_mutex_);
190 temp_queue.swap(msg_collect_queue_);
193 if (!temp_queue.empty()) {
194 std::lock_guard<std::mutex> lock(ready_queue_mutex_);
195 while (!temp_queue.empty()) {
196 msg_ready_queue_.push(std::move(temp_queue.front()));
199 ready_cv_.notify_one();
203 void processRemainingMessagesInSubmit() {
206 bool has_msg =
false;
209 std::lock_guard<std::mutex> lock(ready_queue_mutex_);
210 if (!msg_ready_queue_.empty()) {
211 msg = std::move(msg_ready_queue_.front());
212 msg_ready_queue_.pop();
221 proxy_func_(msg, *os_);
229 void flushRemainingMessages() {
230 processRemainingMessagesInCollect();
231 processRemainingMessagesInSubmit();