xcal
基于 C++23 的现代图形渲染引擎
载入中...
搜索中...
未找到
asynczmqlogsubmitstream.hpp
浏览该文件的文档.
1#pragma once
2#include <condition_variable>
3#include <mutex>
4#include <queue>
5#include <string>
6#include <thread>
7#include <vector>
8#include <zmq.hpp>
9
10#include "./message.hpp"
11
12namespace xclogger {
14 private:
15 std::thread ipc_thread_;
16 std::mutex mutex_;
17 std::condition_variable cv_;
18 std::queue<Message> msg_queue_;
19 bool stop_flag_ = false;
20
21 public:
22 AsyncZmqLogSubmitStream(const std::string& addr) {
23 ipc_thread_ = std::thread([this, addr]() {
24 zmq::context_t ctx{1};
25 zmq::socket_t req{ctx, zmq::socket_type::req};
26 req.connect(addr);
27 req.set(zmq::sockopt::rcvtimeo, 2000);
28 while (true) {
29 Message msg;
30 {
31 std::unique_lock<std::mutex> lock(mutex_);
32 cv_.wait(lock, [this]() {
33 return stop_flag_ || !msg_queue_.empty();
34 });
35 if (stop_flag_ && msg_queue_.empty()) break;
36 msg = msg_queue_.front();
37 msg_queue_.pop();
38 }
39 std::vector<char> data = Message::encode(msg);
40 try{
41 req.send(zmq::const_buffer(data.data(), data.size()),
42 zmq::send_flags::dontwait);
43 zmq::message_t reply;
44 if (req.recv(reply, zmq::recv_flags::none)) {
45 } else {
46 }
47 }catch(const zmq::error_t& e){
48 std::cerr << "zmq error: " << e.what() << std::endl;
49 }
50 }
51 req.close();
52 ctx.close();
53 });
54 }
56 {
57 std::lock_guard<std::mutex> lock(mutex_);
58 msg_queue_.push(msg);
59 }
60 cv_.notify_one();
61 return *this;
62 }
64 {
65 std::lock_guard<std::mutex> lock(mutex_);
66 stop_flag_ = true;
67 }
68 cv_.notify_one();
69 if (ipc_thread_.joinable()) ipc_thread_.join();
70 }
71};
72} // namespace xclogger
73
74#define XCLOG_ENABLE_ASYNCZMQLOGSUBMITSTREAM(addr) \
75 XCLOG_SUBMIT_STREAM_INSTENCE_IMPT() { \
76 static ::xclogger::AsyncZmqLogSubmitStream zmq_stream{addr}; \
77 return &zmq_stream; \
78 }
AsyncZmqLogSubmitStream & operator<<(const Message &msg)
AsyncZmqLogSubmitStream(const std::string &addr)
static std::vector< char > encode(const Message &msg)
Definition message.hpp:30