File transmitter.cpp¶
File List > connection > transmitter.cpp
Go to the documentation of this file
/*
*
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* transmitter.cpp
*/
#include "system_parameter.h"
#include "connection_defs.h"
#include "connection_statistics.hpp"
#include "transmitter.hpp"
#include "error_manager.h"
#include "span.hpp"
#include "object_primitive.h"
#include <unistd.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
#if 0
#define er_log_conn(...) er_log_debug (__VA_ARGS__)
#else
#define er_log_conn(...)
#endif
#define NEXT_STATE(x) do { \
_er_log_debug (__FILE__, __LINE__, "transmitter state %d -> state = %d\n", m_state, state::x); \
(m_state = state::x); \
} while (0)
namespace cubconn
{
transmitter::transmitter (statistics::metrics<statistics::context> *stats) :
m_buf (IOV_MAX),
m_stats (stats)
{
m_deleter.reserve (IOV_MAX);
}
transmitter::transmitter ()
{
}
transmitter::~transmitter ()
{
}
result transmitter::fill (int fd, int limit)
{
struct ::msghdr *msg;
std::size_t advance;
ssize_t bytes, consumption;
msg = &m_buf.get_msghdr ();
consumption = 0;
while (msg->msg_iovlen)
{
if (limit > 0 && consumption >= limit)
{
m_stats->add (statistics::context::SEND_BUDGET_HIT, 1);
return result::BudgetExhausted;
}
bytes = ::sendmsg (fd, msg, MSG_NOSIGNAL);
er_log_conn (__FILE__, __LINE__, "transmitter->fill: sendmsg returned fd = %d, bytes = %u\n", fd, bytes);
if (bytes > 0)
{
m_stats->add (statistics::context::BYTES_OUT_TOTAL, bytes);
consumption += bytes;
advance = static_cast<std::size_t> (bytes);
while (advance && msg->msg_iovlen)
{
if (advance < msg->msg_iov->iov_len)
{
msg->msg_iov->iov_base = static_cast<std::byte *> (msg->msg_iov->iov_base) + advance;
msg->msg_iov->iov_len -= advance;
advance = 0;
}
else
{
advance -= msg->msg_iov->iov_len;
++msg->msg_iov;
--msg->msg_iovlen;
}
}
continue;
}
if (__builtin_expect (bytes == 0, 0))
{
return result::PeerReset;
}
switch (errno)
{
case EINTR:
/* retry */
continue;
case EAGAIN:
#if defined (EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
return result::Pending;
case EPIPE:
case ECONNRESET:
return result::PeerReset;
default:
return result::Error;
}
}
return result::Ok;
}
void transmitter::push_for_deleter (std::function<void ()> &&deleter)
{
if (!deleter)
{
return;
}
m_deleter.push_back (std::move (deleter));
}
void transmitter::stamp ()
{
m_buf.stamp_msghdr ();
}
bool transmitter::empty ()
{
return m_buf.get_msghdr ().msg_iovlen == 0;
}
void transmitter::clear ()
{
for (auto &deleter : m_deleter)
{
if (deleter)
{
deleter ();
}
}
m_deleter.clear ();
m_buf.clear ();
}
}