Skip to content

File query_hash_join.h

File List > cubrid > src > query > query_hash_join.h

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * query_hash_join.h
 */

#ifndef _QUERY_HASH_JOIN_H_
#define _QUERY_HASH_JOIN_H_

#include "regu_var.hpp"     /* REGU_VARIABLE_LIST */

#if defined (SERVER_MODE) || defined (SA_MODE)
#include "query_hash_scan.h"    /* HASH_METHOD */
#include "system.h"     /* UINT32, UINT64 */
#include "thread_entry.hpp" /* THREAD_ENTRY */
#include "tsc_timer.h"      /* TSC_TICKS, TSCTIMEVAL, TSC_ADD_TIMEVAL */
#include "xasl_predicate.hpp"   /* PRED_EXPR */
#endif /* defined (SERVER_MODE) || defined (SA_MODE) */

/*
 * Debug Macros
 */

#define HASHJOIN_PROFILE_TIME 0
#define HASHJOIN_COLLISION_RATE 0
#define HASHJOIN_DUMP_PARTITION 0
#define HASHJOIN_DUMP_HASH_TABLE 0
#define HASHJOIN_DUMP_BUILD 0
#define HASHJOIN_DUMP_PROBE 0

/*
 * Forward Declarations
 */

namespace parallel_query
{
  class worker_manager;
}

struct xasl_node;
struct tp_domain;

typedef struct xasl_node XASL_NODE;
typedef struct tp_domain TP_DOMAIN;

/*
 * Enum & Typedef Definitions
 */

typedef enum hashjoin_status
{
  HASHJOIN_STATUS_NONE = 0,
  HASHJOIN_STATUS_FILL_NULL_VALUES,
  HASHJOIN_STATUS_TRY,
  HASHJOIN_STATUS_SINGLE,
  HASHJOIN_STATUS_PARTITION,
  HASHJOIN_STATUS_PARALLEL,
  HASHJOIN_STATUS_END,
  HASHJOIN_STATUS_ERROR
} HASHJOIN_STATUS;

typedef enum hashjoin_merge_method
{
  HASHJOIN_MERGE_COMBINE = 0,
  HASHJOIN_MERGE_APPEND,
  HASHJOIN_MERGE_CONNECT
} HASHJOIN_MERGE_METHOD;

typedef enum hashjoin_profile_step
{
  HASHJOIN_PROFILE_NONE = 0,
  HASHJOIN_PROFILE_BUILD_FETCH, /* hjoin_fetch_key */
  HASHJOIN_PROFILE_BUILD_HASH,  /* qdata_hash_scan_key */
  HASHJOIN_PROFILE_BUILD_INSERT,    /* hjoin_build_key */
  HASHJOIN_PROFILE_PROBE_FETCH, /* hjoin_fetch_key */
  HASHJOIN_PROFILE_PROBE_HASH,  /* qdata_hash_scan_key */
  HASHJOIN_PROFILE_PROBE_SEARCH,    /* hjoin_probe_key */
  HASHJOIN_PROFILE_PROBE_MATCH, /* hjoin_fetch_key */
  HASHJOIN_PROFILE_PROBE_ADD,   /* hjoin_merge_tuple_to_list_id */
  HASHJOIN_PROFILE_MERGE    /* hjoin_merge_qlist */
} HASHJOIN_PROFILE_STEP;

/*
 * Struct & Typedef Definitions
 */

typedef struct hashjoin_input
{
  XASL_NODE *xasl;

  /* For evaluating during-join predicates. */
  REGU_VARIABLE_LIST regu_list_pred;
} HASHJOIN_INPUT;

typedef struct hashjoin_input_domain_info
{
  TP_DOMAIN **domains;
  int *value_indexes;
} HASHJOIN_INPUT_DOMAIN_INFO;

typedef struct hashjoin_domain_info
{
  HASHJOIN_INPUT_DOMAIN_INFO outer;
  HASHJOIN_INPUT_DOMAIN_INFO inner;

  /* Common domains of build and probe inputs. */
  TP_DOMAIN **coerce_domains;

  /* Whether to use the coerce domain. */
  bool need_coerce_domains;
} HASHJOIN_DOMAIN_INFO;

#if defined (SERVER_MODE) || defined (SA_MODE)

typedef struct hashjoin_range_time_stats
{
  TSCTIMEVAL min;
  TSCTIMEVAL max;
} HASHJOIN_RANGE_TIME_STATS;
#define HASHJOIN_RANGE_TIME_STATS_INITIALIZER { { LONG_MAX, 999999 }, { 0, 0 } }

typedef struct hashjoin_input_stats
{
  TSCTIMEVAL elapsed_time;
  HASHJOIN_RANGE_TIME_STATS range_time;
  UINT64 fetches;
  UINT64 ioreads;
  UINT64 read_rows;
  UINT64 read_keys;
  UINT64 qualified_rows;
} HASHJOIN_INPUT_STATS;

#if HASHJOIN_PROFILE_TIME
typedef struct hashjoin_profile_stats
{
  struct
  {
    TSCTIMEVAL fetch;       /* hjoin_fetch_key */
    TSCTIMEVAL hash;        /* qdata_hash_scan_key */
    TSCTIMEVAL insert;      /* hjoin_build_key */
  } build;

  struct
  {
    TSCTIMEVAL fetch;       /* hjoin_fetch_key */
    TSCTIMEVAL hash;        /* qdata_hash_scan_key */
    TSCTIMEVAL search;      /* hjoin_probe_key */
    TSCTIMEVAL match;       /* hjoin_fetch_key */
    TSCTIMEVAL add;     /* hjoin_merge_tuple_to_list_id */
  } probe;

  struct
  {
    TSCTIMEVAL elapsed_time;    /* hjoin_fetch_key */
    UINT64 fetches;
    UINT64 ioreads;
    UINT64 qualified_rows;
  } merge;
} HASHJOIN_PROFILE_STATS;
#endif /* HASHJOIN_PROFILE_TIME */

typedef struct hashjoin_start_stats
{
  TSC_TICKS tick;
  UINT64 fetches;
  UINT64 ioreads;
  HASHJOIN_PROFILE_STEP step;
} HASHJOIN_START_STATS;
#define HASHJOIN_START_STATS_INITIALIZER { { 0 }, 0, 0, HASHJOIN_PROFILE_NONE }

typedef struct hashjoin_stats
{
  UINT32 num_parallel_threads;

  HASH_METHOD hash_method;
  bool use_hash_memory;
  bool use_hash_hybrid;
  bool use_hash_file;
  bool use_hash_skip;

  bool swap_join_inputs;

  double collision_rate;

  HASHJOIN_INPUT_STATS split;
  HASHJOIN_INPUT_STATS parallel;
  HASHJOIN_INPUT_STATS build;
  HASHJOIN_INPUT_STATS probe;

#if HASHJOIN_PROFILE_TIME
  HASHJOIN_INPUT_STATS merge;
  HASHJOIN_PROFILE_STATS profile;
#endif              /* HASHJOIN_PROFILE_TIME */
} HASHJOIN_STATS;

typedef struct hashjoin_stats_group
{
  HASHJOIN_STATS stats;
  HASHJOIN_STATS *context_stats;
  UINT32 context_cnt;
} HASHJOIN_STATS_GROUP;

/* HASHJOIN_FETCH_INFO */
typedef struct hashjoin_fetch_info
{
  QFILE_LIST_ID *list_id;
  QFILE_LIST_SCAN_ID list_scan_id;
  QFILE_TUPLE_RECORD tuple_record;
  QFILE_TUPLE_RECORD *fill_record;

  /* Pointers to members of HASHJOIN_DOMAIN_INFO,
   * which is a member of HASHJOIN_PROC_NODE. */
  HASHJOIN_INPUT_DOMAIN_INFO *input;
  TP_DOMAIN **coerce_domains;
  bool need_coerce_domains;

  /* Pointer to a member of HASHJOIN_INPUT. */
  REGU_VARIABLE_LIST regu_list_pred;
} HASHJOIN_FETCH_INFO;

/* HASHJOIN_INPUT_SPLIT_INFO */
typedef struct hashjoin_input_split_info
{
  HASHJOIN_FETCH_INFO *fetch_info;
  QFILE_LIST_ID **part_list_id;
} HASHJOIN_INPUT_SPLIT_INFO;

/* HASHJOIN_SPLIT_INFO */
typedef struct hashjoin_split_info
{
  HASHJOIN_INPUT_SPLIT_INFO outer;
  HASHJOIN_INPUT_SPLIT_INFO inner;
} HASHJOIN_SPLIT_INFO;

/* HASHJOIN_SHARED_SPLIT_INFO */
typedef struct hashjoin_shared_split_info
{
  // *INDENT-OFF*
  QFILE_LIST_SECTOR_INFO sector_info;   /* sector-based page distribution (from qfile_collect_list_sector_info) */
  std::atomic<bool> membuf_claimed; /* atomic flag: one worker claims all membuf pages */
  std::atomic<int> next_sector_index;   /* atomic index for sector distribution */
  std::mutex *part_mutexes;

  hashjoin_shared_split_info ()
    : sector_info (QFILE_LIST_SECTOR_INFO_INITIALIZER)
    , membuf_claimed (false)
    , next_sector_index (0)
    , part_mutexes (nullptr)
  {
    //
  }
  // *INDENT-ON*
} HASHJOIN_SHARED_SPLIT_INFO;

/* HASHJOIN_SHARED_JOIN_INFO */
typedef struct hashjoin_shared_join_info
{
  // *INDENT-OFF*
  std::mutex scan_mutex;
  SCAN_POSITION scan_position;
  UINT32 next_index;

  std::mutex stats_mutex;
  HASHJOIN_RANGE_TIME_STATS build_range_time;
  HASHJOIN_RANGE_TIME_STATS probe_range_time;

  hashjoin_shared_join_info ()
    : scan_mutex ()
    , scan_position (S_BEFORE)
    , next_index (0)
    , stats_mutex ()
    , build_range_time (HASHJOIN_RANGE_TIME_STATS_INITIALIZER)
    , probe_range_time (HASHJOIN_RANGE_TIME_STATS_INITIALIZER)
  {
    //
  }
  // *INDENT-ON*
} HASHJOIN_SHARED_JOIN_INFO;

/* HASHJOIN_CONTEXT*/
typedef struct hashjoin_context
{
  QFILE_LIST_ID *list_id;

  HASHJOIN_FETCH_INFO outer;
  HASHJOIN_FETCH_INFO inner;

  /* Set in hjoin_init_context or hjoin_outer_fill_null_values. */
  HASHJOIN_FETCH_INFO *build;
  HASHJOIN_FETCH_INFO *probe;

  HASH_LIST_SCAN hash_scan;
  PRED_EXPR *during_join_pred;
  VAL_DESCR *val_descr;

  HASHJOIN_STATUS status;

  /* Pointer to a member of HASHJOIN_MANAGER. */
  HASHJOIN_STATS *stats;
} HASHJOIN_CONTEXT;

/* HASHJOIN_MANAGER*/
typedef struct hashjoin_manager
{
  /* Pointer to a member of HASHJOIN_PROC_NODE. */
  HASHJOIN_INPUT *outer;
  HASHJOIN_INPUT *inner;
  QFILE_LIST_MERGE_INFO *merge_info;

  /* Copy of a member of QFILE_LIST_MERGE_INFO. */
  JOIN_TYPE join_type;
  int key_cnt;

  /* Pointer to a member of XASL_NODE. */
  PRED_EXPR *during_join_pred;
  int num_parallel_threads;

  /* Pointer to a member of XASL_STATE. */
  QUERY_ID query_id;
  VAL_DESCR *val_descr;

  HASHJOIN_CONTEXT single_context;
  HASHJOIN_CONTEXT *contexts;
  UINT32 context_cnt;

  QFILE_TUPLE_VALUE_TYPE_LIST type_list;
  HASHJOIN_MERGE_METHOD qlist_merge_method;
  int qlist_flag;

  // *INDENT-OFF*
  parallel_query::worker_manager *px_worker_manager;
  // *INDENT-ON*
  UINT64 *px_worker_stats;

  /* From HASHJOIN_PROC_NODE */
  HASHJOIN_STATS_GROUP *stats_group;

#if HASHJOIN_DUMP_HASH_TABLE
  pthread_mutex_t dump_hash_table_mutex;
#endif              /* HASHJOIN_DUMP_HASH_TABLE */
} HASHJOIN_MANAGER;

/*
 * Macro Function Declarations
 */

#if HASHJOIN_PROFILE_TIME
#define HJOIN_PROFILE_START(thread_p, start_stats_p, step) \
   if (thread_is_on_trace ((thread_p))) \
     { \
       hjoin_profile_start ((thread_p), (start_stats_p), (step)); \
     }
#define HJOIN_PROFILE_END(thread_p, stats_p, start_stats_p, step) \
   if (thread_is_on_trace ((thread_p))) \
     { \
       hjoin_profile_end ((thread_p), (stats_p), (start_stats_p), (step)); \
     }
#define HJOIN_PROFILE_MERGE_END(thread_p, stats_p, start_stats_p, step, rows) \
   if (thread_is_on_trace ((thread_p))) \
     { \
       assert ((step) == HASHJOIN_PROFILE_MERGE); \
       hjoin_profile_end ((thread_p), (stats_p), (start_stats_p), (step)); \
       (stats_p)->merge.qualified_rows = (rows); \
     }
#else
#define HJOIN_PROFILE_START(thread_p, start_stats, step) ((void) 0)
#define HJOIN_PROFILE_END(thread_p, stats_p, start_stats_p, step) ((void) 0)
#define HJOIN_PROFILE_MERGE_END(thread_p, stats_p, start_stats_p, step, rows) ((void) 0)
#endif /* HASHJOIN_PROFILE_TIME */

/*
 * Function Declarations
 */

int qexec_hash_join (THREAD_ENTRY * thread_p, XASL_NODE * xasl, QUERY_ID query_id, VAL_DESCR * val_descr);

int hjoin_execute (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_CONTEXT * context);
int hjoin_merge_qlist (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_CONTEXT * context);

int hjoin_init_shared_split_info (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager,
                  HASHJOIN_SHARED_SPLIT_INFO * shared_info);
void hjoin_clear_shared_split_info (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager,
                    HASHJOIN_SHARED_SPLIT_INFO * shared_info);

int hjoin_fetch_key (THREAD_ENTRY * thread_p, HASHJOIN_FETCH_INFO * fetch_info, QFILE_TUPLE_RECORD * tuple_record,
             HASH_SCAN_KEY * key, HASH_SCAN_KEY * compare_key, bool * need_skip_next);
void hjoin_update_tuple_hash_key (THREAD_ENTRY * thread_p, QFILE_TUPLE_RECORD * tuple_record, UINT32 hash_key);

void hjoin_trace_start (THREAD_ENTRY * thread_p, HASHJOIN_START_STATS * start_stats);
void hjoin_trace_end (THREAD_ENTRY * thread_p, HASHJOIN_INPUT_STATS * stats, HASHJOIN_START_STATS * start_stats);
void hjoin_trace_merge_stats (HASHJOIN_STATS * stats, HASHJOIN_STATS * context_stats);

UINT64 *hjoin_trace_get_worker_stats (HASHJOIN_MANAGER * manager, int index);
void hjoin_trace_drain_worker_stats (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager);

#if HASHJOIN_PROFILE_TIME
void hjoin_profile_start (THREAD_ENTRY * thread_p, HASHJOIN_START_STATS * start_stats, HASHJOIN_PROFILE_STEP step);
void hjoin_profile_end (THREAD_ENTRY * thread_p, HASHJOIN_PROFILE_STATS * stats, HASHJOIN_START_STATS * start_stats,
            HASHJOIN_PROFILE_STEP step);
#endif /* HASHJOIN_PROFILE_TIME */

#endif /* defined (SERVER_MODE) || defined (SA_MODE) */

#endif /* _QUERY_HASH_JOIN_H_ */