1320 lines
50 KiB
C++
1320 lines
50 KiB
C++
/*
|
|
* This file is part of the Flowee project
|
|
* Copyright (C) 2016,2019-2020 Tom Zander <tomz@freedommail.ch>
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
#include "NetworkManager.h"
|
|
#include "NetworkManager_p.h"
|
|
#include "NetworkQueueFullError.h"
|
|
#include "NetworkServiceBase.h"
|
|
#include <NetworkEnums.h>
|
|
#include <APIProtocol.h>
|
|
|
|
#include <utils/hash.h>
|
|
#include <utils/streaming/MessageBuilder.h>
|
|
#include <utils/streaming/MessageParser.h>
|
|
#include <boost/lexical_cast.hpp>
|
|
#include <boost/make_shared.hpp>
|
|
|
|
#include <fstream>
|
|
|
|
// #define DEBUG_CONNECTIONS
|
|
|
|
constexpr int RECEIVE_STREAM_SIZE = 200000;
|
|
constexpr int CHUNK_SIZE = 8000;
|
|
constexpr int MAX_MESSAGE_SIZE = 9000;
|
|
constexpr int LEGACY_HEADER_SIZE = 24;
|
|
|
|
namespace {
|
|
|
|
thread_local Streaming::BufferPool m_buffer(10240); // used only really for message-headers
|
|
inline Streaming::BufferPool &pool(int reserveSize) {
|
|
m_buffer.reserve(reserveSize);
|
|
return m_buffer;
|
|
}
|
|
|
|
int reconnectTimeoutForStep(short step) {
|
|
if (step < 5)
|
|
return step*step*step / 2;
|
|
return 44;
|
|
}
|
|
|
|
Message buildPingMessage(bool outgoingConnection) {
|
|
Streaming::MessageBuilder builder(Streaming::HeaderOnly, 10);
|
|
builder.add(Network::ServiceId, Network::SystemServiceId);
|
|
if (outgoingConnection) // outgoing connections ping
|
|
builder.add(Network::Ping, true);
|
|
else
|
|
builder.add(Network::Pong, true);
|
|
builder.add(Network::HeaderEnd, true);
|
|
return builder.message();
|
|
}
|
|
|
|
}
|
|
|
|
|
|
NetworkManager::NetworkManager(boost::asio::io_service &service)
|
|
: d(std::make_shared<NetworkManagerPrivate>(service))
|
|
{
|
|
}
|
|
|
|
NetworkManager::~NetworkManager()
|
|
{
|
|
std::lock_guard<std::recursive_mutex> lock(d->mutex);
|
|
d->isClosingDown = true;
|
|
for (auto server : d->servers) {
|
|
server->shutdown();
|
|
}
|
|
for (auto it = d->connections.begin(); it != d->connections.end(); ++it) {
|
|
it->second->shutdown();
|
|
}
|
|
d->connections.clear(); // invalidate NetworkConnection references
|
|
for (auto service : d->services) {
|
|
service->setManager(nullptr);
|
|
}
|
|
d->services.clear();
|
|
d->unusedConnections.clear();
|
|
}
|
|
|
|
NetworkConnection NetworkManager::connection(const EndPoint &remote, ConnectionEnum connect)
|
|
{
|
|
const bool hasHostname = (!remote.ipAddress.is_unspecified() || !remote.hostname.empty()) && remote.announcePort > 0;
|
|
|
|
if (hasHostname) {
|
|
std::lock_guard<std::recursive_mutex> lock(d->mutex);
|
|
for (auto iter1 = d->connections.begin(); iter1 != d->connections.end(); ++iter1) {
|
|
EndPoint endPoint = iter1->second->endPoint();
|
|
if (!remote.hostname.empty() && endPoint.hostname != remote.hostname)
|
|
continue;
|
|
if (!remote.ipAddress.is_unspecified() && endPoint.ipAddress != remote.ipAddress)
|
|
continue;
|
|
if (!(endPoint.announcePort == 0 || endPoint.announcePort == remote.announcePort || remote.announcePort == 0))
|
|
continue;
|
|
return NetworkConnection(this, iter1->first);
|
|
}
|
|
|
|
if (connect == AutoCreate) {
|
|
EndPoint ep(remote);
|
|
if (ep.ipAddress.is_unspecified()) // try to see if hostname is an IP. If so, bypass DNS lookup
|
|
try { ep.ipAddress = boost::asio::ip::address::from_string(ep.hostname); } catch (...) {}
|
|
ep.peerPort = ep.announcePort; // outgoing connections always have those the same.
|
|
|
|
ep.connectionId = ++d->lastConnectionId;
|
|
if (d->unusedConnections.empty()) {
|
|
d->connections.insert(std::make_pair(ep.connectionId, std::make_shared<NetworkManagerConnection>(d, ep)));
|
|
} else {
|
|
auto con = d->unusedConnections.front();
|
|
d->unusedConnections.pop_front();
|
|
con->setEndPoint(ep);
|
|
d->connections.insert(std::make_pair(ep.connectionId, con));
|
|
}
|
|
|
|
return NetworkConnection(this, ep.connectionId);
|
|
}
|
|
}
|
|
return NetworkConnection();
|
|
}
|
|
|
|
EndPoint NetworkManager::endPoint(int remoteId) const
|
|
{
|
|
std::lock_guard<std::recursive_mutex> lock(d->mutex);
|
|
NetworkManagerConnection *con = d->connections.at(remoteId).get();
|
|
if (con)
|
|
return con->endPoint();
|
|
return EndPoint();
|
|
}
|
|
|
|
void NetworkManager::punishNode(int remoteId, int punishment)
|
|
{
|
|
d->punishNode(remoteId, punishment);
|
|
}
|
|
|
|
void NetworkManager::bind(const tcp::endpoint &endpoint, const std::function<void(NetworkConnection&)> &callback)
|
|
{
|
|
std::lock_guard<std::recursive_mutex> lock(d->mutex);
|
|
try {
|
|
NetworkManagerServer *server = new NetworkManagerServer(d, endpoint, callback);
|
|
d->servers.push_back(server);
|
|
} catch (std::exception &ex) {
|
|
logWarning(Log::NWM) << "Creating NetworkMangerServer failed with" << ex.what();
|
|
throw std::runtime_error("Failed to bind to endpoint");
|
|
}
|
|
|
|
if (d->servers.size() == 1) // start cron
|
|
d->cronHourly(boost::system::error_code());
|
|
}
|
|
|
|
void NetworkManager::bind(const tcp::endpoint &endpoint)
|
|
{
|
|
bind(endpoint, std::bind(&NetworkManagerPrivate::alwaysConnectingNewConnectionHandler, d, std::placeholders::_1));
|
|
}
|
|
|
|
void NetworkManager::addService(NetworkServiceBase *service)
|
|
{
|
|
assert(service);
|
|
if (!service) return;
|
|
std::lock_guard<std::recursive_mutex> lock(d->mutex);
|
|
d->services.push_back(service);
|
|
service->setManager(this);
|
|
}
|
|
|
|
void NetworkManager::removeService(NetworkServiceBase *service)
|
|
{
|
|
assert(service);
|
|
if (!service) return;
|
|
std::lock_guard<std::recursive_mutex> lock(d->mutex);
|
|
d->services.remove(service);
|
|
service->setManager(nullptr);
|
|
}
|
|
|
|
void NetworkManager::setMessageIdLookup(const std::map<int, std::string> &table)
|
|
{
|
|
d->messageIds = table;
|
|
d->messageIdsReverse.clear();
|
|
for (auto iter = table.begin(); iter != table.end(); ++iter) {
|
|
d->messageIdsReverse.insert(std::make_pair(iter->second, iter->first));
|
|
}
|
|
}
|
|
|
|
void NetworkManager::setLegacyNetworkId(const std::vector<uint8_t> &magic)
|
|
{
|
|
assert(magic.size() == 4);
|
|
for (size_t i = 0; i < 4; ++i)
|
|
d->networkId[i] = magic[i];
|
|
}
|
|
|
|
std::weak_ptr<NetworkManagerPrivate> NetworkManager::priv()
|
|
{
|
|
return d;
|
|
}
|
|
|
|
|
|
/////////////////////////////////////
|
|
|
|
|
|
NetworkManagerPrivate::NetworkManagerPrivate(boost::asio::io_service &service)
|
|
: ioService(service),
|
|
lastConnectionId(0),
|
|
isClosingDown(false),
|
|
m_cronHourly(service)
|
|
{
|
|
}
|
|
|
|
NetworkManagerPrivate::~NetworkManagerPrivate()
|
|
{
|
|
m_cronHourly.cancel();
|
|
}
|
|
|
|
void NetworkManagerPrivate::punishNode(int connectionId, int punishScore)
|
|
{
|
|
std::lock_guard<std::recursive_mutex> lock(mutex);
|
|
auto con = connections.find(connectionId);
|
|
if (con == connections.end())
|
|
return;
|
|
con->second->m_punishment += punishScore;
|
|
|
|
if (con->second->m_punishment >= 1000) {
|
|
BannedNode bn;
|
|
bn.endPoint = con->second->endPoint();
|
|
bn.banTimeout = boost::posix_time::second_clock::universal_time() + boost::posix_time::hours(24);
|
|
logInfo(Log::NWM) << "Banned node for 24 hours due to excessive bad behavior" << bn.endPoint.hostname;
|
|
banned.push_back(bn);
|
|
connections.erase(connectionId);
|
|
con->second->shutdown();
|
|
}
|
|
}
|
|
|
|
void NetworkManagerPrivate::cronHourly(const boost::system::error_code &error)
|
|
{
|
|
if (error)
|
|
return;
|
|
|
|
logDebug(Log::NWM) << "cronHourly";
|
|
std::lock_guard<std::recursive_mutex> lock(mutex);
|
|
if (isClosingDown)
|
|
return;
|
|
|
|
const auto now = boost::posix_time::second_clock::universal_time();
|
|
std::list<BannedNode>::iterator bannedNode = banned.begin();
|
|
// clean out banned nodes
|
|
while (bannedNode != banned.end()) {
|
|
if (bannedNode->banTimeout < now)
|
|
bannedNode = banned.erase(bannedNode);
|
|
else
|
|
++bannedNode;
|
|
}
|
|
for (auto connection : connections) {
|
|
connection.second->m_punishment = std::max<short>(0, connection.second->m_punishment - 100);
|
|
// logDebug(Log::NWM) << "peer ban scrore;" << connection.second->m_punishment;
|
|
}
|
|
m_cronHourly.expires_from_now(boost::posix_time::hours(1));
|
|
m_cronHourly.async_wait(std::bind(&NetworkManagerPrivate::cronHourly, this, std::placeholders::_1));
|
|
}
|
|
|
|
|
|
/////////////////////////////////////
|
|
|
|
NetworkManagerConnection::NetworkManagerConnection(const std::shared_ptr<NetworkManagerPrivate> &parent, tcp::socket socket, int connectionId)
|
|
: m_strand(parent->ioService),
|
|
d(parent),
|
|
m_socket(std::move(socket)),
|
|
m_resolver(parent->ioService),
|
|
m_lastCallbackId(1),
|
|
m_isClosingDown(false),
|
|
m_isConnected(true),
|
|
m_reconnectDelay(parent->ioService),
|
|
m_pingTimer(parent->ioService),
|
|
m_sendTimer(parent->ioService),
|
|
m_chunkedMessageBuffer(0)
|
|
{
|
|
m_remote.ipAddress = m_socket.remote_endpoint().address();
|
|
m_remote.announcePort = m_socket.remote_endpoint().port();
|
|
m_remote.hostname = m_remote.ipAddress.to_string();
|
|
m_remote.peerPort = 0;
|
|
m_remote.connectionId = connectionId;
|
|
}
|
|
|
|
NetworkManagerConnection::NetworkManagerConnection(const std::shared_ptr<NetworkManagerPrivate> &parent, const EndPoint &remote)
|
|
: m_strand(parent->ioService),
|
|
d(parent),
|
|
m_remote(remote),
|
|
m_socket(parent->ioService),
|
|
m_resolver(parent->ioService),
|
|
m_receiveStream(RECEIVE_STREAM_SIZE),
|
|
m_lastCallbackId(1),
|
|
m_isClosingDown(false),
|
|
m_isConnected(false),
|
|
m_reconnectDelay(parent->ioService),
|
|
m_pingTimer(parent->ioService),
|
|
m_sendTimer(parent->ioService)
|
|
{
|
|
if (m_remote.peerPort == 0)
|
|
m_remote.peerPort = m_remote.announcePort;
|
|
}
|
|
|
|
void NetworkManagerConnection::connect()
|
|
{
|
|
m_isClosingDown.store(false);
|
|
runOnStrand(std::bind(&NetworkManagerConnection::connect_priv, shared_from_this()));
|
|
}
|
|
|
|
void NetworkManagerConnection::connect_priv()
|
|
{
|
|
assert(m_strand.running_in_this_thread());
|
|
assert(m_remote.announcePort == m_remote.peerPort); // its an outgoing connection
|
|
if (m_isConnecting)
|
|
return;
|
|
if (m_isClosingDown)
|
|
return;
|
|
m_isConnecting = true;
|
|
allocateBuffers();
|
|
|
|
if (m_remote.ipAddress.is_unspecified()) {
|
|
tcp::resolver::query query(m_remote.hostname, boost::lexical_cast<std::string>(m_remote.announcePort));
|
|
m_resolver.async_resolve(query, m_strand.wrap(std::bind(&NetworkManagerConnection::onAddressResolveComplete,
|
|
shared_from_this(), std::placeholders::_1, std::placeholders::_2)));
|
|
} else {
|
|
if (m_remote.hostname.empty())
|
|
m_remote.hostname = m_remote.ipAddress.to_string();
|
|
boost::asio::ip::tcp::endpoint endpoint(m_remote.ipAddress, m_remote.announcePort);
|
|
std::lock_guard<std::mutex> lock(d->connectionMutex);
|
|
m_socket = boost::asio::ip::tcp::socket(d->ioService);
|
|
m_socket.async_connect(endpoint, m_strand.wrap(
|
|
std::bind(&NetworkManagerConnection::onConnectComplete, shared_from_this(), std::placeholders::_1)));
|
|
}
|
|
}
|
|
|
|
void NetworkManagerConnection::onAddressResolveComplete(const boost::system::error_code &error, tcp::resolver::iterator iterator)
|
|
{
|
|
if (m_isClosingDown)
|
|
return;
|
|
if (error) {
|
|
logWarning(Log::NWM).nospace() << "connect[" << m_remote << "] " << error.message() << " (" << error.value() << ")";
|
|
m_isConnecting = false;
|
|
m_reconnectDelay.expires_from_now(boost::posix_time::seconds(45));
|
|
m_reconnectDelay.async_wait(m_strand.wrap(std::bind(&NetworkManagerConnection::reconnectWithCheck,
|
|
shared_from_this(), std::placeholders::_1)));
|
|
errorDetected(error);
|
|
return;
|
|
}
|
|
assert(m_strand.running_in_this_thread());
|
|
// Notice that we always only use the first reported DNS entry. Which is likely Ok.
|
|
m_remote.ipAddress = iterator->endpoint().address();
|
|
logInfo(Log::NWM) << "Outgoing connection to" << m_remote.hostname << "resolved to:" << m_remote.ipAddress.to_string();
|
|
|
|
std::lock_guard<std::mutex> lock(d->connectionMutex);
|
|
m_socket = boost::asio::ip::tcp::socket(d->ioService);
|
|
m_socket.async_connect(*iterator, m_strand.wrap(
|
|
std::bind(&NetworkManagerConnection::onConnectComplete, shared_from_this(), std::placeholders::_1)));
|
|
}
|
|
|
|
void NetworkManagerConnection::onConnectComplete(const boost::system::error_code& error)
|
|
{
|
|
if (m_isClosingDown)
|
|
return;
|
|
m_isConnecting = false;
|
|
if (error) {
|
|
if (error == boost::asio::error::operation_aborted) return;
|
|
logWarning(Log::NWM).nospace() << "connect[" << m_remote.hostname.c_str() << ":" << m_remote.announcePort
|
|
<< "] (" << error.message() << ")";
|
|
if (m_remote.peerPort != m_remote.announcePort) // incoming connection
|
|
return;
|
|
m_reconnectDelay.expires_from_now(boost::posix_time::seconds(reconnectTimeoutForStep(++m_reconnectStep)));
|
|
m_reconnectDelay.async_wait(m_strand.wrap(std::bind(&NetworkManagerConnection::reconnectWithCheck,
|
|
shared_from_this(), std::placeholders::_1)));
|
|
errorDetected(error);
|
|
return;
|
|
}
|
|
m_isConnected = true;
|
|
assert(m_strand.running_in_this_thread());
|
|
logInfo(Log::NWM) << "Successfully made TCP connection to" << m_remote.hostname.c_str() << m_remote.announcePort;
|
|
|
|
for (auto it = m_onConnectedCallbacks.begin(); it != m_onConnectedCallbacks.end(); ++it) {
|
|
try {
|
|
it->second(m_remote);
|
|
} catch (const std::exception &ex) {
|
|
logWarning(Log::NWM) << "onConnected threw exception, ignoring:" << ex.what();
|
|
}
|
|
}
|
|
|
|
runMessageQueue();
|
|
requestMoreBytes(); // setup a callback for receiving.
|
|
|
|
// for outgoing connections, ping. Notice that I don't care if they pong, as long as the TCP connection stays open
|
|
if (m_messageHeaderType == FloweeNative) {
|
|
m_pingTimer.expires_from_now(boost::posix_time::seconds(90));
|
|
m_pingTimer.async_wait(m_strand.wrap(std::bind(&NetworkManagerConnection::sendPing, shared_from_this(),
|
|
std::placeholders::_1)));
|
|
}
|
|
}
|
|
|
|
Streaming::ConstBuffer NetworkManagerConnection::createHeader(const Message &message)
|
|
{
|
|
assert(message.serviceId() >= 0);
|
|
if (message.serviceId() == Api::LegacyP2P) {
|
|
const auto body = message.body();
|
|
|
|
auto &sendHelperBuffer = pool(4 + 12 + 4 + 4);
|
|
memcpy(sendHelperBuffer.data(), d->networkId, 4);
|
|
sendHelperBuffer.markUsed(4);
|
|
auto m = d->messageIds.find(message.messageId());
|
|
std::string messageId;
|
|
if (m != d->messageIds.end())
|
|
messageId = m->second;
|
|
else
|
|
logCritical() << "createHeader[legacy]: P2P message Id unknown:" << message.messageId();
|
|
assert(messageId.size() <= 12);
|
|
memcpy(sendHelperBuffer.data(), messageId.c_str(), messageId.size());
|
|
for (size_t i = messageId.size(); i < 12; ++i) { // rest of version is zero-filled
|
|
sendHelperBuffer.data()[i] = 0;
|
|
}
|
|
sendHelperBuffer.markUsed(12);
|
|
const uint32_t messageSize = body.size();
|
|
WriteLE32(reinterpret_cast<uint8_t*>(sendHelperBuffer.data()), messageSize);
|
|
sendHelperBuffer.markUsed(4);
|
|
|
|
uint256 hash = Hash(body.begin(), body.end());
|
|
unsigned int checksum = 0;
|
|
memcpy(&checksum, &hash, 4);
|
|
WriteLE32(reinterpret_cast<uint8_t*>(sendHelperBuffer.data()), checksum);
|
|
return sendHelperBuffer.commit(4);
|
|
}
|
|
else {
|
|
const auto map = message.headerData();
|
|
auto &sendHelperBuffer = pool(10 * static_cast<int>(map.size()));
|
|
Streaming::MessageBuilder builder(sendHelperBuffer, Streaming::HeaderOnly);
|
|
auto iter = map.begin();
|
|
while (iter != map.end()) {
|
|
assert(iter->first >= 0);
|
|
builder.add(static_cast<uint32_t>(iter->first), iter->second);
|
|
++iter;
|
|
}
|
|
builder.add(Network::HeaderEnd, true);
|
|
assert(sendHelperBuffer.size() + message.size() < MAX_MESSAGE_SIZE);
|
|
builder.setMessageSize(sendHelperBuffer.size() + message.size());
|
|
logDebug(Log::NWM) << "createHeader of message of length;" << sendHelperBuffer.size() << '+' << message.size();
|
|
return builder.buffer();
|
|
}
|
|
}
|
|
|
|
void NetworkManagerConnection::errorDetected(const boost::system::error_code &error)
|
|
{
|
|
if (error == boost::asio::error::operation_aborted || !error) // no need to push those up the stack
|
|
return;
|
|
std::vector<std::function<void(int,const boost::system::error_code& error)> > callbacks;
|
|
callbacks.reserve(m_onErrorCallbacks.size());
|
|
for (auto it = m_onErrorCallbacks.begin(); it != m_onErrorCallbacks.end(); ++it) {
|
|
callbacks.push_back(it->second);
|
|
}
|
|
for (auto callback : callbacks) {
|
|
try {
|
|
callback(m_remote.connectionId, error);
|
|
} catch (const std::exception &e) {
|
|
logCritical(Log::NWM) << "Callback 'onError' threw with" << e;
|
|
}
|
|
}
|
|
}
|
|
|
|
void NetworkManagerConnection::runMessageQueue()
|
|
{
|
|
assert(m_strand.running_in_this_thread());
|
|
if (m_sendingInProgress || (m_messageQueue->isRead() && m_priorityMessageQueue->isRead()) || !isConnected())
|
|
return;
|
|
|
|
m_sendingInProgress = true;
|
|
|
|
/*
|
|
* This method will schedule sending of data.
|
|
* The data to send is pushed async to the network stack and the callback will come in essentially
|
|
* the moment the network stack has accepted the data. This is not at all any confirmation that
|
|
* the other side accepted it!
|
|
* But at the same time, the network stack has limited buffers and will only push to the network
|
|
* an amount based on the TCP window size. So at minimum we know that the speed with which we
|
|
* send stuff is indicative of the throughput.
|
|
*
|
|
* The idea here is to send a maximum amount of 250KB at a time. Which should be enough to avoid
|
|
* delays. The speed limiter here mean we still allow messages that were pushed to the front of the
|
|
* queue to be handled at a good speed.
|
|
*/
|
|
int bytesLeft = 250*1024;
|
|
std::vector<Streaming::ConstBuffer> socketQueue; // the stuff we will send over the socket
|
|
|
|
while (m_priorityMessageQueue->hasUnread()) {
|
|
const Message &message = m_priorityMessageQueue->unreadTip();
|
|
int headerSize = message.header().size();
|
|
if (m_sendQHeaders->isFull())
|
|
break;
|
|
if (!message.hasHeader()) { // build a simple header
|
|
const Streaming::ConstBuffer constBuf = createHeader(message);
|
|
headerSize = constBuf.size();
|
|
bytesLeft -= headerSize;
|
|
socketQueue.push_back(constBuf);
|
|
m_sendQHeaders->append(constBuf);
|
|
}
|
|
assert(message.body().size() + headerSize < MAX_MESSAGE_SIZE);
|
|
socketQueue.push_back(message.rawData());
|
|
bytesLeft -= message.rawData().size();
|
|
m_priorityMessageQueue->markRead();
|
|
if (bytesLeft <= 0)
|
|
break;
|
|
}
|
|
|
|
while (m_messageQueue->hasUnread()) {
|
|
if (bytesLeft <= 0)
|
|
break;
|
|
if (m_sendQHeaders->isFull())
|
|
break;
|
|
const Message &message = m_messageQueue->unreadTip();
|
|
if (message.rawData().size() > CHUNK_SIZE && message.serviceId() != Api::LegacyP2P) {
|
|
assert(!message.hasHeader()); // should have been blocked from entering in queueMessage();
|
|
|
|
/*
|
|
* The maximum size of a message is 9KB. This helps a lot with memory allocations and zero-copy ;)
|
|
* A large message is then split into smaller ones and send with individual headers
|
|
* to the other side where they can be re-connected.
|
|
*/
|
|
Streaming::ConstBuffer body(message.body());
|
|
const char *begin = body.begin() + m_messageBytesSend;
|
|
const char *end = body.end();
|
|
Streaming::ConstBuffer chunkHeader;// the first and last are different, but all the ones in the middle are duplicates.
|
|
bool first = m_messageBytesSend == 0;
|
|
while (begin < end) {
|
|
const char *p = begin + CHUNK_SIZE;
|
|
if (p > end)
|
|
p = end;
|
|
m_messageBytesSend += p - begin;
|
|
Streaming::ConstBuffer bodyChunk(body.internal_buffer(), begin, p);
|
|
begin = p;
|
|
|
|
Streaming::ConstBuffer header;
|
|
if (first || begin == end || !chunkHeader.isValid()) {
|
|
const auto headerData = message.headerData();
|
|
auto &sendHelperBuffer = pool(20 + 8 * headerData.size());
|
|
Streaming::MessageBuilder headerBuilder(sendHelperBuffer, Streaming::HeaderOnly);
|
|
headerBuilder.add(Network::ServiceId, message.serviceId());
|
|
if (first) {
|
|
for (auto iter = headerData.begin(); iter != headerData.end(); ++iter) {
|
|
if (iter->first == Network::ServiceId) // forced to be first.
|
|
continue;
|
|
headerBuilder.add(iter->first, iter->second);
|
|
}
|
|
headerBuilder.add(Network::SequenceStart, body.size());
|
|
} else if (message.messageId() >= 0) {
|
|
headerBuilder.add(Network::MessageId, message.messageId());
|
|
}
|
|
headerBuilder.add(Network::LastInSequence, (begin == end));
|
|
headerBuilder.add(Network::HeaderEnd, true);
|
|
assert(sendHelperBuffer.size() + bodyChunk.size() < MAX_MESSAGE_SIZE);
|
|
headerBuilder.setMessageSize(sendHelperBuffer.size() + bodyChunk.size());
|
|
|
|
header = headerBuilder.buffer();
|
|
if (!first)
|
|
chunkHeader = header;
|
|
first = false;
|
|
} else {
|
|
header = chunkHeader;
|
|
}
|
|
bytesLeft -= header.size();
|
|
socketQueue.push_back(header);
|
|
m_sendQHeaders->append(header);
|
|
|
|
socketQueue.push_back(bodyChunk);
|
|
bytesLeft -= bodyChunk.size();
|
|
|
|
if (bytesLeft <= 0)
|
|
break;
|
|
}
|
|
if (begin >= end) { // done with message.
|
|
m_messageBytesSend = 0;
|
|
m_messageQueue->markRead();
|
|
}
|
|
}
|
|
else {
|
|
if (!message.hasHeader()) { // build a simple header
|
|
const Streaming::ConstBuffer constBuf = createHeader(message);
|
|
bytesLeft -= constBuf.size();
|
|
socketQueue.push_back(constBuf);
|
|
m_sendQHeaders->append(constBuf);
|
|
}
|
|
socketQueue.push_back(message.rawData());
|
|
bytesLeft -= message.rawData().size();
|
|
m_messageQueue->markRead();
|
|
}
|
|
}
|
|
assert(m_messageBytesSend >= 0);
|
|
|
|
boost::asio::async_write(m_socket, socketQueue,
|
|
m_strand.wrap(std::bind(&NetworkManagerConnection::sentSomeBytes, shared_from_this(),
|
|
std::placeholders::_1, std::placeholders::_2)));
|
|
}
|
|
|
|
void NetworkManagerConnection::sentSomeBytes(const boost::system::error_code& error, std::size_t bytes_transferred)
|
|
{
|
|
if (m_isClosingDown)
|
|
return;
|
|
|
|
m_sendingInProgress = false;
|
|
if (error) {
|
|
logWarning(Log::NWM) << "send received error" << error.message();
|
|
m_messageBytesSend = 0;
|
|
m_messageBytesSent = 0;
|
|
m_sendQHeaders->clear();
|
|
m_messageQueue->markAllUnread();
|
|
m_priorityMessageQueue->markAllUnread();
|
|
runOnStrand(std::bind(&NetworkManagerConnection::connect, shared_from_this()));
|
|
return;
|
|
}
|
|
assert(m_strand.running_in_this_thread());
|
|
if (!m_socket.is_open())
|
|
return;
|
|
logDebug(Log::NWM) << "Managed to send" << bytes_transferred << "bytes";
|
|
m_reconnectStep = 0;
|
|
|
|
m_messageQueue->removeAllRead();
|
|
m_priorityMessageQueue->removeAllRead();
|
|
m_sendQHeaders->clear();
|
|
|
|
runMessageQueue();
|
|
}
|
|
|
|
void NetworkManagerConnection::receivedSomeBytes(const boost::system::error_code& error, std::size_t bytes_transferred)
|
|
{
|
|
if (m_isClosingDown)
|
|
return;
|
|
if (error) {
|
|
logDebug(Log::NWM) << "receivedSomeBytes errored:" << error.message();
|
|
// first copy to avoid problems if a callback removes its callback or closes the connection.
|
|
std::vector<std::function<void(const EndPoint&)> > callbacks;
|
|
callbacks.reserve(m_onDisConnectedCallbacks.size());
|
|
for (auto it = m_onDisConnectedCallbacks.begin(); it != m_onDisConnectedCallbacks.end(); ++it) {
|
|
callbacks.push_back(it->second);
|
|
}
|
|
|
|
for (auto callback : callbacks) {
|
|
try {
|
|
callback(m_remote);
|
|
} catch (const std::exception &ex) {
|
|
logInfo(Log::NWM) << "onDisconnected caused exception, ignoring:" << ex;
|
|
}
|
|
}
|
|
close();
|
|
return;
|
|
}
|
|
assert(m_strand.running_in_this_thread());
|
|
m_receiveStream.markUsed(static_cast<int>(bytes_transferred)); // move write pointer
|
|
|
|
while (true) { // get all packets out
|
|
const size_t blockSize = static_cast<size_t>(m_receiveStream.size());
|
|
if (blockSize < 4) // need more data
|
|
break;
|
|
Streaming::ConstBuffer data = m_receiveStream.createBufferSlice(m_receiveStream.begin(), m_receiveStream.end());
|
|
|
|
if (m_firstPacket) {
|
|
m_firstPacket = false;
|
|
if (m_messageHeaderType == FloweeNative) {
|
|
if (data.begin()[2] != 8) { // Positive integer (0) and Network::ServiceId (1 << 3)
|
|
logWarning(Log::NWM) << "receive; Data error from remote - this is NOT an NWM server. Disconnecting" << m_remote.hostname;
|
|
disconnect();
|
|
return;
|
|
}
|
|
} else {
|
|
assert(m_messageHeaderType == LegacyP2P);
|
|
bool ok = true;
|
|
for (size_t i = 0; ok && i < 4; ++i)
|
|
ok |= data.begin()[i] == d->networkId[i];
|
|
if (!ok) {
|
|
logWarning(Log::NWM) << "receive; Data error from remote - this is NOT an P@P server. Disconnecting" << m_remote.hostname;
|
|
disconnect();
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
if(m_messageHeaderType == LegacyP2P) {
|
|
if (data.size() < LEGACY_HEADER_SIZE) // wait for entire header
|
|
break;
|
|
|
|
const uint32_t bodyLength = ReadLE32(reinterpret_cast<const uint8_t*>(data.begin() + 16));
|
|
if (bodyLength > 32000000) {
|
|
logWarning(Log::NWM).nospace() << "receive; Data error from server - stream is corrupt ("
|
|
<< "bl=" << bodyLength << ")";
|
|
close(false);
|
|
return;
|
|
}
|
|
if (data.size() < LEGACY_HEADER_SIZE + static_cast<int>(bodyLength)) // do we have all data for this one?
|
|
break;
|
|
|
|
if (!processLegacyPacket(m_receiveStream.internal_buffer(), data.begin()))
|
|
return;
|
|
m_receiveStream.forget(bodyLength + LEGACY_HEADER_SIZE);
|
|
}
|
|
else {
|
|
const unsigned int rawHeader = *(reinterpret_cast<const unsigned int*>(data.begin()));
|
|
const int packetLength = (rawHeader & 0xFFFF);
|
|
logDebug(Log::NWM) << "Processing incoming packet. Size" << packetLength;
|
|
if (packetLength > MAX_MESSAGE_SIZE) {
|
|
logWarning(Log::NWM).nospace() << "receive; Data error from server - stream is corrupt ("
|
|
<< "pl=" << packetLength << ")";
|
|
close();
|
|
return;
|
|
}
|
|
if (data.size() < packetLength) // do we have all data for this one?
|
|
break;
|
|
if (!processPacket(m_receiveStream.internal_buffer(), data.begin()))
|
|
return;
|
|
m_receiveStream.forget(packetLength);
|
|
}
|
|
}
|
|
requestMoreBytes_callback(boost::system::error_code());
|
|
}
|
|
|
|
/*
|
|
* when we generate more messages than can be sent, we start throttling the incoming
|
|
* message flow. The basic thought is that more incoming messages means more outgoing
|
|
* messages will be generated.
|
|
* As such it makes sense to start slowing down what we sent in order to avoid memory buffers
|
|
* for send-queues growing out of proportion.
|
|
*/
|
|
void NetworkManagerConnection::requestMoreBytes_callback(const boost::system::error_code &error)
|
|
{
|
|
if (error)
|
|
return;
|
|
|
|
const int backlog = m_messageQueue->size() + m_priorityMessageQueue->size();
|
|
if (backlog < 1000)
|
|
requestMoreBytes();
|
|
else if (isConnected()) {
|
|
int wait = 2;
|
|
if (backlog > 2000)
|
|
wait = 30;
|
|
else if (backlog > 1500)
|
|
wait = 10;
|
|
m_sendTimer.expires_from_now(boost::posix_time::milliseconds(wait));
|
|
m_sendTimer.async_wait(m_strand.wrap(std::bind(&NetworkManagerConnection::requestMoreBytes_callback,
|
|
shared_from_this(), std::placeholders::_1)));
|
|
runMessageQueue();
|
|
}
|
|
}
|
|
|
|
void NetworkManagerConnection::requestMoreBytes()
|
|
{
|
|
m_receiveStream.reserve(MAX_MESSAGE_SIZE);
|
|
assert(m_receiveStream.capacity() > 0);
|
|
m_socket.async_receive(boost::asio::buffer(m_receiveStream.data(), static_cast<size_t>(m_receiveStream.capacity())),
|
|
m_strand.wrap(std::bind(&NetworkManagerConnection::receivedSomeBytes, shared_from_this(),
|
|
std::placeholders::_1, std::placeholders::_2)));
|
|
}
|
|
|
|
bool NetworkManagerConnection::processPacket(const std::shared_ptr<char> &buffer, const char *data)
|
|
{
|
|
assert(m_strand.running_in_this_thread());
|
|
const unsigned int rawHeader = *(reinterpret_cast<const unsigned int*>(data));
|
|
const int packetLength = (rawHeader & 0xFFFF);
|
|
logDebug(Log::NWM) << "Receive packet length" << packetLength;
|
|
|
|
const char *messageStart = data + 2;
|
|
Streaming::MessageParser parser(Streaming::ConstBuffer(buffer, messageStart, messageStart + packetLength));
|
|
Streaming::ParsedType type = parser.next();
|
|
int headerSize = 0;
|
|
int messageId = -1;
|
|
int serviceId = -1;
|
|
int lastInSequence = -1;
|
|
int sequenceSize = -1;
|
|
bool isPing = false;
|
|
// TODO have a variable on the NetworkManger that indicates the maximum allowed combined message-size.
|
|
|
|
std::map<int, int> messageHeaderData;
|
|
bool inHeader = true;
|
|
while (inHeader && type == Streaming::FoundTag) {
|
|
switch (parser.tag()) {
|
|
case Network::HeaderEnd:
|
|
headerSize = parser.consumed();
|
|
inHeader = false;
|
|
break;
|
|
case Network::MessageId:
|
|
if (!parser.isInt()) {
|
|
close();
|
|
return false;
|
|
}
|
|
messageId = parser.intData();
|
|
break;
|
|
case Network::ServiceId:
|
|
if (!parser.isInt()) {
|
|
close();
|
|
return false;
|
|
}
|
|
serviceId = parser.intData();
|
|
break;
|
|
case Network::LastInSequence:
|
|
if (!parser.isBool()) {
|
|
close();
|
|
return false;
|
|
}
|
|
lastInSequence = parser.boolData() ? 1 : 0;
|
|
break;
|
|
case Network::SequenceStart:
|
|
if (!parser.isInt()) {
|
|
close();
|
|
return false;
|
|
}
|
|
sequenceSize = parser.intData();
|
|
break;
|
|
case Network::Ping:
|
|
isPing = true;
|
|
break;
|
|
default:
|
|
if (parser.isInt() && parser.tag() < 0xFFFFFF) {
|
|
if (parser.tag() <= 10) { // illegal header tag for users.
|
|
logInfo(Log::NWM) << " header uses illegal tag. Malformed: re-connecting";
|
|
close();
|
|
return false;
|
|
}
|
|
messageHeaderData.insert(std::make_pair(static_cast<int>(parser.tag()), parser.intData()));
|
|
}
|
|
break;
|
|
}
|
|
|
|
type = parser.next();
|
|
}
|
|
if (inHeader) {
|
|
logInfo(Log::NWM) << " header malformed, re-connecting";
|
|
close();
|
|
return false;
|
|
}
|
|
|
|
if (serviceId == -1) { // an obligatory field
|
|
logWarning(Log::NWM) << "peer sent message without serviceId";
|
|
close();
|
|
return false;
|
|
}
|
|
|
|
if (serviceId == Network::SystemServiceId) { // Handle System level messages
|
|
if (isPing) {
|
|
if (m_remote.peerPort == m_remote.announcePort) {
|
|
// we should never get pings from a remote when we initiated the connection.
|
|
disconnect();
|
|
return false;
|
|
}
|
|
m_pingTimer.cancel();
|
|
if (!m_messageQueue->isFull()) {
|
|
queueMessage(m_pingMessage, NetworkConnection::NormalPriority);
|
|
m_pingTimer.expires_from_now(boost::posix_time::seconds(120));
|
|
m_pingTimer.async_wait(m_strand.wrap(std::bind(&NetworkManagerConnection::pingTimeout, this, std::placeholders::_1)));
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
Message message;
|
|
// we assume they are in sequence (which is Ok with TCP sockets), but we don't assume that
|
|
// each packet is part of the sequence.
|
|
if (lastInSequence != -1) {
|
|
if (sequenceSize != -1) {
|
|
if (m_chunkedMessageId != -1 || m_chunkedServiceId != -1) { // Didn't finish another. Thats illegal.
|
|
logWarning(Log::NWM) << "peer sent sequenced message with wrong combination of headers";
|
|
close();
|
|
return false;
|
|
}
|
|
m_chunkedMessageId = messageId;
|
|
m_chunkedServiceId = serviceId;
|
|
m_chunkedMessageBuffer = Streaming::BufferPool(sequenceSize);
|
|
m_chunkedHeaderData = messageHeaderData;
|
|
}
|
|
else if (m_chunkedMessageId != messageId || m_chunkedServiceId != serviceId) { // Changed. Thats illegal.
|
|
close();
|
|
logWarning(Log::NWM) << "peer sent sequenced message with inconsistent service/messageId";
|
|
return false;
|
|
}
|
|
const int bodyLength = packetLength - headerSize - 2;
|
|
if (m_chunkedMessageBuffer.capacity() < bodyLength) {
|
|
logWarning(Log::NWM) << "peer sent sequenced message with too much data";
|
|
return false;
|
|
}
|
|
|
|
logDebug(Log::NWM) << "Message received as part of sequence; last:" << lastInSequence
|
|
<< "total-size:" << sequenceSize;
|
|
std::copy(data + headerSize + 2, data + packetLength, m_chunkedMessageBuffer.data());
|
|
m_chunkedMessageBuffer.markUsed(bodyLength);
|
|
if (lastInSequence == 0)
|
|
return true;
|
|
|
|
message = Message(m_chunkedMessageBuffer.commit(), m_chunkedServiceId);
|
|
messageHeaderData = m_chunkedHeaderData;
|
|
m_chunkedMessageId = -1;
|
|
m_chunkedServiceId = -1;
|
|
m_chunkedMessageBuffer.clear();
|
|
}
|
|
else {
|
|
message = Message(buffer, data + 2, data + 2 + headerSize, data + packetLength);
|
|
}
|
|
message.setMessageId(messageId);
|
|
message.setServiceId(serviceId);
|
|
for (auto iter = messageHeaderData.begin(); iter != messageHeaderData.end(); ++iter) {
|
|
message.setHeaderInt(iter->first, iter->second);
|
|
}
|
|
message.remote = m_remote.connectionId;
|
|
|
|
// first copy to avoid problems if a callback removes its callback or closes the connection.
|
|
std::vector<std::function<void(const Message&)> > callbacks;
|
|
callbacks.reserve(m_onIncomingMessageCallbacks.size());
|
|
for (auto it = m_onIncomingMessageCallbacks.begin(); it != m_onIncomingMessageCallbacks.end(); ++it) {
|
|
callbacks.push_back(it->second);
|
|
}
|
|
|
|
for (auto callback : callbacks) {
|
|
try {
|
|
callback(message);
|
|
} catch (const NetworkQueueFullError &e) {
|
|
logDebug(Log::NWM) << "connection::onIncomingMessage tried to send, but failed (and didn't catch exception) dropping message" << e;
|
|
} catch (const std::exception &ex) {
|
|
logWarning(Log::NWM) << "connection::onIncomingMessage threw exception, ignoring:" << ex;
|
|
}
|
|
if (!m_socket.is_open())
|
|
break;
|
|
}
|
|
std::list<NetworkServiceBase*> servicesCopy;
|
|
{
|
|
std::lock_guard<std::recursive_mutex> lock(d->mutex);
|
|
servicesCopy = d->services;
|
|
}
|
|
for (auto service : servicesCopy) {
|
|
if (!m_socket.is_open())
|
|
break;
|
|
if (service->id() == serviceId) {
|
|
try {
|
|
service->onIncomingMessage(message, m_remote);
|
|
} catch (const std::exception &ex) {
|
|
logWarning(Log::NWM) << "service::onIncomingMessage threw exception, ignoring:" << ex;
|
|
}
|
|
}
|
|
}
|
|
|
|
return m_socket.is_open(); // if the user called disconnect, then stop processing packages
|
|
}
|
|
|
|
bool NetworkManagerConnection::processLegacyPacket(const std::shared_ptr<char> &buffer, const char *data)
|
|
{
|
|
assert(m_strand.running_in_this_thread());
|
|
const int bodyLength = ReadLE32(reinterpret_cast<const uint8_t*>(data + 16));
|
|
logDebug(Log::NWM) << "Receive legacy-packet Body-length:" << bodyLength;
|
|
|
|
char buf[13];
|
|
memcpy(buf, data + 4, 12);
|
|
buf[12] = 0;
|
|
auto m = d->messageIdsReverse.find(std::string(buf));
|
|
if (m == d->messageIdsReverse.end()) {
|
|
logWarning(Log::NWM) << "Incoming message has unknown type:" << std::string(data + 4, 12);
|
|
return true; // skip
|
|
}
|
|
Message message(buffer, data, data + LEGACY_HEADER_SIZE, data + LEGACY_HEADER_SIZE + bodyLength);
|
|
|
|
message.setMessageId(m->second);
|
|
message.setServiceId(Api::LegacyP2P);
|
|
message.remote = m_remote.connectionId;
|
|
|
|
// first copy to avoid problems if a callback removes its callback or closes the connection.
|
|
std::vector<std::function<void(const Message&)> > callbacks;
|
|
callbacks.reserve(m_onIncomingMessageCallbacks.size());
|
|
for (auto it = m_onIncomingMessageCallbacks.begin(); it != m_onIncomingMessageCallbacks.end(); ++it) {
|
|
callbacks.push_back(it->second);
|
|
}
|
|
|
|
for (auto callback : callbacks) {
|
|
try {
|
|
callback(message);
|
|
} catch (const NetworkQueueFullError &e) {
|
|
logDebug(Log::NWM) << "connection::onIncomingMessage tried to send, but failed (and didn't catch exception) dropping message" << e;
|
|
} catch (const std::exception &ex) {
|
|
logWarning(Log::NWM) << "connection::onIncomingMessage (LegacyP2P) threw exception, ignoring:" << ex;
|
|
}
|
|
if (!m_socket.is_open())
|
|
break;
|
|
}
|
|
|
|
return m_socket.is_open(); // if the user called disconnect, then stop processing packages
|
|
}
|
|
|
|
void NetworkManagerConnection::addOnConnectedCallback(int id, std::function<void(const EndPoint&)> callback)
|
|
{
|
|
assert(m_strand.running_in_this_thread());
|
|
m_onConnectedCallbacks.insert(std::make_pair(id, callback));
|
|
}
|
|
|
|
void NetworkManagerConnection::addOnDisconnectedCallback(int id, std::function<void(const EndPoint&)> callback)
|
|
{
|
|
assert(m_strand.running_in_this_thread());
|
|
m_onDisConnectedCallbacks.insert(std::make_pair(id, callback));
|
|
}
|
|
|
|
void NetworkManagerConnection::addOnIncomingMessageCallback(int id, std::function<void(const Message&)> callback)
|
|
{
|
|
assert(m_strand.running_in_this_thread());
|
|
m_onIncomingMessageCallbacks.insert(std::make_pair(id, callback));
|
|
}
|
|
|
|
void NetworkManagerConnection::addOnError(int id, std::function<void (int, const boost::system::error_code &)> callback)
|
|
{
|
|
assert(m_strand.running_in_this_thread());
|
|
m_onErrorCallbacks.insert(std::make_pair(id, callback));
|
|
}
|
|
|
|
void NetworkManagerConnection::queueMessage(const Message &message, NetworkConnection::MessagePriority priority)
|
|
{
|
|
if (!message.hasHeader() && message.serviceId() == -1)
|
|
throw NetworkException("queueMessage: Can't deliver a message with unset service ID");
|
|
if (message.hasHeader() && message.body().size() > CHUNK_SIZE)
|
|
throw NetworkException("queueMessage: Can't send large message and can't auto-chunk because it already has a header");
|
|
if (priority != NetworkConnection::NormalPriority && message.rawData().size() > CHUNK_SIZE)
|
|
throw NetworkException("queueMessage: Can't send large message in the priority queue");
|
|
|
|
// we have a chunk size of 8K and a max message size of 9K. The 1000 bytes is for headers and worse case is around
|
|
// 10 bytes per item plus some extra stuff. So we reject any messages with more than 95 header items.
|
|
if (message.headerData().size() > 95)
|
|
NetworkException("queueMessage: Can't send message with too much header items");
|
|
|
|
if (m_strand.running_in_this_thread()) {
|
|
allocateBuffers();
|
|
if (priority == NetworkConnection::NormalPriority) {
|
|
if (m_messageQueue->isFull())
|
|
throw NetworkQueueFullError("MessageQueue full");
|
|
m_messageQueue->append(message);
|
|
} else {
|
|
if (m_priorityMessageQueue->isFull())
|
|
throw NetworkQueueFullError("PriorityMessageQueue full");
|
|
m_priorityMessageQueue->append(message);
|
|
}
|
|
if (isConnected())
|
|
runMessageQueue();
|
|
else
|
|
connect_priv();
|
|
} else {
|
|
runOnStrand(std::bind(&NetworkManagerConnection::queueMessage, this, message, priority));
|
|
}
|
|
}
|
|
|
|
void NetworkManagerConnection::close(bool reconnect)
|
|
{
|
|
assert(m_strand.running_in_this_thread());
|
|
if (!isOutgoing()) {
|
|
std::lock_guard<std::recursive_mutex> lock(d->mutex);
|
|
shutdown();
|
|
d->connections.erase(m_remote.connectionId);
|
|
return;
|
|
}
|
|
if (!reconnect)
|
|
m_isClosingDown = true;
|
|
|
|
m_receiveStream.clear();
|
|
m_chunkedMessageBuffer.clear();
|
|
m_chunkedMessageId = -1;
|
|
m_chunkedServiceId = -1;
|
|
m_chunkedHeaderData.clear();
|
|
m_messageBytesSend = 0;
|
|
m_messageBytesSent = 0;
|
|
m_reconnectDelay.cancel();
|
|
m_resolver.cancel();
|
|
m_sendQHeaders->clear();
|
|
if (m_isConnected)
|
|
m_socket.close();
|
|
m_pingTimer.cancel();
|
|
m_firstPacket = true;
|
|
m_isConnected = false;
|
|
m_isConnecting = false;
|
|
if (reconnect && !m_isClosingDown) { // auto reconnect.
|
|
if (m_firstPacket) { // this means the network is there, someone is listening. They just don't speak our language.
|
|
// slow down reconnect due to bad peer.
|
|
m_reconnectDelay.expires_from_now(boost::posix_time::seconds(15));
|
|
m_reconnectDelay.async_wait(m_strand.wrap(std::bind(&NetworkManagerConnection::reconnectWithCheck, this, std::placeholders::_1)));
|
|
} else {
|
|
connect_priv();
|
|
}
|
|
}
|
|
}
|
|
|
|
void NetworkManagerConnection::sendPing(const boost::system::error_code &error)
|
|
{
|
|
if (error) return;
|
|
logDebug(Log::NWM) << "ping";
|
|
|
|
if (m_isClosingDown)
|
|
return;
|
|
assert (m_messageHeaderType != LegacyP2P);
|
|
assert(m_strand.running_in_this_thread());
|
|
if (!m_socket.is_open())
|
|
return;
|
|
int time = 90;
|
|
if (m_messageQueue->isFull()) {
|
|
if (m_priorityMessageQueue->isFull())
|
|
time = 2; // delay sending ping
|
|
else
|
|
queueMessage(m_pingMessage, NetworkConnection::HighPriority);
|
|
} else {
|
|
queueMessage(m_pingMessage, NetworkConnection::NormalPriority);
|
|
}
|
|
m_pingTimer.expires_from_now(boost::posix_time::seconds(time));
|
|
m_pingTimer.async_wait(m_strand.wrap(std::bind(&NetworkManagerConnection::sendPing, this, std::placeholders::_1)));
|
|
}
|
|
|
|
void NetworkManagerConnection::pingTimeout(const boost::system::error_code &error)
|
|
{
|
|
// note that this is only for incoming connections.
|
|
if (!error) {
|
|
logWarning(Log::NWM) << "Didn't receive a ping from peer for too long, disconnecting dead connection";
|
|
disconnect();
|
|
}
|
|
}
|
|
|
|
void NetworkManagerConnection::allocateBuffers()
|
|
{
|
|
if (m_messageQueue.get() == nullptr || m_messageQueue->reserved() != m_queueSizeMain) {
|
|
m_messageQueue.reset(new RingBuffer<Message>(m_queueSizeMain));
|
|
m_priorityMessageQueue.reset(new RingBuffer<Message>(m_priorityQueueSize));
|
|
m_sendQHeaders.reset(new RingBuffer<Streaming::ConstBuffer>(m_queueSizeMain));
|
|
|
|
m_pingMessage = buildPingMessage(m_remote.peerPort == m_remote.announcePort);
|
|
}
|
|
}
|
|
|
|
void NetworkManagerConnection::reconnectWithCheck(const boost::system::error_code& error)
|
|
{
|
|
if (!error) {
|
|
m_socket.close();
|
|
connect_priv();
|
|
}
|
|
}
|
|
|
|
int NetworkManagerConnection::nextCallbackId()
|
|
{
|
|
return m_lastCallbackId.fetch_add(1);
|
|
}
|
|
|
|
void NetworkManagerConnection::removeAllCallbacksFor(int id)
|
|
{
|
|
assert(m_strand.running_in_this_thread());
|
|
m_onConnectedCallbacks.erase(id);
|
|
m_onDisConnectedCallbacks.erase(id);
|
|
m_onIncomingMessageCallbacks.erase(id);
|
|
m_onErrorCallbacks.erase(id);
|
|
}
|
|
|
|
void NetworkManagerConnection::shutdown()
|
|
{
|
|
m_isClosingDown = true;
|
|
if (m_strand.running_in_this_thread()) {
|
|
m_onConnectedCallbacks.clear();
|
|
m_onDisConnectedCallbacks.clear();
|
|
m_onIncomingMessageCallbacks.clear();
|
|
m_onErrorCallbacks.clear();
|
|
if (isConnected())
|
|
m_socket.close();
|
|
m_resolver.cancel();
|
|
m_reconnectDelay.cancel();
|
|
m_strand.post(std::bind(&NetworkManagerConnection::finalShutdown, shared_from_this()));
|
|
} else {
|
|
m_strand.post(std::bind(&NetworkManagerConnection::shutdown, shared_from_this()));
|
|
}
|
|
}
|
|
|
|
void NetworkManagerConnection::accept()
|
|
{
|
|
if (m_acceptedConnection)
|
|
return;
|
|
m_acceptedConnection = true;
|
|
allocateBuffers();
|
|
|
|
// setup a callback for receiving.
|
|
m_socket.async_receive(boost::asio::buffer(m_receiveStream.data(), static_cast<size_t>(m_receiveStream.capacity())),
|
|
m_strand.wrap(std::bind(&NetworkManagerConnection::receivedSomeBytes, shared_from_this(),
|
|
std::placeholders::_1, std::placeholders::_2)));
|
|
|
|
// for incoming connections, take action when no ping comes in.
|
|
m_pingTimer.expires_from_now(boost::posix_time::seconds(120));
|
|
m_pingTimer.async_wait(m_strand.wrap(std::bind(&NetworkManagerConnection::pingTimeout, this, std::placeholders::_1)));
|
|
}
|
|
|
|
void NetworkManagerConnection::recycleConnection()
|
|
{
|
|
assert(m_strand.running_in_this_thread());
|
|
m_onConnectedCallbacks.clear();
|
|
m_onDisConnectedCallbacks.clear();
|
|
m_onIncomingMessageCallbacks.clear();
|
|
m_onErrorCallbacks.clear();
|
|
setMessageQueueSizes(2000, 20); // set back to defaults.
|
|
m_punishment = 0;
|
|
close(false);
|
|
std::lock_guard<std::recursive_mutex> lock(d->mutex); // protects connections maps
|
|
if (d->connections.erase(m_remote.connectionId))
|
|
d->unusedConnections.push_back(shared_from_this());
|
|
}
|
|
|
|
void NetworkManagerConnection::runOnStrand(const std::function<void()> &function)
|
|
{
|
|
if (m_isClosingDown)
|
|
return;
|
|
m_strand.post(function);
|
|
}
|
|
|
|
void NetworkManagerConnection::punish(int amount)
|
|
{
|
|
d->punishNode(m_remote.connectionId, amount);
|
|
}
|
|
|
|
void NetworkManagerConnection::setMessageHeaderType(MessageHeaderType messageHeaderType)
|
|
{
|
|
if (m_messageHeaderType == messageHeaderType)
|
|
return;
|
|
m_messageHeaderType = messageHeaderType;
|
|
switch (m_messageHeaderType) {
|
|
case FloweeNative:
|
|
if (isOutgoing()) {
|
|
m_pingTimer.expires_from_now(boost::posix_time::seconds(30));
|
|
m_pingTimer.async_wait(m_strand.wrap(std::bind(&NetworkManagerConnection::sendPing, this, std::placeholders::_1)));
|
|
}
|
|
break;
|
|
case LegacyP2P:
|
|
m_pingTimer.cancel();
|
|
break;
|
|
default:
|
|
assert(false);
|
|
break;
|
|
}
|
|
}
|
|
|
|
void NetworkManagerConnection::finalShutdown()
|
|
{
|
|
}
|
|
|
|
NetworkManagerServer::NetworkManagerServer(const std::shared_ptr<NetworkManagerPrivate> &parent, tcp::endpoint endpoint, const std::function<void(NetworkConnection&)> &callback)
|
|
: d(parent),
|
|
m_acceptor(parent->ioService, endpoint),
|
|
m_socket(parent->ioService),
|
|
onIncomingConnection(callback)
|
|
{
|
|
setupCallback();
|
|
}
|
|
|
|
void NetworkManagerServer::shutdown()
|
|
{
|
|
m_socket.close();
|
|
}
|
|
|
|
void NetworkManagerServer::setupCallback()
|
|
{
|
|
m_acceptor.async_accept(m_socket, std::bind(&NetworkManagerServer::acceptConnection, this, std::placeholders::_1));
|
|
}
|
|
|
|
void NetworkManagerServer::acceptConnection(boost::system::error_code error)
|
|
{
|
|
if (error.value() == boost::asio::error::operation_aborted)
|
|
return;
|
|
logDebug(Log::NWM) << "acceptTcpConnection" << error.message();
|
|
if (error) {
|
|
setupCallback();
|
|
return;
|
|
}
|
|
std::shared_ptr<NetworkManagerPrivate> priv = d.lock();
|
|
if (!priv.get())
|
|
return;
|
|
|
|
std::lock_guard<std::recursive_mutex> lock(priv->mutex);
|
|
if (priv->isClosingDown)
|
|
return;
|
|
|
|
struct RIAA {
|
|
RIAA(NetworkManagerServer *p) : parent(p) {}
|
|
~RIAA() {
|
|
parent->setupCallback();
|
|
}
|
|
NetworkManagerServer *parent;
|
|
};
|
|
RIAA riaa(this);
|
|
|
|
try {
|
|
// catch ENOTCONN (Transport endpoint is not connected) which remote_endpoint() may throw
|
|
const boost::asio::ip::address peerAddress = m_socket.remote_endpoint().address();
|
|
for (const BannedNode &bn : priv->banned) {
|
|
if (bn.endPoint.ipAddress == peerAddress) {
|
|
if (bn.banTimeout > boost::posix_time::second_clock::universal_time()) { // incoming connection is banned.
|
|
logInfo(Log::NWM) << "acceptTcpConnection; closing incoming connection (banned)"
|
|
<< bn.endPoint.hostname;
|
|
m_socket.close();
|
|
}
|
|
return;
|
|
}
|
|
}
|
|
|
|
const int conId = ++priv->lastConnectionId;
|
|
logDebug(Log::NWM) << "acceptTcpConnection; creating new connection object" << conId;
|
|
// Never do a setupCallback until we do a 'std::move' (or disconnect) to avoid an "Already open" error
|
|
std::shared_ptr<NetworkManagerConnection> connection = std::make_shared<NetworkManagerConnection>(priv, std::move(m_socket), conId);
|
|
priv->connections.insert(std::make_pair(conId, connection));
|
|
logDebug(Log::NWM) << "Total connections now;" << priv->connections.size();
|
|
|
|
NetworkConnection con(connection, conId);
|
|
try {
|
|
onIncomingConnection(con);
|
|
} catch (const std::exception &e) {
|
|
logCritical(Log::NWM) << "subsystem handling onIncomingConnection threw. Ignoring" << e;
|
|
}
|
|
|
|
// someone needs to call accept(), if they didn't we shall disconnect
|
|
if (!connection->acceptedConnection())
|
|
connection->m_strand.post(std::bind(&NetworkManagerConnection::disconnect, connection));
|
|
|
|
} catch (...) {
|
|
logInfo(Log::NWM) << "AcceptConnection found that peer closed before we could handle it.";
|
|
try { m_socket.close(); } catch (...) {} // TODO do we need this?
|
|
}
|
|
}
|