Skip to content

File pl_executor.cpp

File List > cubrid > src > sp > pl_executor.cpp

Go to the documentation of this file

/*
 *
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

#include "pl_executor.hpp"

#include "regu_var.hpp"
#include "fetch.h"
#include "memory_alloc.h"

// runtime
#include "dbtype.h"

#include "method_struct_invoke.hpp"
#include "method_struct_query.hpp"
#include "method_struct_value.hpp"
#include "method_struct_oid_info.hpp"
#include "method_struct_parameter_info.hpp"

#include "pl_comm.h"
#include "pl_query_cursor.hpp"
#include "sp_code.hpp"
#include "xserver_interface.h"

// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
namespace cubpl
{
  using namespace cubmethod;

  invoke_java::invoke_java (int tid, pl_signature *sig, bool tc)
    : tran_id (tid)
  {
    signature.assign (sig->ext.sp.target_class_name).append (".").append (sig->ext.sp.target_method_name);
    auth.assign (sig->auth);
    lang = sig->type;
    result_type = sig->result_type;

    pl_arg &arg = sig->arg;
    num_args = arg.arg_size;
    arg_mode.resize (num_args);
    arg_type.resize (num_args);

    for (int i = 0; i < num_args; i++)
      {
    arg_mode[i] = arg.arg_mode[i];
    arg_type[i] = arg.arg_type[i];
      }

    transaction_control = (lang == SP_LANG_PLCSQL) ? true : tc;
  }

  void
  invoke_java::pack (cubpacking::packer &serializator) const
  {
    serializator.pack_int (tran_id);
    serializator.pack_string (signature);
    serializator.pack_string (auth);
    serializator.pack_int (lang);
    serializator.pack_int (num_args);

    for (int i = 0; i < num_args; i++)
      {
    serializator.pack_int (arg_mode[i]);
    serializator.pack_int (arg_type[i]);
      }

    serializator.pack_int (result_type);
    serializator.pack_bool (transaction_control);
  }

  void
  invoke_java::unpack (cubpacking::unpacker &deserializator)
  {
    // TODO: unpacking is not necessary
    assert (false);
  }

  size_t
  invoke_java::get_packed_size (cubpacking::packer &serializator, std::size_t start_offset) const
  {
    size_t size = serializator.get_packed_int_size (start_offset); // tran_id
    size += serializator.get_packed_string_size (signature, size); // signature
    size += serializator.get_packed_string_size (auth, size); // auth
    size += serializator.get_packed_int_size (size); // lang
    size += serializator.get_packed_int_size (size); // num_args

    for (int i = 0; i < num_args; i++)
      {
    size += serializator.get_packed_int_size (size); // arg_mode
    size += serializator.get_packed_int_size (size); // arg_type
      }

    size += serializator.get_packed_int_size (size); // return_type
    size += serializator.get_packed_bool_size (size); // transaction_control
    return size;
  }


  executor::executor (pl_signature &sig)
    : m_sig (sig)
  {
    session *sess = get_session ();
    if (sess)
      {
    m_stack = sess->create_and_push_stack (nullptr);
      }
  }

  executor::~executor ()
  {
    // destory local resources
    pr_clear_value_vector (m_out_args);

    // exit stack
    if (get_session () && m_stack != nullptr)
      {
    delete m_stack;
      }
  }

  int
  executor::fetch_args_peek (regu_variable_list_node *val_list_p, VAL_DESCR *val_desc_p, OID *obj_oid_p,
                 QFILE_TUPLE tuple)
  {
    int error = NO_ERROR;
    int index = 0;
    REGU_VARIABLE_LIST operand;

    if (m_stack == NULL)
      {
    // Check if the session is in an interrupting state by calling get_session()
    // TODO: Verify the behavior of get_session() and validate this code flow.
    session *sess = get_session ();
    if (sess && er_errid () == NO_ERROR)
      {
        // If send_data_to_client() fails, it means the connection with CAS has been disconnected.
        // In this case, get_session() should return NULL.
        // According to the current analysis, the following code should be unreachable.
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      }

    return er_errid ();
      }

    cubthread::entry *m_thread_p = m_stack->get_thread_entry ();

    if (m_sig.has_args ())
      {
    DB_VALUE *value = NULL;

    operand = val_list_p;
    while (operand != NULL)
      {
        error = fetch_peek_dbval (m_thread_p, &operand->value, val_desc_p, NULL, obj_oid_p, tuple, &value);
        if (error != NO_ERROR)
          {
        m_args.clear ();
        break;
          }

        if (is_supported_dbtype (*value) == false)
          {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_SP_NOT_SUPPORTED_ARG_TYPE, 1,
            pr_type_name ((DB_TYPE) value->domain.general_info.type));
        m_stack->set_error_message (std::string (er_msg ()));
        error = er_errid ();
        break;
          }

        m_args.emplace_back (std::ref (*value));

        operand = operand->next;
      }
      }

    return error;
  }

  int
  executor::fetch_args_peek (std::vector <std::reference_wrapper <DB_VALUE>> args)
  {
    assert (m_args.empty ());
    m_args = args;
    for (const DB_VALUE &val : m_args)
      {
    if (is_supported_dbtype (val) == false)
      {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_SP_NOT_SUPPORTED_ARG_TYPE, 1,
            pr_type_name ((DB_TYPE) val.domain.general_info.type));
        m_stack->set_error_message (std::string (er_msg ()));
        return er_errid ();
      }
      }

    return NO_ERROR;
  }

  bool
  executor::is_supported_dbtype (const DB_VALUE &value)
  {
    bool res = false;
    switch (DB_VALUE_TYPE (&value))
      {
      case DB_TYPE_INTEGER:
      case DB_TYPE_SHORT:
      case DB_TYPE_BIGINT:
      case DB_TYPE_FLOAT:
      case DB_TYPE_DOUBLE:
      case DB_TYPE_MONETARY:
      case DB_TYPE_NUMERIC:
      case DB_TYPE_CHAR:
      case DB_TYPE_STRING:
      case DB_TYPE_DATE:
      case DB_TYPE_TIME:
      case DB_TYPE_TIMESTAMP:
      case DB_TYPE_DATETIME:
      case DB_TYPE_SET:
      case DB_TYPE_MULTISET:
      case DB_TYPE_SEQUENCE:
      case DB_TYPE_OID:
      case DB_TYPE_OBJECT:
      case DB_TYPE_RESULTSET:
      case DB_TYPE_NULL:
    res = true;
    break;
      // unsupported types
      case DB_TYPE_BIT:
      case DB_TYPE_VARBIT:
      case DB_TYPE_TABLE:
      case DB_TYPE_BLOB:
      case DB_TYPE_CLOB:
      case DB_TYPE_TIMESTAMPTZ:
      case DB_TYPE_TIMESTAMPLTZ:
      case DB_TYPE_DATETIMETZ:
      case DB_TYPE_DATETIMELTZ:
      case DB_TYPE_JSON:
      case DB_TYPE_ENUMERATION:
    res = false;
    break;

      // obsolete, internal, unused type
      case DB_TYPE_ELO:
      case DB_TYPE_VARIABLE:
      case DB_TYPE_SUB:
      case DB_TYPE_POINTER:
      case DB_TYPE_ERROR:
      case DB_TYPE_VOBJ:
      case DB_TYPE_DB_VALUE:
      case DB_TYPE_MIDXKEY:
      default:
    assert (false);
    break;
      }
    return res;
  }

  int
  executor::execute (DB_VALUE &value)
  {
    int error = NO_ERROR;

    if (m_stack == NULL)
      {
    // Check if the session is in an interrupting state by calling get_session()
    // TODO: Verify the behavior of get_session() and validate this code flow.
    session *sess = get_session ();
    if (sess && er_errid () == NO_ERROR)
      {
        // If send_data_to_client() fails, it means the connection with CAS has been disconnected.
        // In this case, get_session() should return NULL.
        // According to the current analysis, the following code should be unreachable.
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      }

    return er_errid ();
      }

    // execution rights
    assert (m_sig.auth != NULL);
    error = change_exec_rights (m_sig.auth);
    if (error != NO_ERROR)
      {
    goto exit;
      }

    error = request_invoke_command ();
    if (error != NO_ERROR)
      {
    goto exit;
      }

    error = response_invoke_command (value);
    if (error != NO_ERROR)
      {
    goto exit;
      }

exit:
    if (m_stack != NULL)
      {
    m_stack->reset_query_handlers ();
      }

    // restore execution rights
    if (change_exec_rights (NULL) != NO_ERROR)
      {
    error = er_errid ();
      }
    return error;
  }

  // runtime
  int
  executor::change_exec_rights (const char *auth_name)
  {
    int error = NO_ERROR;
    int is_restore = (auth_name == NULL) ? 1 : 0;

    if (is_restore == 0)
      {
    error = m_stack->send_data_to_client (METHOD_CALLBACK_CHANGE_RIGHTS, is_restore, std::string (auth_name));
      }
    else
      {
    error = m_stack->send_data_to_client (METHOD_CALLBACK_CHANGE_RIGHTS, is_restore);
      }

    if (error != NO_ERROR)
      {
    // Check if the session is in an interrupting state by calling get_session()
    // TODO: Verify the behavior of get_session() and validate this code flow.
    session *sess = get_session ();
    if (sess && er_errid () == NO_ERROR)
      {
        // If send_data_to_client() fails, it means the connection with CAS has been disconnected.
        // In this case, get_session() should return NULL.
        // According to the current analysis, the following code should be unreachable.
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      }

    error = er_errid ();
      }

    return error;
  }

  int
  executor::request_invoke_command ()
  {
    int error = NO_ERROR;

    TRANID tid = m_stack->get_tran_id ();

    m_stack->set_java_command (SP_CODE_INVOKE);

    // get changed session parameters
    cubpl::session *sess = get_session ();
    std::vector<sys_param> session_params;
    if (sess)
      {
    session_params = sess->obtain_session_parameters (m_stack->get_connection ());
      }
    // handling 'else' is not required because send_data_to_java will handle the case when sess is not found

    prepare_args prepare_arg ((std::uint64_t) this, tid, METHOD_TYPE_PLCSQL, m_args);
    invoke_java invoke_arg (tid, &m_sig,
                (m_sig.type == PL_TYPE_PLCSQL) ? true : prm_get_bool_value (PRM_ID_PL_TRANSACTION_CONTROL));

    error = m_stack->send_data_to_java (session_params, prepare_arg, invoke_arg);
    return error;
  }

  void
  executor::handle_type_resultset (DB_VALUE &returnval)
  {
    if (db_value_type (&returnval) == DB_TYPE_RESULTSET)
      {
    std::uint64_t query_id = db_get_resultset (&returnval);
    // qfile_update_qlist_count (thread_p, m_list_id, -1);
    m_stack->promote_to_session_cursor (query_id);
      }
  }

  int
  executor::response_invoke_command (DB_VALUE &value)
  {
    int error_code = NO_ERROR;
    int start_code = -1;

    // response loop
    do
      {
    cubmem::block response_blk;
    error_code = m_stack->read_data_from_java (response_blk);
    if (error_code != NO_ERROR)
      {
        break;
      }

    if (!response_blk.is_valid ())
      {
        error_code = ER_SP_NETWORK_ERROR;
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error_code, 1, sizeof (int));
        break;
      }

    cubpacking::unpacker unpacker (response_blk);
    unpacker.unpack_int (start_code);

    (void) m_stack->read_payload_block (unpacker);

    /* processing */
    if (start_code == SP_CODE_INTERNAL_JDBC)
      {
        error_code = response_callback_command ();
      }
    else if (start_code == SP_CODE_RESULT || start_code == SP_CODE_ERROR)
      {
        error_code = response_result (start_code, value);
      }
    else
      {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_SP_NETWORK_ERROR, 1,
            start_code);
        error_code = ER_SP_NETWORK_ERROR;
      }

    if (m_stack->get_data_queue ().empty() == false)
      {
        m_stack->get_data_queue ().pop ();
      }

    // free response block
    response_blk.freemem ();
      }
    while (error_code == NO_ERROR && start_code == SP_CODE_INTERNAL_JDBC);

    return error_code;
  }

  int
  executor::response_result (int code, DB_VALUE &returnval)
  {
    // check queue
    if (m_stack->get_data_queue().empty() == true)
      {
    return ER_FAILED;
      }

    cubmem::block &blk = m_stack->get_data_queue().front ();
    packing_unpacker unpacker (blk);

    if (code == SP_CODE_RESULT)
      {
    dbvalue_java value_unpacker;
    db_make_null (&returnval);
    value_unpacker.value = &returnval;
    value_unpacker.unpack (unpacker);

    handle_type_resultset (returnval);

    for (int i = 0; i < m_sig.arg.arg_size; i++)
      {
        DB_VALUE out_val;
        db_make_null (&out_val);
        if (m_sig.arg.arg_mode[i] != SP_MODE_IN)
          {
        value_unpacker.value = &out_val;
        value_unpacker.unpack (unpacker);
        m_out_args.emplace_back (out_val);

        handle_type_resultset (out_val);
          }
      }
    return NO_ERROR;
      }
    else if (code == SP_CODE_ERROR)
      {
    std::string error_msg;
    unpacker.unpack_string (error_msg);
    m_stack->set_error_message (error_msg);
    return ER_SP_EXECUTE_ERROR;
      }
    else
      {
    // it is handled in response_invoke_command
    assert (false);
    return ER_FAILED;
      }

    return NO_ERROR;
  }

  int
  executor::response_callback_command ()
  {
    int error_code = NO_ERROR;
    // check queue
    if (m_stack->get_data_queue().empty() == true)
      {
    return ER_FAILED;
      }

    cubmem::block &blk = m_stack->get_data_queue().front ();
    packing_unpacker unpacker (blk);
    cubthread::entry &thread_ref = *m_stack->get_thread_entry ();

    int code;
    unpacker.unpack_int (code);

    switch (code)
      {
      /* NOTE: we don't need to implement it
      case METHOD_CALLBACK_GET_DB_VERSION:
      break;
      */

      case METHOD_CALLBACK_GET_DB_PARAMETER:
    error_code = callback_get_db_parameter (thread_ref, unpacker);
    break;

      case METHOD_CALLBACK_QUERY_PREPARE:
    error_code = callback_prepare (thread_ref, unpacker);
    break;

      case METHOD_CALLBACK_QUERY_EXECUTE:
    error_code = callback_execute (thread_ref, unpacker);
    break;

      case METHOD_CALLBACK_FETCH:
    error_code = callback_fetch (thread_ref, unpacker);
    break;

      case METHOD_CALLBACK_OID_GET:
    error_code = callback_oid_get (thread_ref, unpacker);
    break;

      case METHOD_CALLBACK_OID_PUT:
    error_code = callback_oid_put (thread_ref, unpacker);
    break;

      case METHOD_CALLBACK_OID_CMD:
    error_code = callback_oid_cmd (thread_ref, unpacker);
    break;

      case METHOD_CALLBACK_COLLECTION:
    error_code = callback_collection_cmd (thread_ref, unpacker);
    break;

      case METHOD_CALLBACK_MAKE_OUT_RS:
    error_code = callback_make_outresult (thread_ref, unpacker);
    break;

      case METHOD_CALLBACK_GET_GENERATED_KEYS:
    error_code = callback_get_generated_keys (thread_ref, unpacker);
    break;
      case METHOD_CALLBACK_END_TRANSACTION:
    error_code = callback_end_transaction (thread_ref, unpacker);
    break;
      case METHOD_CALLBACK_GET_CODE_ATTR:
    error_code = callback_get_code_attr (thread_ref, unpacker);
    break;

      case METHOD_CALLBACK_SET_PL_SESSION_PARAM:
    error_code = callback_set_pl_session_param (thread_ref, unpacker);
    break;
      default:
    // TODO: not implemented yet, do we need error handling?
    assert (false);
    error_code = ER_FAILED;
    break;
      }

    return error_code;
  }

  std::vector <DB_VALUE> &
  executor::get_out_args ()
  {
    return m_out_args;
  }

  execution_stack *
  executor::get_stack ()
  {
    return m_stack;
  }

  int
  executor::callback_get_db_parameter (cubthread::entry &thread_ref, packing_unpacker &unpacker)
  {
    cubpl::session *pl_session = cubpl::get_session ();
    if (!pl_session)
      {
    return ER_SES_SESSION_EXPIRED;
      }

    int error = NO_ERROR;
    int code = METHOD_CALLBACK_GET_DB_PARAMETER;

    db_parameter_info *parameter_info = pl_session->get_db_parameter_info ();
    if (parameter_info == nullptr)
      {
    int tran_index = LOG_FIND_THREAD_TRAN_INDEX (m_stack->get_thread_entry());
    parameter_info = new db_parameter_info ();

    parameter_info->tran_isolation = logtb_find_isolation (tran_index);
    parameter_info->wait_msec = logtb_find_wait_msecs (tran_index);
    logtb_get_client_ids (tran_index, &parameter_info->client_ids);

    pl_session->set_db_parameter_info (parameter_info);
      }

    cubmem::block blk;
    if (parameter_info)
      {
    blk = std::move (pack_data_block (METHOD_RESPONSE_SUCCESS, *parameter_info));
      }
    else
      {
    blk = std::move (pack_data_block (METHOD_RESPONSE_ERROR, ER_FAILED, std::string ("unknown error"),
                      ARG_FILE_LINE));
      }

    if (blk.is_valid ())
      {
    m_stack->send_data_to_java (blk);
    blk.freemem ();
      }

    return error;
  }

  int
  executor::callback_prepare (cubthread::entry &thread_ref, packing_unpacker &unpacker)
  {
    int error = NO_ERROR;
    int code = METHOD_CALLBACK_QUERY_PREPARE;
    std::string sql;
    int flag;

    unpacker.unpack_all (sql, flag);

    auto get_prepare_info = [&] (const cubmem::block & b)
    {
      packing_unpacker unpacker (b.ptr, (size_t) b.dim);

      int res_code;
      unpacker.unpack_int (res_code);

      if (res_code == METHOD_RESPONSE_SUCCESS)
    {
      prepare_info info;
      info.unpack (unpacker);

      m_stack->add_query_handler (info.handle_id);
    }

      m_stack->send_data_to_java (b);

      return error;
    };

    error = m_stack->send_data_to_client_recv (get_prepare_info, code, sql, flag, m_stack->get_tran_id ());
    return error;
  }

  int
  executor::callback_execute (cubthread::entry &thread_ref, packing_unpacker &unpacker)
  {
    int error = NO_ERROR;
    int code = METHOD_CALLBACK_QUERY_EXECUTE;
    execute_request request;

    unpacker.unpack_all (request);
    request.has_parameter = 1;

    auto get_execute_info = [&] (const cubmem::block & b)
    {
      packing_unpacker unpacker (b.ptr, (size_t) b.dim);

      int res_code;
      unpacker.unpack_int (res_code);

      if (res_code == METHOD_RESPONSE_SUCCESS)
    {
      execute_info info;
      info.unpack (unpacker);

      query_result_info &current_result_info = info.qresult_info;
      int stmt_type = current_result_info.stmt_type;
      if (stmt_type == CUBRID_STMT_SELECT)
        {
          std::uint64_t qid = current_result_info.query_id;
          if (current_result_info.tuple_count > 0)
        {
          int hid = info.handle_id;
          bool is_oid_included = current_result_info.include_oid;
          (void) m_stack->add_cursor (hid, qid, is_oid_included);
        }
          else
        {
          QMGR_QUERY_ENTRY *query_entry = qmgr_get_query_entry (&thread_ref, qid, NULL_TRAN_INDEX);
          if (query_entry)
            {
              // Since the list was not created in this thread,
              // incrementing the count of the list (m_qlist_count) is required
              // to make the assertion on m_qlist_count in qexec_execute_query() hold
              qfile_update_qlist_count (&thread_ref, query_entry->list_id, 1);
              qfile_close_list (&thread_ref, query_entry->list_id);
            }

          xqmgr_end_query (&thread_ref, qid);
        }
        }
    }

      error = m_stack->send_data_to_java (b);
      return error;
    };

    error = m_stack->send_data_to_client_recv (get_execute_info, code, request);
    request.clear ();

    return error;
  }

  int
  executor::callback_fetch (cubthread::entry &thread_ref, packing_unpacker &unpacker)
  {
    int error = NO_ERROR;
    int code = METHOD_CALLBACK_FETCH;
    std::uint64_t qid;
    int pos;
    int fetch_count;
    int fetch_flag;

    unpacker.unpack_all (qid, pos, fetch_count, fetch_flag);

    /* find query cursor */
    query_cursor *cursor = m_stack->get_cursor (qid);
    if (cursor == nullptr)
      {
    cubmem::block b = std::move (pack_data_block (METHOD_RESPONSE_ERROR, ER_SP_INVALID_CURSOR,
                     std::string ("cursor closed"), ARG_FILE_LINE));
    error = m_stack->send_data_to_java (b);
    return error;
      }

    if (cursor->get_is_opened () == false)
      {
    cursor->open ();
      }

    cursor->set_fetch_count (fetch_count);

    fetch_info info;

    SCAN_CODE s_code = S_SUCCESS;

    /* Most cases, fetch_count will be the same value
     * To handle an invalid value of fetch_count is set at `cursor->set_fetch_count (fetch_count);`
     * Here, I'm going to get the fetch_count from the getter again.
    */
    fetch_count = cursor->get_fetch_count ();

    int start_index = cursor->get_current_index ();
    while (s_code == S_SUCCESS)
      {
    s_code = cursor->next_row ();
    if (s_code == S_END || s_code == S_ERROR)
      {
        break;
      }

    int tuple_index = cursor->get_current_index ();
    std::vector<DB_VALUE> tuple_values = cursor->get_current_tuple ();

    if (cursor->get_is_oid_included())
      {
        /* FIXME!!: For more optimized way, refactoring method_query_cursor is needed */
        OID *oid = cursor->get_current_oid ();
        std::vector<DB_VALUE> sub_vector = {tuple_values.begin() + 1, tuple_values.end ()};
        info.tuples.emplace_back (tuple_index, sub_vector, *oid);
      }
    else
      {
        info.tuples.emplace_back (tuple_index, tuple_values);
      }

    if (tuple_index - start_index >= fetch_count - 1)
      {
        break;
      }
      }

    cubmem::block blk;
    if (s_code != S_ERROR)
      {
    blk = std::move (pack_data_block (METHOD_RESPONSE_SUCCESS, info));
      }
    else
      {
    blk = std::move (pack_data_block (METHOD_RESPONSE_ERROR, ER_SP_INVALID_CURSOR,
                      std::string ("cursor closed"), ARG_FILE_LINE));
      }

    error = m_stack->send_data_to_java (blk);
    blk.freemem ();
    return error;
  }

  int
  executor::callback_oid_get (cubthread::entry &thread_ref, packing_unpacker &unpacker)
  {
    int error = NO_ERROR;
    int code = METHOD_CALLBACK_OID_GET;
    oid_get_request request;
    request.unpack (unpacker);

    auto java_lambda = [&] (const cubmem::block & b)
    {
      return m_stack->send_data_to_java (b);
    };

    error = m_stack->send_data_to_client_recv (java_lambda, code, request);
    return error;
  }

  int
  executor::callback_oid_put (cubthread::entry &thread_ref, packing_unpacker &unpacker)
  {
    int error = NO_ERROR;
    int code = METHOD_CALLBACK_OID_PUT;
    oid_put_request request;
    request.is_compatible_java = true;
    request.unpack (unpacker);
    request.is_compatible_java = false;

    auto java_lambda = [&] (const cubmem::block & b)
    {
      return m_stack->send_data_to_java (b);
    };

    error = m_stack->send_data_to_client_recv (java_lambda, code, request);
    return error;
  }

  int
  executor::callback_oid_cmd (cubthread::entry &thread_ref, packing_unpacker &unpacker)
  {
    int error = NO_ERROR;
    int code = METHOD_CALLBACK_OID_CMD;
    int command;
    OID oid;
    unpacker.unpack_all (command, oid);

    auto java_lambda = [&] (const cubmem::block & b)
    {
      return m_stack->send_data_to_java (b);
    };

    error = m_stack->send_data_to_client_recv (java_lambda, code, command, oid);
    return error;
  }

  int
  executor::callback_collection_cmd (cubthread::entry &thread_ref, packing_unpacker &unpacker)
  {
    int error = NO_ERROR;

    int code = METHOD_CALLBACK_COLLECTION;
    collection_cmd_request request;
    request.is_compatible_java = true;
    request.unpack (unpacker);

    request.is_compatible_java = false;

    auto java_lambda = [&] (const cubmem::block & b)
    {
      return m_stack->send_data_to_java (b);
    };

    error = m_stack->send_data_to_client_recv (java_lambda, code, request);
    return error;
  }

  int
  executor::callback_make_outresult (cubthread::entry &thread_ref, packing_unpacker &unpacker)
  {
    int error = NO_ERROR;

    int code = METHOD_CALLBACK_MAKE_OUT_RS;
    uint64_t query_id;
    unpacker.unpack_all (query_id);

    auto get_make_outresult_info = [&] (const cubmem::block & b)
    {
      packing_unpacker unpacker (b.ptr, (size_t) b.dim);

      int res_code;
      unpacker.unpack_int (res_code);

      if (res_code == METHOD_RESPONSE_SUCCESS)
    {
      make_outresult_info info;
      info.unpack (unpacker);

      const query_result_info &current_result_info = info.qresult_info;
      query_cursor *cursor = m_stack->get_cursor (current_result_info.query_id);
      if (cursor)
        {
          cursor->change_owner (&thread_ref);
          return m_stack->send_data_to_java (b);
        }
      else
        {
          assert (false);
          return ER_FAILED;
        }
    }
      else
    {
      return ER_FAILED;
    }
    };

    error = m_stack->send_data_to_client_recv (get_make_outresult_info, code, query_id);

    return error;
  }

  int
  executor::callback_get_generated_keys (cubthread::entry &thread_ref, packing_unpacker &unpacker)
  {
    int error = NO_ERROR;
    int code = METHOD_CALLBACK_GET_GENERATED_KEYS;
    int handler_id;
    unpacker.unpack_all (handler_id);

    auto java_lambda = [&] (const cubmem::block & b)
    {
      return m_stack->send_data_to_java (b);
    };

    error = m_stack->send_data_to_client_recv (java_lambda, code, handler_id);
    return error;
  }

  int
  executor::callback_end_transaction (cubthread::entry &thread_ref, packing_unpacker &unpacker)
  {
    int error = NO_ERROR;
    int code = METHOD_CALLBACK_END_TRANSACTION;
    int command; // commit=1 or abort=2

    unpacker.unpack_all (command);
    if (command == 2)
      {
    cubpl::session *pl_session = cubpl::get_session ();
    if (pl_session)
      {
        pl_session->destroy_all_cursors();
      }
    else
      {
        return ER_SES_SESSION_EXPIRED;
      }
      }

    auto java_lambda = [&] (const cubmem::block & b)
    {
      return m_stack->send_data_to_java (b);
    };

    error = m_stack->send_data_to_client_recv (java_lambda, code, command);
    return error;
  }

  int
  executor::callback_get_code_attr (cubthread::entry &thread_ref, packing_unpacker &unpacker)
  {
    int error = NO_ERROR;
    int code = METHOD_CALLBACK_GET_CODE_ATTR;

    std::string attr_name;

    DB_VALUE res;
    db_make_null (&res);
    unpacker.unpack_all (attr_name);

    OID *code_oid = &m_sig.ext.sp.code_oid;
    if (OID_ISNULL (code_oid))
      {
    error = ER_FAILED;
      }

    if (error == NO_ERROR)
      {
    error = sp_get_code_attr (&thread_ref, attr_name, code_oid, &res);
      }

    cubmem::block blk;
    if (error == NO_ERROR)
      {
    dbvalue_java java_packer;
    java_packer.value = &res;

    blk = std::move (pack_data_block (error, java_packer));
      }
    else
      {
    blk = std::move (pack_data_block (error));
      }

    db_value_clear (&res);

    error = m_stack->send_data_to_java (blk);
    blk.freemem ();

    return error;
  }

  int
  executor::callback_set_pl_session_param (cubthread::entry &thread_ref, packing_unpacker &unpacker)
  {
    cubpl::session *pl_session = cubpl::get_session ();
    if (!pl_session)
      {
    return ER_SES_SESSION_EXPIRED;
      }

    int error = NO_ERROR;
    int code = METHOD_CALLBACK_SET_PL_SESSION_PARAM;

    std::vector<sys_param> params;
    unpacker.unpack_all (params);

    for (const auto &prm : params)
      {
    if (prm.prm_id < static_cast<int> (sys_param_id::PRM_ID_BEGIN))
      {
        continue;
      }
    else
      {
        pl_session->mark_session_param_changed (prm.prm_id);
        pl_session->set_session_param (prm);
      }
      }

    cubmem::block blk = std::move (pack_data_block (METHOD_RESPONSE_SUCCESS));
    if (blk.is_valid ())
      {
    m_stack->send_data_to_java (blk);
    blk.freemem ();
      }

    return error;
  }
}