Merge remote-tracking branch 'origin/3.0' into feature/dnode
This commit is contained in:
commit
74daf8d1cf
|
@ -17,6 +17,7 @@
|
||||||
#define _TD_INDEX_H_
|
#define _TD_INDEX_H_
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
#include "taoserror.h"
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
@ -41,11 +42,12 @@ typedef enum {
|
||||||
UPDATE_VALUE, // update index column value
|
UPDATE_VALUE, // update index column value
|
||||||
ADD_INDEX, // add index on specify column
|
ADD_INDEX, // add index on specify column
|
||||||
DROP_INDEX, // drop existed index
|
DROP_INDEX, // drop existed index
|
||||||
DROP_SATBLE // drop stable
|
DROP_SATBLE, // drop stable
|
||||||
|
DEFAULT // query
|
||||||
} SIndexOperOnColumn;
|
} SIndexOperOnColumn;
|
||||||
|
|
||||||
typedef enum { MUST = 0, SHOULD = 1, NOT = 2 } EIndexOperatorType;
|
typedef enum { MUST = 0, SHOULD, NOT } EIndexOperatorType;
|
||||||
typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2, QUERY_REGEX = 3, QUERY_RANGE = 4 } EIndexQueryType;
|
typedef enum { QUERY_TERM = 0, QUERY_PREFIX, QUERY_SUFFIX, QUERY_REGEX, QUERY_RANGE } EIndexQueryType;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* create multi query
|
* create multi query
|
||||||
|
@ -166,8 +168,8 @@ void indexOptsDestroy(SIndexOpts* opts);
|
||||||
* @param:
|
* @param:
|
||||||
*/
|
*/
|
||||||
|
|
||||||
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn operType, uint8_t colType, const char* colName,
|
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn operType, int8_t qType, uint8_t colType,
|
||||||
int32_t nColName, const char* colVal, int32_t nColVal);
|
const char* colName, int32_t nColName, const char* colVal, int32_t nColVal);
|
||||||
void indexTermDestroy(SIndexTerm* p);
|
void indexTermDestroy(SIndexTerm* p);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -119,6 +119,17 @@ typedef struct SLimit {
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
} SLimit;
|
} SLimit;
|
||||||
|
|
||||||
|
typedef struct SFileBlockLoadRecorder {
|
||||||
|
uint64_t totalRows;
|
||||||
|
uint64_t totalCheckedRows;
|
||||||
|
uint32_t totalBlocks;
|
||||||
|
uint32_t loadBlocks;
|
||||||
|
uint32_t loadBlockStatis;
|
||||||
|
uint32_t skipBlocks;
|
||||||
|
uint32_t filterOutBlocks;
|
||||||
|
uint64_t elapsedTime;
|
||||||
|
} SFileBlockLoadRecorder;
|
||||||
|
|
||||||
typedef struct STaskCostInfo {
|
typedef struct STaskCostInfo {
|
||||||
int64_t created;
|
int64_t created;
|
||||||
int64_t start;
|
int64_t start;
|
||||||
|
@ -132,14 +143,10 @@ typedef struct STaskCostInfo {
|
||||||
uint64_t loadDataInCacheSize;
|
uint64_t loadDataInCacheSize;
|
||||||
|
|
||||||
uint64_t loadDataTime;
|
uint64_t loadDataTime;
|
||||||
uint64_t totalRows;
|
|
||||||
uint64_t totalCheckedRows;
|
SFileBlockLoadRecorder* pRecoder;
|
||||||
uint32_t totalBlocks;
|
|
||||||
uint32_t loadBlocks;
|
|
||||||
uint32_t loadBlockStatis;
|
|
||||||
uint32_t skipBlocks;
|
|
||||||
uint32_t filterOutBlocks;
|
|
||||||
uint64_t elapsedTime;
|
uint64_t elapsedTime;
|
||||||
|
|
||||||
uint64_t firstStageMergeTime;
|
uint64_t firstStageMergeTime;
|
||||||
uint64_t winInfoSize;
|
uint64_t winInfoSize;
|
||||||
uint64_t tableInfoSize;
|
uint64_t tableInfoSize;
|
||||||
|
@ -268,7 +275,7 @@ typedef struct SOperatorFpSet {
|
||||||
|
|
||||||
typedef struct SOperatorInfo {
|
typedef struct SOperatorInfo {
|
||||||
uint8_t operatorType;
|
uint8_t operatorType;
|
||||||
bool blockingOptr; // block operator or not
|
bool blocking; // block operator or not
|
||||||
uint8_t status; // denote if current operator is completed
|
uint8_t status; // denote if current operator is completed
|
||||||
int32_t numOfOutput; // number of columns of the current operator results
|
int32_t numOfOutput; // number of columns of the current operator results
|
||||||
char* name; // name, used to show the query execution plan
|
char* name; // name, used to show the query execution plan
|
||||||
|
@ -333,17 +340,14 @@ typedef struct SScanInfo {
|
||||||
|
|
||||||
typedef struct STableScanInfo {
|
typedef struct STableScanInfo {
|
||||||
void* dataReader;
|
void* dataReader;
|
||||||
|
SFileBlockLoadRecorder readRecorder;
|
||||||
int32_t numOfBlocks; // extract basic running information.
|
|
||||||
int32_t numOfSkipped;
|
|
||||||
int32_t numOfBlockStatis;
|
|
||||||
int64_t numOfRows;
|
int64_t numOfRows;
|
||||||
int64_t elapsedTime;
|
int64_t elapsedTime;
|
||||||
int32_t prevGroupId; // previous table group id
|
// int32_t prevGroupId; // previous table group id
|
||||||
SScanInfo scanInfo;
|
SScanInfo scanInfo;
|
||||||
int32_t current;
|
int32_t scanTimes;
|
||||||
SNode* pFilterNode; // filter operator info
|
SNode* pFilterNode; // filter info, which is push down by optimizer
|
||||||
SqlFunctionCtx* pCtx; // next operator query context
|
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
|
||||||
SResultRowInfo* pResultRowInfo;
|
SResultRowInfo* pResultRowInfo;
|
||||||
int32_t* rowCellInfoOffset;
|
int32_t* rowCellInfoOffset;
|
||||||
SExprInfo* pExpr;
|
SExprInfo* pExpr;
|
||||||
|
@ -397,7 +401,6 @@ typedef struct SSysTableScanInfo {
|
||||||
SArray* scanCols; // SArray<int16_t> scan column id list
|
SArray* scanCols; // SArray<int16_t> scan column id list
|
||||||
SName name;
|
SName name;
|
||||||
SSDataBlock* pRes;
|
SSDataBlock* pRes;
|
||||||
int32_t capacity;
|
|
||||||
int64_t numOfBlocks; // extract basic running information.
|
int64_t numOfBlocks; // extract basic running information.
|
||||||
SLoadRemoteDataInfo loadInfo;
|
SLoadRemoteDataInfo loadInfo;
|
||||||
} SSysTableScanInfo;
|
} SSysTableScanInfo;
|
||||||
|
|
|
@ -132,6 +132,7 @@ void doSetOperatorCompleted(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
|
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
|
||||||
OPTR_SET_OPENED(pOperator);
|
OPTR_SET_OPENED(pOperator);
|
||||||
|
pOperator->cost.openCost = 0;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1592,8 +1593,8 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
|
||||||
|
|
||||||
STaskCostInfo* pCost = &pTaskInfo->cost;
|
STaskCostInfo* pCost = &pTaskInfo->cost;
|
||||||
|
|
||||||
pCost->totalBlocks += 1;
|
// pCost->totalBlocks += 1;
|
||||||
pCost->totalRows += pBlock->info.rows;
|
// pCost->totalRows += pBlock->info.rows;
|
||||||
#if 0
|
#if 0
|
||||||
// Calculate all time windows that are overlapping or contain current data block.
|
// Calculate all time windows that are overlapping or contain current data block.
|
||||||
// If current data block is contained by all possible time window, do not load current data block.
|
// If current data block is contained by all possible time window, do not load current data block.
|
||||||
|
@ -2411,12 +2412,13 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
|
||||||
//
|
//
|
||||||
// calculateOperatorProfResults(pQInfo);
|
// calculateOperatorProfResults(pQInfo);
|
||||||
|
|
||||||
qDebug("%s :cost summary: elapsed time:%" PRId64 " us, first merge:%" PRId64
|
SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
|
||||||
" us, total blocks:%d, "
|
if (pSummary->pRecoder != NULL) {
|
||||||
"load block statis:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
|
qDebug("%s :cost summary: elapsed time:%" PRId64 " us, first merge:%" PRId64 " us, total blocks:%d, "
|
||||||
GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks,
|
"load block statis:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
|
||||||
pSummary->loadBlockStatis, pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows);
|
GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pRecorder->totalBlocks,
|
||||||
//
|
pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows, pRecorder->totalCheckedRows);
|
||||||
|
}
|
||||||
// qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
|
// qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
|
||||||
// hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
|
// hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
|
||||||
// pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
|
// pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
|
||||||
|
@ -3282,7 +3284,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
|
||||||
|
|
||||||
pOperator->name = "ExchangeOperator";
|
pOperator->name = "ExchangeOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
|
||||||
pOperator->blockingOptr = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->numOfOutput = pBlock->info.numOfCols;
|
pOperator->numOfOutput = pBlock->info.numOfCols;
|
||||||
|
@ -3673,7 +3675,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
|
||||||
|
|
||||||
pOperator->name = "SortedMerge";
|
pOperator->name = "SortedMerge";
|
||||||
// pOperator->operatorType = OP_SortedMerge;
|
// pOperator->operatorType = OP_SortedMerge;
|
||||||
pOperator->blockingOptr = true;
|
pOperator->blocking = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->numOfOutput = num;
|
pOperator->numOfOutput = num;
|
||||||
|
@ -3756,7 +3758,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
|
||||||
|
|
||||||
pOperator->name = "SortOperator";
|
pOperator->name = "SortOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
|
||||||
pOperator->blockingOptr = true;
|
pOperator->blocking = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
|
@ -4419,7 +4421,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
|
|
||||||
pOperator->name = "TableAggregate";
|
pOperator->name = "TableAggregate";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG;
|
||||||
pOperator->blockingOptr = true;
|
pOperator->blocking = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->pExpr = pExprInfo;
|
||||||
|
@ -4532,7 +4534,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
|
||||||
|
|
||||||
pOperator->name = "ProjectOperator";
|
pOperator->name = "ProjectOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
|
||||||
pOperator->blockingOptr = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->pExpr = pExprInfo;
|
||||||
|
@ -4615,7 +4617,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->name = "FillOperator";
|
pOperator->name = "FillOperator";
|
||||||
pOperator->blockingOptr = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
|
||||||
pOperator->pExpr = pExpr;
|
pOperator->pExpr = pExpr;
|
||||||
|
@ -4861,15 +4863,19 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
||||||
|
|
||||||
SQueryTableDataCond cond = {0};
|
SQueryTableDataCond cond = {0};
|
||||||
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
|
|
||||||
|
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SInterval interval = extractIntervalInfo(pTableScanNode);
|
SInterval interval = extractIntervalInfo(pTableScanNode);
|
||||||
return createTableScanOperatorInfo(pDataReader, &cond, numOfCols, pTableScanNode->dataRequired,
|
SOperatorInfo* pOperator = createTableScanOperatorInfo(pDataReader, &cond, numOfCols, pTableScanNode->dataRequired,
|
||||||
pTableScanNode->scanSeq, pColList, pResBlock, pScanPhyNode->node.pConditions,
|
pTableScanNode->scanSeq, pColList, pResBlock, pScanPhyNode->node.pConditions,
|
||||||
&interval, pTableScanNode->ratio, pTaskInfo);
|
&interval, pTableScanNode->ratio, pTaskInfo);
|
||||||
|
STableScanInfo* pScanInfo = pOperator->info;
|
||||||
|
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
||||||
|
return pOperator;
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
||||||
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc);
|
||||||
|
@ -4877,8 +4883,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
|
||||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
||||||
|
|
||||||
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
|
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
||||||
queryId, taskId);
|
|
||||||
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
||||||
|
@ -5628,7 +5633,7 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf
|
||||||
|
|
||||||
pOperator->name = "JoinOperator";
|
pOperator->name = "JoinOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN;
|
||||||
pOperator->blockingOptr = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->pExpr = pExprInfo;
|
||||||
pOperator->numOfOutput = numOfCols;
|
pOperator->numOfOutput = numOfCols;
|
||||||
|
|
|
@ -353,7 +353,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
||||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
||||||
|
|
||||||
pOperator->name = "GroupbyAggOperator";
|
pOperator->name = "GroupbyAggOperator";
|
||||||
pOperator->blockingOptr = true;
|
pOperator->blocking = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
// pOperator->operatorType = OP_Groupby;
|
// pOperator->operatorType = OP_Groupby;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->pExpr = pExprInfo;
|
||||||
|
@ -612,7 +612,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->name = "PartitionOperator";
|
pOperator->name = "PartitionOperator";
|
||||||
pOperator->blockingOptr = true;
|
pOperator->blocking = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
|
||||||
pInfo->binfo.pRes = pResultBlock;
|
pInfo->binfo.pRes = pResultBlock;
|
||||||
|
|
|
@ -55,13 +55,27 @@ typedef struct SIFParam {
|
||||||
SArray *result;
|
SArray *result;
|
||||||
char * condValue;
|
char * condValue;
|
||||||
|
|
||||||
|
uint8_t colValType;
|
||||||
col_id_t colId;
|
col_id_t colId;
|
||||||
|
int64_t suid; // add later
|
||||||
int64_t suid; // add later
|
char dbName[TSDB_DB_NAME_LEN];
|
||||||
char dbName[TSDB_DB_NAME_LEN];
|
char colName[TSDB_COL_NAME_LEN];
|
||||||
char colName[TSDB_COL_NAME_LEN];
|
|
||||||
} SIFParam;
|
} SIFParam;
|
||||||
|
|
||||||
|
static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) {
|
||||||
|
if (src == OP_TYPE_GREATER_THAN || src == OP_TYPE_GREATER_EQUAL || src == OP_TYPE_LOWER_THAN ||
|
||||||
|
src == OP_TYPE_LOWER_EQUAL) {
|
||||||
|
*dst = QUERY_RANGE;
|
||||||
|
} else if (src == OP_TYPE_EQUAL) {
|
||||||
|
*dst = QUERY_TERM;
|
||||||
|
} else if (src == OP_TYPE_LIKE || src == OP_TYPE_MATCH || src == OP_TYPE_NMATCH) {
|
||||||
|
*dst = QUERY_REGEX;
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
typedef int32_t (*sif_func_t)(SIFParam *left, SIFParam *rigth, SIFParam *output);
|
typedef int32_t (*sif_func_t)(SIFParam *left, SIFParam *rigth, SIFParam *output);
|
||||||
// construct tag filter operator later
|
// construct tag filter operator later
|
||||||
static void destroyTagFilterOperatorInfo(void *param) {
|
static void destroyTagFilterOperatorInfo(void *param) {
|
||||||
|
@ -145,10 +159,11 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
|
||||||
SColumnNode *cn = (SColumnNode *)node;
|
SColumnNode *cn = (SColumnNode *)node;
|
||||||
/*only support tag column*/
|
/*only support tag column*/
|
||||||
SIF_ERR_RET(sifValidateColumn(cn));
|
SIF_ERR_RET(sifValidateColumn(cn));
|
||||||
|
|
||||||
param->colId = cn->colId;
|
param->colId = cn->colId;
|
||||||
|
param->colValType = cn->node.resType.type;
|
||||||
memcpy(param->dbName, cn->dbName, sizeof(cn->dbName));
|
memcpy(param->dbName, cn->dbName, sizeof(cn->dbName));
|
||||||
memcpy(param->colName, cn->colName, sizeof(cn->colName));
|
memcpy(param->colName, cn->colName, sizeof(cn->colName));
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QUERY_NODE_NODE_LIST: {
|
case QUERY_NODE_NODE_LIST: {
|
||||||
|
@ -231,61 +246,76 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu
|
||||||
qError("index-filter not support buildin function");
|
qError("index-filter not support buildin function");
|
||||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
}
|
}
|
||||||
static int32_t sifIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
|
static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
|
||||||
SIndexMultiTermQuery *mq = indexMultiTermQueryCreate(MUST);
|
SIndexTerm *tm = indexTermCreate(left->suid, DEFAULT, operType, left->colValType, left->colName,
|
||||||
return TSDB_CODE_SUCCESS;
|
strlen(left->colName), right->condValue, strlen(right->condValue));
|
||||||
|
if (operType == OP_TYPE_LOWER_EQUAL || operType == OP_TYPE_GREATER_EQUAL || operType == OP_TYPE_GREATER_THAN ||
|
||||||
|
operType == OP_TYPE_LOWER_THAN) {
|
||||||
|
}
|
||||||
|
if (tm == NULL) {
|
||||||
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
SIndexMultiTermQuery *mtm = indexMultiTermQueryCreate(MUST);
|
||||||
|
|
||||||
|
EIndexQueryType qtype = 0;
|
||||||
|
SIF_ERR_RET(sifGetFuncFromSql(operType, &qtype));
|
||||||
|
|
||||||
|
indexMultiTermQueryAdd(mtm, tm, qtype);
|
||||||
|
int ret = indexSearch(NULL, mtm, output->result);
|
||||||
|
indexMultiTermQueryDestroy(mtm);
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_LOWER_THAN;
|
int id = OP_TYPE_LOWER_THAN;
|
||||||
return sifIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifLessEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static int32_t sifLessEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_LOWER_EQUAL;
|
int id = OP_TYPE_LOWER_EQUAL;
|
||||||
return sifIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifGreaterThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static int32_t sifGreaterThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_GREATER_THAN;
|
int id = OP_TYPE_GREATER_THAN;
|
||||||
return sifIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifGreaterEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static int32_t sifGreaterEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_GREATER_EQUAL;
|
int id = OP_TYPE_GREATER_EQUAL;
|
||||||
return sifIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static int32_t sifEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_EQUAL;
|
int id = OP_TYPE_EQUAL;
|
||||||
return sifIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifNotEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static int32_t sifNotEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_NOT_EQUAL;
|
int id = OP_TYPE_NOT_EQUAL;
|
||||||
return sifIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifInFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static int32_t sifInFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_IN;
|
int id = OP_TYPE_IN;
|
||||||
return sifIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifNotInFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static int32_t sifNotInFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_NOT_IN;
|
int id = OP_TYPE_NOT_IN;
|
||||||
return sifIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static int32_t sifLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_LIKE;
|
int id = OP_TYPE_LIKE;
|
||||||
return sifIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifNotLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static int32_t sifNotLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_NOT_LIKE;
|
int id = OP_TYPE_NOT_LIKE;
|
||||||
return sifIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static int32_t sifMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_MATCH;
|
int id = OP_TYPE_MATCH;
|
||||||
return sifIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifNotMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static int32_t sifNotMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_NMATCH;
|
int id = OP_TYPE_NMATCH;
|
||||||
return sifIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifDefaultFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static int32_t sifDefaultFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
// add more except
|
// add more except
|
||||||
|
@ -460,6 +490,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
|
||||||
qError("index-filter failed to taosHashInit");
|
qError("index-filter failed to taosHashInit");
|
||||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx);
|
nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx);
|
||||||
SIF_ERR_RET(ctx.code);
|
SIF_ERR_RET(ctx.code);
|
||||||
|
|
||||||
|
@ -498,6 +529,7 @@ SIdxFltStatus idxGetFltStatus(SNode *pFilterNode) {
|
||||||
if (pFilterNode == NULL) {
|
if (pFilterNode == NULL) {
|
||||||
return SFLT_NOT_INDEX;
|
return SFLT_NOT_INDEX;
|
||||||
}
|
}
|
||||||
|
|
||||||
// impl later
|
// impl later
|
||||||
return SFLT_ACCURATE_INDEX;
|
return SFLT_ACCURATE_INDEX;
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "common/ttime.h"
|
#include "ttime.h"
|
||||||
#include "filter.h"
|
#include "filter.h"
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
|
@ -36,10 +36,10 @@
|
||||||
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
|
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
|
||||||
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
||||||
|
|
||||||
int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo);
|
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
|
||||||
|
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName);
|
||||||
|
|
||||||
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName);
|
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||||
void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
SWITCH_ORDER(pCtx[i].order);
|
SWITCH_ORDER(pCtx[i].order);
|
||||||
}
|
}
|
||||||
|
@ -158,11 +158,11 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) {
|
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
STableScanInfo* pInfo = pOperator->info;
|
STableScanInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
STaskCostInfo* pCost = &pTaskInfo->cost;
|
SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
|
||||||
|
|
||||||
pCost->totalBlocks += 1;
|
pCost->totalBlocks += 1;
|
||||||
pCost->totalRows += pBlock->info.rows;
|
pCost->totalRows += pBlock->info.rows;
|
||||||
|
@ -188,11 +188,11 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo,
|
||||||
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
||||||
pCost->loadBlockStatis += 1;
|
pCost->loadBlockStatis += 1;
|
||||||
|
|
||||||
bool allHave = true;
|
bool allColumnsHaveAgg = true;
|
||||||
SColumnDataAgg** pColAgg = NULL;
|
SColumnDataAgg** pColAgg = NULL;
|
||||||
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg, &allHave);
|
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
|
||||||
|
|
||||||
if (allHave == true) {
|
if (allColumnsHaveAgg == true) {
|
||||||
int32_t numOfCols = pBlock->info.numOfCols;
|
int32_t numOfCols = pBlock->info.numOfCols;
|
||||||
|
|
||||||
// todo create this buffer during creating operator
|
// todo create this buffer during creating operator
|
||||||
|
@ -236,6 +236,7 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols);
|
relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols);
|
||||||
|
// todo record the filter time cost
|
||||||
doFilter(pTableScanInfo->pFilterNode, pBlock);
|
doFilter(pTableScanInfo->pFilterNode, pBlock);
|
||||||
if (pBlock->info.rows == 0) {
|
if (pBlock->info.rows == 0) {
|
||||||
pCost->filterOutBlocks += 1;
|
pCost->filterOutBlocks += 1;
|
||||||
|
@ -266,7 +267,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||||
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableScanInfo->numOfBlocks += 1;
|
|
||||||
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info);
|
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info);
|
||||||
|
|
||||||
uint32_t status = 0;
|
uint32_t status = 0;
|
||||||
|
@ -296,15 +296,16 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) {
|
// do the ascending order traverse in the first place.
|
||||||
|
while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
||||||
SSDataBlock* p = doTableScanImpl(pOperator);
|
SSDataBlock* p = doTableScanImpl(pOperator);
|
||||||
if (p != NULL) {
|
if (p != NULL) {
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableScanInfo->current += 1;
|
pTableScanInfo->scanTimes += 1;
|
||||||
|
|
||||||
if (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) {
|
if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
||||||
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
||||||
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
||||||
|
|
||||||
|
@ -319,7 +320,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
|
int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
|
||||||
if (pTableScanInfo->current < total) {
|
if (pTableScanInfo->scanTimes < total) {
|
||||||
if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) {
|
if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) {
|
||||||
prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput);
|
prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput);
|
||||||
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||||
|
@ -329,21 +330,20 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
qDebug("%s start to descending order scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
qDebug("%s start to descending order scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
||||||
GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
||||||
|
|
||||||
while (pTableScanInfo->current < total) {
|
while (pTableScanInfo->scanTimes < total) {
|
||||||
SSDataBlock* p = doTableScanImpl(pOperator);
|
SSDataBlock* p = doTableScanImpl(pOperator);
|
||||||
if (p != NULL) {
|
if (p != NULL) {
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableScanInfo->current += 1;
|
pTableScanInfo->scanTimes += 1;
|
||||||
|
|
||||||
if (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) {
|
if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
||||||
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
||||||
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
||||||
|
|
||||||
qDebug("%s start to repeat descending order scan data blocks due to query func required, qrange:%" PRId64
|
qDebug("%s start to repeat descending order scan data blocks due to query func required, qrange:%" PRId64
|
||||||
"-%" PRId64,
|
"-%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
|
||||||
GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
|
|
||||||
|
|
||||||
// do prepare for the next round table scan operation
|
// do prepare for the next round table scan operation
|
||||||
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||||
|
@ -372,48 +372,50 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon
|
||||||
pInfo->cond = *pCond;
|
pInfo->cond = *pCond;
|
||||||
pInfo->scanInfo = (SScanInfo){.numOfAsc = scanInfo[0], .numOfDesc = scanInfo[1]};
|
pInfo->scanInfo = (SScanInfo){.numOfAsc = scanInfo[0], .numOfDesc = scanInfo[1]};
|
||||||
|
|
||||||
pInfo->interval = *pInterval;
|
pInfo->interval = *pInterval;
|
||||||
pInfo->sampleRatio = sampleRatio;
|
pInfo->sampleRatio = sampleRatio;
|
||||||
pInfo->dataBlockLoadFlag = dataLoadFlag;
|
pInfo->dataBlockLoadFlag = dataLoadFlag;
|
||||||
pInfo->pResBlock = pResBlock;
|
pInfo->pResBlock = pResBlock;
|
||||||
pInfo->pFilterNode = pCondition;
|
pInfo->pFilterNode = pCondition;
|
||||||
pInfo->dataReader = pDataReader;
|
pInfo->dataReader = pDataReader;
|
||||||
pInfo->current = 0;
|
pInfo->scanFlag = MAIN_SCAN;
|
||||||
pInfo->scanFlag = MAIN_SCAN;
|
pInfo->pColMatchInfo = pColMatchInfo;
|
||||||
pInfo->pColMatchInfo = pColMatchInfo;
|
|
||||||
|
|
||||||
pOperator->name = "TableScanOperator";
|
pOperator->name = "TableScanOperator"; // for dubug purpose
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
||||||
pOperator->blockingOptr = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->numOfOutput = numOfOutput;
|
pOperator->numOfOutput = numOfOutput;
|
||||||
pOperator->fpSet.getNextFn = doTableScan;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
|
||||||
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, NULL, NULL, NULL, NULL);
|
||||||
|
|
||||||
static int32_t cost = 0;
|
static int32_t cost = 0;
|
||||||
pOperator->cost.openCost = ++cost;
|
|
||||||
|
// for non-blocking operator, the open cost is always 0
|
||||||
|
pOperator->cost.openCost = 0;
|
||||||
pOperator->cost.totalCost = ++cost;
|
pOperator->cost.totalCost = ++cost;
|
||||||
pOperator->resultInfo.totalRows = ++cost;
|
pOperator->resultInfo.totalRows = ++cost;
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle) {
|
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
|
||||||
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
||||||
|
|
||||||
pInfo->dataReader = pTsdbReadHandle;
|
|
||||||
pInfo->current = 0;
|
|
||||||
pInfo->prevGroupId = -1;
|
|
||||||
|
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
pOperator->name = "TableSeqScanOperator";
|
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
|
|
||||||
pOperator->blockingOptr = false;
|
|
||||||
pOperator->status = OP_NOT_OPENED;
|
|
||||||
pOperator->info = pInfo;
|
|
||||||
pOperator->fpSet.getNextFn = doTableScanImpl;
|
|
||||||
|
|
||||||
|
pInfo->dataReader = pReadHandle;
|
||||||
|
// pInfo->prevGroupId = -1;
|
||||||
|
|
||||||
|
pOperator->name = "TableSeqScanOperator";
|
||||||
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
|
||||||
|
pOperator->blocking = false;
|
||||||
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
pOperator->info = pInfo;
|
||||||
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -482,7 +484,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo*
|
||||||
|
|
||||||
pOperator->name = "DataBlockInfoScanOperator";
|
pOperator->name = "DataBlockInfoScanOperator";
|
||||||
// pOperator->operatorType = OP_TableBlockInfoScan;
|
// pOperator->operatorType = OP_TableBlockInfoScan;
|
||||||
pOperator->blockingOptr = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->fpSet._openFn = operatorDummyOpenFn;
|
pOperator->fpSet._openFn = operatorDummyOpenFn;
|
||||||
pOperator->fpSet.getNextFn = doBlockInfoScan;
|
pOperator->fpSet.getNextFn = doBlockInfoScan;
|
||||||
|
@ -636,20 +638,22 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->readerHandle = streamReadHandle;
|
pInfo->readerHandle = streamReadHandle;
|
||||||
pInfo->pRes = pResBlock;
|
pInfo->pRes = pResBlock;
|
||||||
pInfo->pCondition = pCondition;
|
pInfo->pCondition = pCondition;
|
||||||
|
|
||||||
pOperator->name = "StreamBlockScanOperator";
|
pOperator->name = "StreamBlockScanOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
|
||||||
pOperator->blockingOptr = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->numOfOutput = pResBlock->info.numOfCols;
|
pOperator->numOfOutput = pResBlock->info.numOfCols;
|
||||||
pOperator->fpSet._openFn = operatorDummyOpenFn;
|
pOperator->fpSet._openFn = operatorDummyOpenFn;
|
||||||
pOperator->fpSet.getNextFn = doStreamBlockScan;
|
pOperator->fpSet.getNextFn = doStreamBlockScan;
|
||||||
pOperator->fpSet.closeFn = operatorDummyCloseFn;
|
pOperator->fpSet.closeFn = operatorDummyCloseFn;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -671,14 +675,12 @@ EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
|
||||||
switch (nType) {
|
switch (nType) {
|
||||||
case QUERY_NODE_OPERATOR: {
|
case QUERY_NODE_OPERATOR: {
|
||||||
SOperatorNode* node = (SOperatorNode*)pNode;
|
SOperatorNode* node = (SOperatorNode*)pNode;
|
||||||
|
|
||||||
if (OP_TYPE_EQUAL == node->opType) {
|
if (OP_TYPE_EQUAL == node->opType) {
|
||||||
*(int32_t*)pContext = 1;
|
*(int32_t*)pContext = 1;
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
*(int32_t*)pContext = 0;
|
*(int32_t*)pContext = 0;
|
||||||
|
|
||||||
return DEAL_RES_IGNORE_CHILD;
|
return DEAL_RES_IGNORE_CHILD;
|
||||||
}
|
}
|
||||||
case QUERY_NODE_COLUMN: {
|
case QUERY_NODE_COLUMN: {
|
||||||
|
@ -709,19 +711,17 @@ EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
void getDBNameFromCondition(SNode* pCondition, char* dbName) {
|
static void getDBNameFromCondition(SNode* pCondition, const char* dbName) {
|
||||||
if (NULL == pCondition) {
|
if (NULL == pCondition) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*) dbName);
|
||||||
nodesWalkExpr(pCondition, getDBNameFromConditionWalker, dbName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
static int32_t loadSysTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
SOperatorInfo* operator=(SOperatorInfo*) param;
|
SOperatorInfo* operator=(SOperatorInfo*) param;
|
||||||
SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*)operator->info;
|
SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*)operator->info;
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -746,13 +746,14 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SFilterInfo* filter = NULL;
|
SFilterInfo* filter = NULL;
|
||||||
int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0);
|
|
||||||
|
int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0);
|
||||||
|
|
||||||
SFilterColumnParam param1 = {.numOfCols = pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock};
|
SFilterColumnParam param1 = {.numOfCols = pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock};
|
||||||
code = filterSetDataFromSlotId(filter, ¶m1);
|
code = filterSetDataFromSlotId(filter, ¶m1);
|
||||||
|
|
||||||
int8_t* rowRes = NULL;
|
int8_t* rowRes = NULL;
|
||||||
bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
|
bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
|
||||||
filterFreeInfo(filter);
|
filterFreeInfo(filter);
|
||||||
|
|
||||||
SSDataBlock* px = createOneDataBlock(pInfo->pRes, false);
|
SSDataBlock* px = createOneDataBlock(pInfo->pRes, false);
|
||||||
|
@ -795,61 +796,31 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
|
||||||
|
|
||||||
static SSDataBlock* buildSysTableMetaBlock() {
|
static SSDataBlock* buildSysTableMetaBlock() {
|
||||||
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||||
pBlock->info.numOfCols = 10;
|
|
||||||
pBlock->info.hasVarCol = true;
|
size_t size = 0;
|
||||||
|
const SSysTableMeta *pMeta = NULL;
|
||||||
|
getInfosDbMeta(&pMeta, &size);
|
||||||
|
|
||||||
|
int32_t index = 0;
|
||||||
|
for(int32_t i = 0; i < size; ++i) {
|
||||||
|
if(strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) {
|
||||||
|
index = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pBlock->pDataBlock = taosArrayInit(pBlock->info.numOfCols, sizeof(SColumnInfoData));
|
pBlock->pDataBlock = taosArrayInit(pBlock->info.numOfCols, sizeof(SColumnInfoData));
|
||||||
|
|
||||||
SColumnInfoData colInfoData = {0};
|
for(int32_t i = 0; i < pMeta[index].colNum; ++i) {
|
||||||
colInfoData.info.colId = 1;
|
SColumnInfoData colInfoData = {0};
|
||||||
colInfoData.info.type = TSDB_DATA_TYPE_VARCHAR;
|
colInfoData.info.colId = i + 1;
|
||||||
colInfoData.info.bytes = (TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE;
|
colInfoData.info.type = pMeta[index].schema[i].type;
|
||||||
taosArrayPush(pBlock->pDataBlock, &colInfoData);
|
colInfoData.info.bytes = pMeta[index].schema[i].bytes;
|
||||||
|
taosArrayPush(pBlock->pDataBlock, &colInfoData);
|
||||||
|
}
|
||||||
|
|
||||||
colInfoData.info.colId = 2;
|
pBlock->info.numOfCols = pMeta[index].colNum;
|
||||||
colInfoData.info.type = TSDB_DATA_TYPE_VARCHAR;
|
pBlock->info.hasVarCol = true;
|
||||||
colInfoData.info.bytes = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE;
|
|
||||||
taosArrayPush(pBlock->pDataBlock, &colInfoData);
|
|
||||||
|
|
||||||
colInfoData.info.colId = 3;
|
|
||||||
colInfoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
|
|
||||||
colInfoData.info.bytes = 8;
|
|
||||||
taosArrayPush(pBlock->pDataBlock, &colInfoData);
|
|
||||||
|
|
||||||
colInfoData.info.colId = 4;
|
|
||||||
colInfoData.info.type = TSDB_DATA_TYPE_INT;
|
|
||||||
colInfoData.info.bytes = 4;
|
|
||||||
taosArrayPush(pBlock->pDataBlock, &colInfoData);
|
|
||||||
|
|
||||||
colInfoData.info.colId = 5;
|
|
||||||
colInfoData.info.type = TSDB_DATA_TYPE_VARCHAR;
|
|
||||||
colInfoData.info.bytes = (TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE;
|
|
||||||
taosArrayPush(pBlock->pDataBlock, &colInfoData);
|
|
||||||
|
|
||||||
colInfoData.info.colId = 6;
|
|
||||||
colInfoData.info.type = TSDB_DATA_TYPE_BIGINT;
|
|
||||||
colInfoData.info.bytes = 8;
|
|
||||||
taosArrayPush(pBlock->pDataBlock, &colInfoData);
|
|
||||||
|
|
||||||
colInfoData.info.colId = 7;
|
|
||||||
colInfoData.info.type = TSDB_DATA_TYPE_INT;
|
|
||||||
colInfoData.info.bytes = 4;
|
|
||||||
taosArrayPush(pBlock->pDataBlock, &colInfoData);
|
|
||||||
|
|
||||||
colInfoData.info.colId = 8;
|
|
||||||
colInfoData.info.type = TSDB_DATA_TYPE_INT;
|
|
||||||
colInfoData.info.bytes = 4;
|
|
||||||
taosArrayPush(pBlock->pDataBlock, &colInfoData);
|
|
||||||
|
|
||||||
colInfoData.info.colId = 9;
|
|
||||||
colInfoData.info.type = TSDB_DATA_TYPE_VARCHAR;
|
|
||||||
colInfoData.info.bytes = 512 + VARSTR_HEADER_SIZE;
|
|
||||||
taosArrayPush(pBlock->pDataBlock, &colInfoData);
|
|
||||||
|
|
||||||
colInfoData.info.colId = 10;
|
|
||||||
colInfoData.info.type = TSDB_DATA_TYPE_VARCHAR;
|
|
||||||
colInfoData.info.bytes = 20 + VARSTR_HEADER_SIZE;
|
|
||||||
taosArrayPush(pBlock->pDataBlock, &colInfoData);
|
|
||||||
|
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
@ -868,7 +839,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
buildSysDbTableInfo(pInfo);
|
buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
doFilterResult(pInfo);
|
doFilterResult(pInfo);
|
||||||
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
||||||
|
@ -896,7 +867,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
varDataSetLen(dbname, strlen(varDataVal(dbname)));
|
varDataSetLen(dbname, strlen(varDataVal(dbname)));
|
||||||
|
|
||||||
SSDataBlock* p = buildSysTableMetaBlock();
|
SSDataBlock* p = buildSysTableMetaBlock();
|
||||||
blockDataEnsureCapacity(p, pInfo->capacity);
|
blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
while (metaTbCursorNext(pInfo->pCur) == 0) {
|
while (metaTbCursorNext(pInfo->pCur) == 0) {
|
||||||
|
@ -977,7 +948,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
pColInfoData = taosArrayGet(p->pDataBlock, 9);
|
pColInfoData = taosArrayGet(p->pDataBlock, 9);
|
||||||
colDataAppend(pColInfoData, numOfRows, str, false);
|
colDataAppend(pColInfoData, numOfRows, str, false);
|
||||||
|
|
||||||
if (++numOfRows >= pInfo->capacity) {
|
if (++numOfRows >= pOperator->resultInfo.capacity) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1022,7 +993,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
pMsgSendInfo->msgInfo.pData = buf1;
|
pMsgSendInfo->msgInfo.pData = buf1;
|
||||||
pMsgSendInfo->msgInfo.len = contLen;
|
pMsgSendInfo->msgInfo.len = contLen;
|
||||||
pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE;
|
pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE;
|
||||||
pMsgSendInfo->fp = loadSysTableContentCb;
|
pMsgSendInfo->fp = loadSysTableCallback;
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo);
|
int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo);
|
||||||
|
@ -1060,9 +1031,9 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo) {
|
int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
|
||||||
SSDataBlock* p = buildSysTableMetaBlock();
|
SSDataBlock* p = buildSysTableMetaBlock();
|
||||||
blockDataEnsureCapacity(p, pInfo->capacity);
|
blockDataEnsureCapacity(p, capacity);
|
||||||
|
|
||||||
size_t size = 0;
|
size_t size = 0;
|
||||||
const SSysTableMeta* pSysDbTableMeta = NULL;
|
const SSysTableMeta* pSysDbTableMeta = NULL;
|
||||||
|
@ -1133,18 +1104,19 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->accountId = accountId;
|
pInfo->accountId = accountId;
|
||||||
pInfo->showRewrite = showRewrite;
|
pInfo->showRewrite = showRewrite;
|
||||||
pInfo->pRes = pResBlock;
|
pInfo->pRes = pResBlock;
|
||||||
pInfo->capacity = 4096;
|
pInfo->pCondition = pCondition;
|
||||||
pInfo->pCondition = pCondition;
|
pInfo->scanCols = colList;
|
||||||
pInfo->scanCols = colList;
|
|
||||||
|
initResultSizeInfo(pOperator, 4096);
|
||||||
|
|
||||||
tNameAssign(&pInfo->name, pName);
|
tNameAssign(&pInfo->name, pName);
|
||||||
const char* name = tNameGetTableName(&pInfo->name);
|
const char* name = tNameGetTableName(&pInfo->name);
|
||||||
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
|
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
|
||||||
pInfo->readHandle = *(SReadHandle*) readHandle;
|
pInfo->readHandle = *(SReadHandle*) readHandle;
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pInfo->capacity);
|
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
} else {
|
} else {
|
||||||
tsem_init(&pInfo->ready, 0, 0);
|
tsem_init(&pInfo->ready, 0, 0);
|
||||||
pInfo->epSet = epset;
|
pInfo->epSet = epset;
|
||||||
|
@ -1173,12 +1145,12 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->name = "SysTableScanOperator";
|
pOperator->name = "SysTableScanOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
|
||||||
pOperator->blockingOptr = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->numOfOutput = pResBlock->info.numOfCols;
|
pOperator->numOfOutput = pResBlock->info.numOfCols;
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
@ -1188,7 +1160,6 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
|
||||||
|
|
||||||
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
||||||
#if 0
|
#if 0
|
||||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -1322,29 +1293,29 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createTagScanOperatorInfo(void* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput,
|
SOperatorInfo* createTagScanOperatorInfo(void* readHandle, SExprInfo* pExpr, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
|
||||||
SExecTaskInfo* pTaskInfo) {
|
|
||||||
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
|
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pReader = pReaderHandle;
|
pInfo->pReader = readHandle;
|
||||||
pInfo->curPos = 0;
|
pInfo->curPos = 0;
|
||||||
pOperator->name = "TagScanOperator";
|
pOperator->name = "TagScanOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
|
||||||
pOperator->blockingOptr = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
pOperator->pExpr = pExpr;
|
||||||
|
pOperator->numOfOutput = numOfOutput;
|
||||||
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL);
|
||||||
pOperator->pExpr = pExpr;
|
|
||||||
pOperator->numOfOutput = numOfOutput;
|
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
taosMemoryFree(pOperator);
|
taosMemoryFree(pOperator);
|
||||||
|
|
|
@ -1078,7 +1078,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
|
|
||||||
pOperator->name = "TimeIntervalAggOperator";
|
pOperator->name = "TimeIntervalAggOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL;
|
||||||
pOperator->blockingOptr = true;
|
pOperator->blocking = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->pExpr = pExprInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
@ -1137,7 +1137,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
|
||||||
|
|
||||||
pOperator->name = "StreamTimeIntervalAggOperator";
|
pOperator->name = "StreamTimeIntervalAggOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL;
|
||||||
pOperator->blockingOptr = true;
|
pOperator->blocking = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->pExpr = pExprInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
@ -1343,7 +1343,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
|
|
||||||
pOperator->name = "TimeSliceOperator";
|
pOperator->name = "TimeSliceOperator";
|
||||||
// pOperator->operatorType = OP_AllTimeWindow;
|
// pOperator->operatorType = OP_AllTimeWindow;
|
||||||
pOperator->blockingOptr = true;
|
pOperator->blocking = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->pExpr = pExprInfo;
|
||||||
pOperator->numOfOutput = numOfCols;
|
pOperator->numOfOutput = numOfCols;
|
||||||
|
@ -1385,7 +1385,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
|
||||||
pInfo->tsSlotId = tsSlotId;
|
pInfo->tsSlotId = tsSlotId;
|
||||||
pOperator->name = "StateWindowOperator";
|
pOperator->name = "StateWindowOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW;
|
||||||
pOperator->blockingOptr = true;
|
pOperator->blocking = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->pExpr = pExpr;
|
pOperator->pExpr = pExpr;
|
||||||
pOperator->numOfOutput = numOfCols;
|
pOperator->numOfOutput = numOfCols;
|
||||||
|
@ -1437,7 +1437,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
||||||
pInfo->reptScan = false;
|
pInfo->reptScan = false;
|
||||||
pOperator->name = "SessionWindowAggOperator";
|
pOperator->name = "SessionWindowAggOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW;
|
||||||
pOperator->blockingOptr = true;
|
pOperator->blocking = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->pExpr = pExprInfo;
|
||||||
pOperator->numOfOutput = numOfCols;
|
pOperator->numOfOutput = numOfCols;
|
||||||
|
|
|
@ -57,7 +57,7 @@ struct SIndex {
|
||||||
|
|
||||||
char* path;
|
char* path;
|
||||||
|
|
||||||
SIndexStat stat;
|
SIndexStat stat;
|
||||||
TdThreadMutex mtx;
|
TdThreadMutex mtx;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -86,6 +86,7 @@ typedef struct SIndexTerm {
|
||||||
int32_t nColName;
|
int32_t nColName;
|
||||||
char* colVal;
|
char* colVal;
|
||||||
int32_t nColVal;
|
int32_t nColVal;
|
||||||
|
int8_t qType; // just use for range
|
||||||
} SIndexTerm;
|
} SIndexTerm;
|
||||||
|
|
||||||
typedef struct SIndexTermQuery {
|
typedef struct SIndexTermQuery {
|
||||||
|
@ -165,7 +166,7 @@ int32_t indexSerialCacheKey(ICacheKey* key, char* buf);
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define INDEX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0)
|
#define INDEX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0)
|
||||||
#define INDEX_TYPE_GET_TYPE(ty) (ty & 0x0F)
|
#define INDEX_TYPE_GET_TYPE(ty) (ty & 0x0F)
|
||||||
#define INDEX_TYPE_ADD_EXTERN_TYPE(ty, exTy) \
|
#define INDEX_TYPE_ADD_EXTERN_TYPE(ty, exTy) \
|
||||||
do { \
|
do { \
|
||||||
uint8_t oldTy = ty; \
|
uint8_t oldTy = ty; \
|
||||||
|
|
|
@ -175,55 +175,19 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
|
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
|
||||||
#ifdef USE_LUCENE
|
|
||||||
EIndexOperatorType opera = multiQuerys->opera;
|
|
||||||
|
|
||||||
int nQuery = taosArrayGetSize(multiQuerys->query);
|
|
||||||
char** fields = taosMemoryMalloc(sizeof(char*) * nQuery);
|
|
||||||
char** keys = taosMemoryMalloc(sizeof(char*) * nQuery);
|
|
||||||
int* types = taosMemoryMalloc(sizeof(int) * nQuery);
|
|
||||||
|
|
||||||
for (int i = 0; i < nQuery; i++) {
|
|
||||||
SIndexTermQuery* p = taosArrayGet(multiQuerys->query, i);
|
|
||||||
SIndexTerm* term = p->field_value;
|
|
||||||
|
|
||||||
fields[i] = taosMemoryCalloc(1, term->nKey + 1);
|
|
||||||
keys[i] = taosMemoryCalloc(1, term->nVal + 1);
|
|
||||||
|
|
||||||
memcpy(fields[i], term->key, term->nKey);
|
|
||||||
memcpy(keys[i], term->val, term->nVal);
|
|
||||||
types[i] = (int)(p->type);
|
|
||||||
}
|
|
||||||
int* tResult = NULL;
|
|
||||||
int tsz = 0;
|
|
||||||
index_multi_search(index->index, (const char**)fields, (const char**)keys, types, nQuery, opera, &tResult, &tsz);
|
|
||||||
|
|
||||||
for (int i = 0; i < tsz; i++) {
|
|
||||||
taosArrayPush(result, &tResult[i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < nQuery; i++) {
|
|
||||||
taosMemoryFree(fields[i]);
|
|
||||||
taosMemoryFree(keys[i]);
|
|
||||||
}
|
|
||||||
taosMemoryFree(fields);
|
|
||||||
taosMemoryFree(keys);
|
|
||||||
taosMemoryFree(types);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef USE_INVERTED_INDEX
|
#ifdef USE_INVERTED_INDEX
|
||||||
EIndexOperatorType opera = multiQuerys->opera; // relation of querys
|
EIndexOperatorType opera = multiQuerys->opera; // relation of querys
|
||||||
|
|
||||||
SArray* interResults = taosArrayInit(4, POINTER_BYTES);
|
SArray* iRslts = taosArrayInit(4, POINTER_BYTES);
|
||||||
int nQuery = taosArrayGetSize(multiQuerys->query);
|
int nQuery = taosArrayGetSize(multiQuerys->query);
|
||||||
for (size_t i = 0; i < nQuery; i++) {
|
for (size_t i = 0; i < nQuery; i++) {
|
||||||
SIndexTermQuery* qTerm = taosArrayGet(multiQuerys->query, i);
|
SIndexTermQuery* qterm = taosArrayGet(multiQuerys->query, i);
|
||||||
SArray* tResult = NULL;
|
SArray* trslt = NULL;
|
||||||
indexTermSearch(index, qTerm, &tResult);
|
indexTermSearch(index, qterm, &trslt);
|
||||||
taosArrayPush(interResults, (void*)&tResult);
|
taosArrayPush(iRslts, (void*)&trslt);
|
||||||
}
|
}
|
||||||
indexMergeFinalResults(interResults, opera, result);
|
indexMergeFinalResults(iRslts, opera, result);
|
||||||
indexInterResultsDestroy(interResults);
|
indexInterResultsDestroy(iRslts);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -280,8 +244,8 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
|
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, int8_t queryType, uint8_t colType,
|
||||||
int32_t nColName, const char* colVal, int32_t nColVal) {
|
const char* colName, int32_t nColName, const char* colVal, int32_t nColVal) {
|
||||||
SIndexTerm* tm = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm)));
|
SIndexTerm* tm = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm)));
|
||||||
if (tm == NULL) {
|
if (tm == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -298,6 +262,7 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy
|
||||||
tm->colVal = (char*)taosMemoryCalloc(1, nColVal + 1);
|
tm->colVal = (char*)taosMemoryCalloc(1, nColVal + 1);
|
||||||
memcpy(tm->colVal, colVal, nColVal);
|
memcpy(tm->colVal, colVal, nColVal);
|
||||||
tm->nColVal = nColVal;
|
tm->nColVal = nColVal;
|
||||||
|
tm->qType = queryType;
|
||||||
|
|
||||||
return tm;
|
return tm;
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,9 +34,64 @@ static char* indexCacheTermGet(const void* pData);
|
||||||
|
|
||||||
static MemTable* indexInternalCacheCreate(int8_t type);
|
static MemTable* indexInternalCacheCreate(int8_t type);
|
||||||
|
|
||||||
|
static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
|
||||||
|
static int32_t cacheSearchPrefix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
|
||||||
|
static int32_t cacheSearchSuffix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
|
||||||
|
static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
|
||||||
|
static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
|
||||||
|
|
||||||
|
static int32_t (*cacheSearch[])(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) = {
|
||||||
|
cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchRange};
|
||||||
|
|
||||||
static void doMergeWork(SSchedMsg* msg);
|
static void doMergeWork(SSchedMsg* msg);
|
||||||
static bool indexCacheIteratorNext(Iterate* itera);
|
static bool indexCacheIteratorNext(Iterate* itera);
|
||||||
|
|
||||||
|
static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
||||||
|
if (cache == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
MemTable* mem = cache;
|
||||||
|
char* key = indexCacheTermGet(ct);
|
||||||
|
|
||||||
|
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
|
||||||
|
while (tSkipListIterNext(iter)) {
|
||||||
|
SSkipListNode* node = tSkipListIterGet(iter);
|
||||||
|
if (node == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||||
|
if (0 == strcmp(c->colVal, ct->colVal)) {
|
||||||
|
if (c->operaType == ADD_VALUE) {
|
||||||
|
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
|
||||||
|
// taosArrayPush(result, &c->uid);
|
||||||
|
*s = kTypeValue;
|
||||||
|
} else if (c->operaType == DEL_VALUE) {
|
||||||
|
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tSkipListDestroyIter(iter);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
static int32_t cacheSearchPrefix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
||||||
|
// impl later
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
static int32_t cacheSearchSuffix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
||||||
|
// impl later
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
||||||
|
// impl later
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
||||||
|
// impl later
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
|
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
|
||||||
|
|
||||||
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
|
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
|
||||||
|
@ -263,33 +318,7 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SI
|
||||||
if (mem == NULL) {
|
if (mem == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
char* key = indexCacheTermGet(ct);
|
return cacheSearch[qtype](mem, ct, tr, s);
|
||||||
|
|
||||||
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
|
|
||||||
while (tSkipListIterNext(iter)) {
|
|
||||||
SSkipListNode* node = tSkipListIterGet(iter);
|
|
||||||
if (node != NULL) {
|
|
||||||
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
|
|
||||||
if (qtype == QUERY_TERM) {
|
|
||||||
if (0 == strcmp(c->colVal, ct->colVal)) {
|
|
||||||
if (c->operaType == ADD_VALUE) {
|
|
||||||
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
|
|
||||||
// taosArrayPush(result, &c->uid);
|
|
||||||
*s = kTypeValue;
|
|
||||||
} else if (c->operaType == DEL_VALUE) {
|
|
||||||
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else if (qtype == QUERY_PREFIX) {
|
|
||||||
} else if (qtype == QUERY_SUFFIX) {
|
|
||||||
} else if (qtype == QUERY_RANGE) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tSkipListDestroyIter(iter);
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) {
|
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
|
@ -57,6 +57,17 @@ static int tfileCompare(const void* a, const void* b);
|
||||||
static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int* version);
|
static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int* version);
|
||||||
static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int version);
|
static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int version);
|
||||||
static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int32_t version);
|
static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int32_t version);
|
||||||
|
/*
|
||||||
|
* search from tfile
|
||||||
|
*/
|
||||||
|
static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
|
||||||
|
static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
|
||||||
|
static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
|
||||||
|
static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
|
||||||
|
static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
|
||||||
|
|
||||||
|
static int32_t (*tfSearch[])(void* reader, SIndexTerm* tem, SIdxTempResult* tr) = {
|
||||||
|
tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchRange};
|
||||||
|
|
||||||
TFileCache* tfileCacheCreate(const char* path) {
|
TFileCache* tfileCacheCreate(const char* path) {
|
||||||
TFileCache* tcache = taosMemoryCalloc(1, sizeof(TFileCache));
|
TFileCache* tcache = taosMemoryCalloc(1, sizeof(TFileCache));
|
||||||
|
@ -183,59 +194,153 @@ void tfileReaderDestroy(TFileReader* reader) {
|
||||||
writerCtxDestroy(reader->ctx, reader->remove);
|
writerCtxDestroy(reader->ctx, reader->remove);
|
||||||
taosMemoryFree(reader);
|
taosMemoryFree(reader);
|
||||||
}
|
}
|
||||||
|
static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
||||||
|
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
|
||||||
|
int ret = 0;
|
||||||
|
char* p = tem->colVal;
|
||||||
|
uint64_t sz = tem->nColVal;
|
||||||
|
if (hasJson) {
|
||||||
|
p = indexPackJsonData(tem);
|
||||||
|
sz = strlen(p);
|
||||||
|
}
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
|
FstSlice key = fstSliceCreate(p, sz);
|
||||||
|
uint64_t offset;
|
||||||
|
if (fstGet(((TFileReader*)reader)->fst, &key, &offset)) {
|
||||||
|
int64_t et = taosGetTimestampUs();
|
||||||
|
int64_t cost = et - st;
|
||||||
|
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us",
|
||||||
|
tem->suid, tem->colName, tem->colVal, cost);
|
||||||
|
|
||||||
|
ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
|
||||||
|
cost = taosGetTimestampUs() - et;
|
||||||
|
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid,
|
||||||
|
tem->colName, tem->colVal, cost);
|
||||||
|
}
|
||||||
|
if (hasJson) {
|
||||||
|
taosMemoryFree(p);
|
||||||
|
}
|
||||||
|
fstSliceDestroy(&key);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
||||||
|
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
|
||||||
|
char* p = tem->colVal;
|
||||||
|
uint64_t sz = tem->nColVal;
|
||||||
|
if (hasJson) {
|
||||||
|
p = indexPackJsonData(tem);
|
||||||
|
sz = strlen(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
SArray* offsets = taosArrayInit(16, sizeof(uint64_t));
|
||||||
|
|
||||||
|
AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX);
|
||||||
|
FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx);
|
||||||
|
StreamWithState* st = streamBuilderIntoStream(sb);
|
||||||
|
StreamWithStateResult* rt = NULL;
|
||||||
|
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
|
||||||
|
taosArrayPush(offsets, &(rt->out.out));
|
||||||
|
swsResultDestroy(rt);
|
||||||
|
}
|
||||||
|
streamWithStateDestroy(st);
|
||||||
|
fstStreamBuilderDestroy(sb);
|
||||||
|
|
||||||
|
int32_t ret = 0;
|
||||||
|
for (int i = 0; i < taosArrayGetSize(offsets); i++) {
|
||||||
|
uint64_t offset = *(uint64_t*)taosArrayGet(offsets, i);
|
||||||
|
ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
|
||||||
|
if (ret != 0) {
|
||||||
|
indexError("failed to find target tablelist");
|
||||||
|
return TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (hasJson) {
|
||||||
|
taosMemoryFree(p);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
||||||
|
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
|
||||||
|
|
||||||
|
int ret = 0;
|
||||||
|
char* p = tem->colVal;
|
||||||
|
uint64_t sz = tem->nColVal;
|
||||||
|
if (hasJson) {
|
||||||
|
p = indexPackJsonData(tem);
|
||||||
|
sz = strlen(p);
|
||||||
|
}
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
|
FstSlice key = fstSliceCreate(p, sz);
|
||||||
|
/*impl later*/
|
||||||
|
if (hasJson) {
|
||||||
|
taosMemoryFree(p);
|
||||||
|
}
|
||||||
|
fstSliceDestroy(&key);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
||||||
|
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
|
||||||
|
|
||||||
|
int ret = 0;
|
||||||
|
char* p = tem->colVal;
|
||||||
|
uint64_t sz = tem->nColVal;
|
||||||
|
if (hasJson) {
|
||||||
|
p = indexPackJsonData(tem);
|
||||||
|
sz = strlen(p);
|
||||||
|
}
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
|
FstSlice key = fstSliceCreate(p, sz);
|
||||||
|
/*impl later*/
|
||||||
|
|
||||||
|
if (hasJson) {
|
||||||
|
taosMemoryFree(p);
|
||||||
|
}
|
||||||
|
fstSliceDestroy(&key);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
||||||
|
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
|
||||||
|
int ret = 0;
|
||||||
|
char* p = tem->colVal;
|
||||||
|
uint64_t sz = tem->nColVal;
|
||||||
|
if (hasJson) {
|
||||||
|
p = indexPackJsonData(tem);
|
||||||
|
sz = strlen(p);
|
||||||
|
}
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
|
FstSlice key = fstSliceCreate(p, sz);
|
||||||
|
// uint64_t offset;
|
||||||
|
// if (fstGet(((TFileReader*)reader)->fst, &key, &offset)) {
|
||||||
|
// int64_t et = taosGetTimestampUs();
|
||||||
|
// int64_t cost = et - st;
|
||||||
|
// indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us",
|
||||||
|
// tem->suid, tem->colName, tem->colVal, cost);
|
||||||
|
|
||||||
|
// ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
|
||||||
|
// cost = taosGetTimestampUs() - et;
|
||||||
|
// indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid,
|
||||||
|
// tem->colName, tem->colVal, cost);
|
||||||
|
//}
|
||||||
|
if (hasJson) {
|
||||||
|
taosMemoryFree(p);
|
||||||
|
}
|
||||||
|
fstSliceDestroy(&key);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr) {
|
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr) {
|
||||||
SIndexTerm* term = query->term;
|
SIndexTerm* term = query->term;
|
||||||
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
|
|
||||||
EIndexQueryType qtype = query->qType;
|
EIndexQueryType qtype = query->qType;
|
||||||
|
if (qtype >= sizeof(tfSearch) / sizeof(tfSearch[0])) {
|
||||||
// SArray* result = taosArrayInit(16, sizeof(uint64_t));
|
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName,
|
||||||
int ret = -1;
|
term->colVal);
|
||||||
// refactor to callback later
|
return -1;
|
||||||
if (qtype == QUERY_TERM) {
|
} else {
|
||||||
uint64_t offset;
|
return tfSearch[qtype](reader, term, tr);
|
||||||
char* p = term->colVal;
|
|
||||||
uint64_t sz = term->nColVal;
|
|
||||||
if (hasJson) {
|
|
||||||
p = indexPackJsonData(term);
|
|
||||||
sz = strlen(p);
|
|
||||||
}
|
|
||||||
int64_t st = taosGetTimestampUs();
|
|
||||||
FstSlice key = fstSliceCreate(p, sz);
|
|
||||||
if (fstGet(reader->fst, &key, &offset)) {
|
|
||||||
int64_t et = taosGetTimestampUs();
|
|
||||||
int64_t cost = et - st;
|
|
||||||
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us",
|
|
||||||
term->suid, term->colName, term->colVal, cost);
|
|
||||||
|
|
||||||
ret = tfileReaderLoadTableIds(reader, offset, tr->total);
|
|
||||||
cost = taosGetTimestampUs() - et;
|
|
||||||
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", term->suid,
|
|
||||||
term->colName, term->colVal, cost);
|
|
||||||
} else {
|
|
||||||
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName,
|
|
||||||
term->colVal);
|
|
||||||
}
|
|
||||||
fstSliceDestroy(&key);
|
|
||||||
if (hasJson) {
|
|
||||||
taosMemoryFree(p);
|
|
||||||
}
|
|
||||||
} else if (qtype == QUERY_PREFIX) {
|
|
||||||
// handle later
|
|
||||||
//
|
|
||||||
} else if (qtype == QUERY_SUFFIX) {
|
|
||||||
// handle later
|
|
||||||
} else if (qtype == QUERY_REGEX) {
|
|
||||||
// handle later
|
|
||||||
} else if (qtype == QUERY_RANGE) {
|
|
||||||
// handle later
|
|
||||||
}
|
}
|
||||||
tfileReaderUnRef(reader);
|
tfileReaderUnRef(reader);
|
||||||
|
return 0;
|
||||||
// taosArrayAddAll(tr->total, result);
|
|
||||||
// taosArrayDestroy(result);
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t colType) {
|
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t colType) {
|
||||||
|
|
|
@ -483,9 +483,9 @@ TEST_F(IndexTFileEnv, test_tfile_write) {
|
||||||
|
|
||||||
std::string colName("voltage");
|
std::string colName("voltage");
|
||||||
std::string colVal("ab");
|
std::string colVal("ab");
|
||||||
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
SIndexTermQuery query = { term, QUERY_TERM};
|
SIndexTermQuery query = {term, QUERY_TERM};
|
||||||
|
|
||||||
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
|
||||||
fObj->Get(&query, result);
|
fObj->Get(&query, result);
|
||||||
|
@ -557,7 +557,7 @@ TEST_F(IndexCacheEnv, cache_test) {
|
||||||
std::string colName("voltage");
|
std::string colName("voltage");
|
||||||
{
|
{
|
||||||
std::string colVal("v1");
|
std::string colVal("v1");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, colId, version++, suid++);
|
coj->Put(term, colId, version++, suid++);
|
||||||
indexTermDestroy(term);
|
indexTermDestroy(term);
|
||||||
|
@ -565,28 +565,28 @@ TEST_F(IndexCacheEnv, cache_test) {
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
std::string colVal("v3");
|
std::string colVal("v3");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, colId, version++, suid++);
|
coj->Put(term, colId, version++, suid++);
|
||||||
indexTermDestroy(term);
|
indexTermDestroy(term);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
std::string colVal("v2");
|
std::string colVal("v2");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, colId, version++, suid++);
|
coj->Put(term, colId, version++, suid++);
|
||||||
indexTermDestroy(term);
|
indexTermDestroy(term);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
std::string colVal("v3");
|
std::string colVal("v3");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, colId, version++, suid++);
|
coj->Put(term, colId, version++, suid++);
|
||||||
indexTermDestroy(term);
|
indexTermDestroy(term);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
std::string colVal("v3");
|
std::string colVal("v3");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, colId, version++, suid++);
|
coj->Put(term, colId, version++, suid++);
|
||||||
indexTermDestroy(term);
|
indexTermDestroy(term);
|
||||||
|
@ -595,14 +595,14 @@ TEST_F(IndexCacheEnv, cache_test) {
|
||||||
std::cout << "--------first----------" << std::endl;
|
std::cout << "--------first----------" << std::endl;
|
||||||
{
|
{
|
||||||
std::string colVal("v3");
|
std::string colVal("v3");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, othColId, version++, suid++);
|
coj->Put(term, othColId, version++, suid++);
|
||||||
indexTermDestroy(term);
|
indexTermDestroy(term);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
std::string colVal("v4");
|
std::string colVal("v4");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, othColId, version++, suid++);
|
coj->Put(term, othColId, version++, suid++);
|
||||||
indexTermDestroy(term);
|
indexTermDestroy(term);
|
||||||
|
@ -613,7 +613,7 @@ TEST_F(IndexCacheEnv, cache_test) {
|
||||||
std::string colVal("v4");
|
std::string colVal("v4");
|
||||||
for (size_t i = 0; i < 10; i++) {
|
for (size_t i = 0; i < 10; i++) {
|
||||||
colVal[colVal.size() - 1] = 'a' + i;
|
colVal[colVal.size() - 1] = 'a' + i;
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, colId, version++, suid++);
|
coj->Put(term, colId, version++, suid++);
|
||||||
indexTermDestroy(term);
|
indexTermDestroy(term);
|
||||||
|
@ -623,9 +623,9 @@ TEST_F(IndexCacheEnv, cache_test) {
|
||||||
// begin query
|
// begin query
|
||||||
{
|
{
|
||||||
std::string colVal("v3");
|
std::string colVal("v3");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
SIndexTermQuery query = { term, QUERY_TERM };
|
SIndexTermQuery query = {term, QUERY_TERM};
|
||||||
SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid));
|
SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid));
|
||||||
STermValueType valType;
|
STermValueType valType;
|
||||||
|
|
||||||
|
@ -638,9 +638,9 @@ TEST_F(IndexCacheEnv, cache_test) {
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
std::string colVal("v2");
|
std::string colVal("v2");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
SIndexTermQuery query = { term, QUERY_TERM };
|
SIndexTermQuery query = {term, QUERY_TERM};
|
||||||
SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid));
|
SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid));
|
||||||
STermValueType valType;
|
STermValueType valType;
|
||||||
|
|
||||||
|
@ -670,7 +670,7 @@ class IndexObj {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
void Del(const std::string& colName, const std::string& colVal, uint64_t uid) {
|
void Del(const std::string& colName, const std::string& colVal, uint64_t uid) {
|
||||||
SIndexTerm* term = indexTermCreate(0, DEL_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, DEL_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
|
@ -679,7 +679,7 @@ class IndexObj {
|
||||||
}
|
}
|
||||||
int WriteMillonData(const std::string& colName, const std::string& colVal = "Hello world",
|
int WriteMillonData(const std::string& colName, const std::string& colVal = "Hello world",
|
||||||
size_t numOfTable = 100 * 10000) {
|
size_t numOfTable = 100 * 10000) {
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
|
@ -701,7 +701,7 @@ class IndexObj {
|
||||||
// opt
|
// opt
|
||||||
tColVal[taosRand() % colValSize] = 'a' + k % 26;
|
tColVal[taosRand() % colValSize] = 'a' + k % 26;
|
||||||
}
|
}
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
tColVal.c_str(), tColVal.size());
|
tColVal.c_str(), tColVal.size());
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
|
@ -737,7 +737,7 @@ class IndexObj {
|
||||||
|
|
||||||
int SearchOne(const std::string& colName, const std::string& colVal) {
|
int SearchOne(const std::string& colName, const std::string& colVal) {
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
|
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
|
||||||
|
|
||||||
|
@ -759,7 +759,7 @@ class IndexObj {
|
||||||
}
|
}
|
||||||
int SearchOneTarget(const std::string& colName, const std::string& colVal, uint64_t val) {
|
int SearchOneTarget(const std::string& colName, const std::string& colVal, uint64_t val) {
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
|
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
|
||||||
|
|
||||||
|
@ -784,7 +784,7 @@ class IndexObj {
|
||||||
|
|
||||||
void PutOne(const std::string& colName, const std::string& colVal) {
|
void PutOne(const std::string& colName, const std::string& colVal) {
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
Put(terms, 10);
|
Put(terms, 10);
|
||||||
|
@ -792,7 +792,7 @@ class IndexObj {
|
||||||
}
|
}
|
||||||
void PutOneTarge(const std::string& colName, const std::string& colVal, uint64_t val) {
|
void PutOneTarge(const std::string& colName, const std::string& colVal, uint64_t val) {
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
Put(terms, val);
|
Put(terms, val);
|
||||||
|
@ -832,7 +832,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
|
||||||
{
|
{
|
||||||
std::string colName("tag1"), colVal("Hello");
|
std::string colName("tag1"), colVal("Hello");
|
||||||
|
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
|
@ -847,7 +847,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
|
||||||
size_t size = 200;
|
size_t size = 200;
|
||||||
std::string colName("tag1"), colVal("hello");
|
std::string colName("tag1"), colVal("hello");
|
||||||
|
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
|
@ -862,7 +862,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
|
||||||
size_t size = 200;
|
size_t size = 200;
|
||||||
std::string colName("tag1"), colVal("Hello");
|
std::string colName("tag1"), colVal("Hello");
|
||||||
|
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
|
@ -877,7 +877,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
|
||||||
{
|
{
|
||||||
std::string colName("tag1"), colVal("Hello");
|
std::string colName("tag1"), colVal("Hello");
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
|
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
|
||||||
|
|
||||||
|
|
|
@ -40,7 +40,7 @@ TEST_F(JsonEnv, testWrite) {
|
||||||
{
|
{
|
||||||
std::string colName("test");
|
std::string colName("test");
|
||||||
std::string colVal("ab");
|
std::string colVal("ab");
|
||||||
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
|
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
|
@ -53,7 +53,7 @@ TEST_F(JsonEnv, testWrite) {
|
||||||
{
|
{
|
||||||
std::string colName("voltage");
|
std::string colName("voltage");
|
||||||
std::string colVal("ab1");
|
std::string colVal("ab1");
|
||||||
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
|
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
|
@ -66,7 +66,7 @@ TEST_F(JsonEnv, testWrite) {
|
||||||
{
|
{
|
||||||
std::string colName("voltage");
|
std::string colName("voltage");
|
||||||
std::string colVal("123");
|
std::string colVal("123");
|
||||||
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
|
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
|
@ -81,7 +81,7 @@ TEST_F(JsonEnv, testWrite) {
|
||||||
std::string colVal("ab");
|
std::string colVal("ab");
|
||||||
|
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
|
|
||||||
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
||||||
|
@ -95,7 +95,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
|
||||||
{
|
{
|
||||||
std::string colName("test");
|
std::string colName("test");
|
||||||
std::string colVal("ab");
|
std::string colVal("ab");
|
||||||
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
|
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
|
@ -110,7 +110,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
|
||||||
std::string colVal("abxxxxxxxxxxxx");
|
std::string colVal("abxxxxxxxxxxxx");
|
||||||
for (int i = 0; i < 1000; i++) {
|
for (int i = 0; i < 1000; i++) {
|
||||||
colVal[i % colVal.size()] = '0' + i % 128;
|
colVal[i % colVal.size()] = '0' + i % 128;
|
||||||
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
|
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
|
@ -124,7 +124,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
|
||||||
{
|
{
|
||||||
std::string colName("voltagefdadfa");
|
std::string colName("voltagefdadfa");
|
||||||
std::string colVal("abxxxxxxxxxxxx");
|
std::string colVal("abxxxxxxxxxxxx");
|
||||||
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
|
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
|
@ -139,7 +139,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
|
||||||
std::string colVal("ab");
|
std::string colVal("ab");
|
||||||
|
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
|
|
||||||
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
||||||
|
|
Loading…
Reference in New Issue