Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/row_refact

This commit is contained in:
Hongze Cheng 2022-05-28 12:20:14 +00:00
commit 8db7566fc6
24 changed files with 513 additions and 340 deletions

View File

@ -192,11 +192,16 @@ void indexTermDestroy(SIndexTerm* p);
void indexInit(); void indexInit();
/* index filter */ /* index filter */
typedef struct SIndexMetaArg {
void* metaHandle;
uint64_t suid;
} SIndexMetaArg;
typedef enum { SFLT_NOT_INDEX, SFLT_COARSE_INDEX, SFLT_ACCURATE_INDEX } SIdxFltStatus; typedef enum { SFLT_NOT_INDEX, SFLT_COARSE_INDEX, SFLT_ACCURATE_INDEX } SIdxFltStatus;
SIdxFltStatus idxGetFltStatus(SNode* pFilterNode); SIdxFltStatus idxGetFltStatus(SNode* pFilterNode);
int32_t doFilterTag(const SNode* pFilterNode, SArray* result); int32_t doFilterTag(const SNode* pFilterNode, SIndexMetaArg* metaArg, SArray* result);
/* /*
* destory index env * destory index env
* *

View File

@ -1153,9 +1153,11 @@ void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
if (IS_VAR_DATA_TYPE(pColumn->info.type)) { if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
pColumn->varmeta.length = 0; pColumn->varmeta.length = 0;
} else { } else {
if (pColumn->nullbitmap != NULL) {
memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows)); memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows));
} }
} }
}
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
int32_t code = 0; int32_t code = 0;

View File

@ -293,7 +293,7 @@ int32_t taosAddClientLogCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "jniDebugFlag", jniDebugFlag, 0, 255, 1) != 0) return -1; if (cfgAddInt32(pCfg, "jniDebugFlag", jniDebugFlag, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "simDebugFlag", 143, 0, 255, 1) != 0) return -1; if (cfgAddInt32(pCfg, "simDebugFlag", 143, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "debugFlag", 0, 0, 255, 1) != 0) return -1; if (cfgAddInt32(pCfg, "debugFlag", 0, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "idxDebugFlag", 0, 0, 255, 1) != 0) return -1; if (cfgAddInt32(pCfg, "idxDebugFlag", idxDebugFlag, 0, 255, 1) != 0) return -1;
return 0; return 0;
} }

View File

@ -940,7 +940,7 @@ static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
} }
// do not show for cleared subscription // do not show for cleared subscription
#if 0 #if 1
int32_t sz = taosArrayGetSize(pSub->unassignedVgs); int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);

View File

@ -106,7 +106,9 @@ tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STab
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo); int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
bool isTsdbCacheLastRow(tsdbReaderT *pReader); bool isTsdbCacheLastRow(tsdbReaderT *pReader);
int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list); int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list);
void * tsdbGetIdx(SMeta *pMeta);
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle); int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle); bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool *allHave); int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool *allHave);

View File

@ -103,6 +103,7 @@ SArray* metaGetSmaTbUids(SMeta* pMeta);
int32_t metaSnapshotReaderOpen(SMeta* pMeta, SMetaSnapshotReader** ppReader, int64_t sver, int64_t ever); int32_t metaSnapshotReaderOpen(SMeta* pMeta, SMetaSnapshotReader** ppReader, int64_t sver, int64_t ever);
int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader); int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader);
int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nData); int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nData);
void* metaGetIdx(SMeta* pMeta);
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);

View File

@ -880,3 +880,11 @@ _err:
metaULock(pMeta); metaULock(pMeta);
return -1; return -1;
} }
// refactor later
void *metaGetIdx(SMeta *pMeta) {
#ifdef USE_INVERTED_INDEX
return pMeta->pTagIvtIdx;
#else
return pMeta->pTagIdx;
#endif
}

View File

@ -235,6 +235,15 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
} }
} }
} }
while (1) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = (SStreamTask*)pIter;
if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) {
int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd);
ASSERT(code == 0);
}
}
return 0; return 0;
} }

View File

@ -248,8 +248,8 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
} }
taosArrayPush(pTableCheckInfo, &info); taosArrayPush(pTableCheckInfo, &info);
tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId, tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId, info.lastKey,
info.lastKey, pTsdbReadHandle->idStr); pTsdbReadHandle->idStr);
} }
// TODO group table according to the tag value. // TODO group table according to the tag value.
@ -352,13 +352,16 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle,
} }
if (level == TSDB_RETENTION_L0) { if (level == TSDB_RETENTION_L0) {
tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L0); tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle,
TSDB_RETENTION_L0);
return VND_RSMA0(pVnode); return VND_RSMA0(pVnode);
} else if (level == TSDB_RETENTION_L1) { } else if (level == TSDB_RETENTION_L1) {
tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L1); tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle,
TSDB_RETENTION_L1);
return VND_RSMA1(pVnode); return VND_RSMA1(pVnode);
} else { } else {
tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L2); tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle,
TSDB_RETENTION_L2);
return VND_RSMA2(pVnode); return VND_RSMA2(pVnode);
} }
} }
@ -1324,7 +1327,6 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
if ((ascScan && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) || if ((ascScan && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
(!ascScan && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) { (!ascScan && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
bool cacheDataInFileBlockHole = (ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) || bool cacheDataInFileBlockHole = (ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
(!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey)); (!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey));
if (cacheDataInFileBlockHole) { if (cacheDataInFileBlockHole) {
@ -1992,8 +1994,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
STimeWindow* pWin = &blockInfo.window; STimeWindow* pWin = &blockInfo.window;
tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64 tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64
" rows:%d, start:%d, end:%d, %s", pTsdbReadHandle, pCheckInfo->tableId, pWin->skey, pWin->ekey, blockInfo.rows, " rows:%d, start:%d, end:%d, %s",
cur->pos, endPos, pTsdbReadHandle->idStr); pTsdbReadHandle, pCheckInfo->tableId, pWin->skey, pWin->ekey, blockInfo.rows, cur->pos, endPos,
pTsdbReadHandle->idStr);
// compared with the data from in-memory buffer, to generate the correct timestamp array list // compared with the data from in-memory buffer, to generate the correct timestamp array list
int32_t numOfRows = 0; int32_t numOfRows = 0;
@ -2112,7 +2115,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
} }
// still assign data into current row // still assign data into current row
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, numOfRows +=
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
@ -2178,8 +2182,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
* if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT
* copy them all to result buffer, since it may be overlapped with file data block. * copy them all to result buffer, since it may be overlapped with file data block.
*/ */
if (node == NULL || if (node == NULL || ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ascScan) ||
((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ascScan) ||
((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) && !ascScan)) { ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) && !ascScan)) {
// no data in cache or data in cache is greater than the ekey of time window, load data from file block // no data in cache or data in cache is greater than the ekey of time window, load data from file block
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
@ -2819,6 +2822,12 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
return numOfRows; return numOfRows;
} }
void* tsdbGetIdx(SMeta* pMeta) {
if (pMeta == NULL) {
return NULL;
}
return metaGetIdx(pMeta);
}
int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) { int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid); SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid);

View File

@ -678,6 +678,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
int32_t nRows; int32_t nRows;
int32_t tsize, ret; int32_t tsize, ret;
SEncoder encoder = {0}; SEncoder encoder = {0};
SArray *newTbUids = NULL;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
pRsp->code = 0; pRsp->code = 0;
@ -698,6 +699,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
} }
submitRsp.pArray = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(SSubmitBlkRsp)); submitRsp.pArray = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(SSubmitBlkRsp));
newTbUids = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(int64_t));
if (!submitRsp.pArray) { if (!submitRsp.pArray) {
pRsp->code = TSDB_CODE_OUT_OF_MEMORY; pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
@ -727,6 +729,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
goto _exit; goto _exit;
} }
} }
taosArrayPush(newTbUids, &createTbReq.uid);
submitBlkRsp.uid = createTbReq.uid; submitBlkRsp.uid = createTbReq.uid;
submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2); submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
@ -754,8 +757,10 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
submitRsp.affectedRows += submitBlkRsp.affectedRows; submitRsp.affectedRows += submitBlkRsp.affectedRows;
taosArrayPush(submitRsp.pArray, &submitBlkRsp); taosArrayPush(submitRsp.pArray, &submitBlkRsp);
} }
tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
_exit: _exit:
taosArrayDestroy(newTbUids);
tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret); tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
pRsp->pCont = rpcMallocCont(tsize); pRsp->pCont = rpcMallocCont(tsize);
pRsp->contLen = tsize; pRsp->contLen = tsize;

View File

@ -28,13 +28,13 @@
#include "ttime.h" #include "ttime.h"
#include "executorimpl.h" #include "executorimpl.h"
#include "index.h"
#include "query.h" #include "query.h"
#include "tcompare.h" #include "tcompare.h"
#include "tcompression.h" #include "tcompression.h"
#include "thash.h" #include "thash.h"
#include "ttypes.h" #include "ttypes.h"
#include "vnode.h" #include "vnode.h"
#include "index.h"
#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN) #define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN) #define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN)
@ -4434,9 +4434,11 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
} }
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond); STableListInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId,
SNode* pTagCond);
static int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, SNode* pTagCond); static int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo,
SNode* pTagCond);
static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo); static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo);
static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* extractColumnInfo(SNodeList* pNodeList);
@ -4473,7 +4475,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); tsdbReaderT pDataReader =
doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
if (pDataReader == NULL && terrno != 0) { if (pDataReader == NULL && terrno != 0) {
return NULL; return NULL;
} }
@ -4593,8 +4596,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions,
pScalarExprInfo, numOfScalarExpr, pTaskInfo); pScalarExprInfo, numOfScalarExpr, pTaskInfo);
} else { } else {
pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pScalarExprInfo, numOfScalarExpr, pOptr =
pTaskInfo); createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pScalarExprInfo, numOfScalarExpr, pTaskInfo);
} }
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
@ -4913,13 +4916,17 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa
if (tableType == TSDB_SUPER_TABLE) { if (tableType == TSDB_SUPER_TABLE) {
if (pTagCond) { if (pTagCond) {
SIndexMetaArg metaArg = {.metaHandle = tsdbGetIdx(metaHandle), .suid = tableUid};
SArray* res = taosArrayInit(8, sizeof(uint64_t)); SArray* res = taosArrayInit(8, sizeof(uint64_t));
code = doFilterTag(pTagCond, res); code = doFilterTag(pTagCond, &metaArg, res);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("doFilterTag error:%d", code); qError("doFilterTag error:%d", code);
taosArrayDestroy(res); taosArrayDestroy(res);
terrno = code; terrno = code;
return code; return code;
} else {
qDebug("doFilterTag error:%d, suid: %" PRIu64 "", code, tableUid);
} }
for (int i = 0; i < taosArrayGetSize(res); i++) { for (int i = 0; i < taosArrayGetSize(res); i++) {
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i)}; STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i)};
@ -4951,7 +4958,8 @@ SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) { STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) {
int32_t code = getTableList(pHandle->meta, pTableScanNode->scan.tableType, pTableScanNode->scan.uid, pTableListInfo, pTagCond); int32_t code =
getTableList(pHandle->meta, pTableScanNode->scan.tableType, pTableScanNode->scan.uid, pTableListInfo, pTagCond);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
@ -4986,8 +4994,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
goto _complete; goto _complete;
} }
(*pTaskInfo)->pRoot = (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond); &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond);
if (NULL == (*pTaskInfo)->pRoot) { if (NULL == (*pTaskInfo)->pRoot) {
code = terrno; code = terrno;
goto _complete; goto _complete;

View File

@ -74,7 +74,7 @@ void indexCacheIteratorDestroy(Iterate* iiter);
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid); int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid);
// int indexCacheGet(void *cache, uint64_t *rst); // int indexCacheGet(void *cache, uint64_t *rst);
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* tr, STermValueType* s); int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* tr, STermValueType* s);
void indexCacheRef(IndexCache* cache); void indexCacheRef(IndexCache* cache);
void indexCacheUnRef(IndexCache* cache); void indexCacheUnRef(IndexCache* cache);

View File

@ -105,7 +105,7 @@ TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName);
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const char* colName); TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const char* colName);
TFileReader* tfileReaderCreate(WriterCtx* ctx); TFileReader* tfileReaderCreate(WriterCtx* ctx);
void tfileReaderDestroy(TFileReader* reader); void tfileReaderDestroy(TFileReader* reader);
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr); int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr);
void tfileReaderRef(TFileReader* reader); void tfileReaderRef(TFileReader* reader);
void tfileReaderUnRef(TFileReader* reader); void tfileReaderUnRef(TFileReader* reader);
@ -120,7 +120,7 @@ int tfileWriterFinish(TFileWriter* tw);
IndexTFile* indexTFileCreate(const char* path); IndexTFile* indexTFileCreate(const char* path);
void indexTFileDestroy(IndexTFile* tfile); void indexTFileDestroy(IndexTFile* tfile);
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid); int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid);
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* tr); int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* tr);
Iterate* tfileIteratorCreate(TFileReader* reader); Iterate* tfileIteratorCreate(TFileReader* reader);
void tfileIteratorDestroy(Iterate* iterator); void tfileIteratorDestroy(Iterate* iterator);

View File

@ -66,7 +66,7 @@ extern "C" {
* [1, 4, 5] * [1, 4, 5]
* output:[4, 5] * output:[4, 5]
*/ */
void iIntersection(SArray *interResults, SArray *finalResult); void iIntersection(SArray *in, SArray *out);
/* multi sorted result union /* multi sorted result union
* input: [1, 2, 4, 5] * input: [1, 2, 4, 5]
@ -74,7 +74,7 @@ void iIntersection(SArray *interResults, SArray *finalResult);
* [1, 4, 5] * [1, 4, 5]
* output:[1, 2, 3, 4, 5] * output:[1, 2, 3, 4, 5]
*/ */
void iUnion(SArray *interResults, SArray *finalResult); void iUnion(SArray *in, SArray *out);
/* see example /* see example
* total: [1, 2, 4, 5, 7, 8] * total: [1, 2, 4, 5, 7, 8]
@ -92,19 +92,24 @@ typedef struct {
uint64_t data; uint64_t data;
} SIdxVerdata; } SIdxVerdata;
/*
* index temp result
*
*/
typedef struct { typedef struct {
SArray *total; SArray *total;
SArray *added; SArray *add;
SArray *deled; SArray *del;
} SIdxTempResult; } SIdxTRslt;
SIdxTempResult *sIdxTempResultCreate(); SIdxTRslt *idxTRsltCreate();
void sIdxTempResultClear(SIdxTempResult *tr); void idxTRsltClear(SIdxTRslt *tr);
void sIdxTempResultDestroy(SIdxTempResult *tr); void idxTRsltDestroy(SIdxTRslt *tr);
void idxTRsltMergeTo(SIdxTRslt *tr, SArray *out);
void sIdxTempResultMergeTo(SArray *result, SIdxTempResult *tr);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -29,7 +29,7 @@
#include "lucene++/Lucene_c.h" #include "lucene++/Lucene_c.h"
#endif #endif
#define INDEX_NUM_OF_THREADS 1 #define INDEX_NUM_OF_THREADS 5
#define INDEX_QUEUE_SIZE 200 #define INDEX_QUEUE_SIZE 200
#define INDEX_DATA_BOOL_NULL 0x02 #define INDEX_DATA_BOOL_NULL 0x02
@ -85,7 +85,7 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp
static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch); static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
// merge cache and tfile by opera type // merge cache and tfile by opera type
static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTempResult* helper); static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTRslt* helper);
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); // static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// int32_t indexSerialKey(ICacheKey* key, char* buf); // int32_t indexSerialKey(ICacheKey* key, char* buf);
@ -201,6 +201,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
char buf[128] = {0}; char buf[128] = {0};
ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType}; ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
int32_t sz = indexSerialCacheKey(&key, buf); int32_t sz = indexSerialCacheKey(&key, buf);
indexDebug("suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType);
IndexCache** cache = taosHashGet(index->colObj, buf, sz); IndexCache** cache = taosHashGet(index->colObj, buf, sz);
assert(*cache != NULL); assert(*cache != NULL);
@ -328,6 +329,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
char buf[128] = {0}; char buf[128] = {0};
ICacheKey key = { ICacheKey key = {
.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType}; .suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType};
indexDebug("suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType);
int32_t sz = indexSerialCacheKey(&key, buf); int32_t sz = indexSerialCacheKey(&key, buf);
taosThreadMutexLock(&sIdx->mtx); taosThreadMutexLock(&sIdx->mtx);
@ -341,7 +343,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
SIdxTempResult* tr = sIdxTempResultCreate(); SIdxTRslt* tr = idxTRsltCreate();
if (0 == indexCacheSearch(cache, query, tr, &s)) { if (0 == indexCacheSearch(cache, query, tr, &s)) {
if (s == kTypeDeletion) { if (s == kTypeDeletion) {
indexInfo("col: %s already drop by", term->colName); indexInfo("col: %s already drop by", term->colName);
@ -363,12 +365,12 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
int64_t cost = taosGetTimestampUs() - st; int64_t cost = taosGetTimestampUs() - st;
indexInfo("search cost: %" PRIu64 "us", cost); indexInfo("search cost: %" PRIu64 "us", cost);
sIdxTempResultMergeTo(*result, tr); idxTRsltMergeTo(tr, *result);
sIdxTempResultDestroy(tr); idxTRsltDestroy(tr);
return 0; return 0;
END: END:
sIdxTempResultDestroy(tr); idxTRsltDestroy(tr);
return -1; return -1;
} }
static void indexInterResultsDestroy(SArray* results) { static void indexInterResultsDestroy(SArray* results) {
@ -404,18 +406,18 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
return 0; return 0;
} }
static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdxTempResult* tr) { static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdxTRslt* tr) {
int32_t sz = taosArrayGetSize(result); int32_t sz = taosArrayGetSize(result);
if (sz > 0) { if (sz > 0) {
TFileValue* lv = taosArrayGetP(result, sz - 1); TFileValue* lv = taosArrayGetP(result, sz - 1);
if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) { if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) {
sIdxTempResultMergeTo(lv->tableId, tr); idxTRsltMergeTo(tr, lv->tableId);
sIdxTempResultClear(tr); idxTRsltClear(tr);
taosArrayPush(result, &tfv); taosArrayPush(result, &tfv);
} else if (tfv == NULL) { } else if (tfv == NULL) {
// handle last iterator // handle last iterator
sIdxTempResultMergeTo(lv->tableId, tr); idxTRsltMergeTo(tr, lv->tableId);
} else { } else {
// temp result saved in help // temp result saved in help
tfileValueDestroy(tfv); tfileValueDestroy(tfv);
@ -424,7 +426,7 @@ static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdx
taosArrayPush(result, &tfv); taosArrayPush(result, &tfv);
} }
} }
static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTempResult* tr) { static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTRslt* tr) {
char* colVal = (cv != NULL) ? cv->colVal : tv->colVal; char* colVal = (cv != NULL) ? cv->colVal : tv->colVal;
TFileValue* tfv = tfileValueCreate(colVal); TFileValue* tfv = tfileValueCreate(colVal);
@ -434,9 +436,9 @@ static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateVal
uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0); uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
uint32_t ver = cv->ver; uint32_t ver = cv->ver;
if (cv->type == ADD_VALUE) { if (cv->type == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, id) INDEX_MERGE_ADD_DEL(tr->del, tr->add, id)
} else if (cv->type == DEL_VALUE) { } else if (cv->type == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, id) INDEX_MERGE_ADD_DEL(tr->add, tr->del, id)
} }
} }
if (tv != NULL) { if (tv != NULL) {
@ -489,7 +491,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
bool cn = cacheIter ? cacheIter->next(cacheIter) : false; bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
bool tn = tfileIter ? tfileIter->next(tfileIter) : false; bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
SIdxTempResult* tr = sIdxTempResultCreate(); SIdxTRslt* tr = idxTRsltCreate();
while (cn == true || tn == true) { while (cn == true || tn == true) {
IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL; IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL; IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL;
@ -515,7 +517,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
} }
} }
indexMayMergeTempToFinalResult(result, NULL, tr); indexMayMergeTempToFinalResult(result, NULL, tr);
sIdxTempResultDestroy(tr); idxTRsltDestroy(tr);
int ret = indexGenTFile(sIdx, pCache, result); int ret = indexGenTFile(sIdx, pCache, result);
indexDestroyFinalResult(result); indexDestroyFinalResult(result);

View File

@ -36,32 +36,31 @@ static char* indexCacheTermGet(const void* pData);
static MemTable* indexInternalCacheCreate(int8_t type); static MemTable* indexInternalCacheCreate(int8_t type);
static int32_t cacheSearchTerm(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchTerm(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchPrefix(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchPrefix(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchSuffix(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchSuffix(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchRegex(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchRegex(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchLessThan(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchLessThan(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchLessEqual(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchLessEqual(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchGreaterThan(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchGreaterThan(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchRange(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchRange(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
/*comm func of compare, used in (LE/LT/GE/GT compare)*/ /*comm func of compare, used in (LE/LT/GE/GT compare)*/
static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s, static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s, RangeType type);
RangeType type); static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchLessEqual_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchLessEqual_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s, static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s,
RangeType type); RangeType type);
static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s) = { static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s) = {
{cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan, cacheSearchLessEqual, {cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan, cacheSearchLessEqual,
cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange}, cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange},
{cacheSearchTerm_JSON, cacheSearchPrefix_JSON, cacheSearchSuffix_JSON, cacheSearchRegex_JSON, {cacheSearchTerm_JSON, cacheSearchPrefix_JSON, cacheSearchSuffix_JSON, cacheSearchRegex_JSON,
@ -71,7 +70,7 @@ static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTemp
static void doMergeWork(SSchedMsg* msg); static void doMergeWork(SSchedMsg* msg);
static bool indexCacheIteratorNext(Iterate* itera); static bool indexCacheIteratorNext(Iterate* itera);
static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
if (cache == NULL) { if (cache == NULL) {
return 0; return 0;
} }
@ -93,11 +92,11 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTempResult* tr
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
if (0 == strcmp(c->colVal, pCt->colVal)) { if (0 == strcmp(c->colVal, pCt->colVal)) {
if (c->operaType == ADD_VALUE) { if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid)
// taosArrayPush(result, &c->uid); // taosArrayPush(result, &c->uid);
*s = kTypeValue; *s = kTypeValue;
} else if (c->operaType == DEL_VALUE) { } else if (c->operaType == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid) INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
} }
} else { } else {
break; break;
@ -108,20 +107,19 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTempResult* tr
tSkipListDestroyIter(iter); tSkipListDestroyIter(iter);
return 0; return 0;
} }
static int32_t cacheSearchPrefix(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { static int32_t cacheSearchPrefix(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
// impl later // impl later
return 0; return 0;
} }
static int32_t cacheSearchSuffix(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { static int32_t cacheSearchSuffix(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
// impl later // impl later
return 0; return 0;
} }
static int32_t cacheSearchRegex(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { static int32_t cacheSearchRegex(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
// impl later // impl later
return 0; return 0;
} }
static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s, static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s, RangeType type) {
RangeType type) {
if (cache == NULL) { if (cache == NULL) {
return 0; return 0;
} }
@ -133,6 +131,7 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempRes
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm)); CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
pCt->colVal = term->colVal; pCt->colVal = term->colVal;
pCt->colType = term->colType;
pCt->version = atomic_load_64(&pCache->version); pCt->version = atomic_load_64(&pCache->version);
char* key = indexCacheTermGet(pCt); char* key = indexCacheTermGet(pCt);
@ -147,11 +146,11 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempRes
TExeCond cond = cmpFn(c->colVal, pCt->colVal, pCt->colType); TExeCond cond = cmpFn(c->colVal, pCt->colVal, pCt->colType);
if (cond == MATCH) { if (cond == MATCH) {
if (c->operaType == ADD_VALUE) { if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid)
// taosArrayPush(result, &c->uid); // taosArrayPush(result, &c->uid);
*s = kTypeValue; *s = kTypeValue;
} else if (c->operaType == DEL_VALUE) { } else if (c->operaType == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid) INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
} }
} else if (cond == CONTINUE) { } else if (cond == CONTINUE) {
continue; continue;
@ -163,20 +162,20 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempRes
tSkipListDestroyIter(iter); tSkipListDestroyIter(iter);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t cacheSearchLessThan(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { static int32_t cacheSearchLessThan(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return cacheSearchCompareFunc(cache, term, tr, s, LT); return cacheSearchCompareFunc(cache, term, tr, s, LT);
} }
static int32_t cacheSearchLessEqual(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { static int32_t cacheSearchLessEqual(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return cacheSearchCompareFunc(cache, term, tr, s, LE); return cacheSearchCompareFunc(cache, term, tr, s, LE);
} }
static int32_t cacheSearchGreaterThan(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { static int32_t cacheSearchGreaterThan(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return cacheSearchCompareFunc(cache, term, tr, s, GT); return cacheSearchCompareFunc(cache, term, tr, s, GT);
} }
static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return cacheSearchCompareFunc(cache, term, tr, s, GE); return cacheSearchCompareFunc(cache, term, tr, s, GE);
} }
static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
if (cache == NULL) { if (cache == NULL) {
return 0; return 0;
} }
@ -204,11 +203,11 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTempResul
if (0 == strcmp(c->colVal, pCt->colVal)) { if (0 == strcmp(c->colVal, pCt->colVal)) {
if (c->operaType == ADD_VALUE) { if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid)
// taosArrayPush(result, &c->uid); // taosArrayPush(result, &c->uid);
*s = kTypeValue; *s = kTypeValue;
} else if (c->operaType == DEL_VALUE) { } else if (c->operaType == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid) INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
} }
} else { } else {
break; break;
@ -222,32 +221,32 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTempResul
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return cacheSearchCompareFunc_JSON(cache, term, tr, s, LT); return cacheSearchCompareFunc_JSON(cache, term, tr, s, LT);
} }
static int32_t cacheSearchLessEqual_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { static int32_t cacheSearchLessEqual_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return cacheSearchCompareFunc_JSON(cache, term, tr, s, LE); return cacheSearchCompareFunc_JSON(cache, term, tr, s, LE);
} }
static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return cacheSearchCompareFunc_JSON(cache, term, tr, s, GT); return cacheSearchCompareFunc_JSON(cache, term, tr, s, GT);
} }
static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return cacheSearchCompareFunc_JSON(cache, term, tr, s, GE); return cacheSearchCompareFunc_JSON(cache, term, tr, s, GE);
} }
static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s, static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s,
RangeType type) { RangeType type) {
if (cache == NULL) { if (cache == NULL) {
return 0; return 0;
@ -289,11 +288,11 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe
TExeCond cond = cmpFn(p + skip, term->colVal, dType); TExeCond cond = cmpFn(p + skip, term->colVal, dType);
if (cond == MATCH) { if (cond == MATCH) {
if (c->operaType == ADD_VALUE) { if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid)
// taosArrayPush(result, &c->uid); // taosArrayPush(result, &c->uid);
*s = kTypeValue; *s = kTypeValue;
} else if (c->operaType == DEL_VALUE) { } else if (c->operaType == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid) INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
} }
} else if (cond == CONTINUE) { } else if (cond == CONTINUE) {
continue; continue;
@ -309,7 +308,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t cacheSearchRange(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { static int32_t cacheSearchRange(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
// impl later // impl later
return 0; return 0;
} }
@ -568,7 +567,7 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u
return 0; return 0;
} }
static int32_t indexQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTempResult* tr, STermValueType* s) { static int32_t indexQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTRslt* tr, STermValueType* s) {
if (mem == NULL) { if (mem == NULL) {
return 0; return 0;
} }
@ -582,7 +581,7 @@ static int32_t indexQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTempResu
return cacheSearch[0][qtype](mem, term, tr, s); return cacheSearch[0][qtype](mem, term, tr, s);
} }
} }
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) { int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STermValueType* s) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
if (cache == NULL) { if (cache == NULL) {
return 0; return 0;
@ -597,10 +596,10 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result
indexMemRef(imm); indexMemRef(imm);
taosThreadMutexUnlock(&pCache->mtx); taosThreadMutexUnlock(&pCache->mtx);
int ret = indexQueryMem(mem, query, result, s); int ret = (mem && mem->mem) ? indexQueryMem(mem, query, result, s) : 0;
if (ret == 0 && *s != kTypeDeletion) { if (ret == 0 && *s != kTypeDeletion) {
// continue search in imm // continue search in imm
ret = indexQueryMem(imm, query, result, s); ret = (imm && imm->mem) ? indexQueryMem(imm, query, result, s) : 0;
} }
indexMemUnRef(mem); indexMemUnRef(mem);
@ -709,7 +708,7 @@ static int32_t indexCacheJsonTermCompare(const void* l, const void* r) {
return cmp; return cmp;
} }
static MemTable* indexInternalCacheCreate(int8_t type) { static MemTable* indexInternalCacheCreate(int8_t type) {
int ttype = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : type; int ttype = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : TSDB_DATA_TYPE_BINARY;
int32_t (*cmpFn)(const void* l, const void* r) = int32_t (*cmpFn)(const void* l, const void* r) =
INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? indexCacheJsonTermCompare : indexCacheTermCompare; INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? indexCacheJsonTermCompare : indexCacheTermCompare;

View File

@ -37,12 +37,15 @@ typedef struct SIFParam {
int64_t suid; // add later int64_t suid; // add later
char dbName[TSDB_DB_NAME_LEN]; char dbName[TSDB_DB_NAME_LEN];
char colName[TSDB_COL_NAME_LEN]; char colName[TSDB_COL_NAME_LEN];
SIndexMetaArg arg;
} SIFParam; } SIFParam;
typedef struct SIFCtx { typedef struct SIFCtx {
int32_t code; int32_t code;
SHashObj * pRes; /* element is SIFParam */ SHashObj * pRes; /* element is SIFParam */
bool noExec; // true: just iterate condition tree, and add hint to executor plan bool noExec; // true: just iterate condition tree, and add hint to executor plan
SIndexMetaArg arg;
// SIdxFltStatus st; // SIdxFltStatus st;
} SIFCtx; } SIFCtx;
@ -257,7 +260,9 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) { static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
SIndexTerm *tm = indexTermCreate(left->suid, DEFAULT, left->colValType, left->colName, strlen(left->colName), #ifdef USE_INVERTED_INDEX
SIndexMetaArg *arg = &output->arg;
SIndexTerm * tm = indexTermCreate(arg->suid, DEFAULT, left->colValType, left->colName, strlen(left->colName),
right->condValue, strlen(right->condValue)); right->condValue, strlen(right->condValue));
if (tm == NULL) { if (tm == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
@ -268,9 +273,13 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
SIndexMultiTermQuery *mtm = indexMultiTermQueryCreate(MUST); SIndexMultiTermQuery *mtm = indexMultiTermQueryCreate(MUST);
indexMultiTermQueryAdd(mtm, tm, qtype); indexMultiTermQueryAdd(mtm, tm, qtype);
int ret = indexSearch(NULL, mtm, output->result); int ret = indexSearch(arg->metaHandle, mtm, output->result);
indexDebug("index filter data size: %d", (int)taosArrayGetSize(output->result));
indexMultiTermQueryDestroy(mtm); indexMultiTermQueryDestroy(mtm);
return ret; return ret;
#else
return 0;
#endif
} }
static int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) { static int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
@ -372,6 +381,8 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
SIFParam *params = NULL; SIFParam *params = NULL;
SIF_ERR_RET(sifInitOperParams(&params, node, ctx)); SIF_ERR_RET(sifInitOperParams(&params, node, ctx));
// ugly code, refactor later
output->arg = ctx->arg;
sif_func_t operFn = sifGetOperFn(node->opType); sif_func_t operFn = sifGetOperFn(node->opType);
if (ctx->noExec && operFn == NULL) { if (ctx->noExec && operFn == NULL) {
output->status = SFLT_NOT_INDEX; output->status = SFLT_NOT_INDEX;
@ -423,7 +434,7 @@ _return:
static EDealRes sifWalkFunction(SNode *pNode, void *context) { static EDealRes sifWalkFunction(SNode *pNode, void *context) {
SFunctionNode *node = (SFunctionNode *)pNode; SFunctionNode *node = (SFunctionNode *)pNode;
SIFParam output = {0}; SIFParam output = {.result = taosArrayInit(8, sizeof(uint64_t))};
SIFCtx *ctx = context; SIFCtx *ctx = context;
ctx->code = sifExecFunction(node, ctx, &output); ctx->code = sifExecFunction(node, ctx, &output);
@ -439,7 +450,8 @@ static EDealRes sifWalkFunction(SNode *pNode, void *context) {
} }
static EDealRes sifWalkLogic(SNode *pNode, void *context) { static EDealRes sifWalkLogic(SNode *pNode, void *context) {
SLogicConditionNode *node = (SLogicConditionNode *)pNode; SLogicConditionNode *node = (SLogicConditionNode *)pNode;
SIFParam output = {0};
SIFParam output = {.result = taosArrayInit(8, sizeof(uint64_t))};
SIFCtx *ctx = context; SIFCtx *ctx = context;
ctx->code = sifExecLogic(node, ctx, &output); ctx->code = sifExecLogic(node, ctx, &output);
@ -455,7 +467,7 @@ static EDealRes sifWalkLogic(SNode *pNode, void *context) {
} }
static EDealRes sifWalkOper(SNode *pNode, void *context) { static EDealRes sifWalkOper(SNode *pNode, void *context) {
SOperatorNode *node = (SOperatorNode *)pNode; SOperatorNode *node = (SOperatorNode *)pNode;
SIFParam output = {0}; SIFParam output = {.result = taosArrayInit(8, sizeof(uint64_t))};
SIFCtx *ctx = context; SIFCtx *ctx = context;
ctx->code = sifExecOper(node, ctx, &output); ctx->code = sifExecOper(node, ctx, &output);
@ -507,8 +519,9 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
int32_t code = 0; int32_t code = 0;
SIFCtx ctx = {.code = 0, .noExec = false}; SIFCtx ctx = {.code = 0, .noExec = false, .arg = pDst->arg};
ctx.pRes = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); ctx.pRes = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (NULL == ctx.pRes) { if (NULL == ctx.pRes) {
indexError("index-filter failed to taosHashInit"); indexError("index-filter failed to taosHashInit");
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
@ -523,7 +536,9 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
indexError("no valid res in hash, node:(%p), type(%d)", (void *)&pNode, nodeType(pNode)); indexError("no valid res in hash, node:(%p), type(%d)", (void *)&pNode, nodeType(pNode));
SIF_ERR_RET(TSDB_CODE_QRY_APP_ERROR); SIF_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
} }
if (res->result != NULL) {
taosArrayAddAll(pDst->result, res->result); taosArrayAddAll(pDst->result, res->result);
}
sifFreeParam(res); sifFreeParam(res);
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
@ -561,7 +576,7 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
SIF_RET(code); SIF_RET(code);
} }
int32_t doFilterTag(const SNode *pFilterNode, SArray *result) { int32_t doFilterTag(const SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result) {
if (pFilterNode == NULL) { if (pFilterNode == NULL) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -570,10 +585,12 @@ int32_t doFilterTag(const SNode *pFilterNode, SArray *result) {
// todo move to the initialization function // todo move to the initialization function
// SIF_ERR_RET(filterInitFromNode((SNode *)pFilterNode, &filter, 0)); // SIF_ERR_RET(filterInitFromNode((SNode *)pFilterNode, &filter, 0));
SIFParam param = {0}; SArray * output = taosArrayInit(8, sizeof(uint64_t));
SIFParam param = {.arg = *metaArg, .result = output};
SIF_ERR_RET(sifCalculate((SNode *)pFilterNode, &param)); SIF_ERR_RET(sifCalculate((SNode *)pFilterNode, &param));
taosArrayAddAll(result, param.result); taosArrayAddAll(result, param.result);
// taosArrayAddAll(result, param.result);
sifFreeParam(&param); sifFreeParam(&param);
SIF_RET(TSDB_CODE_SUCCESS); SIF_RET(TSDB_CODE_SUCCESS);
} }

View File

@ -60,31 +60,31 @@ static void tfileGenFileFullName(char* fullname, const char* path, uint64_t s
/* /*
* search from tfile * search from tfile
*/ */
static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchLessEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchLessEqual(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype); static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTRslt* tr, RangeType ctype);
static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype); static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr, RangeType ctype);
static int32_t (*tfSearch[][QUERY_MAX])(void* reader, SIndexTerm* tem, SIdxTempResult* tr) = { static int32_t (*tfSearch[][QUERY_MAX])(void* reader, SIndexTerm* tem, SIdxTRslt* tr) = {
{tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan, tfSearchLessEqual, {tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan, tfSearchLessEqual,
tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange}, tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange},
{tfSearchTerm_JSON, tfSearchPrefix_JSON, tfSearchSuffix_JSON, tfSearchRegex_JSON, tfSearchLessThan_JSON, {tfSearchTerm_JSON, tfSearchPrefix_JSON, tfSearchSuffix_JSON, tfSearchRegex_JSON, tfSearchLessThan_JSON,
@ -211,16 +211,16 @@ void tfileReaderDestroy(TFileReader* reader) {
} }
// T_REF_INC(reader); // T_REF_INC(reader);
fstDestroy(reader->fst); fstDestroy(reader->fst);
writerCtxDestroy(reader->ctx, reader->remove);
if (reader->remove) { if (reader->remove) {
indexInfo("%s is removed", reader->ctx->file.buf); indexInfo("%s is removed", reader->ctx->file.buf);
} else { } else {
indexInfo("%s is not removed", reader->ctx->file.buf); indexInfo("%s is not removed", reader->ctx->file.buf);
} }
writerCtxDestroy(reader->ctx, reader->remove);
taosMemoryFree(reader); taosMemoryFree(reader);
} }
static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
int ret = 0; int ret = 0;
char* p = tem->colVal; char* p = tem->colVal;
uint64_t sz = tem->nColVal; uint64_t sz = tem->nColVal;
@ -243,7 +243,7 @@ static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
return 0; return 0;
} }
static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
char* p = tem->colVal; char* p = tem->colVal;
uint64_t sz = tem->nColVal; uint64_t sz = tem->nColVal;
@ -279,7 +279,7 @@ static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTempResult* tr)
} }
return 0; return 0;
} }
static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
int ret = 0; int ret = 0;
@ -298,7 +298,7 @@ static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTempResult* tr)
fstSliceDestroy(&key); fstSliceDestroy(&key);
return 0; return 0;
} }
static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
int ret = 0; int ret = 0;
@ -319,7 +319,7 @@ static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr)
return 0; return 0;
} }
static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType type) { static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTRslt* tr, RangeType type) {
int ret = 0; int ret = 0;
char* p = tem->colVal; char* p = tem->colVal;
int skip = 0; int skip = 0;
@ -358,19 +358,19 @@ static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult
fstStreamBuilderDestroy(sb); fstStreamBuilderDestroy(sb);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
return tfSearchCompareFunc(reader, tem, tr, LT); return tfSearchCompareFunc(reader, tem, tr, LT);
} }
static int32_t tfSearchLessEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { static int32_t tfSearchLessEqual(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
return tfSearchCompareFunc(reader, tem, tr, LE); return tfSearchCompareFunc(reader, tem, tr, LE);
} }
static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
return tfSearchCompareFunc(reader, tem, tr, GT); return tfSearchCompareFunc(reader, tem, tr, GT);
} }
static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
return tfSearchCompareFunc(reader, tem, tr, GE); return tfSearchCompareFunc(reader, tem, tr, GE);
} }
static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
int ret = 0; int ret = 0;
char* p = tem->colVal; char* p = tem->colVal;
@ -399,7 +399,7 @@ static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr)
fstSliceDestroy(&key); fstSliceDestroy(&key);
return 0; return 0;
} }
static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
int ret = 0; int ret = 0;
char* p = indexPackJsonData(tem); char* p = indexPackJsonData(tem);
int sz = strlen(p); int sz = strlen(p);
@ -424,36 +424,36 @@ static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult*
// deprecate api // deprecate api
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
// impl later // impl later
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
// impl later // impl later
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
// impl later // impl later
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
return tfSearchCompareFunc_JSON(reader, tem, tr, LT); return tfSearchCompareFunc_JSON(reader, tem, tr, LT);
} }
static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
return tfSearchCompareFunc_JSON(reader, tem, tr, LE); return tfSearchCompareFunc_JSON(reader, tem, tr, LE);
} }
static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
return tfSearchCompareFunc_JSON(reader, tem, tr, GT); return tfSearchCompareFunc_JSON(reader, tem, tr, GT);
} }
static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
return tfSearchCompareFunc_JSON(reader, tem, tr, GE); return tfSearchCompareFunc_JSON(reader, tem, tr, GE);
} }
static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
// impl later // impl later
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype) { static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr, RangeType ctype) {
int ret = 0; int ret = 0;
int skip = 0; int skip = 0;
@ -501,7 +501,7 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR
fstStreamBuilderDestroy(sb); fstStreamBuilderDestroy(sb);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr) { int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr) {
SIndexTerm* term = query->term; SIndexTerm* term = query->term;
EIndexQueryType qtype = query->qType; EIndexQueryType qtype = query->qType;
int ret = 0; int ret = 0;
@ -673,7 +673,7 @@ void indexTFileDestroy(IndexTFile* tfile) {
taosMemoryFree(tfile); taosMemoryFree(tfile);
} }
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* result) { int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* result) {
int ret = -1; int ret = -1;
if (tfile == NULL) { if (tfile == NULL) {
return ret; return ret;

View File

@ -36,24 +36,24 @@ static int iBinarySearch(SArray *arr, int s, int e, uint64_t k) {
return s; return s;
} }
void iIntersection(SArray *inters, SArray *final) { void iIntersection(SArray *in, SArray *out) {
int32_t sz = (int32_t)taosArrayGetSize(inters); int32_t sz = (int32_t)taosArrayGetSize(in);
if (sz <= 0) { if (sz <= 0) {
return; return;
} }
MergeIndex *mi = taosMemoryCalloc(sz, sizeof(MergeIndex)); MergeIndex *mi = taosMemoryCalloc(sz, sizeof(MergeIndex));
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SArray *t = taosArrayGetP(inters, i); SArray *t = taosArrayGetP(in, i);
mi[i].len = (int32_t)taosArrayGetSize(t); mi[i].len = (int32_t)taosArrayGetSize(t);
mi[i].idx = 0; mi[i].idx = 0;
} }
SArray *base = taosArrayGetP(inters, 0); SArray *base = taosArrayGetP(in, 0);
for (int i = 0; i < taosArrayGetSize(base); i++) { for (int i = 0; i < taosArrayGetSize(base); i++) {
uint64_t tgt = *(uint64_t *)taosArrayGet(base, i); uint64_t tgt = *(uint64_t *)taosArrayGet(base, i);
bool has = true; bool has = true;
for (int j = 1; j < taosArrayGetSize(inters); j++) { for (int j = 1; j < taosArrayGetSize(in); j++) {
SArray *oth = taosArrayGetP(inters, j); SArray *oth = taosArrayGetP(in, j);
int mid = iBinarySearch(oth, mi[j].idx, mi[j].len - 1, tgt); int mid = iBinarySearch(oth, mi[j].idx, mi[j].len - 1, tgt);
if (mid >= 0 && mid < mi[j].len) { if (mid >= 0 && mid < mi[j].len) {
uint64_t val = *(uint64_t *)taosArrayGet(oth, mid); uint64_t val = *(uint64_t *)taosArrayGet(oth, mid);
@ -64,33 +64,33 @@ void iIntersection(SArray *inters, SArray *final) {
} }
} }
if (has == true) { if (has == true) {
taosArrayPush(final, &tgt); taosArrayPush(out, &tgt);
} }
} }
taosMemoryFreeClear(mi); taosMemoryFreeClear(mi);
} }
void iUnion(SArray *inters, SArray *final) { void iUnion(SArray *in, SArray *out) {
int32_t sz = (int32_t)taosArrayGetSize(inters); int32_t sz = (int32_t)taosArrayGetSize(in);
if (sz <= 0) { if (sz <= 0) {
return; return;
} }
if (sz == 1) { if (sz == 1) {
taosArrayAddAll(final, taosArrayGetP(inters, 0)); taosArrayAddAll(out, taosArrayGetP(in, 0));
return; return;
} }
MergeIndex *mi = taosMemoryCalloc(sz, sizeof(MergeIndex)); MergeIndex *mi = taosMemoryCalloc(sz, sizeof(MergeIndex));
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SArray *t = taosArrayGetP(inters, i); SArray *t = taosArrayGetP(in, i);
mi[i].len = (int32_t)taosArrayGetSize(t); mi[i].len = (int32_t)taosArrayGetSize(t);
mi[i].idx = 0; mi[i].idx = 0;
} }
while (1) { while (1) {
uint64_t mVal = UINT_MAX; uint64_t mVal = UINT64_MAX;
int mIdx = -1; int mIdx = -1;
for (int j = 0; j < sz; j++) { for (int j = 0; j < sz; j++) {
SArray *t = taosArrayGetP(inters, j); SArray *t = taosArrayGetP(in, j);
if (mi[j].idx >= mi[j].len) { if (mi[j].idx >= mi[j].len) {
continue; continue;
} }
@ -102,13 +102,13 @@ void iUnion(SArray *inters, SArray *final) {
} }
if (mIdx != -1) { if (mIdx != -1) {
mi[mIdx].idx++; mi[mIdx].idx++;
if (taosArrayGetSize(final) > 0) { if (taosArrayGetSize(out) > 0) {
uint64_t lVal = *(uint64_t *)taosArrayGetLast(final); uint64_t lVal = *(uint64_t *)taosArrayGetLast(out);
if (lVal == mVal) { if (lVal == mVal) {
continue; continue;
} }
} }
taosArrayPush(final, &mVal); taosArrayPush(out, &mVal);
} else { } else {
break; break;
} }
@ -158,41 +158,44 @@ int verdataCompare(const void *a, const void *b) {
return cmp; return cmp;
} }
SIdxTempResult *sIdxTempResultCreate() { SIdxTRslt *idxTRsltCreate() {
SIdxTempResult *tr = taosMemoryCalloc(1, sizeof(SIdxTempResult)); SIdxTRslt *tr = taosMemoryCalloc(1, sizeof(SIdxTRslt));
tr->total = taosArrayInit(4, sizeof(uint64_t)); tr->total = taosArrayInit(4, sizeof(uint64_t));
tr->added = taosArrayInit(4, sizeof(uint64_t)); tr->add = taosArrayInit(4, sizeof(uint64_t));
tr->deled = taosArrayInit(4, sizeof(uint64_t)); tr->del = taosArrayInit(4, sizeof(uint64_t));
return tr; return tr;
} }
void sIdxTempResultClear(SIdxTempResult *tr) { void idxTRsltClear(SIdxTRslt *tr) {
if (tr == NULL) { if (tr == NULL) {
return; return;
} }
taosArrayClear(tr->total); taosArrayClear(tr->total);
taosArrayClear(tr->added); taosArrayClear(tr->add);
taosArrayClear(tr->deled); taosArrayClear(tr->del);
} }
void sIdxTempResultDestroy(SIdxTempResult *tr) { void idxTRsltDestroy(SIdxTRslt *tr) {
if (tr == NULL) { if (tr == NULL) {
return; return;
} }
taosArrayDestroy(tr->total); taosArrayDestroy(tr->total);
taosArrayDestroy(tr->added); taosArrayDestroy(tr->add);
taosArrayDestroy(tr->deled); taosArrayDestroy(tr->del);
} }
void sIdxTempResultMergeTo(SArray *result, SIdxTempResult *tr) { void idxTRsltMergeTo(SIdxTRslt *tr, SArray *result) {
taosArraySort(tr->total, uidCompare); taosArraySort(tr->total, uidCompare);
taosArraySort(tr->added, uidCompare); taosArraySort(tr->add, uidCompare);
taosArraySort(tr->deled, uidCompare); taosArraySort(tr->del, uidCompare);
if (taosArrayGetSize(tr->total) == 0 || taosArrayGetSize(tr->add) == 0) {
SArray *t = taosArrayGetSize(tr->total) == 0 ? tr->add : tr->total;
taosArrayAddAll(result, t);
} else {
SArray *arrs = taosArrayInit(2, sizeof(void *)); SArray *arrs = taosArrayInit(2, sizeof(void *));
taosArrayPush(arrs, &tr->total); taosArrayPush(arrs, &tr->total);
taosArrayPush(arrs, &tr->added); taosArrayPush(arrs, &tr->add);
iUnion(arrs, result); iUnion(arrs, result);
taosArrayDestroy(arrs); taosArrayDestroy(arrs);
}
iExcept(result, tr->deled); iExcept(result, tr->del);
} }

View File

@ -1,74 +1,74 @@
add_executable(indexTest "") add_executable(idxTest "")
add_executable(fstTest "") add_executable(idxFstTest "")
add_executable(fstUT "") add_executable(idxFstUT "")
add_executable(UtilUT "") add_executable(idxUtilUT "")
add_executable(jsonUT "") add_executable(idxJsonUT "")
target_sources(indexTest target_sources(idxTest
PRIVATE PRIVATE
"indexTests.cc" "indexTests.cc"
) )
target_sources(fstTest target_sources(idxFstTest
PRIVATE PRIVATE
"fstTest.cc" "fstTest.cc"
) )
target_sources(fstUT target_sources(idxFstUT
PRIVATE PRIVATE
"fstUT.cc" "fstUT.cc"
) )
target_sources(UtilUT target_sources(idxUtilUT
PRIVATE PRIVATE
"utilUT.cc" "utilUT.cc"
) )
target_sources(jsonUT target_sources(idxJsonUT
PRIVATE PRIVATE
"jsonUT.cc" "jsonUT.cc"
) )
target_include_directories ( indexTest target_include_directories (idxTest
PUBLIC PUBLIC
"${TD_SOURCE_DIR}/include/libs/index" "${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories ( fstTest target_include_directories (idxFstTest
PUBLIC PUBLIC
"${TD_SOURCE_DIR}/include/libs/index" "${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories ( fstUT target_include_directories (idxFstUT
PUBLIC PUBLIC
"${TD_SOURCE_DIR}/include/libs/index" "${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories ( UtilUT target_include_directories (idxUtilUT
PUBLIC PUBLIC
"${TD_SOURCE_DIR}/include/libs/index" "${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories (jsonUT target_include_directories (idxJsonUT
PUBLIC PUBLIC
"${TD_SOURCE_DIR}/include/libs/index" "${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_link_libraries (indexTest target_link_libraries (idxTest
os os
util util
common common
gtest_main gtest_main
index index
) )
target_link_libraries (fstTest target_link_libraries (idxFstTest
os os
util util
common common
gtest_main gtest_main
index index
) )
target_link_libraries (fstUT target_link_libraries (idxFstUT
os os
util util
common common
@ -76,7 +76,7 @@ target_link_libraries (fstUT
index index
) )
target_link_libraries (UtilUT target_link_libraries (idxUtilUT
os os
util util
common common
@ -84,7 +84,7 @@ target_link_libraries (UtilUT
index index
) )
target_link_libraries (jsonUT target_link_libraries (idxJsonUT
os os
util util
common common
@ -94,17 +94,17 @@ target_link_libraries (jsonUT
add_test( add_test(
NAME idxtest NAME idxtest
COMMAND indexTest COMMAND idxTest
) )
add_test( add_test(
NAME idxJsonUT NAME idxJsonUT
COMMAND jsonUT COMMAND idxJsonUT
) )
add_test( add_test(
NAME idxUtilUT NAME idxUtilUT
COMMAND UtilUT COMMAND idxUtilUT
) )
add_test( add_test(
NAME idxFstUT NAME idxFstUT
COMMAND fstUT COMMAND idxFstUT
) )

View File

@ -411,12 +411,12 @@ class TFileObj {
// //
// //
} }
SIdxTempResult* tr = sIdxTempResultCreate(); SIdxTRslt* tr = idxTRsltCreate();
int ret = tfileReaderSearch(reader_, query, tr); int ret = tfileReaderSearch(reader_, query, tr);
sIdxTempResultMergeTo(result, tr); idxTRsltMergeTo(tr, result);
sIdxTempResultDestroy(tr); idxTRsltDestroy(tr);
return ret; return ret;
} }
~TFileObj() { ~TFileObj() {
@ -531,11 +531,11 @@ class CacheObj {
indexCacheDebug(cache); indexCacheDebug(cache);
} }
int Get(SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) { int Get(SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) {
SIdxTempResult* tr = sIdxTempResultCreate(); SIdxTRslt* tr = idxTRsltCreate();
int ret = indexCacheSearch(cache, query, tr, s); int ret = indexCacheSearch(cache, query, tr, s);
sIdxTempResultMergeTo(result, tr); idxTRsltMergeTo(tr, result);
sIdxTempResultDestroy(tr); idxTRsltDestroy(tr);
if (ret != 0) { if (ret != 0) {
std::cout << "failed to get from cache:" << ret << std::endl; std::cout << "failed to get from cache:" << ret << std::endl;

View File

@ -226,6 +226,22 @@ TEST_F(UtilEnv, 04union) {
iUnion(src, rslt); iUnion(src, rslt);
assert(taosArrayGetSize(rslt) == 12); assert(taosArrayGetSize(rslt) == 12);
} }
TEST_F(UtilEnv, 05unionExcept) {
clearSourceArray(src);
clearFinalArray(rslt);
uint64_t arr2[] = {7};
SArray * f = (SArray *)taosArrayGetP(src, 1);
for (int i = 0; i < sizeof(arr2) / sizeof(arr2[0]); i++) {
taosArrayPush(f, &arr2[i]);
}
iUnion(src, rslt);
SArray *ept = taosArrayInit(0, sizeof(uint64_t));
iExcept(rslt, ept);
EXPECT_EQ(taosArrayGetSize(rslt), 1);
}
TEST_F(UtilEnv, 01Except) { TEST_F(UtilEnv, 01Except) {
SArray *total = taosArrayInit(4, sizeof(uint64_t)); SArray *total = taosArrayInit(4, sizeof(uint64_t));
{ {
@ -308,16 +324,36 @@ TEST_F(UtilEnv, 01Except) {
ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 1), 100); ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 1), 100);
} }
TEST_F(UtilEnv, testFill) { TEST_F(UtilEnv, testFill) {
for (int i = 0; i < 10000000; i++) { for (int i = 0; i < 1000000; i++) {
int64_t val = i; int64_t val = i;
char buf[65] = {0}; char buf[65] = {0};
indexInt2str(val, buf, 1); indexInt2str(val, buf, 1);
EXPECT_EQ(val, taosStr2int64(buf)); EXPECT_EQ(val, taosStr2int64(buf));
} }
for (int i = 0; i < 10000000; i++) { for (int i = 0; i < 1000000; i++) {
int64_t val = 0 - i; int64_t val = 0 - i;
char buf[65] = {0}; char buf[65] = {0};
indexInt2str(val, buf, -1); indexInt2str(val, buf, -1);
EXPECT_EQ(val, taosStr2int64(buf)); EXPECT_EQ(val, taosStr2int64(buf));
} }
} }
TEST_F(UtilEnv, TempResult) {
SIdxTRslt *relt = idxTRsltCreate();
SArray *f = taosArrayInit(0, sizeof(uint64_t));
uint64_t val = UINT64_MAX - 1;
taosArrayPush(relt->add, &val);
idxTRsltMergeTo(relt, f);
EXPECT_EQ(taosArrayGetSize(f), 1);
}
TEST_F(UtilEnv, TempResultExcept) {
SIdxTRslt *relt = idxTRsltCreate();
SArray *f = taosArrayInit(0, sizeof(uint64_t));
uint64_t val = UINT64_MAX;
taosArrayPush(relt->add, &val);
idxTRsltMergeTo(relt, f);
EXPECT_EQ(taosArrayGetSize(f), 1);
}

View File

@ -35,6 +35,14 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) {
return (void*)buf; return (void*)buf;
} }
static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
SStreamDispatchReq req = {
.streamId = pTask->streamId,
.data = data,
};
return 0;
}
static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
SStreamTaskExecReq req = { SStreamTaskExecReq req = {
.streamId = pTask->streamId, .streamId = pTask->streamId,
@ -407,6 +415,26 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
return 0; return 0;
} }
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->sourceTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->sourceVg) < 0) return -1;
tEndEncode(pEncoder);
return 0;
}
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->sourceTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->sourceVg) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq) { int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->streamId); tlen += taosEncodeFixedI64(buf, pReq->streamId);

View File

@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
from util.common import tdCom
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
#for i in range(100):
tdSql.prepare()
dbname = tdCom.getLongName(10, "letters")
tdSql.execute('create database if not exists djnhawvlgq vgroups 1')
tdSql.execute('use djnhawvlgq')
tdSql.execute('create table if not exists downsampling_stb (ts timestamp, c1 int, c2 double, c3 varchar(100), c4 bool) tags (t1 int, t2 double, t3 varchar(100), t4 bool);')
tdSql.execute('create table downsampling_ct1 using downsampling_stb tags(10, 10.1, "Beijing", True);')
tdSql.execute('create table if not exists scalar_stb (ts timestamp, c1 int, c2 double, c3 binary(20), c4 nchar(20), c5 nchar(20)) tags (t1 int);')
tdSql.execute('create table scalar_ct1 using scalar_stb tags(10);')
tdSql.execute('create table if not exists data_filter_stb (ts timestamp, c1 tinyint, c2 smallint, c3 int, c4 bigint, c5 float, c6 double, c7 binary(100), c8 nchar(200), c9 bool, c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned) tags (t1 tinyint, t2 smallint, t3 int, t4 bigint, t5 float, t6 double, t7 binary(100), t8 nchar(200), t9 bool, t10 tinyint unsigned, t11 smallint unsigned, t12 int unsigned, t13 bigint unsigned)')
tdSql.execute('create table if not exists data_filter_ct1 using data_filter_stb tags (1, 2, 3, 4, 5.5, 6.6, "binary7", "nchar8", true, 11, 12, 13, 14)')
tdSql.execute('create stream data_filter_stream into output_data_filter_stb as select * from data_filter_stb where ts >= 1653648072973+1s and c1 = 1 or c2 > 1 and c3 != 4 or c4 <= 3 and c5 <> 0 or c6 is not Null or c7 is Null or c8 between "na" and "nchar4" and c8 not between "bi" and "binary" and c8 match "nchar[19]" and c8 nmatch "nchar[25]" or c9 in (1, 2, 3) or c10 not in (6, 7) and c8 like "nch%" and c7 not like "bina_" and c11 <= 10 or c12 is Null or c13 >= 4;')
tdSql.execute('insert into data_filter_ct1 values (1653648072973, 1, 1, 1, 3, 1.1, 1.1, "binary1", "nchar1", true, 1, 2, 3, 4);')
tdSql.execute('insert into data_filter_ct1 values (1653648072973+1s, 2, 2, 1, 3, 1.1, 1.1, "binary2", "nchar2", true, 2, 3, 4, 5);')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())