fix(tsdb): add attributes for cache scan optr.

This commit is contained in:
Haojun Liao 2024-03-26 17:34:30 +08:00
parent 3ccab8a5b8
commit f61596706d
6 changed files with 34 additions and 7 deletions

View File

@ -191,7 +191,7 @@ typedef struct TsdReader {
typedef struct SStoreCacheReader { typedef struct SStoreCacheReader {
int32_t (*openReader)(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, int32_t (*openReader)(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr, SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr,
SArray *pFuncTypeList); SArray *pFuncTypeList, SColumnInfo* pPkCol, int32_t numOfPks);
void *(*closeReader)(void *pReader); void *(*closeReader)(void *pReader);
int32_t (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, int32_t (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
SArray *pTableUidList); SArray *pTableUidList);

View File

@ -181,7 +181,7 @@ void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn not
int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables); int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr, SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr,
SArray* pFuncTypeList); SArray* pFuncTypeList, SColumnInfo* pkCol, int32_t numOfPks);
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
SArray *pTableUids); SArray *pTableUids);
void *tsdbCacherowsReaderClose(void *pReader); void *tsdbCacherowsReaderClose(void *pReader);

View File

@ -2288,6 +2288,8 @@ static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb
.loadTombFn = loadSttTomb, .loadTombFn = loadSttTomb,
.pReader = pr, .pReader = pr,
.idstr = pr->idstr, .idstr = pr->idstr,
.comparFn = pr->pkComparFn,
.pCurRowKey = &pr->rowKey,
}; };
code = tMergeTreeOpen2(&iter->mergeTree, &conf, NULL); code = tMergeTreeOpen2(&iter->mergeTree, &conf, NULL);

View File

@ -209,7 +209,7 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols, int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
SArray* pCidList, int32_t* pSlotIds, uint64_t suid, void** pReader, const char* idstr, SArray* pCidList, int32_t* pSlotIds, uint64_t suid, void** pReader, const char* idstr,
SArray* pFuncTypeList) { SArray* pFuncTypeList, SColumnInfo* pPkCol, int32_t numOfPks) {
*pReader = NULL; *pReader = NULL;
SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader)); SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
if (p == NULL) { if (p == NULL) {
@ -226,6 +226,15 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
p->pSlotIds = pSlotIds; p->pSlotIds = pSlotIds;
p->pFuncTypeList = pFuncTypeList; p->pFuncTypeList = pFuncTypeList;
p->rowKey.numOfPKs = numOfPks;
if (numOfPks > 0) {
p->pkComparFn = getComparFunc(pPkCol->type, 0);
p->rowKey.pks[0].type = pPkCol->type;
if (IS_VAR_DATA_TYPE(pPkCol->type)) {
p->rowKey.pks[0].pData = taosMemoryCalloc(1, pPkCol->bytes);
}
}
if (numOfTables == 0) { if (numOfTables == 0) {
*pReader = p; *pReader = p;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -359,10 +368,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
for (int32_t j = 0; j < pr->numOfCols; ++j) { for (int32_t j = 0; j < pr->numOfCols; ++j) {
int32_t bytes; int32_t bytes;
if (slotIds[j] == -1) if (slotIds[j] == -1) {
bytes = 1; bytes = 1;
else } else {
bytes = pr->pSchema->columns[slotIds[j]].bytes; bytes = pr->pSchema->columns[slotIds[j]].bytes;
}
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + VARSTR_HEADER_SIZE); pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + VARSTR_HEADER_SIZE);
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]); SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);

View File

@ -369,6 +369,8 @@ typedef struct SCacheRowsReader {
char* idstr; char* idstr;
int64_t lastTs; int64_t lastTs;
SArray* pFuncTypeList; SArray* pFuncTypeList;
__compar_fn_t pkComparFn;
SRowKey rowKey;
} SCacheRowsReader; } SCacheRowsReader;
int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype); int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype);

View File

@ -46,6 +46,8 @@ typedef struct SCacheRowsScanInfo {
int32_t indexOfBufferedRes; int32_t indexOfBufferedRes;
STableListInfo* pTableList; STableListInfo* pTableList;
SArray* pFuncTypeList; SArray* pFuncTypeList;
int32_t numOfPks;
SColumnInfo pkCol;
} SCacheRowsScanInfo; } SCacheRowsScanInfo;
static SSDataBlock* doScanCache(SOperatorInfo* pOperator); static SSDataBlock* doScanCache(SOperatorInfo* pOperator);
@ -106,6 +108,16 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
goto _error; goto _error;
} }
for(int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
SColMatchItem* pItem = taosArrayGet(pInfo->matchInfo.pList, i);
if (pItem->isPk) {
pInfo->numOfPks += 1;
pInfo->pkCol.type = pItem->dataType.type; // only record one primary key
pInfo->pkCol.bytes = pItem->dataType.bytes; // only record one primary key
pInfo->pkCol.pk = 1;
}
}
SArray* pCidList = taosArrayInit(numOfCols, sizeof(int16_t)); SArray* pCidList = taosArrayInit(numOfCols, sizeof(int16_t));
pInfo->pFuncTypeList = taosArrayInit(taosArrayGetSize(pScanNode->pFuncTypes), sizeof(int32_t)); pInfo->pFuncTypeList = taosArrayInit(taosArrayGetSize(pScanNode->pFuncTypes), sizeof(int32_t));
taosArrayAddAll(pInfo->pFuncTypeList, pScanNode->pFuncTypes); taosArrayAddAll(pInfo->pFuncTypeList, pScanNode->pFuncTypes);
@ -140,7 +152,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
uint64_t suid = tableListGetSuid(pTableListInfo); uint64_t suid = tableListGetSuid(pTableListInfo);
code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables, code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
taosArrayGetSize(pInfo->matchInfo.pList), pCidList, pInfo->pSlotIds, taosArrayGetSize(pInfo->matchInfo.pList), pCidList, pInfo->pSlotIds,
suid, &pInfo->pLastrowReader, pTaskInfo->id.str, pScanNode->pFuncTypes); suid, &pInfo->pLastrowReader, pTaskInfo->id.str, pScanNode->pFuncTypes,
&pInfo->pkCol, pInfo->numOfPks);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
@ -282,7 +295,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
if (NULL == pInfo->pLastrowReader) { if (NULL == pInfo->pLastrowReader) {
code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, pInfo->pSlotIds, suid, &pInfo->pLastrowReader, taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, pInfo->pSlotIds, suid, &pInfo->pLastrowReader,
pTaskInfo->id.str, pInfo->pFuncTypeList); pTaskInfo->id.str, pInfo->pFuncTypeList, &pInfo->pkCol, pInfo->numOfPks);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pInfo->currentGroupIndex += 1; pInfo->currentGroupIndex += 1;
taosArrayClear(pInfo->pUidList); taosArrayClear(pInfo->pUidList);