Files
thehub/libs/api/APIServer.cpp
T

449 lines
16 KiB
C++
Raw Permalink Normal View History

2016-08-16 16:51:22 +02:00
/*
2017-11-09 19:34:51 +01:00
* This file is part of the Flowee project
2021-02-16 18:36:40 +01:00
* Copyright (C) 2016-2021 Tom Zander <tom@flowee.org>
2016-08-16 16:51:22 +02:00
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2018-02-15 19:50:12 +01:00
#include "APIServer.h"
#include "APIRPCBinding.h"
#include "APIProtocol.h"
2016-08-16 16:51:22 +02:00
#include "streaming/MessageBuilder.h"
#include "streaming/MessageParser.h"
2017-03-08 15:31:28 -08:00
#include "chainparamsbase.h"
#include "netbase.h"
2016-08-16 16:51:22 +02:00
#include "util.h"
#include "utilstrencodings.h"
#include "random.h"
#include "rpcserver.h"
2020-10-26 15:11:28 +01:00
#include "init.h"
2016-08-16 16:51:22 +02:00
#include <fstream>
#include <functional>
2020-09-14 22:48:03 +02:00
// the amount of seconds after which we disconnect incoming connections that have not done anything yet.
#define INTRODUCTION_TIMEOUT 4
2016-08-16 16:51:22 +02:00
2019-08-12 23:48:46 +02:00
#ifdef __linux__
#ifndef _GNU_SOURCE
# define _GNU_SOURCE /* To get defns of NI_MAXSERV and NI_MAXHOST */
#endif
namespace {
std::deque<std::string> allInterfaces() {
std::deque<std::string> answer;
struct ifaddrs *ifaddr;
if (getifaddrs(&ifaddr) == -1)
return answer;
for (struct ifaddrs *ifa = ifaddr; ifa; ifa = ifa->ifa_next) {
if (ifa->ifa_addr == nullptr)
continue;
int family = ifa->ifa_addr->sa_family;
if (family != AF_INET && family != AF_INET6)
continue;
char host[NI_MAXHOST];
int s = getnameinfo(ifa->ifa_addr,
(family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6),
host, NI_MAXHOST,
nullptr, 0, NI_NUMERICHOST);
if (s == 0)
answer.push_back(std::string(host));
}
freeifaddrs(ifaddr);
return answer;
}
}
#endif
2018-02-15 19:50:12 +01:00
Api::Server::Server(boost::asio::io_service &service)
2016-08-16 16:51:22 +02:00
: m_networkManager(service),
2021-02-17 22:49:19 +01:00
m_netProtect(100),
2016-08-16 16:51:22 +02:00
m_timerRunning(false),
m_newConnectionTimeout(service)
{
2019-12-12 17:48:48 +01:00
uint16_t defaultPort = BaseParams().ApiServerPort();
using boost::asio::ip::tcp;
std::list<tcp::endpoint> endpoints;
2017-03-08 15:31:28 -08:00
2018-02-15 19:50:12 +01:00
if (mapArgs.count("-apilisten")) {
2020-12-25 23:51:06 +01:00
for (auto &strAddress : mapMultiArgs["-apilisten"]) {
2019-06-02 20:16:49 +02:00
uint16_t port = defaultPort;
2017-03-08 15:31:28 -08:00
std::string host;
SplitHostPort(strAddress, port, host);
if (host.empty()) {
2017-03-08 15:31:28 -08:00
host = "127.0.0.1";
} else if (host == "localhost") {
endpoints.push_back(tcp::endpoint(boost::asio::ip::address_v4::loopback(), port));
endpoints.push_back(tcp::endpoint(boost::asio::ip::address_v6::loopback(), port));
continue;
2019-08-12 23:48:46 +02:00
#ifdef __linux__
} else if (host == "0.0.0.0") {
2020-12-25 23:51:06 +01:00
for (auto &iface : allInterfaces()) {
2019-08-12 23:48:46 +02:00
endpoints.push_back(tcp::endpoint(boost::asio::ip::address::from_string(iface), port));
}
continue;
#endif
}
try {
endpoints.push_back(tcp::endpoint(boost::asio::ip::address::from_string(host), port));
} catch (std::runtime_error &e) {
logCritical(Log::ApiServer) << "Bind port needs to be an API address. Parsing failed with" << e;
}
2017-03-08 15:31:28 -08:00
}
} else {
endpoints.push_back(tcp::endpoint(boost::asio::ip::address_v4::loopback(), defaultPort));
endpoints.push_back(tcp::endpoint(boost::asio::ip::address_v6::loopback(), defaultPort));
2017-03-08 15:31:28 -08:00
}
2020-12-25 23:51:06 +01:00
for (auto &endpoint : endpoints) {
2017-03-08 15:31:28 -08:00
try {
2018-02-15 19:50:12 +01:00
m_networkManager.bind(endpoint, std::bind(&Api::Server::newConnection, this, std::placeholders::_1));
2020-12-17 09:48:15 +01:00
logCritical(Log::ApiServer) << "Api Server listening on" << endpoint;
m_netProtect.addWhitelistedAddress(endpoint.address());
2017-03-08 15:31:28 -08:00
} catch (const std::exception &e) {
logCritical(Log::ApiServer) << "Api Server failed to listen on" << endpoint << "due to:" << e;
2017-03-08 15:31:28 -08:00
}
}
2016-08-16 16:51:22 +02:00
}
2020-09-29 17:06:57 +02:00
Api::Server::~Server()
{
}
2018-02-17 14:27:49 +01:00
void Api::Server::addService(NetworkService *service)
{
m_networkManager.addService(service);
}
2018-02-15 19:50:12 +01:00
void Api::Server::newConnection(NetworkConnection &connection)
2016-08-16 16:51:22 +02:00
{
2021-02-16 22:08:16 +01:00
std::unique_lock<std::mutex> lock(m_mutex);
2019-06-10 17:06:27 +02:00
logDebug() << "server newConnection";
NewConnection con;
2021-02-18 10:06:25 +01:00
con.initialConnectionTime = time(nullptr);
2021-02-26 15:14:14 +01:00
// If we want to rate-limit by IP, we can't allow ipv6 addresses or we need a MUCH more
// sophisticated limiting system. Which may be nice should ipv6 start to actually see usage.
if (GetBoolArg("-api_disallow_v6", false) && connection.endPoint().ipAddress.is_v6()) {
logInfo() << "Rejecting incoming connection because its IPv6";
return;
}
if (!m_netProtect.shouldAccept(connection, con.initialConnectionTime))
2021-02-17 22:49:19 +01:00
return; // we don't accept
2021-02-26 15:14:14 +01:00
static const int maxIncoming = GetArg("-api_connection_per_ip", -1);
if (maxIncoming > 0) {
auto list = m_networkManager.connectionsFrom(connection.endPoint().ipAddress);
if (static_cast<int>(list.size()) >= maxIncoming) {
logInfo() << "Rejecting incoming connection from IP, we already have:" << list.size();
return; // we don't accept more than N open connections from a single IP
}
2021-02-17 22:49:19 +01:00
}
connection.setOnIncomingMessage(std::bind(&Api::Server::incomingMessage, this, std::placeholders::_1));
2018-02-15 19:50:12 +01:00
connection.setOnDisconnected(std::bind(&Api::Server::connectionRemoved, this, std::placeholders::_1));
2016-08-16 16:51:22 +02:00
connection.accept();
con.connection = std::move(connection);
m_newConnections.push_back(std::move(con));
if (!m_timerRunning) {
m_timerRunning = true;
m_newConnectionTimeout.expires_from_now(boost::posix_time::seconds(INTRODUCTION_TIMEOUT));
2018-02-15 19:50:12 +01:00
m_newConnectionTimeout.async_wait(std::bind(&Api::Server::checkConnections, this, std::placeholders::_1));
2016-08-16 16:51:22 +02:00
}
}
2018-02-15 19:50:12 +01:00
void Api::Server::connectionRemoved(const EndPoint &endPoint)
2016-08-16 16:51:22 +02:00
{
2021-02-16 22:08:16 +01:00
std::unique_lock<std::mutex> lock(m_mutex);
2016-08-16 16:51:22 +02:00
auto iter = m_newConnections.begin();
while (iter != m_newConnections.end()) {
if (iter->connection.connectionId() == endPoint.connectionId) {
m_newConnections.erase(iter);
break;
}
++iter;
}
auto conIter = m_connections.begin();
while (conIter != m_connections.end()) {
if ((*conIter)->m_connection.connectionId() == endPoint.connectionId) {
delete *conIter;
2020-09-02 13:49:09 +02:00
m_connections.erase(conIter);
2016-08-16 16:51:22 +02:00
break;
}
++conIter;
}
}
void Api::Server::incomingMessage(const Message &message)
2016-08-16 16:51:22 +02:00
{
2019-06-10 17:06:27 +02:00
logDebug() << "incomingMessage";
Connection *handler;
{
2021-02-16 22:08:16 +01:00
std::unique_lock<std::mutex> lock(m_mutex);
2019-06-10 17:06:27 +02:00
bool found = false;
auto iter = m_newConnections.begin();
while (iter != m_newConnections.end()) {
if (iter->connection.connectionId() == message.remote) {
m_newConnections.erase(iter);
2019-06-10 17:06:27 +02:00
found = true;
break;
}
++iter;
2016-08-16 16:51:22 +02:00
}
2019-06-10 17:06:27 +02:00
if (!found)
return;
NetworkConnection con(&m_networkManager, message.remote);
assert(con.isValid());
con.setOnDisconnected(std::bind(&Api::Server::connectionRemoved, this, std::placeholders::_1));
2020-09-29 17:06:57 +02:00
handler = new Connection(this, std::move(con));
2019-06-10 17:06:27 +02:00
m_connections.push_back(handler);
2016-08-16 16:51:22 +02:00
}
handler->incomingMessage(message);
2016-08-16 16:51:22 +02:00
}
2018-02-15 19:50:12 +01:00
void Api::Server::checkConnections(boost::system::error_code error)
2016-08-16 16:51:22 +02:00
{
if (error.value() == boost::asio::error::operation_aborted)
return;
2021-02-16 22:08:16 +01:00
std::unique_lock<std::mutex> lock(m_mutex);
2021-02-18 10:06:25 +01:00
const auto disconnectTime = time(nullptr) + INTRODUCTION_TIMEOUT;
2016-08-16 16:51:22 +02:00
auto iter = m_newConnections.begin();
while (iter != m_newConnections.end()) {
if (iter->initialConnectionTime <= disconnectTime) {
logDebug() << "Calling disconnect on connection" << iter->connection.connectionId() << "now";
2016-08-16 16:51:22 +02:00
iter->connection.disconnect();
iter = m_newConnections.erase(iter);
} else {
++iter;
}
}
// restart timer if there is still something left.
if (!m_newConnections.empty()) {
m_timerRunning = true;
m_newConnectionTimeout.expires_from_now(boost::posix_time::seconds(1));
2018-02-15 19:50:12 +01:00
m_newConnectionTimeout.async_wait(std::bind(&Api::Server::checkConnections, this, std::placeholders::_1));
2016-08-16 16:51:22 +02:00
} else {
m_timerRunning = false;
}
}
2020-09-29 17:06:57 +02:00
thread_local Streaming::BufferPool m_buffer(4000000);
Streaming::BufferPool &Api::Server::pool(int reserveSize) const
2016-08-16 16:51:22 +02:00
{
2020-09-29 17:06:57 +02:00
m_buffer.reserve(reserveSize);
return m_buffer;
}
NetworkConnection Api::Server::copyConnection(const NetworkConnection &orig)
{
return m_networkManager.connection(m_networkManager.endPoint(orig.connectionId()));
}
Message Api::Server::createFailedMessage(const Message &origin, const std::string &failReason) const
{
Streaming::MessageBuilder builder(pool(failReason.size() + 40));
builder.add(Meta::FailedReason, failReason);
builder.add(Meta::FailedCommandServiceId, origin.serviceId());
builder.add(Meta::FailedCommandId, origin.messageId());
Message answer = builder.message(APIService, Meta::CommandFailed);
for (auto header : origin.headerData()) {
if (header.first >= RequestId) // anything below is not allowed to be used by users.
answer.setHeaderInt(header.first, header.second);
}
return answer;
}
Api::Server::Connection::Connection(Server *parent, NetworkConnection && connection)
: m_connection(std::move(connection)),
m_parent(parent)
{
2018-02-15 19:50:12 +01:00
m_connection.setOnIncomingMessage(std::bind(&Api::Server::Connection::incomingMessage, this, std::placeholders::_1));
2016-08-16 16:51:22 +02:00
}
2019-03-09 16:36:13 +01:00
Api::Server::Connection::~Connection()
{
for (auto iter = m_properties.begin(); iter != m_properties.end(); ++iter) {
delete iter->second;
}
}
2018-02-15 19:50:12 +01:00
void Api::Server::Connection::incomingMessage(const Message &message)
2016-08-16 16:51:22 +02:00
{
2019-03-27 18:44:26 +01:00
if (message.serviceId() >= 16) // not a service we handle
return;
if (message.serviceId() == APIService && message.messageId() == Meta::Version) {
2020-09-29 17:06:57 +02:00
Streaming::MessageBuilder builder(m_parent->pool(50));
std::ostringstream ss;
2019-12-12 17:48:48 +01:00
ss << "Flowee:" << HUB_SERIES << " (" << CLIENT_VERSION_MAJOR << "-";
2019-12-31 18:05:24 +01:00
ss.width(2);
ss.fill('0');
ss << CLIENT_VERSION_MINOR << ")";
builder.add(Meta::GenericByteData, ss.str());
2019-06-22 23:24:00 +02:00
m_connection.send(builder.reply(message));
return;
}
2019-03-07 18:38:03 +01:00
std::unique_ptr<Api::Parser> parser;
2016-08-16 16:51:22 +02:00
try {
2019-03-07 18:38:03 +01:00
parser.reset(Api::createParser(message));
2016-08-16 16:51:22 +02:00
assert(parser.get()); // createParser should never return a nullptr
} catch (const std::exception &e) {
2018-03-19 22:40:50 +01:00
logWarning(Log::ApiServer) << e;
2016-08-16 16:51:22 +02:00
sendFailedMessage(message, e.what());
return;
}
assert(parser.get());
2019-03-09 16:36:13 +01:00
assert(message.serviceId() < 0xFFFF);
assert(message.messageId() < 0xFFFF);
2019-12-12 17:48:48 +01:00
const uint32_t sessionDataId = (static_cast<uint32_t>(message.serviceId()) << 16) + static_cast<uint32_t>(message.messageId());
2019-03-09 16:36:13 +01:00
parser.get()->setSessionData(&m_properties[sessionDataId]);
2016-08-16 16:51:22 +02:00
2020-09-29 17:06:57 +02:00
switch (parser->type()) {
case Parser::WrapsRPCCall:
2020-10-26 18:30:39 +01:00
handleRpcParser(parser, message);
2020-09-29 17:06:57 +02:00
break;
case Parser::IncludesHandler:
2020-10-26 18:30:39 +01:00
handleMainParser(std::move(parser), message);
2020-09-29 17:06:57 +02:00
break;
case Parser::ASyncParser:
2020-10-26 18:30:39 +01:00
startASyncParser(std::move(parser));
2020-09-29 17:06:57 +02:00
break;
}
}
2020-10-26 18:30:39 +01:00
void Api::Server::Connection::handleRpcParser(const std::unique_ptr<Parser> &parser, const Message &message)
2020-09-29 17:06:57 +02:00
{
2020-10-26 18:30:39 +01:00
auto *rpcParser = dynamic_cast<Api::RpcParser*>(parser.get());
2020-09-29 17:06:57 +02:00
assert(rpcParser);
assert(!rpcParser->method().empty());
try {
UniValue request(UniValue::VOBJ);
rpcParser->createRequest(message, request);
UniValue result;
2016-08-16 16:51:22 +02:00
try {
2020-09-29 17:06:57 +02:00
logInfo(Log::ApiServer) << rpcParser->method() << message.serviceId() << '/' << message.messageId();
result = tableRPC.execute(rpcParser->method(), request);
} catch (UniValue& objError) {
const std::string error = find_value(objError, "message").get_str();
logWarning(Log::ApiServer) << error;
2017-10-06 20:18:09 +02:00
sendFailedMessage(message, error);
2020-09-29 17:06:57 +02:00
return;
} catch(const std::exception &e) {
logWarning(Log::ApiServer) << e;
sendFailedMessage(message, std::string(e.what()));
return;
2016-08-16 16:51:22 +02:00
}
2020-09-29 17:06:57 +02:00
int reserveSize = rpcParser->messageSize(result);
Streaming::MessageBuilder builder(m_parent->pool(reserveSize));
rpcParser->buildReply(builder, result);
Message reply = builder.reply(message, rpcParser->replyMessageId());
if (reserveSize < reply.body().size())
logDebug(Log::ApiServer) << "Generated message larger than space reserved."
<< message.serviceId() << message.messageId()
<< "reserved:" << reserveSize << "built:" << reply.body().size();
assert(reply.body().size() <= reserveSize); // fail fast.
m_connection.send(reply);
} catch (const ParserException &e) {
logWarning(Log::ApiServer) << e;
sendFailedMessage(message, e.what());
return;
} catch (const std::exception &e) {
std::string error = "Interal Error " + std::string(e.what());
logCritical(Log::ApiServer) << "ApiServer internal error in parsing" << rpcParser->method() << e;
m_parent->pool(0).commit(); // make sure the partial message is discarded
sendFailedMessage(message, error);
}
}
2020-10-26 18:30:39 +01:00
void Api::Server::Connection::handleMainParser(const std::unique_ptr<Parser> &parser, const Message &message)
2020-09-29 17:06:57 +02:00
{
2020-10-26 18:30:39 +01:00
auto *directParser = dynamic_cast<Api::DirectParser*>(parser.get());
2020-09-29 17:06:57 +02:00
assert(directParser);
int reserveSize = 0;
try {
reserveSize = directParser->calculateMessageSize(message);
} catch (const ParserException &e) {
logWarning(Log::ApiServer) << "calculateMessageSize() threw:" << e;
sendFailedMessage(message, e.what());
2017-10-06 20:18:09 +02:00
return;
2021-02-16 18:36:40 +01:00
} catch (const std::exception &e) {
logWarning(Log::ApiServer) << "calculateMessageSize() threw:" << e;
sendFailedMessage(message, "unknown error");
return;
2017-10-06 20:18:09 +02:00
}
2020-09-29 17:06:57 +02:00
logInfo(Log::ApiServer) << message.serviceId() << '/' << message.messageId();
Streaming::MessageBuilder builder(m_parent->pool(reserveSize));
try {
directParser->buildReply(message, builder);
Message reply = builder.reply(message, directParser->replyMessageId());
if (reserveSize < reply.body().size())
logDebug(Log::ApiServer) << "Generated message larger than space reserved."
<< message.serviceId() << message.messageId()
<< "reserved:" << reserveSize << "built:" << reply.body().size();
assert(reply.body().size() <= reserveSize); // fail fast.
m_connection.send(reply);
} catch (const ParserException &e) {
2021-02-16 18:36:40 +01:00
logWarning(Log::ApiServer) << "buildReply() threw:" << e;
2020-09-29 17:06:57 +02:00
sendFailedMessage(message, e.what());
return;
2021-02-16 18:36:40 +01:00
} catch (const std::exception &e) {
logWarning(Log::ApiServer) << "buildReply() threw:" << e;
sendFailedMessage(message, "unknown error");
return;
2016-08-16 16:51:22 +02:00
}
}
2020-10-26 18:30:39 +01:00
void Api::Server::Connection::startASyncParser(std::unique_ptr<Parser> &&parser)
2020-09-29 17:06:57 +02:00
{
2020-10-26 18:30:39 +01:00
auto *asyncParser = dynamic_cast<Api::ASyncParser*>(parser.get());
2020-09-29 17:06:57 +02:00
assert(asyncParser);
2020-10-26 15:11:28 +01:00
while (!ShutdownRequested()) {
2020-09-29 17:06:57 +02:00
for (size_t i = 0; i < m_runningParsers.size(); ++i) {
2020-10-26 15:11:28 +01:00
auto &token = m_runningParsers[i];
if (!token.exchange(true)) {
asyncParser->start(&token,
m_parent->copyConnection(m_connection), m_parent);
2020-10-26 18:30:39 +01:00
parser.release(); // avoid double delete
2020-10-26 15:11:28 +01:00
return;
2020-09-29 17:06:57 +02:00
}
}
// if 'full' avoid burning CPU while waiting for some thread to finish.
struct timespec tim, tim2;
tim.tv_sec = 0;
tim.tv_nsec = 500;
nanosleep(&tim , &tim2);
}
}
2018-02-15 19:50:12 +01:00
void Api::Server::Connection::sendFailedMessage(const Message &origin, const std::string &failReason)
2016-08-16 16:51:22 +02:00
{
2020-09-29 17:06:57 +02:00
m_connection.send(m_parent->createFailedMessage(origin, failReason));
2016-08-16 16:51:22 +02:00
}
2019-03-09 16:36:13 +01:00
Api::SessionData::~SessionData()
{
}