/* * This file is part of the Flowee project * Copyright (C) 2016-2026 Tom Zander * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "APIServer.h" #include "APIRPCBinding.h" #include "APIProtocol.h" #include "streaming/MessageBuilder.h" #include "streaming/MessageParser.h" #include "streaming/BufferPools.h" #include "chainparamsbase.h" #include "netbase.h" #include "util.h" #include "utilstrencodings.h" #include "random.h" #include "rpcserver.h" #include "init.h" #include #include // the amount of seconds after which we disconnect incoming connections that have not done anything yet. #define INTRODUCTION_TIMEOUT 4 #ifdef __linux__ #ifndef _GNU_SOURCE # define _GNU_SOURCE /* To get defns of NI_MAXSERV and NI_MAXHOST */ #endif namespace { std::deque allInterfaces() { std::deque answer; struct ifaddrs *ifaddr; if (getifaddrs(&ifaddr) == -1) return answer; for (struct ifaddrs *ifa = ifaddr; ifa; ifa = ifa->ifa_next) { if (ifa->ifa_addr == nullptr) continue; int family = ifa->ifa_addr->sa_family; if (family != AF_INET && family != AF_INET6) continue; char host[NI_MAXHOST]; int s = getnameinfo(ifa->ifa_addr, (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6), host, NI_MAXHOST, nullptr, 0, NI_NUMERICHOST); if (s == 0) answer.push_back(std::string(host)); } freeifaddrs(ifaddr); return answer; } } #endif Api::Server::Server(boost::asio::io_context &context) : m_networkManager(context), m_netProtect(100), m_timerRunning(false), m_newConnectionTimeout(context) { uint16_t defaultPort = BaseParams().ApiServerPort(); using boost::asio::ip::tcp; std::list endpoints; if (mapArgs.count("-apibind")) { for (auto &strAddress : mapMultiArgs["-apibind"]) { uint16_t port = defaultPort; std::string host; SplitHostPort(strAddress, port, host); if (host.empty()) { host = "127.0.0.1"; } else if (host == "localhost") { endpoints.push_back(tcp::endpoint(boost::asio::ip::address_v4::loopback(), port)); endpoints.push_back(tcp::endpoint(boost::asio::ip::address_v6::loopback(), port)); continue; #ifdef __linux__ } else if (host == "0.0.0.0") { for (auto &iface : allInterfaces()) { endpoints.push_back(tcp::endpoint(boost::asio::ip::make_address(iface), port)); } continue; #endif } try { endpoints.push_back(tcp::endpoint(boost::asio::ip::make_address(host), port)); } catch (std::runtime_error &e) { logCritical(Log::ApiServer) << "Bind port needs to be an API address. Parsing failed with" << e; } } } else { endpoints.push_back(tcp::endpoint(boost::asio::ip::address_v4::loopback(), defaultPort)); endpoints.push_back(tcp::endpoint(boost::asio::ip::address_v6::loopback(), defaultPort)); } for (auto &endpoint : endpoints) { try { m_networkManager.bind(endpoint, std::bind(&Api::Server::newConnection, this, std::placeholders::_1)); logCritical(Log::ApiServer) << "Api Server listening on" << endpoint; m_netProtect.addWhitelistedAddress(endpoint.address()); } catch (const std::exception &e) { logCritical(Log::ApiServer) << "Api Server failed to listen on" << endpoint << "due to:" << e; } } } Api::Server::~Server() { // Make sure we get the callbacks about being disconnected while we're still alive. m_newConnections.clear(); for (auto i = m_connections.begin(); i != m_connections.end(); ++i) { delete *i; } m_connections.clear(); } void Api::Server::addService(NetworkService *service) { m_networkManager.addService(service); } void Api::Server::newConnection(NetworkConnection &connection) { std::unique_lock lock(m_mutex); logDebug() << "server newConnection"; NewConnection con; con.initialConnectionTime = time(nullptr); // If we want to rate-limit by IP, we can't allow ipv6 addresses or we need a MUCH more // sophisticated limiting system. Which may be nice should ipv6 start to actually see usage. if (GetBoolArg("-api_disallow_v6", false) && connection.endPoint().ipAddress.is_v6()) { logInfo() << "Rejecting incoming connection because its IPv6"; return; } if (!m_netProtect.shouldAccept(connection, con.initialConnectionTime)) return; // we don't accept static const int maxIncoming = GetArg("-api_connection_per_ip", -1); if (maxIncoming > 0) { auto list = m_networkManager.connectionsFrom(connection.endPoint().ipAddress); if (static_cast(list.size()) >= maxIncoming) { logInfo() << "Rejecting incoming connection from IP, we already have:" << list.size(); return; // we don't accept more than N open connections from a single IP } } connection.setOnIncomingMessage(std::bind(&Api::Server::incomingMessage, this, std::placeholders::_1)); connection.setOnDisconnected(std::bind(&Api::Server::connectionRemoved, this, std::placeholders::_1)); connection.accept(); con.connection = std::move(connection); m_newConnections.push_back(std::move(con)); if (!m_timerRunning) { m_timerRunning = true; m_newConnectionTimeout.expires_after(std::chrono::seconds(INTRODUCTION_TIMEOUT)); m_newConnectionTimeout.async_wait(std::bind(&Api::Server::checkConnections, this, std::placeholders::_1)); } } void Api::Server::connectionRemoved(const EndPoint &endPoint) { std::unique_lock lock(m_mutex); auto iter = m_newConnections.begin(); while (iter != m_newConnections.end()) { if (iter->connection.connectionId() == endPoint.connectionId) { m_newConnections.erase(iter); break; } ++iter; } auto conIter = m_connections.begin(); while (conIter != m_connections.end()) { if ((*conIter)->m_connection.connectionId() == endPoint.connectionId) { delete *conIter; m_connections.erase(conIter); break; } ++conIter; } } void Api::Server::incomingMessage(const Message &message) { logDebug() << "incomingMessage"; Connection *handler; { std::unique_lock lock(m_mutex); bool found = false; auto iter = m_newConnections.begin(); while (iter != m_newConnections.end()) { if (iter->connection.connectionId() == message.remote) { m_newConnections.erase(iter); found = true; break; } ++iter; } if (!found) return; NetworkConnection con(&m_networkManager, message.remote); assert(con.isValid()); con.setOnDisconnected(std::bind(&Api::Server::connectionRemoved, this, std::placeholders::_1)); handler = new Connection(this, std::move(con)); m_connections.push_back(handler); } handler->incomingMessage(message); } void Api::Server::checkConnections(boost::system::error_code error) { if (error.value() == boost::asio::error::operation_aborted) return; std::unique_lock lock(m_mutex); const auto disconnectTime = time(nullptr) + INTRODUCTION_TIMEOUT; auto iter = m_newConnections.begin(); while (iter != m_newConnections.end()) { if (iter->initialConnectionTime <= disconnectTime) { logDebug() << "Calling disconnect on connection" << iter->connection.connectionId() << "now"; iter->connection.disconnect(); iter = m_newConnections.erase(iter); } else { ++iter; } } // restart timer if there is still something left. if (!m_newConnections.empty()) { m_timerRunning = true; m_newConnectionTimeout.expires_after(std::chrono::seconds(1)); m_newConnectionTimeout.async_wait(std::bind(&Api::Server::checkConnections, this, std::placeholders::_1)); } else { m_timerRunning = false; } } NetworkConnection Api::Server::copyConnection(const NetworkConnection &orig) { return m_networkManager.connection(m_networkManager.endPoint(orig.connectionId())); } Message Api::Server::createFailedMessage(const Message &origin, const std::string &failReason) const { Streaming::MessageBuilder builder(Streaming::pool(failReason.size() + 40)); builder.add(Meta::FailedReason, failReason); builder.add(Meta::FailedCommandServiceId, origin.serviceId()); builder.add(Meta::FailedCommandId, origin.messageId()); Message answer = builder.message(APIService, Meta::CommandFailed); for (auto header : origin.headerData()) { if (header.first >= RequestId) // anything below is not allowed to be used by users. answer.setHeaderInt(header.first, header.second); } return answer; } Api::Server::Connection::Connection(Server *parent, NetworkConnection && connection) : m_connection(std::move(connection)), m_parent(parent) { m_connection.setOnIncomingMessage(std::bind(&Api::Server::Connection::incomingMessage, this, std::placeholders::_1)); } Api::Server::Connection::~Connection() { for (auto iter = m_properties.begin(); iter != m_properties.end(); ++iter) { delete iter->second; } } void Api::Server::Connection::incomingMessage(const Message &message) { if (message.serviceId() >= 16) // not a service we handle return; if (message.serviceId() == APIService && message.messageId() == Meta::Version) { Streaming::MessageBuilder builder(Streaming::pool(50)); std::ostringstream ss; ss << "Flowee:" << HUB_SERIES << " (" << CLIENT_VERSION_MAJOR << "-"; ss.width(2); ss.fill('0'); ss << CLIENT_VERSION_MINOR << ")"; builder.add(Meta::GenericByteData, ss.str()); m_connection.send(builder.reply(message)); return; } std::unique_ptr parser; try { parser.reset(Api::createParser(message)); assert(parser.get()); // createParser should never return a nullptr } catch (const std::exception &e) { logWarning(Log::ApiServer) << e; sendFailedMessage(message, e.what()); return; } assert(parser.get()); assert(message.serviceId() < 0xFFFF); assert(message.messageId() < 0xFFFF); const uint32_t sessionDataId = (static_cast(message.serviceId()) << 16) + static_cast(message.messageId()); parser.get()->setSessionData(&m_properties[sessionDataId]); switch (parser->type()) { case Parser::WrapsRPCCall: handleRpcParser(parser, message); break; case Parser::IncludesHandler: handleMainParser(std::move(parser), message); break; case Parser::ASyncParser: startASyncParser(std::move(parser)); break; } } void Api::Server::Connection::handleRpcParser(const std::unique_ptr &parser, const Message &message) { auto *rpcParser = dynamic_cast(parser.get()); assert(rpcParser); assert(!rpcParser->method().empty()); try { UniValue request(UniValue::VOBJ); rpcParser->createRequest(message, request); UniValue result; try { logInfo(Log::ApiServer) << rpcParser->method() << message.serviceId() << '/' << message.messageId(); result = tableRPC.execute(rpcParser->method(), request); } catch (UniValue& objError) { const std::string error = find_value(objError, "message").get_str(); logWarning(Log::ApiServer) << error; sendFailedMessage(message, error); return; } catch(const std::exception &e) { logWarning(Log::ApiServer) << e; sendFailedMessage(message, std::string(e.what())); return; } int reserveSize = rpcParser->messageSize(result); Streaming::MessageBuilder builder(Streaming::pool(reserveSize)); rpcParser->buildReply(builder, result); Message reply = builder.reply(message, rpcParser->replyMessageId()); if (reserveSize < reply.body().size()) logDebug(Log::ApiServer) << "Generated message larger than space reserved." << message.serviceId() << message.messageId() << "reserved:" << reserveSize << "built:" << reply.body().size(); assert(reply.body().size() <= reserveSize); // fail fast. m_connection.send(reply); } catch (const ParserException &e) { logWarning(Log::ApiServer) << e; sendFailedMessage(message, e.what()); return; } catch (const std::exception &e) { std::string error = "Interal Error " + std::string(e.what()); logCritical(Log::ApiServer) << "ApiServer internal error in parsing" << rpcParser->method() << e; Streaming::pool(0)->commit(); // make sure the partial message is discarded sendFailedMessage(message, error); } } void Api::Server::Connection::handleMainParser(const std::unique_ptr &parser, const Message &message) { auto *directParser = dynamic_cast(parser.get()); assert(directParser); int reserveSize = 0; try { reserveSize = directParser->calculateMessageSize(message); } catch (const ParserException &e) { logWarning(Log::ApiServer) << "calculateMessageSize() threw:" << e; sendFailedMessage(message, e.what()); return; } catch (const std::exception &e) { logWarning(Log::ApiServer) << "calculateMessageSize() threw:" << e; sendFailedMessage(message, "unknown error"); return; } logInfo(Log::ApiServer) << message.serviceId() << '/' << message.messageId(); Streaming::MessageBuilder builder(Streaming::pool(reserveSize)); try { directParser->buildReply(message, builder); Message reply = builder.reply(message, directParser->replyMessageId()); if (reserveSize < reply.body().size()) logDebug(Log::ApiServer) << "Generated message larger than space reserved." << message.serviceId() << message.messageId() << "reserved:" << reserveSize << "built:" << reply.body().size(); assert(reply.body().size() <= reserveSize); // fail fast. m_connection.send(reply); } catch (const ParserException &e) { logWarning(Log::ApiServer) << "buildReply() threw:" << e; sendFailedMessage(message, e.what()); return; } catch (const std::exception &e) { logWarning(Log::ApiServer) << "buildReply() threw:" << e; sendFailedMessage(message, "unknown error"); return; } } void Api::Server::Connection::startASyncParser(std::unique_ptr &&parser) { auto *asyncParser = dynamic_cast(parser.get()); assert(asyncParser); while (!ShutdownRequested()) { for (size_t i = 0; i < m_runningParsers.size(); ++i) { auto &token = m_runningParsers[i]; if (!token.exchange(true)) { asyncParser->start(&token, m_parent->copyConnection(m_connection), m_parent); parser.release(); // avoid double delete return; } } // if 'full' avoid burning CPU while waiting for some thread to finish. struct timespec tim, tim2; tim.tv_sec = 0; tim.tv_nsec = 500; nanosleep(&tim , &tim2); } } void Api::Server::Connection::sendFailedMessage(const Message &origin, const std::string &failReason) { m_connection.send(m_parent->createFailedMessage(origin, failReason)); } Api::SessionData::~SessionData() { }