449 lines
16 KiB
C++
449 lines
16 KiB
C++
/*
|
|
* This file is part of the Flowee project
|
|
* Copyright (C) 2016-2026 Tom Zander <tom@flowee.org>
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
#include "APIServer.h"
|
|
#include "APIRPCBinding.h"
|
|
#include "APIProtocol.h"
|
|
|
|
#include "streaming/MessageBuilder.h"
|
|
#include "streaming/MessageParser.h"
|
|
#include "streaming/BufferPools.h"
|
|
|
|
#include "chainparamsbase.h"
|
|
#include "netbase.h"
|
|
#include "util.h"
|
|
#include "utilstrencodings.h"
|
|
#include "random.h"
|
|
#include "rpcserver.h"
|
|
#include "init.h"
|
|
|
|
#include <fstream>
|
|
#include <functional>
|
|
|
|
// the amount of seconds after which we disconnect incoming connections that have not done anything yet.
|
|
#define INTRODUCTION_TIMEOUT 4
|
|
|
|
#ifdef __linux__
|
|
#ifndef _GNU_SOURCE
|
|
# define _GNU_SOURCE /* To get defns of NI_MAXSERV and NI_MAXHOST */
|
|
#endif
|
|
|
|
namespace {
|
|
std::deque<std::string> allInterfaces() {
|
|
std::deque<std::string> answer;
|
|
|
|
struct ifaddrs *ifaddr;
|
|
if (getifaddrs(&ifaddr) == -1)
|
|
return answer;
|
|
|
|
for (struct ifaddrs *ifa = ifaddr; ifa; ifa = ifa->ifa_next) {
|
|
if (ifa->ifa_addr == nullptr)
|
|
continue;
|
|
int family = ifa->ifa_addr->sa_family;
|
|
if (family != AF_INET && family != AF_INET6)
|
|
continue;
|
|
char host[NI_MAXHOST];
|
|
int s = getnameinfo(ifa->ifa_addr,
|
|
(family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6),
|
|
host, NI_MAXHOST,
|
|
nullptr, 0, NI_NUMERICHOST);
|
|
if (s == 0)
|
|
answer.push_back(std::string(host));
|
|
}
|
|
freeifaddrs(ifaddr);
|
|
return answer;
|
|
}
|
|
}
|
|
#endif
|
|
|
|
Api::Server::Server(boost::asio::io_context &context)
|
|
: m_networkManager(context),
|
|
m_netProtect(100),
|
|
m_timerRunning(false),
|
|
m_newConnectionTimeout(context)
|
|
{
|
|
uint16_t defaultPort = BaseParams().ApiServerPort();
|
|
using boost::asio::ip::tcp;
|
|
std::list<tcp::endpoint> endpoints;
|
|
|
|
if (mapArgs.count("-apibind")) {
|
|
for (auto &strAddress : mapMultiArgs["-apibind"]) {
|
|
uint16_t port = defaultPort;
|
|
std::string host;
|
|
SplitHostPort(strAddress, port, host);
|
|
if (host.empty()) {
|
|
host = "127.0.0.1";
|
|
} else if (host == "localhost") {
|
|
endpoints.push_back(tcp::endpoint(boost::asio::ip::address_v4::loopback(), port));
|
|
endpoints.push_back(tcp::endpoint(boost::asio::ip::address_v6::loopback(), port));
|
|
continue;
|
|
#ifdef __linux__
|
|
} else if (host == "0.0.0.0") {
|
|
for (auto &iface : allInterfaces()) {
|
|
endpoints.push_back(tcp::endpoint(boost::asio::ip::make_address(iface), port));
|
|
}
|
|
continue;
|
|
#endif
|
|
}
|
|
try {
|
|
endpoints.push_back(tcp::endpoint(boost::asio::ip::make_address(host), port));
|
|
} catch (std::runtime_error &e) {
|
|
logCritical(Log::ApiServer) << "Bind port needs to be an API address. Parsing failed with" << e;
|
|
}
|
|
}
|
|
} else {
|
|
endpoints.push_back(tcp::endpoint(boost::asio::ip::address_v4::loopback(), defaultPort));
|
|
endpoints.push_back(tcp::endpoint(boost::asio::ip::address_v6::loopback(), defaultPort));
|
|
}
|
|
|
|
for (auto &endpoint : endpoints) {
|
|
try {
|
|
m_networkManager.bind(endpoint, std::bind(&Api::Server::newConnection, this, std::placeholders::_1));
|
|
logCritical(Log::ApiServer) << "Api Server listening on" << endpoint;
|
|
m_netProtect.addWhitelistedAddress(endpoint.address());
|
|
} catch (const std::exception &e) {
|
|
logCritical(Log::ApiServer) << "Api Server failed to listen on" << endpoint << "due to:" << e;
|
|
}
|
|
}
|
|
}
|
|
|
|
Api::Server::~Server()
|
|
{
|
|
// Make sure we get the callbacks about being disconnected while we're still alive.
|
|
m_newConnections.clear();
|
|
for (auto i = m_connections.begin(); i != m_connections.end(); ++i) {
|
|
delete *i;
|
|
}
|
|
m_connections.clear();
|
|
}
|
|
|
|
void Api::Server::addService(NetworkService *service)
|
|
{
|
|
m_networkManager.addService(service);
|
|
}
|
|
|
|
void Api::Server::newConnection(NetworkConnection &connection)
|
|
{
|
|
std::unique_lock<std::mutex> lock(m_mutex);
|
|
logDebug() << "server newConnection";
|
|
NewConnection con;
|
|
con.initialConnectionTime = time(nullptr);
|
|
|
|
// If we want to rate-limit by IP, we can't allow ipv6 addresses or we need a MUCH more
|
|
// sophisticated limiting system. Which may be nice should ipv6 start to actually see usage.
|
|
if (GetBoolArg("-api_disallow_v6", false) && connection.endPoint().ipAddress.is_v6()) {
|
|
logInfo() << "Rejecting incoming connection because its IPv6";
|
|
return;
|
|
}
|
|
|
|
if (!m_netProtect.shouldAccept(connection, con.initialConnectionTime))
|
|
return; // we don't accept
|
|
|
|
static const int maxIncoming = GetArg("-api_connection_per_ip", -1);
|
|
if (maxIncoming > 0) {
|
|
auto list = m_networkManager.connectionsFrom(connection.endPoint().ipAddress);
|
|
if (static_cast<int>(list.size()) >= maxIncoming) {
|
|
logInfo() << "Rejecting incoming connection from IP, we already have:" << list.size();
|
|
return; // we don't accept more than N open connections from a single IP
|
|
}
|
|
}
|
|
|
|
connection.setOnIncomingMessage(std::bind(&Api::Server::incomingMessage, this, std::placeholders::_1));
|
|
connection.setOnDisconnected(std::bind(&Api::Server::connectionRemoved, this, std::placeholders::_1));
|
|
connection.accept();
|
|
con.connection = std::move(connection);
|
|
m_newConnections.push_back(std::move(con));
|
|
|
|
if (!m_timerRunning) {
|
|
m_timerRunning = true;
|
|
m_newConnectionTimeout.expires_after(std::chrono::seconds(INTRODUCTION_TIMEOUT));
|
|
m_newConnectionTimeout.async_wait(std::bind(&Api::Server::checkConnections, this, std::placeholders::_1));
|
|
}
|
|
}
|
|
|
|
void Api::Server::connectionRemoved(const EndPoint &endPoint)
|
|
{
|
|
std::unique_lock<std::mutex> lock(m_mutex);
|
|
auto iter = m_newConnections.begin();
|
|
while (iter != m_newConnections.end()) {
|
|
if (iter->connection.connectionId() == endPoint.connectionId) {
|
|
m_newConnections.erase(iter);
|
|
break;
|
|
}
|
|
++iter;
|
|
}
|
|
|
|
auto conIter = m_connections.begin();
|
|
while (conIter != m_connections.end()) {
|
|
if ((*conIter)->m_connection.connectionId() == endPoint.connectionId) {
|
|
delete *conIter;
|
|
m_connections.erase(conIter);
|
|
break;
|
|
}
|
|
++conIter;
|
|
}
|
|
}
|
|
|
|
void Api::Server::incomingMessage(const Message &message)
|
|
{
|
|
logDebug() << "incomingMessage";
|
|
Connection *handler;
|
|
{
|
|
std::unique_lock<std::mutex> lock(m_mutex);
|
|
bool found = false;
|
|
auto iter = m_newConnections.begin();
|
|
while (iter != m_newConnections.end()) {
|
|
if (iter->connection.connectionId() == message.remote) {
|
|
m_newConnections.erase(iter);
|
|
found = true;
|
|
break;
|
|
}
|
|
++iter;
|
|
}
|
|
|
|
if (!found)
|
|
return;
|
|
NetworkConnection con(&m_networkManager, message.remote);
|
|
assert(con.isValid());
|
|
con.setOnDisconnected(std::bind(&Api::Server::connectionRemoved, this, std::placeholders::_1));
|
|
|
|
handler = new Connection(this, std::move(con));
|
|
m_connections.push_back(handler);
|
|
}
|
|
handler->incomingMessage(message);
|
|
}
|
|
|
|
void Api::Server::checkConnections(boost::system::error_code error)
|
|
{
|
|
if (error.value() == boost::asio::error::operation_aborted)
|
|
return;
|
|
std::unique_lock<std::mutex> lock(m_mutex);
|
|
const auto disconnectTime = time(nullptr) + INTRODUCTION_TIMEOUT;
|
|
auto iter = m_newConnections.begin();
|
|
while (iter != m_newConnections.end()) {
|
|
if (iter->initialConnectionTime <= disconnectTime) {
|
|
logDebug() << "Calling disconnect on connection" << iter->connection.connectionId() << "now";
|
|
iter->connection.disconnect();
|
|
iter = m_newConnections.erase(iter);
|
|
} else {
|
|
++iter;
|
|
}
|
|
}
|
|
|
|
// restart timer if there is still something left.
|
|
if (!m_newConnections.empty()) {
|
|
m_timerRunning = true;
|
|
m_newConnectionTimeout.expires_after(std::chrono::seconds(1));
|
|
m_newConnectionTimeout.async_wait(std::bind(&Api::Server::checkConnections, this, std::placeholders::_1));
|
|
} else {
|
|
m_timerRunning = false;
|
|
}
|
|
}
|
|
|
|
NetworkConnection Api::Server::copyConnection(const NetworkConnection &orig)
|
|
{
|
|
return m_networkManager.connection(m_networkManager.endPoint(orig.connectionId()));
|
|
}
|
|
|
|
Message Api::Server::createFailedMessage(const Message &origin, const std::string &failReason) const
|
|
{
|
|
Streaming::MessageBuilder builder(Streaming::pool(failReason.size() + 40));
|
|
builder.add(Meta::FailedReason, failReason);
|
|
builder.add(Meta::FailedCommandServiceId, origin.serviceId());
|
|
builder.add(Meta::FailedCommandId, origin.messageId());
|
|
Message answer = builder.message(APIService, Meta::CommandFailed);
|
|
for (auto header : origin.headerData()) {
|
|
if (header.first >= RequestId) // anything below is not allowed to be used by users.
|
|
answer.setHeaderInt(header.first, header.second);
|
|
}
|
|
return answer;
|
|
}
|
|
|
|
|
|
Api::Server::Connection::Connection(Server *parent, NetworkConnection && connection)
|
|
: m_connection(std::move(connection)),
|
|
m_parent(parent)
|
|
{
|
|
m_connection.setOnIncomingMessage(std::bind(&Api::Server::Connection::incomingMessage, this, std::placeholders::_1));
|
|
}
|
|
|
|
Api::Server::Connection::~Connection()
|
|
{
|
|
for (auto iter = m_properties.begin(); iter != m_properties.end(); ++iter) {
|
|
delete iter->second;
|
|
}
|
|
}
|
|
|
|
void Api::Server::Connection::incomingMessage(const Message &message)
|
|
{
|
|
if (message.serviceId() >= 16) // not a service we handle
|
|
return;
|
|
if (message.serviceId() == APIService && message.messageId() == Meta::Version) {
|
|
Streaming::MessageBuilder builder(Streaming::pool(50));
|
|
std::ostringstream ss;
|
|
ss << "Flowee:" << HUB_SERIES << " (" << CLIENT_VERSION_MAJOR << "-";
|
|
ss.width(2);
|
|
ss.fill('0');
|
|
ss << CLIENT_VERSION_MINOR << ")";
|
|
builder.add(Meta::GenericByteData, ss.str());
|
|
m_connection.send(builder.reply(message));
|
|
return;
|
|
}
|
|
|
|
std::unique_ptr<Api::Parser> parser;
|
|
try {
|
|
parser.reset(Api::createParser(message));
|
|
assert(parser.get()); // createParser should never return a nullptr
|
|
} catch (const std::exception &e) {
|
|
logWarning(Log::ApiServer) << e;
|
|
sendFailedMessage(message, e.what());
|
|
return;
|
|
}
|
|
|
|
assert(parser.get());
|
|
assert(message.serviceId() < 0xFFFF);
|
|
assert(message.messageId() < 0xFFFF);
|
|
const uint32_t sessionDataId = (static_cast<uint32_t>(message.serviceId()) << 16) + static_cast<uint32_t>(message.messageId());
|
|
parser.get()->setSessionData(&m_properties[sessionDataId]);
|
|
|
|
switch (parser->type()) {
|
|
case Parser::WrapsRPCCall:
|
|
handleRpcParser(parser, message);
|
|
break;
|
|
case Parser::IncludesHandler:
|
|
handleMainParser(std::move(parser), message);
|
|
break;
|
|
case Parser::ASyncParser:
|
|
startASyncParser(std::move(parser));
|
|
break;
|
|
}
|
|
}
|
|
|
|
void Api::Server::Connection::handleRpcParser(const std::unique_ptr<Parser> &parser, const Message &message)
|
|
{
|
|
auto *rpcParser = dynamic_cast<Api::RpcParser*>(parser.get());
|
|
assert(rpcParser);
|
|
assert(!rpcParser->method().empty());
|
|
try {
|
|
UniValue request(UniValue::VOBJ);
|
|
rpcParser->createRequest(message, request);
|
|
UniValue result;
|
|
try {
|
|
logInfo(Log::ApiServer) << rpcParser->method() << message.serviceId() << '/' << message.messageId();
|
|
result = tableRPC.execute(rpcParser->method(), request);
|
|
} catch (UniValue& objError) {
|
|
const std::string error = find_value(objError, "message").get_str();
|
|
logWarning(Log::ApiServer) << error;
|
|
sendFailedMessage(message, error);
|
|
return;
|
|
} catch(const std::exception &e) {
|
|
logWarning(Log::ApiServer) << e;
|
|
sendFailedMessage(message, std::string(e.what()));
|
|
return;
|
|
}
|
|
int reserveSize = rpcParser->messageSize(result);
|
|
Streaming::MessageBuilder builder(Streaming::pool(reserveSize));
|
|
rpcParser->buildReply(builder, result);
|
|
Message reply = builder.reply(message, rpcParser->replyMessageId());
|
|
if (reserveSize < reply.body().size())
|
|
logDebug(Log::ApiServer) << "Generated message larger than space reserved."
|
|
<< message.serviceId() << message.messageId()
|
|
<< "reserved:" << reserveSize << "built:" << reply.body().size();
|
|
assert(reply.body().size() <= reserveSize); // fail fast.
|
|
m_connection.send(reply);
|
|
} catch (const ParserException &e) {
|
|
logWarning(Log::ApiServer) << e;
|
|
sendFailedMessage(message, e.what());
|
|
return;
|
|
} catch (const std::exception &e) {
|
|
std::string error = "Interal Error " + std::string(e.what());
|
|
logCritical(Log::ApiServer) << "ApiServer internal error in parsing" << rpcParser->method() << e;
|
|
Streaming::pool(0)->commit(); // make sure the partial message is discarded
|
|
sendFailedMessage(message, error);
|
|
}
|
|
}
|
|
|
|
void Api::Server::Connection::handleMainParser(const std::unique_ptr<Parser> &parser, const Message &message)
|
|
{
|
|
auto *directParser = dynamic_cast<Api::DirectParser*>(parser.get());
|
|
assert(directParser);
|
|
int reserveSize = 0;
|
|
try {
|
|
reserveSize = directParser->calculateMessageSize(message);
|
|
} catch (const ParserException &e) {
|
|
logWarning(Log::ApiServer) << "calculateMessageSize() threw:" << e;
|
|
sendFailedMessage(message, e.what());
|
|
return;
|
|
} catch (const std::exception &e) {
|
|
logWarning(Log::ApiServer) << "calculateMessageSize() threw:" << e;
|
|
sendFailedMessage(message, "unknown error");
|
|
return;
|
|
}
|
|
logInfo(Log::ApiServer) << message.serviceId() << '/' << message.messageId();
|
|
Streaming::MessageBuilder builder(Streaming::pool(reserveSize));
|
|
try {
|
|
directParser->buildReply(message, builder);
|
|
Message reply = builder.reply(message, directParser->replyMessageId());
|
|
if (reserveSize < reply.body().size())
|
|
logDebug(Log::ApiServer) << "Generated message larger than space reserved."
|
|
<< message.serviceId() << message.messageId()
|
|
<< "reserved:" << reserveSize << "built:" << reply.body().size();
|
|
assert(reply.body().size() <= reserveSize); // fail fast.
|
|
m_connection.send(reply);
|
|
} catch (const ParserException &e) {
|
|
logWarning(Log::ApiServer) << "buildReply() threw:" << e;
|
|
sendFailedMessage(message, e.what());
|
|
return;
|
|
} catch (const std::exception &e) {
|
|
logWarning(Log::ApiServer) << "buildReply() threw:" << e;
|
|
sendFailedMessage(message, "unknown error");
|
|
return;
|
|
}
|
|
}
|
|
|
|
void Api::Server::Connection::startASyncParser(std::unique_ptr<Parser> &&parser)
|
|
{
|
|
auto *asyncParser = dynamic_cast<Api::ASyncParser*>(parser.get());
|
|
assert(asyncParser);
|
|
while (!ShutdownRequested()) {
|
|
for (size_t i = 0; i < m_runningParsers.size(); ++i) {
|
|
auto &token = m_runningParsers[i];
|
|
if (!token.exchange(true)) {
|
|
asyncParser->start(&token,
|
|
m_parent->copyConnection(m_connection), m_parent);
|
|
parser.release(); // avoid double delete
|
|
return;
|
|
}
|
|
}
|
|
// if 'full' avoid burning CPU while waiting for some thread to finish.
|
|
struct timespec tim, tim2;
|
|
tim.tv_sec = 0;
|
|
tim.tv_nsec = 500;
|
|
nanosleep(&tim , &tim2);
|
|
}
|
|
}
|
|
|
|
void Api::Server::Connection::sendFailedMessage(const Message &origin, const std::string &failReason)
|
|
{
|
|
m_connection.send(m_parent->createFailedMessage(origin, failReason));
|
|
}
|
|
|
|
|
|
Api::SessionData::~SessionData()
|
|
{
|
|
}
|