#include "http.h" #include "curl/curl.h" #include "curl/easy.h" #include "curl/multi.h" #include #include using namespace http; HttpClient::HttpClient(uv_loop_t *loop): m_eventLoop(loop) { m_curlMulti = curl_multi_init(); curl_multi_setopt(m_curlMulti , CURLMOPT_SOCKETFUNCTION, &HttpClient::curl_socket_cb); curl_multi_setopt(m_curlMulti, CURLMOPT_SOCKETDATA, this); curl_multi_setopt(m_curlMulti, CURLMOPT_TIMERFUNCTION, &HttpClient::curl_timer_cb); curl_multi_setopt(m_curlMulti, CURLMOPT_TIMERDATA, this); m_curlTimer = new uv_timer_t; uv_timer_init(loop, m_curlTimer); m_curlTimer->data = this; m_logger = spdlog::get("httpclient"); if (!m_logger) { m_logger = spdlog::stdout_color_mt("httpclient"); m_logger->set_level(spdlog::level::info); } } HttpClient::~HttpClient() { while (m_requests.size() > 0) { auto i = m_requests.begin(); m_logger->warn("canceling request while destructing"); if (i->second.socketData) { m_logger->debug("removing request handle"); curl_multi_remove_handle(m_curlMulti, i->second.curl); } m_logger->debug("closing poll handle"); if (i->second.socketData->pollHandle) { uv_poll_stop(i->second.socketData->pollHandle); uv_close((uv_handle_t*)i->second.socketData->pollHandle, [](uv_handle_t *h){ delete h; }); } m_requests.erase(i->first); } m_logger->debug("closing timer handle"); uv_close((uv_handle_t*)m_curlTimer, [](uv_handle_t *h){ delete h; }); } void HttpClient::cancel_request(request_id id) { CURL *requestHandle = reinterpret_cast(id); auto request = m_requests.find(requestHandle); if (request == m_requests.end()) { m_logger->warn("cancel_request: not found"); return; } curl_multi_remove_handle(m_curlMulti, requestHandle); if (request->second.socketData->pollHandle) { m_logger->debug("closing poll handle"); uv_poll_stop(request->second.socketData->pollHandle); uv_close((uv_handle_t*)request->second.socketData->pollHandle, [](uv_handle_t *h){ delete h; }); } m_requests.erase(request); } request_id HttpClient::send_request(std::string method, std::string url, HttpOptions opts, ResponseCallback cb) { m_logger->debug("send request {} {}", method, url); CURL *requestHandle = curl_easy_init(); std::pair insertResult = m_requests.emplace(requestHandle, this); if (!insertResult.second) { curl_easy_cleanup(requestHandle); return nullptr; } auto requestData = insertResult.first; requestData->second.callback = cb; requestData->second.curl = requestHandle; requestData->second.response = std::make_unique(); curl_easy_setopt(requestHandle, CURLOPT_WRITEFUNCTION, &HttpClient::curl_data_cb); curl_easy_setopt(requestHandle, CURLOPT_WRITEDATA, requestHandle); curl_easy_setopt(requestHandle, CURLOPT_PRIVATE, this); curl_easy_setopt(requestHandle, CURLOPT_FOLLOWLOCATION, 1); curl_easy_setopt(requestHandle, CURLOPT_URL, url.c_str()); curl_easy_setopt(requestHandle, CURLOPT_CUSTOMREQUEST, method.c_str()); if (opts.headers) { curl_slist *headerList = nullptr; auto inputHeaderList = *opts.headers; for (auto i = inputHeaderList.cbegin(); i != inputHeaderList.cend(); ++i) { std::string headerLine; headerLine.reserve(i->first.size() + 1 + i->second.size()); headerLine += i->first; headerLine += ':'; headerLine += i->second; curl_slist_append(headerList, headerLine.c_str()); } curl_easy_setopt(requestHandle, CURLOPT_HTTPHEADER, headerList); requestData->second.requestHeaders = headerList; } if (opts.body) { curl_easy_setopt(requestHandle, CURLOPT_POSTFIELDS, opts.body->c_str()); } curl_multi_add_handle(m_curlMulti, requestHandle); return reinterpret_cast(requestHandle); } int HttpClient::curl_socket_cb(CURL *curl, curl_socket_t curlSocket, int action, HttpClient *self, void *socketPtr) { (void)curl; int pollFlags = 0; CurlSocketData_ *data; switch (action) { case CURL_POLL_IN: case CURL_POLL_INOUT: case CURL_POLL_OUT: self->m_logger->debug("polling socket {}", curlSocket); if (!socketPtr) { data = new CurlSocketData_; data->curlSocket = curlSocket; data->client = self; data->pollHandle = new uv_poll_t; uv_poll_init(self->m_eventLoop, data->pollHandle, curlSocket); data->pollHandle->data = data; self->m_requests.at(curl).socketData = data; } else { data = reinterpret_cast(socketPtr); } curl_multi_assign(self->m_curlMulti, curlSocket, data); if (action != CURL_POLL_OUT) pollFlags |= UV_READABLE; if (action != CURL_POLL_IN) pollFlags |= UV_WRITABLE; uv_poll_start(data->pollHandle, pollFlags, &HttpClient::uv_socket_cb); break; case CURL_POLL_REMOVE: if (socketPtr) { self->m_logger->debug("removing socket {}", curlSocket); data = reinterpret_cast(socketPtr); uv_poll_t *pollHandle = data->pollHandle; data->pollHandle = nullptr; uv_poll_stop(pollHandle); curl_multi_assign(self->m_curlMulti, curlSocket, nullptr); pollHandle->data = nullptr; uv_close((uv_handle_t*)pollHandle, [](uv_handle_t *h){ delete h; }); delete data; } } return 0; } int HttpClient::curl_timer_cb(CURLM *curl, long timeout, HttpClient *self) { if (timeout < 0) uv_timer_stop(self->m_curlTimer); else { if (timeout == 0) timeout = 1; uv_timer_start(self->m_curlTimer, &HttpClient::uv_timeout_cb, timeout, 0); } return 0; } void HttpClient::uv_socket_cb(uv_poll_t *h, int status, int events) { (void)status; auto data = reinterpret_cast(h->data); HttpClient *client = data->client; if (!data) return; data->client->m_logger->debug("socket {}", events & UV_READABLE ? (events & UV_WRITABLE ? "readable and writable" : "readable") : "writable"); int flags = 0; if (events & UV_READABLE) flags |= CURL_CSELECT_IN; if (events & UV_WRITABLE) flags |= CURL_CSELECT_OUT; int runningHandles; curl_multi_socket_action(data->client->m_curlMulti, data->curlSocket, flags, &runningHandles); client->check_curl_messages(); } void HttpClient::uv_timeout_cb(uv_timer_t *h) { auto self = reinterpret_cast(h->data); self->m_logger->debug("curl timeout"); if (self) { int runningHandles; curl_multi_socket_action(self->m_curlMulti, CURL_SOCKET_TIMEOUT, 0, &runningHandles); self->check_curl_messages(); } } void HttpClient::check_curl_messages() { CURLMsg *msg; int pending; while ((msg = curl_multi_info_read(m_curlMulti, &pending))) { switch (msg->msg) { case CURLMSG_DONE: { CURLcode r = msg->data.result; CURL *curl = msg->easy_handle; auto &request = m_requests.at(curl); if (r == CURLE_OK) { m_logger->debug("curl transfer done"); long statusCode = 0; curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &statusCode); request.response->status = statusCode; request.callback(std::move(request.response), CURLE_OK); } else { m_logger->error("curl transfer error: {}", (int)r); request.callback(nullptr, r); } curl_multi_remove_handle(m_curlMulti, curl); curl_easy_cleanup(curl); if (request.requestHeaders) curl_slist_free_all(request.requestHeaders); m_logger->debug("removing request"); m_requests.erase(curl); break; } default: break; } } } size_t HttpClient::curl_data_cb(char *ptr, size_t size, size_t nmemb, CURL *userdata) { HttpClient *self; curl_easy_getinfo(userdata, CURLINFO_PRIVATE, &self); HttpRequestData_ &req = self->m_requests.at(userdata); self->m_logger->debug("received {} bytes", nmemb); req.response->body.append(ptr, nmemb); return nmemb; }