File dblink_scan.c¶
File List > cubrid > src > query > dblink_scan.c
Go to the documentation of this file
/*
*
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ident "$Id$"
#include <string.h>
#include "query_executor.h"
// dblink connection handling for distributed transaction
#include "connection_defs.h"
#include "thread_manager.hpp"
#include "query_manager.h"
#include "dblink_scan.h"
#include "xasl.h"
#include "dbtype.h"
#include "object_primitive.h"
#include "object_representation.h"
#include "query_list.h"
#include "regu_var.hpp"
#ifndef DBDEF_HEADER_
#define DBDEF_HEADER_
#endif
#include "db_date.h"
#include "tz_support.h"
#include <cas_cci.h>
#include <db_json.hpp>
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
#define MAX_LEN_CONNECTION_URL 512
// *INDENT-OFF*
#define DATETIME_DECODE(date, dt, m, d, y, hour, min, sec, msec) \
do \
{ \
db_datetime_decode (&(dt), &d, &y, &m, &hour, &min, &sec, &msec); \
date.hh = hour; \
date.mm = min; \
date.ss = sec; \
date.ms = msec; \
date.mon = m; \
date.day = d; \
date.yr = y; \
} \
while (0)
#define TIMESTAMP_DECODE(date, dt, tm, m, d, y, hour, min, sec) \
do \
{ \
db_time_decode (&tm, &hour, &min, &sec); \
db_date_decode (&dt, &m, &d, &y); \
date.hh = hour; \
date.mm = min; \
date.ss = sec; \
date.ms = 0; \
date.mon = m; \
date.day = d; \
date.yr = y; \
} \
while (0)
// *INDENT-ON*
/*
* dblink_scan.c - Routines to implement scanning the values
* received by the cci interface
*/
static int type_map[] = {
0,
CCI_A_TYPE_STR, /* CCI_U_TYPE_CHAR */
CCI_A_TYPE_STR, /* CCI_U_TYPE_STRING */
/* TODO:
* CCI_U_TYPE_NCHAR and CCI_U_TYPE_VARNCHAR will no longer be used(NCHAR was deprecated).
* However, to maintain compatibility with previous versions, the enum list will be preserved.
*/
CCI_A_TYPE_STR, /* CCI_U_TYPE_NCHAR */
CCI_A_TYPE_STR, /* CCI_U_TYPE_VARNCHAR */
CCI_A_TYPE_BIT, /* CCI_U_TYPE_BIT */
CCI_A_TYPE_BIT, /* CCI_U_TYPE_VARBIT */
CCI_A_TYPE_STR, /* CCI_U_TYPE_NUMERIC */
CCI_A_TYPE_INT, /* CCI_U_TYPE_INT */
CCI_A_TYPE_INT, /* CCI_U_TYPE_SHORT */
CCI_A_TYPE_DOUBLE, /* CCI_U_TYPE_MONETARY */
CCI_A_TYPE_FLOAT, /* CCI_U_TYPE_FLOAT */
CCI_A_TYPE_DOUBLE, /* CCI_U_TYPE_DOUBLE */
CCI_A_TYPE_DATE, /* CCI_U_TYPE_DATE */
CCI_A_TYPE_DATE, /* CCI_U_TYPE_TIME */
CCI_A_TYPE_DATE, /* CCI_U_TYPE_TIMESTAMP */
/* not support for collection type, processing as null */
0, /* CCI_U_TYPE_SET */
0, /* CCI_U_TYPE_MULTISET */
0, /* CCI_U_TYPE_SEQUENCE */
0, /* CCI_U_TYPE_OBJECT */
0, /* CCI_U_TYPE_RESULTSET */
CCI_A_TYPE_BIGINT, /* CCI_U_TYPE_BIGINT */
CCI_A_TYPE_DATE, /* CCI_U_TYPE_DATETIME */
/* not support for BLOB, CLOB, and ENUM */
0, /* CCI_U_TYPE_BLOB */
0, /* CCI_U_TYPE_CLOB */
0, /* CCI_U_TYPE_ENUM */
CCI_A_TYPE_UINT, /* CCI_U_TYPE_USHORT */
CCI_A_TYPE_UINT, /* CCI_U_TYPE_UINT */
CCI_A_TYPE_UBIGINT, /* CCI_U_TYPE_UBIGINT */
CCI_A_TYPE_DATE_TZ, /* CCI_U_TYPE_TIMESTAMPTZ */
CCI_A_TYPE_DATE_TZ, /* CCI_U_TYPE_TIMESTAMPLTZ */
CCI_A_TYPE_DATE_TZ, /* CCI_U_TYPE_DATETIMETZ */
CCI_A_TYPE_DATE_TZ, /* CCI_U_TYPE_DATETIMELTZ */
/* Disabled type */
CCI_A_TYPE_DATE_TZ, /* CCI_U_TYPE_TIMETZ - internal only, RESERVED */
/* end of disabled types */
CCI_A_TYPE_STR /* CCI_U_TYPE_JSON */
};
#define NULL_CHECK(ind) \
if ((ind) == -1) break
static T_CCI_U_TYPE
dblink_get_basic_utype (T_CCI_U_EXT_TYPE u_ext_type)
{
if (CCI_IS_SET_TYPE (u_ext_type))
{
return CCI_U_TYPE_SET;
}
else if (CCI_IS_MULTISET_TYPE (u_ext_type))
{
return CCI_U_TYPE_MULTISET;
}
else if (CCI_IS_SEQUENCE_TYPE (u_ext_type))
{
return CCI_U_TYPE_SEQUENCE;
}
else
{
return (T_CCI_U_TYPE) CCI_GET_COLLECTION_DOMAIN (u_ext_type);
}
}
static char *
print_utype_to_string (int type)
{
switch (type)
{
case CCI_U_TYPE_BIT:
return (char *) "bit";
case CCI_U_TYPE_VARBIT:
return (char *) "varbit";
case CCI_U_TYPE_NULL:
return (char *) "null";
case CCI_U_TYPE_SET:
return (char *) "set";
case CCI_U_TYPE_MULTISET:
return (char *) "multiset";
case CCI_U_TYPE_SEQUENCE:
return (char *) "sequence";
case CCI_U_TYPE_OBJECT:
return (char *) "object";
case CCI_U_TYPE_BLOB:
return (char *) "blob";
case CCI_U_TYPE_CLOB:
return (char *) "clob";
case CCI_U_TYPE_JSON:
return (char *) "json";
case CCI_U_TYPE_ENUM:
return (char *) "enum";
default:
return (char *) "";
}
}
static int
dblink_make_cci_value (DB_VALUE * cci_value, T_CCI_U_TYPE utype, void *val, int prec, int len, int codeset)
{
int error;
switch (utype)
{
case CCI_U_TYPE_SHORT:
error = db_make_short (cci_value, *(short *) val);
break;
case CCI_U_TYPE_BIGINT:
error = db_make_bigint (cci_value, *(DB_BIGINT *) val);
break;
case CCI_U_TYPE_INT:
error = db_make_int (cci_value, *(int *) val);
break;
case CCI_U_TYPE_FLOAT:
error = db_make_float (cci_value, *(float *) val);
break;
case CCI_U_TYPE_DOUBLE:
error = db_make_double (cci_value, *(double *) val);
break;
case CCI_U_TYPE_MONETARY:
error = db_make_monetary (cci_value, db_get_currency_default (), *(double *) val);
break;
case CCI_U_TYPE_STRING:
error =
db_make_varchar (cci_value, prec, (DB_CONST_C_CHAR) val, len, codeset, LANG_GET_BINARY_COLLATION (codeset));
break;
case CCI_U_TYPE_CHAR:
error = db_make_char (cci_value, prec, (DB_CONST_C_CHAR) val, len, codeset, LANG_GET_BINARY_COLLATION (codeset));
break;
default:
assert (false);
error = ER_FAILED;
break;
}
return error;
}
static int
dblink_make_date_time (T_CCI_U_TYPE utype, DB_VALUE * value_p, T_CCI_DATE * date_time)
{
DB_TIME t_time;
DB_DATE t_date;
DB_DATETIME t_datetime;
DB_TIMESTAMP t_timestamp;
int error = NO_ERROR;
db_make_null (value_p);
switch (utype)
{
case CCI_U_TYPE_TIME:
error = db_make_time (value_p, date_time->hh, date_time->mm, date_time->ss);
break;
case CCI_U_TYPE_DATE:
error = db_make_date (value_p, date_time->mon, date_time->day, date_time->yr);
break;
case CCI_U_TYPE_DATETIME:
error = db_datetime_encode (&t_datetime, date_time->mon, date_time->day, date_time->yr,
date_time->hh, date_time->mm, date_time->ss, date_time->ms);
if (error != NO_ERROR)
{
break;
}
error = db_make_datetime (value_p, &t_datetime);
break;
case CCI_U_TYPE_TIMESTAMP:
error = db_time_encode (&t_time, date_time->hh, date_time->mm, date_time->ss);
if (error != NO_ERROR)
{
break;
}
error = db_date_encode (&t_date, date_time->mon, date_time->day, date_time->yr);
if (error != NO_ERROR)
{
break;
}
error = db_timestamp_encode (&t_timestamp, &t_date, &t_time);
if (error != NO_ERROR)
{
break;
}
error = db_make_timestamp (value_p, t_timestamp);
break;
default:
assert (false);
break;
}
return error;
}
static int
dblink_make_date_time_tz (T_CCI_U_TYPE utype, DB_VALUE * value_p, T_CCI_DATE_TZ * date_time_tz)
{
DB_TIME t_time;
DB_DATE t_date;
DB_DATETIME t_datetime;
DB_DATETIMETZ tz_datetime;
DB_TIMESTAMPTZ tz_timestamp;
TZ_REGION region;
int error = NO_ERROR;
db_make_null (value_p);
tz_get_session_tz_region (®ion);
switch (utype)
{
case CCI_U_TYPE_TIMESTAMPTZ:
error = db_time_encode (&t_time, date_time_tz->hh, date_time_tz->mm, date_time_tz->ss);
if (error != NO_ERROR)
{
break;
}
error = db_date_encode (&t_date, date_time_tz->mon, date_time_tz->day, date_time_tz->yr);
if (error != NO_ERROR)
{
break;
}
error =
tz_create_timestamptz (&t_date, &t_time, date_time_tz->tz, strlen (date_time_tz->tz), ®ion, &tz_timestamp,
NULL);
if (error != NO_ERROR)
{
break;
}
error = db_make_timestamptz (value_p, &tz_timestamp);
break;
case CCI_U_TYPE_DATETIMETZ:
error = db_datetime_encode (&t_datetime, date_time_tz->mon, date_time_tz->day, date_time_tz->yr,
date_time_tz->hh, date_time_tz->mm, date_time_tz->ss, date_time_tz->ms);
if (error != NO_ERROR)
{
break;
}
error =
tz_create_datetimetz (&t_datetime, date_time_tz->tz, strlen (date_time_tz->tz), ®ion, &tz_datetime, NULL);
if (error != NO_ERROR)
{
break;
}
error = db_make_datetimetz (value_p, &tz_datetime);
break;
case CCI_U_TYPE_TIMESTAMPLTZ:
error = db_time_encode (&t_time, date_time_tz->hh, date_time_tz->mm, date_time_tz->ss);
if (error != NO_ERROR)
{
break;
}
error = db_date_encode (&t_date, date_time_tz->mon, date_time_tz->day, date_time_tz->yr);
if (error != NO_ERROR)
{
break;
}
error =
tz_create_timestamptz (&t_date, &t_time, date_time_tz->tz, strlen (date_time_tz->tz), ®ion, &tz_timestamp,
NULL);
if (error != NO_ERROR)
{
break;
}
error = db_make_timestampltz (value_p, tz_timestamp.timestamp);
break;
case CCI_U_TYPE_DATETIMELTZ:
error = db_datetime_encode (&t_datetime, date_time_tz->mon, date_time_tz->day, date_time_tz->yr,
date_time_tz->hh, date_time_tz->mm, date_time_tz->ss, date_time_tz->ms);
if (error != NO_ERROR)
{
break;
}
error =
tz_create_datetimetz (&t_datetime, date_time_tz->tz, strlen (date_time_tz->tz), ®ion, &tz_datetime, NULL);
if (error != NO_ERROR)
{
break;
}
error = db_make_datetimeltz (value_p, &tz_datetime.datetime);
break;
default:
assert (false);
break;
}
return error;
}
static int
dblink_bind_param (int stmt_handle, VAL_DESCR * vd, DBLINK_HOST_VARS * host_vars)
{
int i, n, ret, num_size = 0;
T_CCI_A_TYPE a_type;
T_CCI_U_TYPE u_type;
void *value;
int month, day, year;
int hh, mm, ss, ms;
DB_TIMESTAMP *timestamp;
DB_DATETIME *datetime;
DB_DATETIME dt_local;
TZ_ID *zone_id;
DB_DATE date;
DB_TIME time;
T_CCI_DATE cci_date;
T_CCI_BIT cci_bit;
char num_str[NUMERIC_MAX_STRING_SIZE];
unsigned char type;
for (n = 0; n < host_vars->count; n++)
{
i = host_vars->index[n];
value = &vd->dbval_ptr[i].data;
type = vd->dbval_ptr[i].domain.general_info.type;
switch (type)
{
case DB_TYPE_BIT:
case DB_TYPE_VARBIT:
a_type = CCI_A_TYPE_BIT;
u_type = (type == DB_TYPE_BIT) ? CCI_U_TYPE_BIT : CCI_U_TYPE_VARBIT;
value = (void *) &cci_bit;
cci_bit.buf = (char *) db_get_bit (&vd->dbval_ptr[i], &num_size);
cci_bit.size = QSTR_NUM_BYTES (num_size);
break;
case DB_TYPE_JSON:
a_type = CCI_A_TYPE_STR;
u_type = CCI_U_TYPE_JSON;
value = (void *) db_get_json_raw_body (&vd->dbval_ptr[i]);
break;
case DB_TYPE_SHORT:
a_type = CCI_A_TYPE_INT;
u_type = CCI_U_TYPE_SHORT;
break;
case DB_TYPE_INTEGER:
a_type = CCI_A_TYPE_INT;
u_type = CCI_U_TYPE_INT;
break;
case DB_TYPE_BIGINT:
a_type = CCI_A_TYPE_BIGINT;
u_type = CCI_U_TYPE_BIGINT;
break;
case DB_TYPE_NUMERIC:
a_type = CCI_A_TYPE_STR;
u_type = CCI_U_TYPE_NUMERIC;
value = (void *) numeric_db_value_print (&vd->dbval_ptr[i], num_str);
break;
case DB_TYPE_DOUBLE:
case DB_TYPE_FLOAT:
a_type = CCI_A_TYPE_DOUBLE;
u_type = CCI_U_TYPE_DOUBLE;
break;
case DB_TYPE_STRING:
case DB_TYPE_CHAR:
a_type = CCI_A_TYPE_STR;
u_type = CCI_U_TYPE_STRING;
value = (void *) db_get_string (&vd->dbval_ptr[i]);
break;
case DB_TYPE_DATE:
a_type = CCI_A_TYPE_DATE;
u_type = CCI_U_TYPE_DATE;
db_date_decode ((DB_DATE *) value, &month, &day, &year);
cci_date.mon = month;
cci_date.day = day;
cci_date.yr = year;
value = &cci_date;
break;
case DB_TYPE_TIME:
a_type = CCI_A_TYPE_DATE;
u_type = CCI_U_TYPE_TIME;
db_time_decode (&vd->dbval_ptr[i].data.time, &hh, &mm, &ss);
cci_date.hh = hh;
cci_date.mm = mm;
cci_date.ss = ss;
cci_date.ms = 0;
value = &cci_date;
break;
case DB_TYPE_DATETIME:
a_type = CCI_A_TYPE_DATE;
u_type = CCI_U_TYPE_DATETIME;
DATETIME_DECODE (cci_date, vd->dbval_ptr[i].data.datetime, month, day, year, hh, mm, ss, ms);
value = &cci_date;
break;
case DB_TYPE_TIMESTAMP:
a_type = CCI_A_TYPE_DATE;
u_type = CCI_U_TYPE_TIMESTAMP;
timestamp = &vd->dbval_ptr[i].data.utime;
db_timestamp_decode_ses (timestamp, &date, &time);
TIMESTAMP_DECODE (cci_date, date, time, month, day, year, hh, mm, ss);
value = &cci_date;
break;
case DB_TYPE_TIMESTAMPTZ:
a_type = CCI_A_TYPE_DATE;
u_type = CCI_U_TYPE_TIMESTAMPTZ;
timestamp = &vd->dbval_ptr[i].data.timestamptz.timestamp;
zone_id = &vd->dbval_ptr[i].data.timestamptz.tz_id;
db_timestamp_decode_w_tz_id (timestamp, zone_id, &date, &time);
TIMESTAMP_DECODE (cci_date, date, time, month, day, year, hh, mm, ss);
value = &cci_date;
break;
case DB_TYPE_TIMESTAMPLTZ:
a_type = CCI_A_TYPE_DATE;
u_type = CCI_U_TYPE_TIMESTAMPLTZ;
timestamp = &vd->dbval_ptr[i].data.timestamptz.timestamp;
db_timestamp_decode_utc (timestamp, &date, &time);
TIMESTAMP_DECODE (cci_date, date, time, month, day, year, hh, mm, ss);
value = &cci_date;
break;
case DB_TYPE_DATETIMETZ:
a_type = CCI_A_TYPE_DATE;
u_type = CCI_U_TYPE_DATETIMETZ;
datetime = &vd->dbval_ptr[i].data.datetimetz.datetime;
zone_id = &vd->dbval_ptr[i].data.datetimetz.tz_id;
tz_utc_datetimetz_to_local (datetime, zone_id, &dt_local);
DATETIME_DECODE (cci_date, dt_local, month, day, year, hh, mm, ss, ms);
value = &cci_date;
break;
case DB_TYPE_DATETIMELTZ:
a_type = CCI_A_TYPE_DATE;
u_type = CCI_U_TYPE_DATETIMELTZ;
datetime = &vd->dbval_ptr[i].data.datetimetz.datetime;
tz_datetimeltz_to_local (datetime, &dt_local);
DATETIME_DECODE (cci_date, dt_local, month, day, year, hh, mm, ss, ms);
value = &cci_date;
break;
case DB_TYPE_NULL:
a_type = CCI_A_TYPE_LAST; // for clear -Wmaybe-uninitialized
value = NULL;
u_type = CCI_U_TYPE_NULL;
break;
default:
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK_UNSUPPORTED_TYPE, 1, "unknown");
return ER_DBLINK_UNSUPPORTED_TYPE;
}
ret = cci_bind_param (stmt_handle, n + 1, a_type, value, u_type, 0);
if (ret < 0)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK_INVALID_BIND_PARAM, 0);
return ER_DBLINK_INVALID_BIND_PARAM;
}
}
return S_SUCCESS;
}
int
dblink_end_tran (DBLINK_CONN_ENTRY * dblink, bool is_abort)
{
int tran_error = NO_ERROR, rc = NO_ERROR;
T_CCI_ERROR err_buf;
DBLINK_CONN_ENTRY *prev;
while (dblink)
{
rc = cci_end_tran (dblink->conn_info.conn_handle, is_abort ? CCI_TRAN_ROLLBACK : CCI_TRAN_COMMIT, &err_buf);
if (rc < 0 && tran_error == NO_ERROR)
{
is_abort = true;
tran_error = ER_DBLINK_TRAN;
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK_TRAN, 1, err_buf.err_msg);
}
rc = cci_disconnect (dblink->conn_info.conn_handle, &err_buf);
if (rc < 0 && tran_error == NO_ERROR)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK, 1, err_buf.err_msg);
}
/* delete dblink_entry */
prev = dblink;
dblink = dblink->next;
free (prev);
}
return (tran_error == NO_ERROR) ? rc : tran_error;
}
int
dblink_execute_query (THREAD_ENTRY * thread_p, struct access_spec_node *spec, VAL_DESCR * vd,
DBLINK_HOST_VARS * host_vars)
{
static bool auto_commit = prm_get_bool_value (PRM_ID_DBLINK_AUTO_COMMIT);
int ret = NO_ERROR, result, conn_handle, stmt_handle;
T_CCI_ERROR err_buf;
char conn_url[MAX_LEN_CONNECTION_URL] = { 0, };
char *user_name = spec->s.dblink_node.conn_user;
char *password = spec->s.dblink_node.conn_password;
char *sql_text = spec->s.dblink_node.conn_sql;
char *find = strstr (spec->s.dblink_node.conn_url, ":?");
if (find)
{
snprintf (conn_url, MAX_LEN_CONNECTION_URL, "%s%s", spec->s.dblink_node.conn_url, "&__gateway=true");
}
else
{
snprintf (conn_url, MAX_LEN_CONNECTION_URL, "%s%s", spec->s.dblink_node.conn_url, "?__gateway=true");
}
conn_handle = -1;
if (!auto_commit)
{
conn_handle = qmgr_dblink_find_conn_handle (thread_p, spec->s.dblink_node.conn_url, user_name, password, true);
}
if (conn_handle < 0)
{
conn_handle = cci_connect_with_url_ex (conn_url, user_name, password, &err_buf);
if (conn_handle < 0)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK, 1, err_buf.err_msg);
goto error_exit;
}
ret = cci_set_autocommit (conn_handle, (CCI_AUTOCOMMIT_MODE) auto_commit);
if (ret < 0)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK, 1, "set autocommit error");
goto error_exit;
}
if (!auto_commit)
{
ret =
qmgr_dblink_add_conn_handle (thread_p, conn_handle, spec->s.dblink_node.conn_url, user_name, password,
true);
if (ret < 0)
{
/* malloc error */
goto error_exit;
}
}
}
stmt_handle = cci_prepare (conn_handle, sql_text, 0, &err_buf);
if (stmt_handle < 0)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK, 1, err_buf.err_msg);
goto error_exit;
}
if (host_vars->count > 0)
{
if ((ret = dblink_bind_param (stmt_handle, vd, host_vars)) < 0)
{
goto error_exit;
}
}
result = cci_execute (stmt_handle, 0, 0, &err_buf);
if (result < 0)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK, 1, err_buf.err_msg);
goto error_exit;
}
if (auto_commit)
{
ret = cci_disconnect (conn_handle, &err_buf);
if (ret < 0)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK, 1, err_buf.err_msg);
return ER_DBLINK;
}
}
return result;
error_exit:
if (auto_commit)
{
(void) cci_disconnect (conn_handle, &err_buf);
}
return ER_DBLINK;
}
/*
* dblink_open_scan () - open the scan for dblink
* return: int
* scan_info(out) : dblink information
* conn_url(in) : connection URL for dblink
* user_name(in) : user name for dblink
* password(in) : password for dblink
* sql_text(in) : SQL text for dblink
*/
int
dblink_open_scan (THREAD_ENTRY * thread_p, DBLINK_SCAN_INFO * scan_info, struct access_spec_node *spec,
VAL_DESCR * vd, DBLINK_HOST_VARS * host_vars)
{
static bool auto_commit = prm_get_bool_value (PRM_ID_DBLINK_AUTO_COMMIT);
int ret;
T_CCI_ERROR err_buf;
char conn_url[MAX_LEN_CONNECTION_URL] = { 0, };
char *user_name = spec->s.dblink_node.conn_user;
char *password = spec->s.dblink_node.conn_password;
char *sql_text = spec->s.dblink_node.conn_sql;
/* Invariant: a non-reuse open must arrive without an active stmt_handle.
* CCI handles are positive integers; 0 = zero-initialized (never opened), -1 = sentinel
* set by dblink_close_scan after successful close. Both mean "no active statement".
* conn_handle is intentionally excluded: when auto_commit=false the connection is
* pool-managed (qmgr_dblink_*) and conn_handle legitimately remains >= 0 after close. */
assert (scan_info->cursor_rewind || scan_info->stmt_handle <= 0);
/* correlated reuse mode: CCI connection/stmt already open; reposition remote cursor to first row.
* conn/stmt handles are valid only from the second outer row onwards; first call falls through to open. */
if (scan_info->cursor_rewind && scan_info->conn_handle > 0 && scan_info->stmt_handle > 0)
{
ret = cci_cursor (scan_info->stmt_handle, 1, CCI_CURSOR_FIRST, &err_buf);
/* Success (CCI_ER_NO_ERROR) or 0-row remote result (CCI_ER_NO_MORE_DATA, expected). */
if (ret == CCI_ER_NO_ERROR || ret == CCI_ER_NO_MORE_DATA)
{
scan_info->cursor = CCI_CURSOR_FIRST;
return NO_ERROR;
}
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK, 1, err_buf.err_msg);
return ER_DBLINK;
}
char *find = strstr (spec->s.dblink_node.conn_url, ":?");
if (find)
{
snprintf (conn_url, MAX_LEN_CONNECTION_URL, "%s%s", spec->s.dblink_node.conn_url, "&__gateway=true");
}
else
{
snprintf (conn_url, MAX_LEN_CONNECTION_URL, "%s%s", spec->s.dblink_node.conn_url, "?__gateway=true");
}
scan_info->conn_handle = -1;
if (!auto_commit)
{
scan_info->conn_handle =
qmgr_dblink_find_conn_handle (thread_p, spec->s.dblink_node.conn_url, user_name, password, false);
}
if (scan_info->conn_handle < 0)
{
scan_info->conn_handle = cci_connect_with_url_ex (conn_url, user_name, password, &err_buf);
if (scan_info->conn_handle < 0)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK, 1, err_buf.err_msg);
return ER_DBLINK;
}
ret = cci_set_autocommit (scan_info->conn_handle, (CCI_AUTOCOMMIT_MODE) auto_commit);
if (ret < 0)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK, 1, "set autocommit mode");
(void) cci_disconnect (scan_info->conn_handle, &err_buf);
scan_info->conn_handle = -1;
return ER_DBLINK;
}
if (!auto_commit)
{
ret =
qmgr_dblink_add_conn_handle (thread_p, scan_info->conn_handle, spec->s.dblink_node.conn_url, user_name,
password, false);
if (ret < 0)
{
(void) cci_disconnect (scan_info->conn_handle, &err_buf);
scan_info->conn_handle = -1;
return ER_DBLINK;
}
}
}
/* Force autocommit OFF for the duration of this scan.
* cursor_rewind requires the CCI cursor to stay alive across outer rows;
* this is only necessary when cursor_rewind is set and auto_commit is true
* (false connections are already OFF). */
if (scan_info->cursor_rewind && auto_commit)
{
ret = cci_set_autocommit (scan_info->conn_handle, CCI_AUTOCOMMIT_FALSE);
if (ret < 0)
{
cci_get_err_msg (ret, err_buf.err_msg, sizeof (err_buf.err_msg));
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK, 1, err_buf.err_msg);
(void) cci_disconnect (scan_info->conn_handle, &err_buf);
scan_info->conn_handle = -1;
return ER_DBLINK;
}
}
scan_info->stmt_handle = cci_prepare (scan_info->conn_handle, sql_text, 0, &err_buf);
if (scan_info->stmt_handle < 0)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK, 1, err_buf.err_msg);
if (auto_commit && scan_info->conn_handle >= 0)
{
(void) cci_disconnect (scan_info->conn_handle, &err_buf);
scan_info->conn_handle = -1;
}
return ER_DBLINK;
}
if (host_vars->count > 0)
{
if ((ret = dblink_bind_param (scan_info->stmt_handle, vd, host_vars)) < 0)
{
return ER_DBLINK;
}
}
ret = cci_execute (scan_info->stmt_handle, 0, 0, &err_buf);
if (ret < 0)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK, 1, err_buf.err_msg);
return ER_DBLINK;
}
else
{
T_CCI_CUBRID_STMT stmt_type;
scan_info->col_info = (void *) cci_get_result_info (scan_info->stmt_handle, &stmt_type, &scan_info->col_cnt);
if (scan_info->col_info == NULL)
{
/* this can not be reached, something wrong */
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK, 1, "unknown error");
(void) cci_close_req_handle (scan_info->stmt_handle);
scan_info->stmt_handle = -1;
if (auto_commit && scan_info->conn_handle >= 0)
{
(void) cci_disconnect (scan_info->conn_handle, &err_buf);
scan_info->conn_handle = -1;
}
return ER_DBLINK;
}
scan_info->cursor = CCI_CURSOR_FIRST;
}
return NO_ERROR;
}
/*
* dblink_close_scan () -
* return: int
* scan_info(in) : information for dblink
* is_final(in) : true = final teardown (XASL clear path); always close CCI handles.
* false = per-iteration close; skip if cursor_rewind is set so that
* the next outer row can rewind the CCI cursor without re-executing.
*
* Note: stmt_handle and conn_handle are set to -1 after successful close so that repeated calls
* (e.g. via scan_close_scan after an explicit is_final=true call) are safe no-ops.
*/
int
dblink_close_scan (DBLINK_SCAN_INFO * scan_info, bool is_final)
{
int error;
T_CCI_ERROR err_buf;
static bool auto_commit = prm_get_bool_value (PRM_ID_DBLINK_AUTO_COMMIT);
/* note: return NO_ERROR even though the connection or stmt handle is not valid */
/* correlated reuse mode: keep CCI resources open for the next outer-row cursor rewind.
* On is_final=true (called from qexec_clear_xasl spec-list walk) clear the flag and
* fall through to the real teardown below. */
if (scan_info->cursor_rewind)
{
if (!is_final)
{
return NO_ERROR;
}
scan_info->cursor_rewind = 0;
}
/* sentinel: already closed (set below on success) */
if (scan_info->stmt_handle <= 0)
{
return NO_ERROR;
}
if ((error = cci_close_req_handle (scan_info->stmt_handle)) < 0)
{
cci_get_err_msg (error, err_buf.err_msg, sizeof (err_buf.err_msg));
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK, 1, err_buf.err_msg);
scan_info->stmt_handle = -1;
if (auto_commit)
{
(void) cci_disconnect (scan_info->conn_handle, &err_buf);
scan_info->conn_handle = -1;
}
return S_ERROR;
}
scan_info->stmt_handle = -1;
if (scan_info->conn_handle >= 0 && auto_commit)
{
if ((error = cci_disconnect (scan_info->conn_handle, &err_buf)) < 0)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK, 1, err_buf.err_msg);
scan_info->conn_handle = -1;
return S_ERROR;
}
scan_info->conn_handle = -1;
}
return NO_ERROR;
}
/*
* dblink_scan_next () - get next tuple for dblink
* return: SCAN_CODE
* scan_info(in) : information for dblink
* val_list(in) : value list for derived dblink table
*/
SCAN_CODE
dblink_scan_next (DBLINK_SCAN_INFO * scan_info, val_list_node * val_list)
{
T_CCI_ERROR err_buf;
int col_no, col_cnt, ind, error = NO_ERROR;
T_CCI_U_TYPE utype;
T_CCI_BIT bit_val; /* for bit or varbit type */
T_CCI_DATE date_time; /* for date or time type */
T_CCI_DATE_TZ date_time_tz; /* for date or time with zone */
void *value; /* for any other type */
DB_VALUE cci_value = { 0 }; /* from cci interface */
QPROC_DB_VALUE_LIST valptrp;
T_CCI_COL_INFO *col_info = (T_CCI_COL_INFO *) scan_info->col_info;
int codeset;
col_cnt = scan_info->col_cnt;
assert (scan_info->stmt_handle >= 0);
if ((error = cci_cursor (scan_info->stmt_handle, 1, (T_CCI_CURSOR_POS) scan_info->cursor, &err_buf)) < 0)
{
if (error == CCI_ER_NO_MORE_DATA)
{
return S_END;
}
else
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK, 1, err_buf.err_msg);
goto close_exit;
}
}
/* for next scan, set cursor posioning */
scan_info->cursor = CCI_CURSOR_CURRENT;
if ((error = cci_fetch (scan_info->stmt_handle, &err_buf)) < 0)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK, 1, err_buf.err_msg);
goto close_exit;
}
assert (col_info);
assert (val_list->valp);
if (val_list->val_cnt != col_cnt)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK_INVALID_COLUMNS_SPECIFIED, 0);
goto close_exit;
}
for (valptrp = val_list->valp, col_no = 1; col_no <= col_cnt; col_no++, valptrp = valptrp->next)
{
DB_DATA cci_data;
int prec = col_info[col_no - 1].precision;
pr_clear_value (valptrp->val);
valptrp->val->domain.general_info.is_null = 0;
utype = dblink_get_basic_utype (CCI_GET_RESULT_INFO_TYPE (col_info, col_no));
value = &cci_data;
codeset = col_info[col_no - 1].charset;
switch (utype)
{
case CCI_U_TYPE_NULL:
ind = -1;
break;
case CCI_U_TYPE_BIGINT:
case CCI_U_TYPE_INT:
case CCI_U_TYPE_SHORT:
case CCI_U_TYPE_FLOAT:
case CCI_U_TYPE_DOUBLE:
case CCI_U_TYPE_MONETARY:
if ((error = cci_get_data (scan_info->stmt_handle, col_no, type_map[utype], value, &ind)) < 0)
{
goto error_exit;
}
NULL_CHECK (ind);
(void) dblink_make_cci_value (&cci_value, utype, value, prec, ind, codeset);
break;
case CCI_U_TYPE_NUMERIC:
if ((error = cci_get_data (scan_info->stmt_handle, col_no, type_map[utype], &value, &ind)) < 0)
{
goto error_exit;
}
NULL_CHECK (ind);
error = numeric_coerce_string_to_num ((char *) value, ind, (INTL_CODESET) codeset, &cci_value);
break;
case CCI_U_TYPE_STRING:
case CCI_U_TYPE_CHAR:
case CCI_U_TYPE_JSON:
if ((error = cci_get_data (scan_info->stmt_handle, col_no, type_map[utype], &value, &ind)) < 0)
{
goto error_exit;
}
NULL_CHECK (ind);
if (utype == CCI_U_TYPE_JSON)
{
JSON_DOC *json_doc = NULL;
error = db_json_get_json_from_str ((char *) value, json_doc, ind);
if (error != NO_ERROR)
{
goto close_exit;
}
(void) db_make_json (&cci_value, json_doc, true);
}
else
{
(void) dblink_make_cci_value (&cci_value, utype, value, prec, ind, codeset);
}
break;
case CCI_U_TYPE_BIT:
case CCI_U_TYPE_VARBIT:
if ((error = cci_get_data (scan_info->stmt_handle, col_no, type_map[utype], &bit_val, &ind)) < 0)
{
goto error_exit;
}
NULL_CHECK (ind);
if (utype == CCI_U_TYPE_BIT)
{
/* bit_val.size * 8 : bit length for the value */
(void) db_make_bit (&cci_value, prec, bit_val.buf, bit_val.size * 8);
}
else
{
/* bit_val.size * 8 : bit length for the value */
(void) db_make_varbit (&cci_value, prec, bit_val.buf, bit_val.size * 8);
}
break;
case CCI_U_TYPE_DATE:
case CCI_U_TYPE_TIME:
case CCI_U_TYPE_TIMESTAMP:
case CCI_U_TYPE_DATETIME:
if ((error = cci_get_data (scan_info->stmt_handle, col_no, type_map[utype], &date_time, &ind)) < 0)
{
goto error_exit;
}
NULL_CHECK (ind);
error = dblink_make_date_time (utype, &cci_value, &date_time);
break;
case CCI_U_TYPE_DATETIMETZ:
case CCI_U_TYPE_DATETIMELTZ:
case CCI_U_TYPE_TIMESTAMPTZ:
case CCI_U_TYPE_TIMESTAMPLTZ:
if ((error = cci_get_data (scan_info->stmt_handle, col_no, type_map[utype], &date_time_tz, &ind)) < 0)
{
goto error_exit;
}
NULL_CHECK (ind);
error = dblink_make_date_time_tz (utype, &cci_value, &date_time_tz);
break;
default:
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK_UNSUPPORTED_TYPE, 1, print_utype_to_string (utype));
goto close_exit;
}
if (error < 0)
{
break;
}
if (ind == -1)
{
valptrp->val->domain.general_info.is_null = 1;
}
else
{
TP_DOMAIN dom;
TP_DOMAIN_STATUS status;
tp_domain_init (&dom, (DB_TYPE) db_value_domain_type (valptrp->val));
dom.precision = db_value_precision (valptrp->val);
dom.collation_id = db_get_string_collation (valptrp->val);
dom.codeset = db_get_string_codeset (valptrp->val);
dom.scale = db_value_scale (valptrp->val);
if ((status =
tp_value_cast_preserve_domain (&cci_value, valptrp->val, &dom, false, true)) != DOMAIN_COMPATIBLE)
{
(void) tp_domain_status_er_set (status, ARG_FILE_LINE, &cci_value, &dom);
goto close_exit;
}
}
if (cci_value.need_clear)
{
pr_clear_value (&cci_value);
}
}
if (error != NO_ERROR)
{
goto close_exit;
}
return S_SUCCESS;
error_exit:
cci_get_err_msg (error, err_buf.err_msg, sizeof (err_buf.err_msg));
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DBLINK, 1, err_buf.err_msg);
close_exit:
if (cci_value.need_clear)
{
pr_clear_value (&cci_value);
}
return S_ERROR;
}
/*
* dblink_scan_reset () - reset the cursor
* return: SCAN_CODE
* scan_info(in) : information for dblink
*/
SCAN_CODE
dblink_scan_reset (DBLINK_SCAN_INFO * scan_info)
{
assert (scan_info->conn_handle >= 0 && scan_info->stmt_handle >= 0);
scan_info->cursor = CCI_CURSOR_FIRST;
return S_SUCCESS;
}