mmcs-quotes-bridge/http.cpp

224 lines
7.9 KiB
C++

#include "http.h"
#include "curl/curl.h"
#include "curl/easy.h"
#include "curl/multi.h"
#include <spdlog/spdlog.h>
#include <spdlog/sinks/stdout_color_sinks.h>
using namespace http;
HttpClient::HttpClient(uv_loop_t *loop): m_eventLoop(loop) {
m_curlMulti = curl_multi_init();
curl_multi_setopt(m_curlMulti , CURLMOPT_SOCKETFUNCTION, &HttpClient::curl_socket_cb);
curl_multi_setopt(m_curlMulti, CURLMOPT_SOCKETDATA, this);
curl_multi_setopt(m_curlMulti, CURLMOPT_TIMERFUNCTION, &HttpClient::curl_timer_cb);
curl_multi_setopt(m_curlMulti, CURLMOPT_TIMERDATA, this);
m_curlTimer = new uv_timer_t;
uv_timer_init(loop, m_curlTimer);
m_curlTimer->data = this;
m_logger = spdlog::get("httpclient");
if (!m_logger) {
m_logger = spdlog::stdout_color_mt("httpclient");
m_logger->set_level(spdlog::level::info);
}
}
HttpClient::~HttpClient() {
while (m_requests.size() > 0) {
auto i = m_requests.begin();
m_logger->warn("canceling request while destructing");
if (i->second.socketData) {
m_logger->debug("removing request handle");
curl_multi_remove_handle(m_curlMulti, i->second.curl);
}
m_logger->debug("closing poll handle");
if (i->second.socketData->pollHandle) {
uv_poll_stop(i->second.socketData->pollHandle);
uv_close((uv_handle_t*)i->second.socketData->pollHandle, [](uv_handle_t *h){
delete h;
});
}
m_requests.erase(i->first);
}
m_logger->debug("closing timer handle");
uv_close((uv_handle_t*)m_curlTimer, [](uv_handle_t *h){
delete h;
});
}
void HttpClient::cancel_request(request_id id) {
CURL *requestHandle = reinterpret_cast<CURL*>(id);
auto request = m_requests.find(requestHandle);
if (request == m_requests.end()) {
m_logger->warn("cancel_request: not found");
return;
}
curl_multi_remove_handle(m_curlMulti, requestHandle);
if (request->second.socketData->pollHandle) {
m_logger->debug("closing poll handle");
uv_poll_stop(request->second.socketData->pollHandle);
uv_close((uv_handle_t*)request->second.socketData->pollHandle, [](uv_handle_t *h){
delete h;
});
}
m_requests.erase(request);
}
request_id HttpClient::send_request(std::string method, std::string url, HttpOptions opts, ResponseCallback cb) {
m_logger->debug("send request {} {}", method, url);
CURL *requestHandle = curl_easy_init();
std::pair<decltype(m_requests)::iterator, bool> insertResult = m_requests.emplace(requestHandle, this);
if (!insertResult.second) {
curl_easy_cleanup(requestHandle);
return nullptr;
}
auto requestData = insertResult.first;
requestData->second.callback = cb;
requestData->second.curl = requestHandle;
requestData->second.response = std::make_unique<HttpResponse>();
curl_easy_setopt(requestHandle, CURLOPT_WRITEFUNCTION, &HttpClient::curl_data_cb);
curl_easy_setopt(requestHandle, CURLOPT_WRITEDATA, requestHandle);
curl_easy_setopt(requestHandle, CURLOPT_PRIVATE, this);
curl_easy_setopt(requestHandle, CURLOPT_FOLLOWLOCATION, 1);
curl_easy_setopt(requestHandle, CURLOPT_URL, url.c_str());
curl_easy_setopt(requestHandle, CURLOPT_CUSTOMREQUEST, method.c_str());
if (opts.headers) {
curl_slist *headerList = nullptr;
auto inputHeaderList = *opts.headers;
for (auto i = inputHeaderList.cbegin(); i != inputHeaderList.cend(); ++i) {
std::string headerLine;
headerLine.reserve(i->first.size() + 1 + i->second.size());
headerLine += i->first;
headerLine += ':';
headerLine += i->second;
curl_slist_append(headerList, headerLine.c_str());
}
curl_easy_setopt(requestHandle, CURLOPT_HTTPHEADER, headerList);
requestData->second.requestHeaders = headerList;
}
if (opts.body) {
curl_easy_setopt(requestHandle, CURLOPT_POSTFIELDS, opts.body->c_str());
}
curl_multi_add_handle(m_curlMulti, requestHandle);
return reinterpret_cast<void*>(requestHandle);
}
int HttpClient::curl_socket_cb(CURL *curl, curl_socket_t curlSocket, int action, HttpClient *self, void *socketPtr) {
(void)curl;
int pollFlags = 0;
CurlSocketData_ *data;
switch (action) {
case CURL_POLL_IN:
case CURL_POLL_INOUT:
case CURL_POLL_OUT:
self->m_logger->debug("polling socket {}", curlSocket);
if (!socketPtr) {
data = new CurlSocketData_;
data->curlSocket = curlSocket;
data->client = self;
data->pollHandle = new uv_poll_t;
uv_poll_init(self->m_eventLoop, data->pollHandle, curlSocket);
data->pollHandle->data = data;
self->m_requests.at(curl).socketData = data;
} else {
data = reinterpret_cast<CurlSocketData_*>(socketPtr);
}
curl_multi_assign(self->m_curlMulti, curlSocket, data);
if (action != CURL_POLL_OUT) pollFlags |= UV_READABLE;
if (action != CURL_POLL_IN) pollFlags |= UV_WRITABLE;
uv_poll_start(data->pollHandle, pollFlags, &HttpClient::uv_socket_cb);
break;
case CURL_POLL_REMOVE:
if (socketPtr) {
self->m_logger->debug("removing socket {}", curlSocket);
data = reinterpret_cast<CurlSocketData_*>(socketPtr);
uv_poll_t *pollHandle = data->pollHandle;
data->pollHandle = nullptr;
uv_poll_stop(pollHandle);
curl_multi_assign(self->m_curlMulti, curlSocket, nullptr);
pollHandle->data = nullptr;
uv_close((uv_handle_t*)pollHandle, [](uv_handle_t *h){ delete h; });
delete data;
}
}
return 0;
}
int HttpClient::curl_timer_cb(CURLM *curl, long timeout, HttpClient *self) {
if (timeout < 0)
uv_timer_stop(self->m_curlTimer);
else {
if (timeout == 0) timeout = 1;
uv_timer_start(self->m_curlTimer, &HttpClient::uv_timeout_cb, timeout, 0);
}
return 0;
}
void HttpClient::uv_socket_cb(uv_poll_t *h, int status, int events) {
(void)status;
auto data = reinterpret_cast<CurlSocketData_*>(h->data);
HttpClient *client = data->client;
if (!data) return;
data->client->m_logger->debug("socket {}", events & UV_READABLE ? (events & UV_WRITABLE ? "readable and writable" : "readable") : "writable");
int flags = 0;
if (events & UV_READABLE) flags |= CURL_CSELECT_IN;
if (events & UV_WRITABLE) flags |= CURL_CSELECT_OUT;
int runningHandles;
curl_multi_socket_action(data->client->m_curlMulti, data->curlSocket, flags, &runningHandles);
client->check_curl_messages();
}
void HttpClient::uv_timeout_cb(uv_timer_t *h) {
auto self = reinterpret_cast<HttpClient*>(h->data);
self->m_logger->debug("curl timeout");
if (self) {
int runningHandles;
curl_multi_socket_action(self->m_curlMulti, CURL_SOCKET_TIMEOUT, 0, &runningHandles);
self->check_curl_messages();
}
}
void HttpClient::check_curl_messages() {
CURLMsg *msg;
int pending;
while ((msg = curl_multi_info_read(m_curlMulti, &pending))) {
switch (msg->msg) {
case CURLMSG_DONE: {
CURLcode r = msg->data.result;
CURL *curl = msg->easy_handle;
auto &request = m_requests.at(curl);
if (r == CURLE_OK) {
m_logger->debug("curl transfer done");
long statusCode = 0;
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &statusCode);
request.response->status = statusCode;
request.callback(std::move(request.response), CURLE_OK);
} else {
m_logger->error("curl transfer error: {}", (int)r);
request.callback(nullptr, r);
}
curl_multi_remove_handle(m_curlMulti, curl);
curl_easy_cleanup(curl);
if (request.requestHeaders)
curl_slist_free_all(request.requestHeaders);
m_logger->debug("removing request");
m_requests.erase(curl);
break;
}
default:
break;
}
}
}
size_t HttpClient::curl_data_cb(char *ptr, size_t size, size_t nmemb, CURL *userdata) {
HttpClient *self;
curl_easy_getinfo(userdata, CURLINFO_PRIVATE, &self);
HttpRequestData_ &req = self->m_requests.at(userdata);
self->m_logger->debug("received {} bytes", nmemb);
req.response->body.append(ptr, nmemb);
return nmemb;
}