RabbitMQ tutorials на C++

RabbitMQ tutorials на C++

631
ПОДЕЛИТЬСЯ

Под катом собраны ссылки на переведенные управления , материалы и код под спойлером. На веб-сайте rabbitmq.com в разделе tutorials приведены примеры реализации на разных языках, но посреди их нет C++.

Кому удобнее просматривать код из под интерфейса GitHub, можно сходу перейти в репозиторий.

Данный материал употребляет реализацию клиента AMQP-CPP и POCO C++ для работы с сокетом.

«RabbitMQ tutorial 1 — Hello World»

To exit press CTRL-Cn";
handler.loop();
return 0;
} receive.cpp#include <iostream>
#include "SimplePocoHandler.h"

int main(void)
{
SimplePocoHandler handler("localhost", 5672);

AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

AMQP::Channel channel(&connection);
channel.declareQueue("hello");
channel.consume("hello", AMQP::noack).onReceived(
[](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{

std::cout <<" [x] Received "<<message.message() << std::endl;
});

std::cout << " [*] Waiting for messages.

send.cpp#include <iostream>

#include "SimplePocoHandler.h"

int main(void)
{
SimplePocoHandler handler("localhost", 5672);

AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);

channel.onReady([&]()
{
if(handler.connected())
{
channel.publish("", "hello", "Hello World!");
std::cout << " [x] Sent ‘Hello World!’" << std::endl;
handler.quit();
}
});

handler.loop();
return 0;
}

«RabbitMQ tutorial 2 — Очередь задач»

To exit press CTRL-Cn";
handler.loop();
return 0;
} worker.cpp#include <iostream>
#include <algorithm>
#include <thread>
#include <chrono>

#include "SimplePocoHandler.h"

int main(void)
{
SimplePocoHandler handler("localhost", 5672);

AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

AMQP::Channel channel(&connection);
channel.declareQueue("task_queue", AMQP::durable);
channel.consume("task_queue", AMQP::noack).onReceived(
[&channel](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
const auto body = message.message();
std::cout<<" [x] Received "<<body<<std::endl;

size_t count = 0;
std::for_each(body.cbegin(), body.cend(), [&](const char& ch)
{
if(ch ==’.’)
{
++count;
}
});
std::this_thread::sleep_for (std::chrono::seconds(count));

std::cout<<" [x] Done"<<std::endl;
channel.ack(deliveryTag);
});

channel.setQos(1);
std::cout << " [*] Waiting for messages.

join(&argv[1], &argv[argc], " ") : "Hello World!";

SimplePocoHandler handler("localhost", 5672);

AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);

auto callback =
[&](const std::string &name, int msgcount, int consumercount)
{
channel.publish("", "task_queue", msg);
std::cout<<" [x] Sent ‘"<<msg<<"’n";
handler.quit();
};

channel.declareQueue("task_queue", AMQP::durable).onSuccess(callback);
handler.loop();
return 0;
} new_task.cpp#include <iostream>

#include "SimplePocoHandler.h"
#include "tools.h"

int main(int argc, const char* argv[])
{
const std::string msg =
argc > 1?

«RabbitMQ tutorial 3 — Публикация/Подписка»

receive_logs.cpp#include <iostream>

#include "SimplePocoHandler.h"

int main(void)
{
SimplePocoHandler handler("localhost", 5672);

AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

AMQP::Channel channel(&connection);
auto receiveMessageCallback = [](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{

std::cout <<" [x] "<<message.message() << std::endl;
};

auto callback =
[&](const std::string &name, int msgcount, int consumercount)
{
channel.bindQueue("logs", name,"");
channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);
};

channel.declareExchange("logs", AMQP::fanout).onSuccess([&]()
{
channel.declareQueue(AMQP::exclusive).onSuccess(callback);

});

std::cout << " [*] Waiting for messages. To exit press CTRL-Cn";
handler.loop();
return 0;
}

join(&argv[1], &argv[argc], " ") : "info: Hello World!";

SimplePocoHandler handler("localhost", 5672);

AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

AMQP::Channel channel(&connection);
channel.declareExchange("logs", AMQP::fanout).onSuccess([&]()
{
channel.publish("logs", "", msg);
std::cout << " [x] Sent "<<msg<< std::endl;
handler.quit();
});

handler.loop();
return 0;
} emit_log.cpp#include <iostream>

#include "SimplePocoHandler.h"
#include "tools.h"

int main(int argc, const char* argv[])
{
const std::string msg =
argc > 1?

«RabbitMQ tutorial 4 — Роутинг»

To exit press CTRL-Cn";
handler.loop();
return 0;
} receive_logs_direct.cpp#include <iostream>
#include <algorithm>

#include "SimplePocoHandler.h"

int main(int argc, const char* argv[])
{
if(argc==1)
{
std::cout<<"Usage: "<<argv[0]<<" [info] [warning] [error]"<<std::endl;
return 1;
}
SimplePocoHandler handler("localhost", 5672);

AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

AMQP::Channel channel(&connection);

channel.declareExchange("direct_logs", AMQP::direct);

auto receiveMessageCallback =
[](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
std::cout <<" [x] "
<<message.routingKey()
<<":"
<<message.message()
<< std::endl;
};

auto callback = [&](const std::string &name,
int msgcount,
int consumercount)
{
std::for_each(&argv[1],
&argv[argc],
[&](const char* severity)
{
channel.bindQueue("direct_logs","", severity);
channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);
});

};
channel.declareQueue(AMQP::exclusive).onSuccess(callback);

std::cout << " [*] Waiting for messages.

emit_log_direct.cpp#include <iostream>

#include "SimplePocoHandler.h"
#include "tools.h"

int main(int argc, const char* argv[])
{
const std::string severity = argc > 2? join(&argv[2], &argv[argc], " ") : "Hello World!";

SimplePocoHandler handler("localhost", 5672);

AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

AMQP::Channel channel(&connection);
channel.declareExchange("direct_logs", AMQP::direct).onSuccess([&]()
{
channel.publish("direct_logs", severity, msg);
std::cout << " [x] Sent "<<severity<<":"<<msg<< std::endl;
handler.quit();
});

handler.loop();
return 0;
} argv[1] : "info";
const std::string msg =
argc > 2?

«RabbitMQ tutorial 5 — Тематики»

receive_logs_topic.cpp#include <iostream>
#include <algorithm>

#include "SimplePocoHandler.h"

int main(int argc, const char* argv[])
{
if(argc==1)
{
std::cout<<"Usage: "<<argv[0]<<" [binding_key]…"<<std::endl;
return 1;
}
SimplePocoHandler handler("localhost", 5672);

AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

AMQP::Channel channel(&connection);

channel.declareExchange("topic_logs", AMQP::topic);

auto receiveMessageCallback =
[](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
std::cout <<" [x] "
<<message.routingKey()
<<":"
<<message.message()
<< std::endl;
};

auto callback = [&](const std::string &name,
int msgcount,
int consumercount)
{
std::for_each(&argv[1],
&argv[argc],
[&](const char* bindingKeys)
{
std::cout<<bindingKeys<<std::endl;
channel.bindQueue("topic_logs",name, bindingKeys);
channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);
});

};
channel.declareQueue(AMQP::exclusive).onSuccess(callback);

std::cout << " [*] Waiting for messages. To exit press CTRL-Cn";
handler.loop();
return 0;
}

join(&argv[2], &argv[argc], " ") : "Hello World!";
const std::string routing_key = argc > 1? argv[1] : "anonymous.info";

SimplePocoHandler handler("localhost", 5672);

AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

AMQP::Channel channel(&connection);
channel.declareExchange("topic_logs", AMQP::topic).onSuccess([&]()
{
channel.publish("topic_logs", routing_key, msg);
std::cout << " [x] Sent "<<routing_key<<":"<<msg<< std::endl;
handler.quit();
});

handler.loop();
return 0;
} emit_log_topic.cpp#include <iostream>

#include "SimplePocoHandler.h"
#include "tools.h"

int main(int argc, const char* argv[])
{
const std::string msg =
argc > 1?

«RabbitMQ tutorial 6 — Удаленный вызов процедур»

rpc_server.cpp#include <iostream>
#include <algorithm>
#include <thread>
#include <chrono>

#include "SimplePocoHandler.h"

int fib(int n)
{
switch (n)
{
case 0:
return 0;
case 1:
return 1;
default:
return fib(n — 1) + fib(n — 2);
}
}

int main(void)
{
SimplePocoHandler handler("localhost", 5672);

AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

AMQP::Channel channel(&connection);
channel.declareQueue("rpc_queue");
channel.consume("").onReceived([&channel](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
const auto body = message.message();
std::cout<<" [.] fib("<<body<<")"<<std::endl;

AMQP::Envelope env(std::to_string(fib(std::stoi(body))));
env.setCorrelationID(message.correlationID());

channel.publish("", message.replyTo(), env);
channel.ack(deliveryTag);
});

channel.setQos(1);
std::cout << " [x] Awaiting RPC requests" << std::endl;
handler.loop();
return 0;
}

rpc_client.cpp#include <iostream>

#include "tools.h"
#include "SimplePocoHandler.h"

int main(int argc, const char* argv[])
{
const std::string correlation(uuid());

SimplePocoHandler handler("localhost", 5672);

AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

AMQP::Channel channel(&connection);
auto callback = [&](const std::string &name,
int msgcount,
int consumercount)
{
AMQP::Envelope env("30");
env.setCorrelationID(correlation);
env.setReplyTo(name);
channel.publish("","rpc_queue",env);
std::cout<<" [x] Requesting fib(30)"<<std::endl;

};
channel.declareQueue(AMQP::exclusive).onSuccess(callback);

auto receiveCallback = [&](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
if(message.correlationID() != correlation)
return;

std::cout<<" [.] Got "<<message.message()<<std::endl;
handler.quit();
};

channel.consume("", AMQP::noack).onReceived(receiveCallback);

handler.loop();
return 0;
}
habrahabr.ru