Skip to content

File receiver.cpp

File List > connection > receiver.cpp

Go to the documentation of this file

/*
 *
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * receiver.cpp
 */

#include "system_parameter.h"
#include "connection_defs.h"
#include "receiver.hpp"
#include "error_manager.h"
#include "span.hpp"
#include "object_primitive.h"

#include <algorithm>
#include <unistd.h>
#include <errno.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>

// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

#if 0
#define er_log_conn(...) er_log_debug (__VA_ARGS__)
#else
#define er_log_conn(...)
#endif

#define NEXT_STATE(x) do { \
    er_log_conn (__FILE__, __LINE__, "receiver (%p) state %d -> state = %d\n", this, m_state, state::x); \
    (m_state = state::x); \
} while (0)

constexpr std::size_t SIZE_HEADER = sizeof (int);
constexpr std::size_t SIZE_HEADER_PADDING = sizeof (int);

namespace cubconn
{
  receiver::receiver (std::size_t capacity, statistics::metrics<statistics::context> *stats) :
    m_stats (stats),
    m_buf (capacity)
  {
    m_result.reserve (8);
    this->reset ();
  }

  receiver::receiver () :
    m_buf ()
  {
  }

  receiver::~receiver ()
  {
#if !defined (NDEBUG)
    if (m_allocated.size () > 0)
      {
    er_log_conn (ARG_FILE_LINE, "receiver: found unreleased memory first = %p\n", m_allocated[0]);
    assert (false);
      }
#endif
  }

  void receiver::reset ()
  {
    cubbase::span<std::byte> buffer;

    m_state = state::Recv;

    m_received = 0;
    m_size = 0;

    m_result.clear ();
    m_allocated.clear ();

    /* if m_buf is already in use, it may be corrupted by subsequent reception. */
    m_buf.reset ();

    buffer = m_buf.buffer ();
    m_bufptr = buffer.data ();
    m_bufsize = buffer.size ();
    m_tmpsize = 0;
  }

  result receiver::to_result (ssize_t bytes, int errid)
  {
    if (__builtin_expect (bytes == 0, 0))
      {
    return result::PeerReset;
      }

    switch (errid)
      {
      case EINTR:
    /* retry */
    return result::Partial;
      case EAGAIN:
#if defined (EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
      case EWOULDBLOCK:
#endif
    return result::Pending;
      case EPIPE:
      case ECONNRESET:
    return result::PeerReset;
      default:
    return result::Error;
      }

    return result::Error;
  }

  void receiver::parse_packet (bool is_buffer)
  {
    cubbase::span<std::byte> buffer;
    std::uint32_t aligned;
    int padding;

    assert (m_size > 0);

    while (m_received >= SIZE_HEADER + m_size)
      {
    er_log_conn (ARG_FILE_LINE, "receiver->parse_packet: m_bufptr = %p, m_received = %d, m_size = %d, after parse = %d\n",
             m_bufptr + SIZE_HEADER + SIZE_HEADER_PADDING, m_received, m_size, m_received - SIZE_HEADER - m_size);

    assert ((reinterpret_cast<uintptr_t> (m_bufptr) & 7) == 0);

    std::memcpy (&aligned, m_bufptr + SIZE_HEADER, sizeof (std::uint32_t));
    padding = ntohl (aligned);
    assert_release (padding >= 0 && padding < 8);

    *reinterpret_cast<int *> (m_bufptr + SIZE_HEADER) = padding;

    er_log_conn (ARG_FILE_LINE, "receiver->parse_packet: packet size = %d, padding = %d, push size = %d.\n", m_size,
             SIZE_HEADER_PADDING + padding, m_size - SIZE_HEADER_PADDING - padding);

    m_result.emplace_back (m_bufptr + SIZE_HEADER + SIZE_HEADER_PADDING, m_size - SIZE_HEADER_PADDING - padding);
    m_received -= SIZE_HEADER + m_size;
    if (is_buffer)
      {
        m_buf.commit (SIZE_HEADER + m_size);
      }

    buffer = m_buf.buffer ();
    m_bufptr = buffer.data ();
    m_bufsize = buffer.size ();

    if (m_received < SIZE_HEADER)
      {
        break;
      }
    std::memcpy (&aligned, m_bufptr, sizeof (std::uint32_t));
    m_size = ntohl (aligned);
      }
    er_log_conn (ARG_FILE_LINE, "receiver->parse_packet: remains = %d.\n", m_received);
  }

  result receiver::parse_size_in_tmpsize (size_t consumption, size_t limit)
  {
    cubbase::span<std::byte> mem, buffer;
    std::byte *ptr;

    assert (m_received >= SIZE_HEADER);

    m_size = ntohl (m_tmpsize);
    if (m_size == 0)
      {
    er_log_conn (ARG_FILE_LINE, "receiver->parse_size_in_tmpsize: zero-length frame: peer sent a size of 0.\n");
    assert_release (false);
      }

    ptr = new std::byte[SIZE_HEADER + m_size];
    if (!ptr)
      {
    return result::Error;
      }
    er_log_conn (ARG_FILE_LINE,
         "receiver->parse_size_in_tmpsize: allocate new memory for packet. m_bufsize = %d, m_size = %d\n",
         m_bufsize, m_size);

    assert (m_received == SIZE_HEADER);
    std::memcpy (ptr, &m_tmpsize, m_received);

    m_allocated.push_back (ptr);

    m_bufptr = ptr;
    m_bufsize = SIZE_HEADER + m_size;
    NEXT_STATE (RecvInAllocated);

    if (limit > 0 && consumption >= limit)
      {
    er_log_conn (ARG_FILE_LINE, "receiver->parse_size_in_tmpsize: budget exhausted.\n");
    return result::BudgetExhausted;
      }
    return result::Partial;
  }

  result receiver::parse_size (size_t consumption, size_t limit)
  {
    cubbase::span<std::byte> mem, buffer;
    std::byte *ptr;
    std::uint32_t aligned;

    assert (m_received >= SIZE_HEADER);

    std::memcpy (&aligned, m_bufptr, sizeof (std::uint32_t));
    m_size = ntohl (aligned);
    if (m_size == 0)
      {
    er_log_conn (ARG_FILE_LINE, "receiver->parse_size: zero-length frame: peer sent a size of 0.\n");
    assert_release (false);
      }

    if (m_received >= SIZE_HEADER + m_size)
      {
    this->parse_packet (true);

    if (m_received >= SIZE_HEADER)
      {
        NEXT_STATE (ParseSize);

        if (limit > 0 && consumption >= limit)
          {
        er_log_conn (ARG_FILE_LINE, "receiver->parse_size: budget exhausted.\n");
        return result::BudgetExhausted;
          }
        return result::Partial;
      }

    /* m_received < SIZE_HEADER */

    if (m_bufsize < SIZE_HEADER + SIZE_HEADER_PADDING + sizeof (NET_HEADER))
      {
        /* need to recv the size header but the buffer was not large enough */
        std::memcpy (&m_tmpsize, m_bufptr, m_received);
        NEXT_STATE (RecvSizeInTmp);
        return result::Partial;
      }

    /* m_received < SIZE_HEADER */
    /* m_bufsize >= SIZE_HEADER + SIZE_HEADER_PADDING + sizeof (NET_HEADER) */

    NEXT_STATE (Recv);

    if (limit > 0 && consumption >= limit)
      {
        er_log_conn (ARG_FILE_LINE, "receiver->parse_size: budget exhausted.\n");
        return result::BudgetExhausted;
      }
    return result::Partial;
      }

    /* m_received < SIZE_HEADER + m_size */

    if (SIZE_HEADER + m_size > m_bufsize)
      {
    ptr = new std::byte[SIZE_HEADER + m_size];
    if (!ptr)
      {
        return result::Error;
      }
    er_log_conn (ARG_FILE_LINE, "receiver->parse_size: allocate new memory for packet. m_bufsize = %d, m_size = %d\n",
             m_bufsize, m_size);
    std::memcpy (ptr, m_bufptr, m_received);

    m_allocated.push_back (ptr);
#if !defined (NDEBUG)
    std::memcpy (&aligned, ptr, sizeof (std::uint32_t));
    assert (m_size == ntohl (aligned));
#endif
    m_bufptr = ptr;
    m_bufsize = SIZE_HEADER + m_size;
    NEXT_STATE (RecvInAllocated);

    if (limit > 0 && consumption >= limit)
      {
        er_log_conn (ARG_FILE_LINE, "receiver->parse_size: budget exhausted.\n");
        return result::BudgetExhausted;
      }
    return result::Partial;
      }

    assert (m_bufsize - m_received != 0);

    /* m_bufsize >= SIZE_HEADER + m_size */

    NEXT_STATE (Recv);

    if (limit > 0 && consumption >= limit)
      {
    er_log_conn (ARG_FILE_LINE, "receiver->parse_size: budget exhausted.\n");
    return result::BudgetExhausted;
      }
    return result::Partial;
  }

  result receiver::receive_in_allocated (int fd, size_t &consumption, size_t limit)
  {
    ssize_t bytes;

    /* receive data from socket */
    assert (m_bufsize - m_received > 0);
    if (limit > 0)
      {
    bytes = ::recv (fd, reinterpret_cast<char *> (m_bufptr) + m_received, std::min (m_bufsize - m_received, limit), 0);
      }
    else
      {
    bytes = ::recv (fd, reinterpret_cast<char *> (m_bufptr) + m_received, m_bufsize - m_received, 0);
      }
    if (bytes > 0)
      {
    m_received += bytes;
    consumption += bytes;

    m_stats->add (statistics::context::BYTES_IN_TOTAL, bytes);
    er_log_conn (ARG_FILE_LINE, "receiver->receive_in_allocated: state = %d, received = %d, accumulated = %d\n",
             (int) m_state,
             (int) bytes, (int) m_received);

    if (m_received < SIZE_HEADER + m_size)
      {
        return result::Partial;
      }
    assert (m_received == SIZE_HEADER + m_size);
    this->parse_packet (false);
    assert (m_received == 0);

    assert (m_bufptr == m_buf.buffer ().data ());
    assert (m_bufsize == m_buf.buffer ().size ());

    if (m_bufsize < SIZE_HEADER + SIZE_HEADER_PADDING + sizeof (NET_HEADER))
      {
        NEXT_STATE (RecvSizeInTmp);
      }
    else
      {
        NEXT_STATE (Recv);
      }

    if (limit > 0 && consumption >= limit)
      {
        er_log_conn (ARG_FILE_LINE, "receiver->receive_in_allocated: budget exhausted.\n");
        return result::BudgetExhausted;
      }
    return result::Ok;
      }

    return this->to_result (bytes, errno);
  }

  result receiver::receive_in_tmpsize (int fd, size_t &consumption)
  {
    ssize_t bytes;

    /* receive data from socket */
    assert (SIZE_HEADER - m_received > 0);
    bytes = ::recv (fd, reinterpret_cast<char *> (&m_tmpsize) + m_received, SIZE_HEADER - m_received, 0);
    if (bytes > 0)
      {
    m_received += bytes;
    consumption += bytes;

    m_stats->add (statistics::context::BYTES_IN_TOTAL, bytes);
    er_log_conn (ARG_FILE_LINE, "receiver->receive_in_tmpsize: state = %d, received = %d, accumulated = %d\n",
             (int) m_state,
             (int) bytes, (int) m_received);

    if (m_received < SIZE_HEADER)
      {
        return result::Partial;
      }
    NEXT_STATE (ParseSizeInTmp);
    return result::Ok;
      }

    return this->to_result (bytes, errno);
  }

  result receiver::receive (int fd, size_t &consumption, size_t limit)
  {
    ssize_t bytes;

    er_log_conn (__FILE__, __LINE__, "receiver (%p) drain from fd = %d\n", this, fd);

    /* receive data from socket */
    assert (m_bufsize - m_received > 0);
    if (limit > 0)
      {
    bytes = ::recv (fd, reinterpret_cast<char *> (m_bufptr) + m_received, std::min (m_bufsize - m_received, limit), 0);
      }
    else
      {
    bytes = ::recv (fd, reinterpret_cast<char *> (m_bufptr) + m_received, m_bufsize - m_received, 0);
      }
    if (bytes > 0)
      {
    m_received += bytes;
    consumption += bytes;

    m_stats->add (statistics::context::BYTES_IN_TOTAL, bytes);
    er_log_conn (ARG_FILE_LINE, "receiver->receive: state = %d, received = %d, accumulated = %d\n", (int) m_state,
             (int) bytes, (int) m_received);
    if (m_received < SIZE_HEADER)
      {
        return result::Partial;
      }
    NEXT_STATE (ParseSize);

    return result::Ok;
      }

    return this->to_result (bytes, errno);
  }

  result receiver::drain (int fd, size_t limit)
  {
    result status;
    size_t consumption;

    consumption = 0;
    m_result.clear ();
    while (true)
      {
    switch (m_state)
      {
      case state::Recv:
        status = this->receive (fd, consumption, limit);
        break;

      case state::RecvSizeInTmp:
        status = this->receive_in_tmpsize (fd, consumption);
        break;

      case state::RecvInAllocated:
        status = this->receive_in_allocated (fd, consumption, limit);
        break;

      case state::ParseSize:
        status = this->parse_size (consumption, limit);
        break;

      case state::ParseSizeInTmp:
        status = this->parse_size_in_tmpsize (consumption, limit);
        break;

      default:
        status = result::Error;
        er_log_conn (ARG_FILE_LINE, "receiver->drain: unknown state\n");
        assert_release (false);
        break;
      }

    switch (status)
      {
      case result::Partial:
      case result::Ok:
        break;

      case result::BudgetExhausted:
        m_stats->add (statistics::context::RECV_BUDGET_HIT, 1);
        [[fallthrough]];

      case result::Pending:
      case result::Error:
      case result::PeerReset:
        return status;

      default:
        er_log_conn (ARG_FILE_LINE, "receiver->drain: unknown state\n");
        assert_release (false);
        break;
      }
      }

    return result::Error;
  }

  void receiver::release (std::byte *ptr)
  {
    cubbase::span<std::byte> source;
    int size;

    if (m_buf.is_in (ptr))
      {
    size = ntohl (*reinterpret_cast<int *> (ptr - (SIZE_HEADER + SIZE_HEADER_PADDING)));
    source = cubbase::span<std::byte> (ptr - (SIZE_HEADER + SIZE_HEADER_PADDING), size + SIZE_HEADER);

    m_buf.restore (source);
      }
    else
      {
    auto it = std::find (m_allocated.begin (), m_allocated.end (), ptr - (SIZE_HEADER + SIZE_HEADER_PADDING));
    if (it == m_allocated.end ())
      {
        er_log_conn (ARG_FILE_LINE, "receiver: memory = %p does not belong to this receiver\n",
             ptr - (SIZE_HEADER + SIZE_HEADER_PADDING));
        assert_release (false);
        return;
      }
    m_allocated.erase (it);

    delete[] (ptr - (SIZE_HEADER + SIZE_HEADER_PADDING));
      }
  }

  std::vector<cubbase::span<std::byte>> *receiver::get_result ()
  {
    return &m_result;
  }
}