|
19 | 19 | // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
20 | 20 | // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
21 | 21 | // SOFTWARE. |
| 22 | +#include <string> |
22 | 23 | #include <condition_variable> |
23 | 24 | #include <iostream> |
24 | 25 | #include <mutex> |
25 | 26 | #include <signal.h> |
26 | | -#include <string> |
27 | 27 |
|
28 | 28 | #include <cpp_redis/cpp_redis> |
29 | | - |
30 | | -#ifdef _WIN32 |
31 | | -#include <Winsock2.h> |
32 | | -#endif // _WIN32 |
| 29 | +#include "winsock_initializer.h" |
33 | 30 |
|
34 | 31 | std::condition_variable should_exit; |
35 | 32 |
|
36 | 33 | void sigint_handler(int) { should_exit.notify_all(); } |
37 | 34 |
|
38 | | -int main() { |
39 | | -#ifdef _WIN32 |
40 | | - //! Windows netword DLL init |
41 | | - WORD version = MAKEWORD(2, 2); |
42 | | - WSADATA data; |
43 | | - |
44 | | - if (WSAStartup(version, &data) != 0) { |
45 | | - std::cerr << "WSAStartup() failure" << std::endl; |
46 | | - return -1; |
47 | | - } |
48 | | -#endif // _WIN32 |
49 | | - |
50 | | - //! Enable logging |
51 | | - |
52 | | - // const std::string group_name = "groupone"; |
53 | | - const std::vector<std::string> group_names = {"groupone"}; //, "grouptwo"}; |
54 | | - const std::string session_name = "sessone"; |
55 | | - const std::string consumer_name = "ABCD"; |
56 | | - |
57 | | - cpp_redis::active_logger = |
58 | | - std::unique_ptr<cpp_redis::logger>(new cpp_redis::logger); |
59 | | - |
60 | | - cpp_redis::consumer sub(session_name, consumer_name); |
61 | | - |
62 | | - sub.connect("127.0.0.1", 6379, |
63 | | - [](const std::string &host, std::size_t port, |
64 | | - cpp_redis::connect_state status) { |
65 | | - if (status == cpp_redis::connect_state::dropped) { |
66 | | - std::cout << "client disconnected from " << host << ":" |
67 | | - << port << std::endl; |
68 | | - } |
69 | | - }); |
70 | | - |
71 | | - sub.auth("{redis_key}"); |
72 | | - |
73 | | - for (auto &group : group_names) { |
74 | | - |
75 | | - sub.subscribe(group, |
76 | | - [group](const cpp_redis::message_type msg) { |
77 | | - cpp_redis::consumer_response_t res; |
78 | | - // Callback will run for each message obtained from the |
79 | | - // queue |
80 | | - std::cout << "Group: " << group << std::endl; |
81 | | - std::cout << "Id in the cb: " << msg.get_id() << std::endl; |
82 | | - res.insert({"Id", msg.get_id()}); |
83 | | - return res; |
84 | | - }, |
85 | | - [group](int ack_status) { |
86 | | - // Callback will run upon return of xack |
87 | | - std::cout << "Group: " << group << std::endl; |
88 | | - std::cout << "Ack status: " << ack_status << std::endl; |
89 | | - }); |
90 | | - } |
91 | | - |
92 | | - /*sub.subscribe(group_name, |
93 | | - [](const cpp_redis::message_type msg) { |
94 | | - // Callback will run for each message |
95 | | - obtained from the queue std::cout << "Id in the cb: " << msg.get_id() |
96 | | - << std::endl; return msg; |
97 | | - }, |
98 | | - [](int ack_status) { |
99 | | - // Callback will run upon return of |
100 | | - xack std::cout << "Ack status: " << ack_status |
101 | | - << std::endl; |
102 | | - });*/ |
103 | | - |
104 | | - sub.commit(); |
105 | | - |
106 | | - signal(SIGINT, &sigint_handler); |
107 | | - std::mutex mtx; |
108 | | - std::unique_lock<std::mutex> l(mtx); |
109 | | - should_exit.wait(l); |
110 | | - |
111 | | -#ifdef _WIN32 |
112 | | - WSACleanup(); |
113 | | -#endif // _WIN32 |
114 | | - |
115 | | - return 0; |
| 35 | +int |
| 36 | +main() { |
| 37 | + winsock_initializer winsock_init; |
| 38 | + //! Enable logging |
| 39 | + |
| 40 | + //const std::string group_name = "groupone"; |
| 41 | + const std::vector<std::string> group_names = {"groupone"}; //, "grouptwo"}; |
| 42 | + const std::string session_name = "sessone"; |
| 43 | + const std::string consumer_name = "ABCD"; |
| 44 | + |
| 45 | + cpp_redis::active_logger = std::unique_ptr<cpp_redis::logger>(new cpp_redis::logger); |
| 46 | + |
| 47 | + cpp_redis::consumer sub(session_name, consumer_name); |
| 48 | + |
| 49 | + sub.connect("127.0.0.1", 6379, |
| 50 | + [](const std::string &host, std::size_t port, cpp_redis::connect_state status) { |
| 51 | + if (status == cpp_redis::connect_state::dropped) { |
| 52 | + std::cout << "client disconnected from " << host << ":" << port << std::endl; |
| 53 | + } |
| 54 | + }); |
| 55 | + |
| 56 | + sub.auth("{redis_key}"); |
| 57 | + |
| 58 | + for (auto &group : group_names) { |
| 59 | + |
| 60 | + sub.subscribe(group, |
| 61 | + [group](const cpp_redis::message_type msg) { |
| 62 | + cpp_redis::consumer_response_t res; |
| 63 | + // Callback will run for each message obtained from the queue |
| 64 | + std::cout << "Group: " << group << std::endl; |
| 65 | + std::cout << "Id in the cb: " << msg.get_id() << std::endl; |
| 66 | + res.insert({"Id", msg.get_id()}); |
| 67 | + return res; |
| 68 | + }, |
| 69 | + [group](int ack_status) { |
| 70 | + // Callback will run upon return of xack |
| 71 | + std::cout << "Group: " << group << std::endl; |
| 72 | + std::cout << "Ack status: " << ack_status << std::endl; |
| 73 | + }); |
| 74 | + } |
| 75 | + |
| 76 | + /*sub.subscribe(group_name, |
| 77 | + [](const cpp_redis::message_type msg) { |
| 78 | + // Callback will run for each message obtained from the queue |
| 79 | + std::cout << "Id in the cb: " << msg.get_id() << std::endl; |
| 80 | + return msg; |
| 81 | + }, |
| 82 | + [](int ack_status) { |
| 83 | + // Callback will run upon return of xack |
| 84 | + std::cout << "Ack status: " << ack_status << std::endl; |
| 85 | + });*/ |
| 86 | + |
| 87 | + sub.commit(); |
| 88 | + |
| 89 | + signal(SIGINT, &sigint_handler); |
| 90 | + std::mutex mtx; |
| 91 | + std::unique_lock<std::mutex> l(mtx); |
| 92 | + should_exit.wait(l); |
| 93 | + |
| 94 | + return 0; |
116 | 95 | } |
0 commit comments