diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index b47a162a1a..cdc978ae08 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -191,7 +191,7 @@ typedef struct TsdReader { typedef struct SStoreCacheReader { int32_t (*openReader)(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr, - SArray *pFuncTypeList); + SArray *pFuncTypeList, SColumnInfo* pPkCol, int32_t numOfPks); void *(*closeReader)(void *pReader); int32_t (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, SArray *pTableUidList); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 8ae7e9706d..87a0db1e71 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -181,7 +181,7 @@ void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn not int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr, - SArray* pFuncTypeList); + SArray* pFuncTypeList, SColumnInfo* pkCol, int32_t numOfPks); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, SArray *pTableUids); void *tsdbCacherowsReaderClose(void *pReader); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 4c88242fee..c293b63f5d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -2288,6 +2288,8 @@ static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb .loadTombFn = loadSttTomb, .pReader = pr, .idstr = pr->idstr, + .comparFn = pr->pkComparFn, + .pCurRowKey = &pr->rowKey, }; code = tMergeTreeOpen2(&iter->mergeTree, &conf, NULL); diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index b7fe92c2c5..195ca59e9a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -209,7 +209,7 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols, SArray* pCidList, int32_t* pSlotIds, uint64_t suid, void** pReader, const char* idstr, - SArray* pFuncTypeList) { + SArray* pFuncTypeList, SColumnInfo* pPkCol, int32_t numOfPks) { *pReader = NULL; SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader)); if (p == NULL) { @@ -226,6 +226,15 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, p->pSlotIds = pSlotIds; p->pFuncTypeList = pFuncTypeList; + p->rowKey.numOfPKs = numOfPks; + if (numOfPks > 0) { + p->pkComparFn = getComparFunc(pPkCol->type, 0); + p->rowKey.pks[0].type = pPkCol->type; + if (IS_VAR_DATA_TYPE(pPkCol->type)) { + p->rowKey.pks[0].pData = taosMemoryCalloc(1, pPkCol->bytes); + } + } + if (numOfTables == 0) { *pReader = p; return TSDB_CODE_SUCCESS; @@ -359,10 +368,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 for (int32_t j = 0; j < pr->numOfCols; ++j) { int32_t bytes; - if (slotIds[j] == -1) + if (slotIds[j] == -1) { bytes = 1; - else + } else { bytes = pr->pSchema->columns[slotIds[j]].bytes; + } pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + VARSTR_HEADER_SIZE); SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 9cdb36e648..c48b7479dc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -369,6 +369,8 @@ typedef struct SCacheRowsReader { char* idstr; int64_t lastTs; SArray* pFuncTypeList; + __compar_fn_t pkComparFn; + SRowKey rowKey; } SCacheRowsReader; int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index e4fa9f7580..56052434a4 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -46,6 +46,8 @@ typedef struct SCacheRowsScanInfo { int32_t indexOfBufferedRes; STableListInfo* pTableList; SArray* pFuncTypeList; + int32_t numOfPks; + SColumnInfo pkCol; } SCacheRowsScanInfo; static SSDataBlock* doScanCache(SOperatorInfo* pOperator); @@ -106,6 +108,16 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe goto _error; } + for(int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { + SColMatchItem* pItem = taosArrayGet(pInfo->matchInfo.pList, i); + if (pItem->isPk) { + pInfo->numOfPks += 1; + pInfo->pkCol.type = pItem->dataType.type; // only record one primary key + pInfo->pkCol.bytes = pItem->dataType.bytes; // only record one primary key + pInfo->pkCol.pk = 1; + } + } + SArray* pCidList = taosArrayInit(numOfCols, sizeof(int16_t)); pInfo->pFuncTypeList = taosArrayInit(taosArrayGetSize(pScanNode->pFuncTypes), sizeof(int32_t)); taosArrayAddAll(pInfo->pFuncTypeList, pScanNode->pFuncTypes); @@ -140,7 +152,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe uint64_t suid = tableListGetSuid(pTableListInfo); code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables, taosArrayGetSize(pInfo->matchInfo.pList), pCidList, pInfo->pSlotIds, - suid, &pInfo->pLastrowReader, pTaskInfo->id.str, pScanNode->pFuncTypes); + suid, &pInfo->pLastrowReader, pTaskInfo->id.str, pScanNode->pFuncTypes, + &pInfo->pkCol, pInfo->numOfPks); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -282,7 +295,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { if (NULL == pInfo->pLastrowReader) { code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, pInfo->pSlotIds, suid, &pInfo->pLastrowReader, - pTaskInfo->id.str, pInfo->pFuncTypeList); + pTaskInfo->id.str, pInfo->pFuncTypeList, &pInfo->pkCol, pInfo->numOfPks); if (code != TSDB_CODE_SUCCESS) { pInfo->currentGroupIndex += 1; taosArrayClear(pInfo->pUidList);