File buffer.hpp¶
File List > connection > buffer.hpp
Go to the documentation of this file
/*
*
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* buffer.hpp
*/
#ifndef _CONNECTION_BUFFER_HPP_
#define _CONNECTION_BUFFER_HPP_
#include "packet_buffer.hpp"
#include "error_manager.h"
#include <tuple>
#include <array>
#include <cstring>
#include <cstddef>
#include <type_traits>
#include <errno.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <fcntl.h>
namespace cubconn
{
constexpr size_t BUFFER_SIZE = 1024;
enum class result
{
Ok,
Partial,
Error,
Reset,
Pending,
BudgetExhausted,
PeerReset,
RefuseConnection,
ClosedConnection,
Aborted,
Skewed
};
class buffer
{
public:
bool set_data (const void *data, size_t size) noexcept;
std::pair<const std::byte *, std::size_t> remaining_data () const noexcept;
std::pair<std::byte *, std::size_t> remaining_space () noexcept;
void advance (std::size_t bytes) noexcept;
bool is_complete () const noexcept;
bool has_data () const noexcept;
bool has_space () const noexcept;
std::size_t position () const noexcept;
std::size_t total_size () const noexcept;
void mark_consumed ();
void set_target_size (size_t target);
void reset ();
template<typename T>
bool set_data (const T &data) noexcept
{
static_assert (std::is_trivially_copyable_v<T>);
if (sizeof (T) > m_data.size ())
{
_er_log_debug (__FILE__, __LINE__, "master_buffer->set_data: data too large for buffer.");
return false;
}
std::memcpy (m_data.data (), &data, sizeof (T));
m_pos = 0;
m_size = sizeof (T);
return true;
}
template<typename T>
const T *data_as () const noexcept
{
static_assert (std::is_trivially_copyable_v<T>);
if (m_pos < sizeof (T))
{
return nullptr;
}
return reinterpret_cast<const T *> (m_data.data ());
}
template<typename T>
bool is_ready_for () const noexcept
{
return m_pos >= sizeof (T);
}
private:
std::array<std::byte, BUFFER_SIZE> m_data;
std::size_t m_pos = 0;
std::size_t m_size = 0;
};
class buffered_socket
{
public:
[[gnu::hot]]
static result send_partial (int fd, cubbase::packet_buffer &buffer) noexcept
{
struct ::msghdr *msg;
std::size_t advance;
ssize_t bytes;
msg = &buffer.get_msghdr ();
while (msg->msg_iovlen)
{
bytes = ::sendmsg (fd, msg, MSG_NOSIGNAL);
if (bytes < 0 && errno == EINTR)
{
/* interrupted by signal, retry */
continue;
}
if (bytes > 0)
{
advance = static_cast<std::size_t> (bytes);
while (advance && msg->msg_iovlen)
{
if (advance < msg->msg_iov->iov_len)
{
msg->msg_iov->iov_base = static_cast<std::byte *> (msg->msg_iov->iov_base) + advance;
msg->msg_iov->iov_len -= advance;
advance = 0;
}
else
{
advance -= msg->msg_iov->iov_len;
++msg->msg_iov;
--msg->msg_iovlen;
}
}
continue;
}
if (__builtin_expect (bytes == 0, 0))
{
_er_log_debug (__FILE__, __LINE__, "socket_io->send_partial: send returned 0 - error: %s", strerror (errno));
return result::PeerReset;
}
switch (errno)
{
case EPIPE:
case ECONNRESET:
_er_log_debug (__FILE__, __LINE__, "socket_io->send_partial: client disconnected: %s", strerror (errno));
return result::PeerReset;
case EAGAIN:
#if defined (EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
return result::Pending;
default:
_er_log_debug (__FILE__, __LINE__, "socket_io->send_partial: unexpected send error: %s", strerror (errno));
return result::Error;
}
}
return result::Ok;
}
[[gnu::hot]]
static result recv_partial (int fd, buffer &buffer)
{
std::byte *space;
std::size_t available;
ssize_t n;
if (!buffer.has_space ())
{
_er_log_debug (__FILE__, __LINE__, "socket_io->recv_partial: out of buffer");
return result::Error;
}
std::tie (space, available) = buffer.remaining_space ();
if (!space)
{
_er_log_debug (__FILE__, __LINE__, "socket_io->recv_partial: out of buffer");
return result::Error;
}
assert (buffer.total_size () - buffer.position () <= available);
n = ::recv (fd, space, buffer.total_size () - buffer.position (), 0);
if (n > 0)
{
buffer.advance (n);
}
else if (n < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
return result::Pending;
}
else
{
_er_log_debug (__FILE__, __LINE__, "socket_io->recv_partial: recv error: %s", strerror (errno));
return result::Error;
}
}
else
{
_er_log_debug (__FILE__, __LINE__, "socket_io->recv_partial: recv returned 0 - error: %s", strerror (errno));
return result::PeerReset;
}
return buffer.is_complete () ? result::Ok : result::Pending;
}
/* recv helper */
template<typename T>
[[gnu::hot]]
static std::tuple<result, const T *> read_fixed_size (int fd, buffer &buffer)
{
result status;
static_assert (std::is_trivially_copyable_v<T>);
if (buffer.total_size () != sizeof (T))
{
buffer.set_target_size (sizeof (T));
}
status = recv_partial (fd, buffer);
if (status == result::Ok)
{
assert_release (buffer.position () == sizeof (T));
}
return { status, buffer.data_as<T> () };
}
};
}
#endif