diff --git a/include/libs/index/index.h b/include/libs/index/index.h index 453b49e4c6..bf260d4899 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -17,6 +17,7 @@ #define _TD_INDEX_H_ #include "os.h" +#include "taoserror.h" #include "tarray.h" #ifdef __cplusplus @@ -41,11 +42,12 @@ typedef enum { UPDATE_VALUE, // update index column value ADD_INDEX, // add index on specify column DROP_INDEX, // drop existed index - DROP_SATBLE // drop stable + DROP_SATBLE, // drop stable + DEFAULT // query } SIndexOperOnColumn; -typedef enum { MUST = 0, SHOULD = 1, NOT = 2 } EIndexOperatorType; -typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2, QUERY_REGEX = 3, QUERY_RANGE = 4 } EIndexQueryType; +typedef enum { MUST = 0, SHOULD, NOT } EIndexOperatorType; +typedef enum { QUERY_TERM = 0, QUERY_PREFIX, QUERY_SUFFIX, QUERY_REGEX, QUERY_RANGE } EIndexQueryType; /* * create multi query @@ -166,8 +168,8 @@ void indexOptsDestroy(SIndexOpts* opts); * @param: */ -SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn operType, uint8_t colType, const char* colName, - int32_t nColName, const char* colVal, int32_t nColVal); +SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn operType, int8_t qType, uint8_t colType, + const char* colName, int32_t nColName, const char* colVal, int32_t nColVal); void indexTermDestroy(SIndexTerm* p); /* diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 6305022b27..0404b4d447 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -119,6 +119,17 @@ typedef struct SLimit { int64_t offset; } SLimit; +typedef struct SFileBlockLoadRecorder { + uint64_t totalRows; + uint64_t totalCheckedRows; + uint32_t totalBlocks; + uint32_t loadBlocks; + uint32_t loadBlockStatis; + uint32_t skipBlocks; + uint32_t filterOutBlocks; + uint64_t elapsedTime; +} SFileBlockLoadRecorder; + typedef struct STaskCostInfo { int64_t created; int64_t start; @@ -132,14 +143,10 @@ typedef struct STaskCostInfo { uint64_t loadDataInCacheSize; uint64_t loadDataTime; - uint64_t totalRows; - uint64_t totalCheckedRows; - uint32_t totalBlocks; - uint32_t loadBlocks; - uint32_t loadBlockStatis; - uint32_t skipBlocks; - uint32_t filterOutBlocks; + + SFileBlockLoadRecorder* pRecoder; uint64_t elapsedTime; + uint64_t firstStageMergeTime; uint64_t winInfoSize; uint64_t tableInfoSize; @@ -268,7 +275,7 @@ typedef struct SOperatorFpSet { typedef struct SOperatorInfo { uint8_t operatorType; - bool blockingOptr; // block operator or not + bool blocking; // block operator or not uint8_t status; // denote if current operator is completed int32_t numOfOutput; // number of columns of the current operator results char* name; // name, used to show the query execution plan @@ -333,17 +340,14 @@ typedef struct SScanInfo { typedef struct STableScanInfo { void* dataReader; - - int32_t numOfBlocks; // extract basic running information. - int32_t numOfSkipped; - int32_t numOfBlockStatis; + SFileBlockLoadRecorder readRecorder; int64_t numOfRows; int64_t elapsedTime; - int32_t prevGroupId; // previous table group id +// int32_t prevGroupId; // previous table group id SScanInfo scanInfo; - int32_t current; - SNode* pFilterNode; // filter operator info - SqlFunctionCtx* pCtx; // next operator query context + int32_t scanTimes; + SNode* pFilterNode; // filter info, which is push down by optimizer + SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context SResultRowInfo* pResultRowInfo; int32_t* rowCellInfoOffset; SExprInfo* pExpr; @@ -397,7 +401,6 @@ typedef struct SSysTableScanInfo { SArray* scanCols; // SArray scan column id list SName name; SSDataBlock* pRes; - int32_t capacity; int64_t numOfBlocks; // extract basic running information. SLoadRemoteDataInfo loadInfo; } SSysTableScanInfo; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 98804d4f1a..b5d972321b 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -132,6 +132,7 @@ void doSetOperatorCompleted(SOperatorInfo* pOperator) { int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) { OPTR_SET_OPENED(pOperator); + pOperator->cost.openCost = 0; return TSDB_CODE_SUCCESS; } @@ -1592,8 +1593,8 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc STaskCostInfo* pCost = &pTaskInfo->cost; - pCost->totalBlocks += 1; - pCost->totalRows += pBlock->info.rows; +// pCost->totalBlocks += 1; +// pCost->totalRows += pBlock->info.rows; #if 0 // Calculate all time windows that are overlapping or contain current data block. // If current data block is contained by all possible time window, do not load current data block. @@ -2411,12 +2412,13 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) { // // calculateOperatorProfResults(pQInfo); - qDebug("%s :cost summary: elapsed time:%" PRId64 " us, first merge:%" PRId64 - " us, total blocks:%d, " - "load block statis:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64, - GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, - pSummary->loadBlockStatis, pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows); - // + SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder; + if (pSummary->pRecoder != NULL) { + qDebug("%s :cost summary: elapsed time:%" PRId64 " us, first merge:%" PRId64 " us, total blocks:%d, " + "load block statis:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64, + GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pRecorder->totalBlocks, + pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows, pRecorder->totalCheckedRows); + } // qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, // hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0, // pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0); @@ -3282,7 +3284,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock pOperator->name = "ExchangeOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; - pOperator->blockingOptr = false; + pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = pBlock->info.numOfCols; @@ -3673,7 +3675,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t pOperator->name = "SortedMerge"; // pOperator->operatorType = OP_SortedMerge; - pOperator->blockingOptr = true; + pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = num; @@ -3756,7 +3758,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR pOperator->name = "SortOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; - pOperator->blockingOptr = true; + pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; @@ -4419,7 +4421,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->name = "TableAggregate"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG; - pOperator->blockingOptr = true; + pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = pExprInfo; @@ -4532,7 +4534,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p pOperator->name = "ProjectOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; - pOperator->blockingOptr = false; + pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = pExprInfo; @@ -4615,7 +4617,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp } pOperator->name = "FillOperator"; - pOperator->blockingOptr = false; + pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL; pOperator->pExpr = pExpr; @@ -4861,15 +4863,19 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc); SQueryTableDataCond cond = {0}; - int32_t code = initQueryTableDataCond(&cond, pTableScanNode); + + int32_t code = initQueryTableDataCond(&cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { return NULL; } SInterval interval = extractIntervalInfo(pTableScanNode); - return createTableScanOperatorInfo(pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, + SOperatorInfo* pOperator = createTableScanOperatorInfo(pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq, pColList, pResBlock, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo); + STableScanInfo* pScanInfo = pOperator->info; + pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; + return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc); @@ -4877,8 +4883,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. - int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, - queryId, taskId); + int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); SArray* tableIdList = extractTableIdList(pTableGroupInfo); SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc); @@ -5628,7 +5633,7 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf pOperator->name = "JoinOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN; - pOperator->blockingOptr = false; + pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; pOperator->numOfOutput = numOfCols; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index acef8e4bc4..f5dd67217e 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -353,7 +353,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); pOperator->name = "GroupbyAggOperator"; - pOperator->blockingOptr = true; + pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; // pOperator->operatorType = OP_Groupby; pOperator->pExpr = pExprInfo; @@ -612,7 +612,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* } pOperator->name = "PartitionOperator"; - pOperator->blockingOptr = true; + pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; pInfo->binfo.pRes = pResultBlock; diff --git a/source/libs/executor/src/indexoperator.c b/source/libs/executor/src/indexoperator.c index 911ba22fe6..0f075c5de2 100644 --- a/source/libs/executor/src/indexoperator.c +++ b/source/libs/executor/src/indexoperator.c @@ -55,13 +55,27 @@ typedef struct SIFParam { SArray *result; char * condValue; + uint8_t colValType; col_id_t colId; - - int64_t suid; // add later - char dbName[TSDB_DB_NAME_LEN]; - char colName[TSDB_COL_NAME_LEN]; + int64_t suid; // add later + char dbName[TSDB_DB_NAME_LEN]; + char colName[TSDB_COL_NAME_LEN]; } SIFParam; +static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) { + if (src == OP_TYPE_GREATER_THAN || src == OP_TYPE_GREATER_EQUAL || src == OP_TYPE_LOWER_THAN || + src == OP_TYPE_LOWER_EQUAL) { + *dst = QUERY_RANGE; + } else if (src == OP_TYPE_EQUAL) { + *dst = QUERY_TERM; + } else if (src == OP_TYPE_LIKE || src == OP_TYPE_MATCH || src == OP_TYPE_NMATCH) { + *dst = QUERY_REGEX; + } else { + return TSDB_CODE_QRY_INVALID_INPUT; + } + return TSDB_CODE_SUCCESS; +} + typedef int32_t (*sif_func_t)(SIFParam *left, SIFParam *rigth, SIFParam *output); // construct tag filter operator later static void destroyTagFilterOperatorInfo(void *param) { @@ -145,10 +159,11 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) { SColumnNode *cn = (SColumnNode *)node; /*only support tag column*/ SIF_ERR_RET(sifValidateColumn(cn)); + param->colId = cn->colId; + param->colValType = cn->node.resType.type; memcpy(param->dbName, cn->dbName, sizeof(cn->dbName)); memcpy(param->colName, cn->colName, sizeof(cn->colName)); - break; } case QUERY_NODE_NODE_LIST: { @@ -231,61 +246,76 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu qError("index-filter not support buildin function"); return TSDB_CODE_QRY_INVALID_INPUT; } -static int32_t sifIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) { - SIndexMultiTermQuery *mq = indexMultiTermQueryCreate(MUST); - return TSDB_CODE_SUCCESS; +static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) { + SIndexTerm *tm = indexTermCreate(left->suid, DEFAULT, operType, left->colValType, left->colName, + strlen(left->colName), right->condValue, strlen(right->condValue)); + if (operType == OP_TYPE_LOWER_EQUAL || operType == OP_TYPE_GREATER_EQUAL || operType == OP_TYPE_GREATER_THAN || + operType == OP_TYPE_LOWER_THAN) { + } + if (tm == NULL) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + SIndexMultiTermQuery *mtm = indexMultiTermQueryCreate(MUST); + + EIndexQueryType qtype = 0; + SIF_ERR_RET(sifGetFuncFromSql(operType, &qtype)); + + indexMultiTermQueryAdd(mtm, tm, qtype); + int ret = indexSearch(NULL, mtm, output->result); + indexMultiTermQueryDestroy(mtm); + return ret; } static int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_LOWER_THAN; - return sifIndex(left, right, id, output); + return sifDoIndex(left, right, id, output); } static int32_t sifLessEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_LOWER_EQUAL; - return sifIndex(left, right, id, output); + return sifDoIndex(left, right, id, output); } static int32_t sifGreaterThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_GREATER_THAN; - return sifIndex(left, right, id, output); + return sifDoIndex(left, right, id, output); } static int32_t sifGreaterEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_GREATER_EQUAL; - return sifIndex(left, right, id, output); + return sifDoIndex(left, right, id, output); } static int32_t sifEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_EQUAL; - return sifIndex(left, right, id, output); + return sifDoIndex(left, right, id, output); } static int32_t sifNotEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_NOT_EQUAL; - return sifIndex(left, right, id, output); + return sifDoIndex(left, right, id, output); } static int32_t sifInFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_IN; - return sifIndex(left, right, id, output); + return sifDoIndex(left, right, id, output); } static int32_t sifNotInFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_NOT_IN; - return sifIndex(left, right, id, output); + return sifDoIndex(left, right, id, output); } static int32_t sifLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_LIKE; - return sifIndex(left, right, id, output); + return sifDoIndex(left, right, id, output); } static int32_t sifNotLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_NOT_LIKE; - return sifIndex(left, right, id, output); + return sifDoIndex(left, right, id, output); } static int32_t sifMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_MATCH; - return sifIndex(left, right, id, output); + return sifDoIndex(left, right, id, output); } static int32_t sifNotMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_NMATCH; - return sifIndex(left, right, id, output); + return sifDoIndex(left, right, id, output); } static int32_t sifDefaultFunc(SIFParam *left, SIFParam *right, SIFParam *output) { // add more except @@ -460,6 +490,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) { qError("index-filter failed to taosHashInit"); return TSDB_CODE_QRY_OUT_OF_MEMORY; } + nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx); SIF_ERR_RET(ctx.code); @@ -498,6 +529,7 @@ SIdxFltStatus idxGetFltStatus(SNode *pFilterNode) { if (pFilterNode == NULL) { return SFLT_NOT_INDEX; } + // impl later return SFLT_ACCURATE_INDEX; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a66d2a263c..7e721455c3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "common/ttime.h" +#include "ttime.h" #include "filter.h" #include "function.h" #include "functionMgt.h" @@ -36,10 +36,10 @@ #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) -int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo); +static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity); +static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName); -int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName); -void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) { +static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) { for (int32_t i = 0; i < numOfOutput; ++i) { SWITCH_ORDER(pCtx[i].order); } @@ -158,11 +158,11 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn return false; } -int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { +static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STableScanInfo* pInfo = pOperator->info; - STaskCostInfo* pCost = &pTaskInfo->cost; + SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; pCost->totalBlocks += 1; pCost->totalRows += pBlock->info.rows; @@ -188,11 +188,11 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { pCost->loadBlockStatis += 1; - bool allHave = true; + bool allColumnsHaveAgg = true; SColumnDataAgg** pColAgg = NULL; - tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg, &allHave); + tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg); - if (allHave == true) { + if (allColumnsHaveAgg == true) { int32_t numOfCols = pBlock->info.numOfCols; // todo create this buffer during creating operator @@ -236,6 +236,7 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, } relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols); + // todo record the filter time cost doFilter(pTableScanInfo->pFilterNode, pBlock); if (pBlock->info.rows == 0) { pCost->filterOutBlocks += 1; @@ -266,7 +267,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } - pTableScanInfo->numOfBlocks += 1; tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info); uint32_t status = 0; @@ -296,15 +296,16 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { return NULL; } - while (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) { + // do the ascending order traverse in the first place. + while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { SSDataBlock* p = doTableScanImpl(pOperator); if (p != NULL) { return p; } - pTableScanInfo->current += 1; + pTableScanInfo->scanTimes += 1; - if (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) { + if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); pTableScanInfo->scanFlag = REPEAT_SCAN; @@ -319,7 +320,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { } int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc; - if (pTableScanInfo->current < total) { + if (pTableScanInfo->scanTimes < total) { if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) { prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput); tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond); @@ -329,21 +330,20 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { qDebug("%s start to descending order scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); - while (pTableScanInfo->current < total) { + while (pTableScanInfo->scanTimes < total) { SSDataBlock* p = doTableScanImpl(pOperator); if (p != NULL) { return p; } - pTableScanInfo->current += 1; + pTableScanInfo->scanTimes += 1; - if (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) { + if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); pTableScanInfo->scanFlag = REPEAT_SCAN; qDebug("%s start to repeat descending order scan data blocks due to query func required, qrange:%" PRId64 - "-%" PRId64, - GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); + "-%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); // do prepare for the next round table scan operation tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond); @@ -372,48 +372,50 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon pInfo->cond = *pCond; pInfo->scanInfo = (SScanInfo){.numOfAsc = scanInfo[0], .numOfDesc = scanInfo[1]}; - pInfo->interval = *pInterval; - pInfo->sampleRatio = sampleRatio; + pInfo->interval = *pInterval; + pInfo->sampleRatio = sampleRatio; pInfo->dataBlockLoadFlag = dataLoadFlag; - pInfo->pResBlock = pResBlock; - pInfo->pFilterNode = pCondition; - pInfo->dataReader = pDataReader; - pInfo->current = 0; - pInfo->scanFlag = MAIN_SCAN; - pInfo->pColMatchInfo = pColMatchInfo; + pInfo->pResBlock = pResBlock; + pInfo->pFilterNode = pCondition; + pInfo->dataReader = pDataReader; + pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColMatchInfo; - pOperator->name = "TableScanOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; - pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfOutput = numOfOutput; - pOperator->fpSet.getNextFn = doTableScan; - pOperator->pTaskInfo = pTaskInfo; + pOperator->name = "TableScanOperator"; // for dubug purpose + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfOutput = numOfOutput; + pOperator->pTaskInfo = pTaskInfo; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, NULL, NULL, NULL, NULL); static int32_t cost = 0; - pOperator->cost.openCost = ++cost; + + // for non-blocking operator, the open cost is always 0 + pOperator->cost.openCost = 0; pOperator->cost.totalCost = ++cost; pOperator->resultInfo.totalRows = ++cost; return pOperator; } -SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle) { +SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) { STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); - - pInfo->dataReader = pTsdbReadHandle; - pInfo->current = 0; - pInfo->prevGroupId = -1; - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - pOperator->name = "TableSeqScanOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN; - pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->fpSet.getNextFn = doTableScanImpl; + pInfo->dataReader = pReadHandle; +// pInfo->prevGroupId = -1; + + pOperator->name = "TableSeqScanOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL); return pOperator; } @@ -482,7 +484,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pOperator->name = "DataBlockInfoScanOperator"; // pOperator->operatorType = OP_TableBlockInfoScan; - pOperator->blockingOptr = false; + pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->fpSet._openFn = operatorDummyOpenFn; pOperator->fpSet.getNextFn = doBlockInfoScan; @@ -636,20 +638,22 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* } pInfo->readerHandle = streamReadHandle; - pInfo->pRes = pResBlock; - pInfo->pCondition = pCondition; + pInfo->pRes = pResBlock; + pInfo->pCondition = pCondition; - pOperator->name = "StreamBlockScanOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; - pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfOutput = pResBlock->info.numOfCols; - pOperator->fpSet._openFn = operatorDummyOpenFn; + pOperator->name = "StreamBlockScanOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfOutput = pResBlock->info.numOfCols; + pOperator->fpSet._openFn = operatorDummyOpenFn; pOperator->fpSet.getNextFn = doStreamBlockScan; - pOperator->fpSet.closeFn = operatorDummyCloseFn; + pOperator->fpSet.closeFn = operatorDummyCloseFn; pOperator->pTaskInfo = pTaskInfo; + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL); + return pOperator; } @@ -671,14 +675,12 @@ EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) { switch (nType) { case QUERY_NODE_OPERATOR: { SOperatorNode* node = (SOperatorNode*)pNode; - if (OP_TYPE_EQUAL == node->opType) { *(int32_t*)pContext = 1; return DEAL_RES_CONTINUE; } *(int32_t*)pContext = 0; - return DEAL_RES_IGNORE_CHILD; } case QUERY_NODE_COLUMN: { @@ -709,19 +711,17 @@ EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) { default: break; } - return DEAL_RES_CONTINUE; } -void getDBNameFromCondition(SNode* pCondition, char* dbName) { +static void getDBNameFromCondition(SNode* pCondition, const char* dbName) { if (NULL == pCondition) { return; } - - nodesWalkExpr(pCondition, getDBNameFromConditionWalker, dbName); + nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*) dbName); } -static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t code) { +static int32_t loadSysTableCallback(void* param, const SDataBuf* pMsg, int32_t code) { SOperatorInfo* operator=(SOperatorInfo*) param; SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*)operator->info; if (TSDB_CODE_SUCCESS == code) { @@ -746,13 +746,14 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { } SFilterInfo* filter = NULL; - int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0); + + int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0); SFilterColumnParam param1 = {.numOfCols = pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock}; code = filterSetDataFromSlotId(filter, ¶m1); int8_t* rowRes = NULL; - bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols); + bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols); filterFreeInfo(filter); SSDataBlock* px = createOneDataBlock(pInfo->pRes, false); @@ -795,61 +796,31 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { static SSDataBlock* buildSysTableMetaBlock() { SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - pBlock->info.numOfCols = 10; - pBlock->info.hasVarCol = true; + + size_t size = 0; + const SSysTableMeta *pMeta = NULL; + getInfosDbMeta(&pMeta, &size); + + int32_t index = 0; + for(int32_t i = 0; i < size; ++i) { + if(strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) { + index = i; + break; + } + } pBlock->pDataBlock = taosArrayInit(pBlock->info.numOfCols, sizeof(SColumnInfoData)); - SColumnInfoData colInfoData = {0}; - colInfoData.info.colId = 1; - colInfoData.info.type = TSDB_DATA_TYPE_VARCHAR; - colInfoData.info.bytes = (TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE; - taosArrayPush(pBlock->pDataBlock, &colInfoData); + for(int32_t i = 0; i < pMeta[index].colNum; ++i) { + SColumnInfoData colInfoData = {0}; + colInfoData.info.colId = i + 1; + colInfoData.info.type = pMeta[index].schema[i].type; + colInfoData.info.bytes = pMeta[index].schema[i].bytes; + taosArrayPush(pBlock->pDataBlock, &colInfoData); + } - colInfoData.info.colId = 2; - colInfoData.info.type = TSDB_DATA_TYPE_VARCHAR; - colInfoData.info.bytes = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE; - taosArrayPush(pBlock->pDataBlock, &colInfoData); - - colInfoData.info.colId = 3; - colInfoData.info.type = TSDB_DATA_TYPE_TIMESTAMP; - colInfoData.info.bytes = 8; - taosArrayPush(pBlock->pDataBlock, &colInfoData); - - colInfoData.info.colId = 4; - colInfoData.info.type = TSDB_DATA_TYPE_INT; - colInfoData.info.bytes = 4; - taosArrayPush(pBlock->pDataBlock, &colInfoData); - - colInfoData.info.colId = 5; - colInfoData.info.type = TSDB_DATA_TYPE_VARCHAR; - colInfoData.info.bytes = (TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE; - taosArrayPush(pBlock->pDataBlock, &colInfoData); - - colInfoData.info.colId = 6; - colInfoData.info.type = TSDB_DATA_TYPE_BIGINT; - colInfoData.info.bytes = 8; - taosArrayPush(pBlock->pDataBlock, &colInfoData); - - colInfoData.info.colId = 7; - colInfoData.info.type = TSDB_DATA_TYPE_INT; - colInfoData.info.bytes = 4; - taosArrayPush(pBlock->pDataBlock, &colInfoData); - - colInfoData.info.colId = 8; - colInfoData.info.type = TSDB_DATA_TYPE_INT; - colInfoData.info.bytes = 4; - taosArrayPush(pBlock->pDataBlock, &colInfoData); - - colInfoData.info.colId = 9; - colInfoData.info.type = TSDB_DATA_TYPE_VARCHAR; - colInfoData.info.bytes = 512 + VARSTR_HEADER_SIZE; - taosArrayPush(pBlock->pDataBlock, &colInfoData); - - colInfoData.info.colId = 10; - colInfoData.info.type = TSDB_DATA_TYPE_VARCHAR; - colInfoData.info.bytes = 20 + VARSTR_HEADER_SIZE; - taosArrayPush(pBlock->pDataBlock, &colInfoData); + pBlock->info.numOfCols = pMeta[index].colNum; + pBlock->info.hasVarCol = true; return pBlock; } @@ -868,7 +839,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { return NULL; } - buildSysDbTableInfo(pInfo); + buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity); doFilterResult(pInfo); pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; @@ -896,7 +867,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { varDataSetLen(dbname, strlen(varDataVal(dbname))); SSDataBlock* p = buildSysTableMetaBlock(); - blockDataEnsureCapacity(p, pInfo->capacity); + blockDataEnsureCapacity(p, pOperator->resultInfo.capacity); char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; while (metaTbCursorNext(pInfo->pCur) == 0) { @@ -977,7 +948,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { pColInfoData = taosArrayGet(p->pDataBlock, 9); colDataAppend(pColInfoData, numOfRows, str, false); - if (++numOfRows >= pInfo->capacity) { + if (++numOfRows >= pOperator->resultInfo.capacity) { break; } } @@ -1022,7 +993,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { pMsgSendInfo->msgInfo.pData = buf1; pMsgSendInfo->msgInfo.len = contLen; pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE; - pMsgSendInfo->fp = loadSysTableContentCb; + pMsgSendInfo->fp = loadSysTableCallback; int64_t transporterId = 0; int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo); @@ -1060,9 +1031,9 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { } } -int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo) { +int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) { SSDataBlock* p = buildSysTableMetaBlock(); - blockDataEnsureCapacity(p, pInfo->capacity); + blockDataEnsureCapacity(p, capacity); size_t size = 0; const SSysTableMeta* pSysDbTableMeta = NULL; @@ -1133,18 +1104,19 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe return NULL; } - pInfo->accountId = accountId; + pInfo->accountId = accountId; pInfo->showRewrite = showRewrite; - pInfo->pRes = pResBlock; - pInfo->capacity = 4096; - pInfo->pCondition = pCondition; - pInfo->scanCols = colList; + pInfo->pRes = pResBlock; + pInfo->pCondition = pCondition; + pInfo->scanCols = colList; + + initResultSizeInfo(pOperator, 4096); tNameAssign(&pInfo->name, pName); const char* name = tNameGetTableName(&pInfo->name); if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { pInfo->readHandle = *(SReadHandle*) readHandle; - blockDataEnsureCapacity(pInfo->pRes, pInfo->capacity); + blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); } else { tsem_init(&pInfo->ready, 0, 0); pInfo->epSet = epset; @@ -1173,12 +1145,12 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe #endif } - pOperator->name = "SysTableScanOperator"; + pOperator->name = "SysTableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN; - pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfOutput = pResBlock->info.numOfCols; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfOutput = pResBlock->info.numOfCols; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL); pOperator->pTaskInfo = pTaskInfo; @@ -1188,7 +1160,6 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { #if 0 - SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -1322,29 +1293,29 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { pInfo->pRes = blockDataDestroy(pInfo->pRes); } -SOperatorInfo* createTagScanOperatorInfo(void* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput, - SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createTagScanOperatorInfo(void* readHandle, SExprInfo* pExpr, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->pReader = pReaderHandle; - pInfo->curPos = 0; - pOperator->name = "TagScanOperator"; + pInfo->pReader = readHandle; + pInfo->curPos = 0; + pOperator->name = "TagScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; - pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL); - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; - pOperator->pTaskInfo = pTaskInfo; return pOperator; + _error: taosMemoryFree(pInfo); taosMemoryFree(pOperator); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 33112b5982..580614dc02 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1078,7 +1078,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->name = "TimeIntervalAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL; - pOperator->blockingOptr = true; + pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; pOperator->pTaskInfo = pTaskInfo; @@ -1137,7 +1137,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr pOperator->name = "StreamTimeIntervalAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL; - pOperator->blockingOptr = true; + pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; pOperator->pTaskInfo = pTaskInfo; @@ -1343,7 +1343,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->name = "TimeSliceOperator"; // pOperator->operatorType = OP_AllTimeWindow; - pOperator->blockingOptr = true; + pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; pOperator->numOfOutput = numOfCols; @@ -1385,7 +1385,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf pInfo->tsSlotId = tsSlotId; pOperator->name = "StateWindowOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW; - pOperator->blockingOptr = true; + pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfCols; @@ -1437,7 +1437,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo pInfo->reptScan = false; pOperator->name = "SessionWindowAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW; - pOperator->blockingOptr = true; + pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; pOperator->numOfOutput = numOfCols; diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 37318767c7..62ddf85985 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -57,7 +57,7 @@ struct SIndex { char* path; - SIndexStat stat; + SIndexStat stat; TdThreadMutex mtx; }; @@ -86,6 +86,7 @@ typedef struct SIndexTerm { int32_t nColName; char* colVal; int32_t nColVal; + int8_t qType; // just use for range } SIndexTerm; typedef struct SIndexTermQuery { @@ -165,7 +166,7 @@ int32_t indexSerialCacheKey(ICacheKey* key, char* buf); } while (0) #define INDEX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0) -#define INDEX_TYPE_GET_TYPE(ty) (ty & 0x0F) +#define INDEX_TYPE_GET_TYPE(ty) (ty & 0x0F) #define INDEX_TYPE_ADD_EXTERN_TYPE(ty, exTy) \ do { \ uint8_t oldTy = ty; \ diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index aeb5e01175..ee77aa92e9 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -175,55 +175,19 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { return 0; } int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) { -#ifdef USE_LUCENE - EIndexOperatorType opera = multiQuerys->opera; - - int nQuery = taosArrayGetSize(multiQuerys->query); - char** fields = taosMemoryMalloc(sizeof(char*) * nQuery); - char** keys = taosMemoryMalloc(sizeof(char*) * nQuery); - int* types = taosMemoryMalloc(sizeof(int) * nQuery); - - for (int i = 0; i < nQuery; i++) { - SIndexTermQuery* p = taosArrayGet(multiQuerys->query, i); - SIndexTerm* term = p->field_value; - - fields[i] = taosMemoryCalloc(1, term->nKey + 1); - keys[i] = taosMemoryCalloc(1, term->nVal + 1); - - memcpy(fields[i], term->key, term->nKey); - memcpy(keys[i], term->val, term->nVal); - types[i] = (int)(p->type); - } - int* tResult = NULL; - int tsz = 0; - index_multi_search(index->index, (const char**)fields, (const char**)keys, types, nQuery, opera, &tResult, &tsz); - - for (int i = 0; i < tsz; i++) { - taosArrayPush(result, &tResult[i]); - } - - for (int i = 0; i < nQuery; i++) { - taosMemoryFree(fields[i]); - taosMemoryFree(keys[i]); - } - taosMemoryFree(fields); - taosMemoryFree(keys); - taosMemoryFree(types); -#endif - #ifdef USE_INVERTED_INDEX EIndexOperatorType opera = multiQuerys->opera; // relation of querys - SArray* interResults = taosArrayInit(4, POINTER_BYTES); + SArray* iRslts = taosArrayInit(4, POINTER_BYTES); int nQuery = taosArrayGetSize(multiQuerys->query); for (size_t i = 0; i < nQuery; i++) { - SIndexTermQuery* qTerm = taosArrayGet(multiQuerys->query, i); - SArray* tResult = NULL; - indexTermSearch(index, qTerm, &tResult); - taosArrayPush(interResults, (void*)&tResult); + SIndexTermQuery* qterm = taosArrayGet(multiQuerys->query, i); + SArray* trslt = NULL; + indexTermSearch(index, qterm, &trslt); + taosArrayPush(iRslts, (void*)&trslt); } - indexMergeFinalResults(interResults, opera, result); - indexInterResultsDestroy(interResults); + indexMergeFinalResults(iRslts, opera, result); + indexInterResultsDestroy(iRslts); #endif return 0; @@ -280,8 +244,8 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde return 0; } -SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName, - int32_t nColName, const char* colVal, int32_t nColVal) { +SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, int8_t queryType, uint8_t colType, + const char* colName, int32_t nColName, const char* colVal, int32_t nColVal) { SIndexTerm* tm = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm))); if (tm == NULL) { return NULL; @@ -298,6 +262,7 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy tm->colVal = (char*)taosMemoryCalloc(1, nColVal + 1); memcpy(tm->colVal, colVal, nColVal); tm->nColVal = nColVal; + tm->qType = queryType; return tm; } diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index df3c0b6e7b..78368981b3 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -34,9 +34,64 @@ static char* indexCacheTermGet(const void* pData); static MemTable* indexInternalCacheCreate(int8_t type); +static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchPrefix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchSuffix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); + +static int32_t (*cacheSearch[])(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) = { + cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchRange}; + static void doMergeWork(SSchedMsg* msg); static bool indexCacheIteratorNext(Iterate* itera); +static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) { + if (cache == NULL) { + return 0; + } + + MemTable* mem = cache; + char* key = indexCacheTermGet(ct); + + SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); + while (tSkipListIterNext(iter)) { + SSkipListNode* node = tSkipListIterGet(iter); + if (node == NULL) { + break; + } + CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); + if (0 == strcmp(c->colVal, ct->colVal)) { + if (c->operaType == ADD_VALUE) { + INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) + // taosArrayPush(result, &c->uid); + *s = kTypeValue; + } else if (c->operaType == DEL_VALUE) { + INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid) + } + } else { + break; + } + } + tSkipListDestroyIter(iter); + return 0; +} +static int32_t cacheSearchPrefix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) { + // impl later + return 0; +} +static int32_t cacheSearchSuffix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) { + // impl later + return 0; +} +static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) { + // impl later + return 0; +} +static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) { + // impl later + return 0; +} static IterateValue* indexCacheIteratorGetValue(Iterate* iter); IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) { @@ -263,33 +318,7 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SI if (mem == NULL) { return 0; } - char* key = indexCacheTermGet(ct); - - SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); - while (tSkipListIterNext(iter)) { - SSkipListNode* node = tSkipListIterGet(iter); - if (node != NULL) { - CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); - if (qtype == QUERY_TERM) { - if (0 == strcmp(c->colVal, ct->colVal)) { - if (c->operaType == ADD_VALUE) { - INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) - // taosArrayPush(result, &c->uid); - *s = kTypeValue; - } else if (c->operaType == DEL_VALUE) { - INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid) - } - } else { - break; - } - } else if (qtype == QUERY_PREFIX) { - } else if (qtype == QUERY_SUFFIX) { - } else if (qtype == QUERY_RANGE) { - } - } - } - tSkipListDestroyIter(iter); - return 0; + return cacheSearch[qtype](mem, ct, tr, s); } int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) { int64_t st = taosGetTimestampUs(); diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index 554b8092a2..7072baf574 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -57,6 +57,17 @@ static int tfileCompare(const void* a, const void* b); static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int* version); static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int version); static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int32_t version); +/* + * search from tfile + */ +static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr); + +static int32_t (*tfSearch[])(void* reader, SIndexTerm* tem, SIdxTempResult* tr) = { + tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchRange}; TFileCache* tfileCacheCreate(const char* path) { TFileCache* tcache = taosMemoryCalloc(1, sizeof(TFileCache)); @@ -183,59 +194,153 @@ void tfileReaderDestroy(TFileReader* reader) { writerCtxDestroy(reader->ctx, reader->remove); taosMemoryFree(reader); } +static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); + int ret = 0; + char* p = tem->colVal; + uint64_t sz = tem->nColVal; + if (hasJson) { + p = indexPackJsonData(tem); + sz = strlen(p); + } + int64_t st = taosGetTimestampUs(); + FstSlice key = fstSliceCreate(p, sz); + uint64_t offset; + if (fstGet(((TFileReader*)reader)->fst, &key, &offset)) { + int64_t et = taosGetTimestampUs(); + int64_t cost = et - st; + indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us", + tem->suid, tem->colName, tem->colVal, cost); + + ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total); + cost = taosGetTimestampUs() - et; + indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid, + tem->colName, tem->colVal, cost); + } + if (hasJson) { + taosMemoryFree(p); + } + fstSliceDestroy(&key); + return 0; +} + +static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); + char* p = tem->colVal; + uint64_t sz = tem->nColVal; + if (hasJson) { + p = indexPackJsonData(tem); + sz = strlen(p); + } + + SArray* offsets = taosArrayInit(16, sizeof(uint64_t)); + + AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX); + FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx); + StreamWithState* st = streamBuilderIntoStream(sb); + StreamWithStateResult* rt = NULL; + while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { + taosArrayPush(offsets, &(rt->out.out)); + swsResultDestroy(rt); + } + streamWithStateDestroy(st); + fstStreamBuilderDestroy(sb); + + int32_t ret = 0; + for (int i = 0; i < taosArrayGetSize(offsets); i++) { + uint64_t offset = *(uint64_t*)taosArrayGet(offsets, i); + ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total); + if (ret != 0) { + indexError("failed to find target tablelist"); + return TSDB_CODE_TDB_FILE_CORRUPTED; + } + } + if (hasJson) { + taosMemoryFree(p); + } + return 0; +} +static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); + + int ret = 0; + char* p = tem->colVal; + uint64_t sz = tem->nColVal; + if (hasJson) { + p = indexPackJsonData(tem); + sz = strlen(p); + } + int64_t st = taosGetTimestampUs(); + FstSlice key = fstSliceCreate(p, sz); + /*impl later*/ + if (hasJson) { + taosMemoryFree(p); + } + fstSliceDestroy(&key); + return 0; +} +static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); + + int ret = 0; + char* p = tem->colVal; + uint64_t sz = tem->nColVal; + if (hasJson) { + p = indexPackJsonData(tem); + sz = strlen(p); + } + int64_t st = taosGetTimestampUs(); + FstSlice key = fstSliceCreate(p, sz); + /*impl later*/ + + if (hasJson) { + taosMemoryFree(p); + } + fstSliceDestroy(&key); + return 0; +} +static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); + int ret = 0; + char* p = tem->colVal; + uint64_t sz = tem->nColVal; + if (hasJson) { + p = indexPackJsonData(tem); + sz = strlen(p); + } + int64_t st = taosGetTimestampUs(); + FstSlice key = fstSliceCreate(p, sz); + // uint64_t offset; + // if (fstGet(((TFileReader*)reader)->fst, &key, &offset)) { + // int64_t et = taosGetTimestampUs(); + // int64_t cost = et - st; + // indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us", + // tem->suid, tem->colName, tem->colVal, cost); + + // ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total); + // cost = taosGetTimestampUs() - et; + // indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid, + // tem->colName, tem->colVal, cost); + //} + if (hasJson) { + taosMemoryFree(p); + } + fstSliceDestroy(&key); + return 0; +} int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr) { SIndexTerm* term = query->term; - bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON); EIndexQueryType qtype = query->qType; - - // SArray* result = taosArrayInit(16, sizeof(uint64_t)); - int ret = -1; - // refactor to callback later - if (qtype == QUERY_TERM) { - uint64_t offset; - char* p = term->colVal; - uint64_t sz = term->nColVal; - if (hasJson) { - p = indexPackJsonData(term); - sz = strlen(p); - } - int64_t st = taosGetTimestampUs(); - FstSlice key = fstSliceCreate(p, sz); - if (fstGet(reader->fst, &key, &offset)) { - int64_t et = taosGetTimestampUs(); - int64_t cost = et - st; - indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us", - term->suid, term->colName, term->colVal, cost); - - ret = tfileReaderLoadTableIds(reader, offset, tr->total); - cost = taosGetTimestampUs() - et; - indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", term->suid, - term->colName, term->colVal, cost); - } else { - indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName, - term->colVal); - } - fstSliceDestroy(&key); - if (hasJson) { - taosMemoryFree(p); - } - } else if (qtype == QUERY_PREFIX) { - // handle later - // - } else if (qtype == QUERY_SUFFIX) { - // handle later - } else if (qtype == QUERY_REGEX) { - // handle later - } else if (qtype == QUERY_RANGE) { - // handle later + if (qtype >= sizeof(tfSearch) / sizeof(tfSearch[0])) { + indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName, + term->colVal); + return -1; + } else { + return tfSearch[qtype](reader, term, tr); } tfileReaderUnRef(reader); - - // taosArrayAddAll(tr->total, result); - // taosArrayDestroy(result); - - return ret; + return 0; } TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t colType) { diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 0162b754ee..d8ea6a8233 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -483,9 +483,9 @@ TEST_F(IndexTFileEnv, test_tfile_write) { std::string colName("voltage"); std::string colVal("ab"); - SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); - SIndexTermQuery query = { term, QUERY_TERM}; + SIndexTermQuery query = {term, QUERY_TERM}; SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t)); fObj->Get(&query, result); @@ -557,7 +557,7 @@ TEST_F(IndexCacheEnv, cache_test) { std::string colName("voltage"); { std::string colVal("v1"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); indexTermDestroy(term); @@ -565,28 +565,28 @@ TEST_F(IndexCacheEnv, cache_test) { } { std::string colVal("v3"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); indexTermDestroy(term); } { std::string colVal("v2"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); indexTermDestroy(term); } { std::string colVal("v3"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); indexTermDestroy(term); } { std::string colVal("v3"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); indexTermDestroy(term); @@ -595,14 +595,14 @@ TEST_F(IndexCacheEnv, cache_test) { std::cout << "--------first----------" << std::endl; { std::string colVal("v3"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, othColId, version++, suid++); indexTermDestroy(term); } { std::string colVal("v4"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, othColId, version++, suid++); indexTermDestroy(term); @@ -613,7 +613,7 @@ TEST_F(IndexCacheEnv, cache_test) { std::string colVal("v4"); for (size_t i = 0; i < 10; i++) { colVal[colVal.size() - 1] = 'a' + i; - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); indexTermDestroy(term); @@ -623,9 +623,9 @@ TEST_F(IndexCacheEnv, cache_test) { // begin query { std::string colVal("v3"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); - SIndexTermQuery query = { term, QUERY_TERM }; + SIndexTermQuery query = {term, QUERY_TERM}; SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid)); STermValueType valType; @@ -638,9 +638,9 @@ TEST_F(IndexCacheEnv, cache_test) { } { std::string colVal("v2"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); - SIndexTermQuery query = { term, QUERY_TERM }; + SIndexTermQuery query = {term, QUERY_TERM}; SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid)); STermValueType valType; @@ -670,7 +670,7 @@ class IndexObj { return ret; } void Del(const std::string& colName, const std::string& colVal, uint64_t uid) { - SIndexTerm* term = indexTermCreate(0, DEL_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, DEL_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); @@ -679,7 +679,7 @@ class IndexObj { } int WriteMillonData(const std::string& colName, const std::string& colVal = "Hello world", size_t numOfTable = 100 * 10000) { - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); @@ -701,7 +701,7 @@ class IndexObj { // opt tColVal[taosRand() % colValSize] = 'a' + k % 26; } - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), tColVal.c_str(), tColVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); @@ -737,7 +737,7 @@ class IndexObj { int SearchOne(const std::string& colName, const std::string& colVal) { SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); indexMultiTermQueryAdd(mq, term, QUERY_TERM); @@ -759,7 +759,7 @@ class IndexObj { } int SearchOneTarget(const std::string& colName, const std::string& colVal, uint64_t val) { SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); indexMultiTermQueryAdd(mq, term, QUERY_TERM); @@ -784,7 +784,7 @@ class IndexObj { void PutOne(const std::string& colName, const std::string& colVal) { SIndexMultiTerm* terms = indexMultiTermCreate(); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); indexMultiTermAdd(terms, term); Put(terms, 10); @@ -792,7 +792,7 @@ class IndexObj { } void PutOneTarge(const std::string& colName, const std::string& colVal, uint64_t val) { SIndexMultiTerm* terms = indexMultiTermCreate(); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); indexMultiTermAdd(terms, term); Put(terms, val); @@ -832,7 +832,7 @@ TEST_F(IndexEnv2, testIndexOpen) { { std::string colName("tag1"), colVal("Hello"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); @@ -847,7 +847,7 @@ TEST_F(IndexEnv2, testIndexOpen) { size_t size = 200; std::string colName("tag1"), colVal("hello"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); @@ -862,7 +862,7 @@ TEST_F(IndexEnv2, testIndexOpen) { size_t size = 200; std::string colName("tag1"), colVal("Hello"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); @@ -877,7 +877,7 @@ TEST_F(IndexEnv2, testIndexOpen) { { std::string colName("tag1"), colVal("Hello"); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); indexMultiTermQueryAdd(mq, term, QUERY_TERM); diff --git a/source/libs/index/test/jsonUT.cc b/source/libs/index/test/jsonUT.cc index f789d23136..e1e5004701 100644 --- a/source/libs/index/test/jsonUT.cc +++ b/source/libs/index/test/jsonUT.cc @@ -40,7 +40,7 @@ TEST_F(JsonEnv, testWrite) { { std::string colName("test"); std::string colVal("ab"); - SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); @@ -53,7 +53,7 @@ TEST_F(JsonEnv, testWrite) { { std::string colName("voltage"); std::string colVal("ab1"); - SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); @@ -66,7 +66,7 @@ TEST_F(JsonEnv, testWrite) { { std::string colName("voltage"); std::string colVal("123"); - SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); @@ -81,7 +81,7 @@ TEST_F(JsonEnv, testWrite) { std::string colVal("ab"); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SArray* result = taosArrayInit(1, sizeof(uint64_t)); @@ -95,7 +95,7 @@ TEST_F(JsonEnv, testWriteMillonData) { { std::string colName("test"); std::string colVal("ab"); - SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); @@ -110,7 +110,7 @@ TEST_F(JsonEnv, testWriteMillonData) { std::string colVal("abxxxxxxxxxxxx"); for (int i = 0; i < 1000; i++) { colVal[i % colVal.size()] = '0' + i % 128; - SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); @@ -124,7 +124,7 @@ TEST_F(JsonEnv, testWriteMillonData) { { std::string colName("voltagefdadfa"); std::string colVal("abxxxxxxxxxxxx"); - SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); @@ -139,7 +139,7 @@ TEST_F(JsonEnv, testWriteMillonData) { std::string colVal("ab"); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SArray* result = taosArrayInit(1, sizeof(uint64_t));