From 90788972e5f9d4f65061ce712e689379b69e827f Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 29 Jun 2022 16:53:30 +0800 Subject: [PATCH 1/3] tsdbCache: new release interface --- source/dnode/vnode/src/inc/tsdb.h | 3 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 304 ++++++++++++++++++++- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 2 +- 3 files changed, 295 insertions(+), 14 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 9d7cfc0552..eac4f06132 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -251,7 +251,8 @@ int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(SLRUCache *pCache); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row); int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow); -int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid); +int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); +int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); // structs ======================= typedef struct { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 2f6901cb24..12f323e92e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -65,7 +65,7 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row) { if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) { tdRowCpy(cacheRow, row); } else { - tsdbCacheDeleteLastrow(pCache, uid); + tsdbCacheDeleteLastrow(pCache, uid, TSKEY_MAX); tsdbCacheInsertLastrow(pCache, uid, row); } } @@ -97,7 +97,7 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row) { if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) { tdRowCpy(cacheRow, row); } else { - tsdbCacheDeleteLastrow(pCache, uid); + tsdbCacheDeleteLastrow(pCache, uid, TSKEY_MAX); tsdbCacheInsertLastrow(pCache, uid, row); } } @@ -581,6 +581,10 @@ typedef struct TsdbNextRowState { static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { int32_t code = 0; + STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); + int16_t nCol = pTSchema->numOfCols; + SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal)); + tb_uid_t suid = getTableSuidByUid(uid, pTsdb); STbData *pMem = NULL; @@ -597,6 +601,8 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); + SDelIdx delIdx; + SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState); if (pDelFile) { SDelFReader *pDelFReader; @@ -604,7 +610,6 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL); if (code) goto _err; - SDelIdx delIdx; code = getTableDelIdx(pDelFReader, suid, uid, &delIdx); if (code) goto _err; @@ -612,6 +617,9 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { if (code) goto _err; tsdbDelFReaderClose(pDelFReader); + } else { + code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pSkyline); + if (code) goto _err; } int iSkyline = taosArrayGetSize(pSkyline) - 1; @@ -644,7 +652,9 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { input[1].next = true; } - STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); + int16_t nilColCount = nCol - 1; // count of null & none cols + int iCol = 0; // index of first nil col index from left to right + bool setICol = false; do { for (int i = 0; i < 3; ++i) { @@ -667,15 +677,17 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { TSDBROW *max[3] = {0}; int iMax[3] = {-1, -1, -1}; int nMax = 0; + TSKEY maxKey = TSKEY_MIN; + for (int i = 0; i < 3; ++i) { - if (input[i].pRow != NULL) { + if (!input[i].stop && input[i].pRow != NULL) { TSDBKEY key = TSDBROW_KEY(input[i].pRow); - TSDBKEY maxKey = TSDBROW_KEY(max[nMax]); // merging & deduplicating on client side - if (maxKey.ts <= key.ts) { - if (maxKey.ts < key.ts) { + if (maxKey <= key.ts) { + if (maxKey < key.ts) { nMax = 0; + maxKey = key.ts; } iMax[nMax] = i; @@ -686,6 +698,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { // delete detection TSDBROW *merge[3] = {0}; + int iMerge[3] = {-1, -1, -1}; int nMerge = 0; for (int i = 0; i < nMax; ++i) { TSDBKEY maxKey = TSDBROW_KEY(max[i]); @@ -693,6 +706,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { // bool deleted = false; bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline); if (!deleted) { + iMerge[nMerge] = i; merge[nMerge++] = max[i]; } @@ -716,6 +730,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { tRowMergerClear(&merger); } } + } while (*ppRow == NULL); taosMemoryFreeClear(pTSchema); @@ -727,6 +742,245 @@ _err: return code; } +static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { + int32_t code = 0; + + STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); + int16_t nCol = pTSchema->numOfCols; + SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal)); + + tb_uid_t suid = getTableSuidByUid(uid, pTsdb); + + STbData *pMem = NULL; + if (pTsdb->mem) { + tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem); + } + + STbData *pIMem = NULL; + if (pTsdb->imem) { + tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem); + } + + *ppRow = NULL; + + SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); + + SDelIdx delIdx; + + SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState); + if (pDelFile) { + SDelFReader *pDelFReader; + + code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL); + if (code) goto _err; + + code = getTableDelIdx(pDelFReader, suid, uid, &delIdx); + if (code) goto _err; + + code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pSkyline); + if (code) goto _err; + + tsdbDelFReaderClose(pDelFReader); + } else { + code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pSkyline); + if (code) goto _err; + } + + int iSkyline = taosArrayGetSize(pSkyline) - 1; + + SBlockIdx idx = {.suid = suid, .uid = uid}; + + SFSNextRowIter fsState = {0}; + fsState.state = SFSNEXTROW_FS; + fsState.pTsdb = pTsdb; + fsState.pBlockIdxExp = &idx; + + SMemNextRowIter memState = {0}; + SMemNextRowIter imemState = {0}; + TSDBROW memRow, imemRow, fsRow; + + TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem}, + {&imemRow, true, false, &imemState, getNextRowFromMem}, + {&fsRow, false, true, &fsState, getNextRowFromFS}}; + + if (pMem) { + memState.pMem = pMem; + memState.state = SMEMNEXTROW_ENTER; + input[0].stop = false; + input[0].next = true; + } + if (pIMem) { + imemState.pMem = pIMem; + imemState.state = SMEMNEXTROW_ENTER; + input[1].stop = false; + input[1].next = true; + } + + int16_t nilColCount = nCol - 1; // count of null & none cols + int iCol = 0; // index of first nil col index from left to right + bool setICol = false; + + do { + for (int i = 0; i < 3; ++i) { + if (input[i].next && !input[i].stop) { + code = input[i].nextRowFn(input[i].iter, &input[i].pRow); + if (code) goto _err; + + if (input[i].pRow == NULL) { + input[i].stop = true; + input[i].next = false; + } + } + } + + if (input[0].stop && input[1].stop && input[2].stop) { + break; + } + + // select maxpoint(s) from mem, imem, fs + TSDBROW *max[3] = {0}; + int iMax[3] = {-1, -1, -1}; + int nMax = 0; + TSKEY maxKey = TSKEY_MIN; + + for (int i = 0; i < 3; ++i) { + if (!input[i].stop && input[i].pRow != NULL) { + TSDBKEY key = TSDBROW_KEY(input[i].pRow); + + // merging & deduplicating on client side + if (maxKey <= key.ts) { + if (maxKey < key.ts) { + nMax = 0; + maxKey = key.ts; + } + + iMax[nMax] = i; + max[nMax++] = input[i].pRow; + } + } + } + + // delete detection + TSDBROW *merge[3] = {0}; + int iMerge[3] = {-1, -1, -1}; + int nMerge = 0; + for (int i = 0; i < nMax; ++i) { + TSDBKEY maxKey = TSDBROW_KEY(max[i]); + + // bool deleted = false; + bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline); + if (!deleted) { + iMerge[nMerge] = i; + merge[nMerge++] = max[i]; + } + + input[iMax[i]].next = deleted; + } + + // merge if nMerge > 1 + if (nMerge > 0) { + if (nMerge == 1) { + code = tsRowFromTsdbRow(pTSchema, merge[nMerge - 1], ppRow); + if (code) goto _err; + } else { + // merge 2 or 3 rows + SRowMerger merger = {0}; + + tRowMergerInit(&merger, merge[0], pTSchema); + for (int i = 1; i < nMerge; ++i) { + tRowMerge(&merger, merge[i]); + } + tRowMergerGetRow(&merger, ppRow); + tRowMergerClear(&merger); + } + } + + if (iCol == 0) { + STColumn *pTColumn = &pTSchema->columns[0]; + SColVal *pColVal = &(SColVal){0}; + + *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = maxKey}); + + if (taosArrayPush(pColArray, pColVal) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + ++iCol; + + setICol = false; + for (int16_t i = iCol; iCol < nCol; ++i) { + // tsdbRowGetColVal(*ppRow, pTSchema, i, pColVal); + if (taosArrayPush(pColArray, pColVal) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + if (pColVal->isNull || pColVal->isNone) { + for (int j = 0; j < nMerge; ++j) { + SColVal jColVal = {0}; + tsdbRowGetColVal(merge[j], pTSchema, i, &jColVal); + if (jColVal.isNull || jColVal.isNone) { + input[iMerge[j]].next = true; + } + } + if (!setICol) { + iCol = i; + setICol = true; + } + } else { + --nilColCount; + } + } + + continue; + } + + setICol = false; + for (int16_t i = iCol; i < nCol; ++i) { + SColVal colVal = {0}; + tTSRowGetVal(*ppRow, pTSchema, i, &colVal); + + SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, i); + + if (!colVal.isNone && !colVal.isNull) { + if (tColVal->isNull || tColVal->isNone) { + taosArraySet(pColArray, i, &colVal); + --nilColCount; + } + } else { + if (tColVal->isNull || tColVal->isNone && !setICol) { + iCol = i; + setICol = true; + + for (int j = 0; j < nMerge; ++j) { + SColVal jColVal = {0}; + tsdbRowGetColVal(merge[j], pTSchema, i, &jColVal); + if (jColVal.isNull || jColVal.isNone) { + input[iMerge[j]].next = true; + } + } + } + } + } + } while (nilColCount > 0); + + // if () new ts row from pColArray if non empty + if (taosArrayGetSize(pColArray) == nCol) { + code = tdSTSRowNew(pColArray, pTSchema, ppRow); + if (code) goto _err; + } + taosArrayDestroy(pColArray); + taosMemoryFreeClear(pTSchema); + + return code; +_err: + taosArrayDestroy(pColArray); + taosMemoryFreeClear(pTSchema); + tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { int32_t code = 0; char key[32] = {0}; @@ -749,6 +1003,8 @@ int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRo *ppRow = (STSRow *)taosLRUCacheValue(pCache, h); } + // taosLRUCacheRelease(pCache, h, true); + return code; } @@ -763,7 +1019,7 @@ int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow * *ppRow = (STSRow *)taosLRUCacheValue(pCache, h); } else { STSRow *pRow = NULL; - // code = mergeLast(uid, pTsdb, &pRow); + code = mergeLast(uid, pTsdb, &pRow); // if table's empty or error, return code of -1 if (code < 0 || pRow == NULL) { return -1; @@ -774,10 +1030,12 @@ int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow * *ppRow = (STSRow *)taosLRUCacheValue(pCache, h); } + // taosLRUCacheRelease(pCache, h, true); + return code; } -int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid) { +int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { int32_t code = 0; char key[32] = {0}; int keyLen = 0; @@ -785,10 +1043,32 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid) { getTableCacheKey(uid, "lr", key, &keyLen); LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); if (h) { + STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h); + if (pRow->ts <= eKey) { + taosLRUCacheRelease(pCache, h, true); + } else { + taosLRUCacheRelease(pCache, h, false); + } + + // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); + } + + getTableCacheKey(uid, "l", key, &keyLen); + h = taosLRUCacheLookup(pCache, key, keyLen); + if (h) { + // clear last cache anyway, no matter where eKey ends. taosLRUCacheRelease(pCache, h, true); - // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t - // keyLen); + + // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); } return code; } + +int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { + int32_t code = 0; + + taosLRUCacheRelease(pCache, h, false); + + return code; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 537fa5b866..a9c22b6b7f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -180,7 +180,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid pMemTable->nDel++; if (tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) { - tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid); + tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey); } tsdbError("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 From 3a566d5e5ed4a1b4a8e4f0afce85c688ef62f26c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 29 Jun 2022 17:14:00 +0800 Subject: [PATCH 2/3] refactor(query): do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 32 +++++++++++++++---------- source/libs/executor/inc/executil.h | 2 +- source/libs/executor/src/executil.c | 6 ++--- source/libs/executor/src/executorimpl.c | 8 +++---- source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/tsort.c | 1 + 6 files changed, 30 insertions(+), 21 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 062e91a6a6..bd4f4d1ca8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -984,8 +984,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI } } else { SColVal cv = {0}; - - SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pReader->suppInfo.slotIds[i]); + SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pReader->suppInfo.slotIds[i] - 1); for (int32_t j = 0; j < pBlockData->nRow; ++j) { tColDataGetValue(pData, j, &cv); colDataAppend(pColData, j, (const char*)&cv.value, cv.isNull); @@ -994,7 +993,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI } pReader->pResBlock->info.rows = pBlockData->nRow; - setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlockData); + setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlockData); /* int32_t ret = tsdbLoadBlockDataCols(&(pReader->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, @@ -2283,7 +2282,7 @@ static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) } static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) { - return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) /*&& (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer)*/; + return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer); } static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) { @@ -2616,7 +2615,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { return code; } -static int32_t buildInmemBlockSeqentially(STsdbReader* pReader) { +static int32_t buildBlockFromBufferSeqentially(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; while(1) { @@ -2644,7 +2643,7 @@ static int32_t buildInmemBlockSeqentially(STsdbReader* pReader) { } } -static int32_t loadDataInFiles(STsdbReader* pReader) { +static int32_t buildBlockFromFiles(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SFileSetIter* pFIter = &pStatus->fileIter; @@ -3018,7 +3017,7 @@ int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) { // // check if the query range overlaps with the file data block // bool exists = true; -// int32_t code = loadDataInFiles(pTsdbReadHandle, &exists); +// int32_t code = buildBlockFromFiles(pTsdbReadHandle, &exists); // if (code != TSDB_CODE_SUCCESS) { // pTsdbReadHandle->checkFiles = false; // return false; @@ -3306,7 +3305,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { if (pReader->type == BLOCK_LOAD_OFFSET_ORDER) { if (pStatus->loadFromFile) { - int32_t code = loadDataInFiles(pReader); + int32_t code = buildBlockFromFiles(pReader); if (code != TSDB_CODE_SUCCESS) { return false; } @@ -3314,11 +3313,11 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { if (pBlock->info.rows > 0) { return true; } else { - buildInmemBlockSeqentially(pReader); + buildBlockFromBufferSeqentially(pReader); return pBlock->info.rows > 0; } } else { // no data in files, let's try the buffer - buildInmemBlockSeqentially(pReader); + buildBlockFromBufferSeqentially(pReader); return pBlock->info.rows > 0; } } else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) { @@ -3334,7 +3333,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { // if (pReader->checkFiles) { // // check if the query range overlaps with the file data block // bool exists = true; - // int32_t code = loadDataInFiles(pReader, &exists); + // int32_t code = buildBlockFromFiles(pReader, &exists); // if (code != TSDB_CODE_SUCCESS) { // pReader->activeIndex = 0; // pReader->checkFiles = false; @@ -3454,7 +3453,16 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); int32_t code = tBlockDataInit(&pStatus->fileBlockData); - doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } + + code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } return pReader->pResBlock->pDataBlock; } diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index f3e1eb47e8..b5aad7589a 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -107,7 +107,7 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext); -int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo); +int32_t getTableList(void* metaHandle, void* vnode, SScanPhysiNode* pScanNode, STableListInfo* pListInfo); SArray* createSortInfo(SNodeList* pNodeList); SArray* extractPartitionColInfo(SNodeList* pNodeList); SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 221ebbd3ac..74debd1531 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -283,7 +283,7 @@ static bool isTableOk(STableKeyInfo* info, SNode *pTagCond, SMeta *metaHandle){ return result; } -int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) { +int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) { int32_t code = TSDB_CODE_SUCCESS; pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo)); if(pListInfo->pTableList == NULL) return TSDB_CODE_OUT_OF_MEMORY; @@ -303,7 +303,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo //code = doFilterTag(pTagIndexCond, &metaArg, res); code = TSDB_CODE_INDEX_REBUILDING; if (code == TSDB_CODE_INDEX_REBUILDING) { - code = vnodeGetAllTableList(metaHandle, tableUid, pListInfo->pTableList); + code = vnodeGetAllTableList(pVnode, tableUid, pListInfo->pTableList); } else if (code != TSDB_CODE_SUCCESS) { qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid); taosArrayDestroy(res); @@ -319,7 +319,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo } taosArrayDestroy(res); } else { - code = vnodeGetAllTableList(metaHandle, tableUid, pListInfo->pTableList); + code = vnodeGetAllTableList(pVnode, tableUid, pListInfo->pTableList); } if(pTagCond){ diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 570dea474a..496cfbf276 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4121,7 +4121,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; - int32_t code = getTableList(pHandle->meta, pScanPhyNode, pTableListInfo); + int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = terrno; return NULL; @@ -4133,7 +4133,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); if (pBlockNode->tableType == TSDB_SUPER_TABLE) { - int32_t code = vnodeGetAllTableList(pHandle->meta, pBlockNode->uid, pTableListInfo->pTableList); + int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pTableListInfo->pTableList); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = terrno; return NULL; @@ -4183,7 +4183,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); if (pScanNode->tableType == TSDB_SUPER_TABLE) { - code = vnodeGetAllTableList(pHandle->meta, pScanNode->uid, pTableListInfo->pTableList); + code = vnodeGetAllTableList(pHandle->vnode, pScanNode->uid, pTableListInfo->pTableList); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = terrno; return NULL; @@ -4399,7 +4399,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { } STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, const char* idstr) { - int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo); + int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index bb3f4e403d..a467dd2a53 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2033,7 +2033,7 @@ typedef struct STableMergeScanInfo { int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) { - int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo); + int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 7d44d41b55..b3c51c8f88 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -545,6 +545,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { return 0; } +// TODO consider the page meta size int32_t getProperSortPageSize(size_t rowSize) { uint32_t defaultPageSize = 4096; From 847016aa21281d9e1987dfd6dbcd497b9c085b08 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 29 Jun 2022 17:29:49 +0800 Subject: [PATCH 3/3] tsdbCache: new H version api for last/last_row getting --- source/dnode/vnode/src/inc/tsdb.h | 8 +++- source/dnode/vnode/src/tsdb/tsdbCache.c | 59 ++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index eac4f06132..76bfc217b3 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -250,10 +250,16 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf); int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(SLRUCache *pCache); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row); + +// bug api, deprecated, USE H version int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow); -int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); + +int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h); +int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h); int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); +int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); + // structs ======================= typedef struct { int minFid; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 12f323e92e..3cd8ac922a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1007,7 +1007,7 @@ int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRo return code; } - +#if 0 int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { int32_t code = 0; char key[32] = {0}; @@ -1034,6 +1034,63 @@ int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow * return code; } +#endif +int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) { + int32_t code = 0; + char key[32] = {0}; + int keyLen = 0; + + getTableCacheKey(uid, "lr", key, &keyLen); + LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); + if (h) { + //*ppRow = (STSRow *)taosLRUCacheValue(pCache, h); + } else { + STSRow *pRow = NULL; + code = mergeLastRow(uid, pTsdb, &pRow); + // if table's empty or error, return code of -1 + if (code < 0 || pRow == NULL) { + return -1; + } + + tsdbCacheInsertLastrow(pCache, uid, pRow); + h = taosLRUCacheLookup(pCache, key, keyLen); + //*ppRow = (STSRow *)taosLRUCacheValue(pCache, h); + } + + *handle = h; + // taosLRUCacheRelease(pCache, h, true); + + return code; +} + +int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) { + int32_t code = 0; + char key[32] = {0}; + int keyLen = 0; + + getTableCacheKey(uid, "l", key, &keyLen); + LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); + if (h) { + //*ppRow = (STSRow *)taosLRUCacheValue(pCache, h); + + } else { + STSRow *pRow = NULL; + code = mergeLast(uid, pTsdb, &pRow); + // if table's empty or error, return code of -1 + if (code < 0 || pRow == NULL) { + return -1; + } + + tsdbCacheInsertLast(pCache, uid, pRow); + h = taosLRUCacheLookup(pCache, key, keyLen); + //*ppRow = (STSRow *)taosLRUCacheValue(pCache, h); + } + + *handle = h; + // taosLRUCacheRelease(pCache, h, true); + + return code; +} int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { int32_t code = 0;