xcal
基于 C++23 的现代图形渲染引擎
载入中...
搜索中...
未找到
asyncostreamlogsubmitstream.hpp
浏览该文件的文档.
1#pragma once
2#include <condition_variable>
3#include <functional>
4#include <iostream>
5#include <mutex>
6#include <ostream>
7#include <queue>
8#include <thread>
9
10#include "./message.hpp"
11
12namespace xclogger {
14 private:
15 std::thread collect_thread_;
16 std::thread submit_thread_;
17 std::mutex collect_mutex_;
18 std::mutex ready_queue_mutex_;
19 std::condition_variable collect_cv_;
20 std::condition_variable ready_cv_;
21 std::queue<Message> msg_collect_queue_;
22 std::queue<Message> msg_ready_queue_;
23 std::atomic<bool> stop_flag_{false};
24 std::ostream* os_;
25 std::function<void(const Message&, std::ostream&)> proxy_func_;
26
27 public:
29 std::ostream* os = &std::cout,
30 std::function<void(const Message&, std::ostream&)> proxy_func = nullptr)
31 : os_(os) {
32 // 设置默认的代理函数
33 if (!proxy_func) {
34 proxy_func_ = [](const Message& msg, std::ostream& stream) {
35 stream << msg;
36 };
37 } else {
38 proxy_func_ = proxy_func;
39 }
40
41 // 收集线程
42 collect_thread_ = std::thread([this]() { collectThreadFunc(); });
43
44 // 提交线程
45 submit_thread_ = std::thread([this]() { submitThreadFunc(); });
46 }
47
48 // 禁止拷贝和移动
51 delete;
52
54 {
55 std::lock_guard<std::mutex> lock(collect_mutex_);
56 msg_collect_queue_.push(msg);
57 }
58 collect_cv_.notify_one();
59 return *this;
60 }
61
62 // 移动语义支持
64 {
65 std::lock_guard<std::mutex> lock(collect_mutex_);
66 msg_collect_queue_.push(std::move(msg));
67 }
68 collect_cv_.notify_one();
69 return *this;
70 }
71
73 // 设置停止标志
74 stop_flag_.store(true, std::memory_order_release);
75
76 // 通知所有等待的线程
77 collect_cv_.notify_one();
78 ready_cv_.notify_one();
79
80 // 等待线程结束
81 if (collect_thread_.joinable()) {
82 collect_thread_.join();
83 }
84 if (submit_thread_.joinable()) {
85 submit_thread_.join();
86 }
87
88 // 清空剩余的消息(可选)
89 flushRemainingMessages();
90 }
91
92 // 刷新剩余消息(确保所有消息都被处理)
93 void flush() {
94 // 等待所有队列为空
95 while (true) {
96 std::unique_lock<std::mutex> collect_lock(collect_mutex_);
97 std::unique_lock<std::mutex> ready_lock(ready_queue_mutex_);
98
99 if (msg_collect_queue_.empty() && msg_ready_queue_.empty()) {
100 break;
101 }
102
103 // 短暂释放锁,让线程有机会处理消息
104 collect_lock.unlock();
105 ready_lock.unlock();
106 std::this_thread::yield();
107 }
108 }
109
110 private:
111 void collectThreadFunc() {
112 while (!stop_flag_.load(std::memory_order_acquire)) {
113 std::queue<Message> temp_queue;
114
115 // 从收集队列获取消息
116 {
117 std::unique_lock<std::mutex> lock(collect_mutex_);
118 collect_cv_.wait(lock, [this]() {
119 return stop_flag_.load(std::memory_order_acquire) ||
120 !msg_collect_queue_.empty();
121 });
122
123 if (stop_flag_.load(std::memory_order_acquire)) {
124 break;
125 }
126
127 // 交换到临时队列,尽快释放锁
128 temp_queue.swap(msg_collect_queue_);
129 }
130
131 // 将消息转移到就绪队列
132 if (!temp_queue.empty()) {
133 std::lock_guard<std::mutex> lock(ready_queue_mutex_);
134 while (!temp_queue.empty()) {
135 msg_ready_queue_.push(std::move(temp_queue.front()));
136 temp_queue.pop();
137 }
138 ready_cv_.notify_one();
139 }
140 }
141
142 // 线程结束前处理剩余消息
143 processRemainingMessagesInCollect();
144 }
145
146 void submitThreadFunc() {
147 while (!stop_flag_.load(std::memory_order_acquire)) {
148 Message msg;
149 bool has_msg = false;
150
151 // 从就绪队列获取消息
152 {
153 std::unique_lock<std::mutex> lock(ready_queue_mutex_);
154 ready_cv_.wait(lock, [this]() {
155 return stop_flag_.load(std::memory_order_acquire) ||
156 !msg_ready_queue_.empty();
157 });
158
159 if (stop_flag_.load(std::memory_order_acquire) &&
160 msg_ready_queue_.empty()) {
161 break;
162 }
163
164 if (!msg_ready_queue_.empty()) {
165 msg = std::move(msg_ready_queue_.front());
166 msg_ready_queue_.pop();
167 has_msg = true;
168 }
169 }
170
171 // 处理消息(不在锁内执行)
172 if (has_msg && os_) {
173 try {
174 proxy_func_(msg, *os_);
175 } catch (...) {
176 // 异常处理:避免因为单个消息处理失败导致整个线程崩溃
177 // 可以在这里添加错误日志
178 }
179 }
180 }
181
182 // 线程结束前处理剩余消息
183 processRemainingMessagesInSubmit();
184 }
185
186 void processRemainingMessagesInCollect() {
187 std::queue<Message> temp_queue;
188 {
189 std::lock_guard<std::mutex> lock(collect_mutex_);
190 temp_queue.swap(msg_collect_queue_);
191 }
192
193 if (!temp_queue.empty()) {
194 std::lock_guard<std::mutex> lock(ready_queue_mutex_);
195 while (!temp_queue.empty()) {
196 msg_ready_queue_.push(std::move(temp_queue.front()));
197 temp_queue.pop();
198 }
199 ready_cv_.notify_one();
200 }
201 }
202
203 void processRemainingMessagesInSubmit() {
204 while (true) {
205 Message msg;
206 bool has_msg = false;
207
208 {
209 std::lock_guard<std::mutex> lock(ready_queue_mutex_);
210 if (!msg_ready_queue_.empty()) {
211 msg = std::move(msg_ready_queue_.front());
212 msg_ready_queue_.pop();
213 has_msg = true;
214 }
215 }
216
217 if (!has_msg) break;
218
219 if (os_) {
220 try {
221 proxy_func_(msg, *os_);
222 } catch (...) {
223 // 异常处理
224 }
225 }
226 }
227 }
228
229 void flushRemainingMessages() {
230 processRemainingMessagesInCollect();
231 processRemainingMessagesInSubmit();
232 }
233};
234} // namespace xclogger
235
236#define XCLOG_ENABLE_ASYNCSTREAMLOGSUBMITSTREAM() \
237 XCLOG_SUBMIT_STREAM_INSTENCE_IMPT() { \
238 static ::xclogger::AsyncOstreamLogSubmitStream ostream_stream{}; \
239 return &ostream_stream; \
240 }
241#define XCLOG_ENABLE_ASYNCSTREAMLOGSUBMITSTREAM_WITH_PROXY(os, proxy_func) \
242 XCLOG_SUBMIT_STREAM_INSTENCE_IMPT() { \
243 static ::xclogger::AsyncOstreamLogSubmitStream ostream_stream{ \
244 os, proxy_func}; \
245 return &ostream_stream; \
246 }
AsyncOstreamLogSubmitStream(const AsyncOstreamLogSubmitStream &)=delete
AsyncOstreamLogSubmitStream(std::ostream *os=&std::cout, std::function< void(const Message &, std::ostream &)> proxy_func=nullptr)
AsyncOstreamLogSubmitStream & operator<<(Message &&msg)
AsyncOstreamLogSubmitStream & operator=(const AsyncOstreamLogSubmitStream &)=delete
AsyncOstreamLogSubmitStream & operator<<(const Message &msg)