From 90788972e5f9d4f65061ce712e689379b69e827f Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 29 Jun 2022 16:53:30 +0800 Subject: [PATCH 01/10] tsdbCache: new release interface --- source/dnode/vnode/src/inc/tsdb.h | 3 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 304 ++++++++++++++++++++- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 2 +- 3 files changed, 295 insertions(+), 14 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 9d7cfc0552..eac4f06132 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -251,7 +251,8 @@ int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(SLRUCache *pCache); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row); int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow); -int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid); +int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); +int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); // structs ======================= typedef struct { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 2f6901cb24..12f323e92e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -65,7 +65,7 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row) { if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) { tdRowCpy(cacheRow, row); } else { - tsdbCacheDeleteLastrow(pCache, uid); + tsdbCacheDeleteLastrow(pCache, uid, TSKEY_MAX); tsdbCacheInsertLastrow(pCache, uid, row); } } @@ -97,7 +97,7 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row) { if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) { tdRowCpy(cacheRow, row); } else { - tsdbCacheDeleteLastrow(pCache, uid); + tsdbCacheDeleteLastrow(pCache, uid, TSKEY_MAX); tsdbCacheInsertLastrow(pCache, uid, row); } } @@ -581,6 +581,10 @@ typedef struct TsdbNextRowState { static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { int32_t code = 0; + STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); + int16_t nCol = pTSchema->numOfCols; + SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal)); + tb_uid_t suid = getTableSuidByUid(uid, pTsdb); STbData *pMem = NULL; @@ -597,6 +601,8 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); + SDelIdx delIdx; + SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState); if (pDelFile) { SDelFReader *pDelFReader; @@ -604,7 +610,6 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL); if (code) goto _err; - SDelIdx delIdx; code = getTableDelIdx(pDelFReader, suid, uid, &delIdx); if (code) goto _err; @@ -612,6 +617,9 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { if (code) goto _err; tsdbDelFReaderClose(pDelFReader); + } else { + code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pSkyline); + if (code) goto _err; } int iSkyline = taosArrayGetSize(pSkyline) - 1; @@ -644,7 +652,9 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { input[1].next = true; } - STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); + int16_t nilColCount = nCol - 1; // count of null & none cols + int iCol = 0; // index of first nil col index from left to right + bool setICol = false; do { for (int i = 0; i < 3; ++i) { @@ -667,15 +677,17 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { TSDBROW *max[3] = {0}; int iMax[3] = {-1, -1, -1}; int nMax = 0; + TSKEY maxKey = TSKEY_MIN; + for (int i = 0; i < 3; ++i) { - if (input[i].pRow != NULL) { + if (!input[i].stop && input[i].pRow != NULL) { TSDBKEY key = TSDBROW_KEY(input[i].pRow); - TSDBKEY maxKey = TSDBROW_KEY(max[nMax]); // merging & deduplicating on client side - if (maxKey.ts <= key.ts) { - if (maxKey.ts < key.ts) { + if (maxKey <= key.ts) { + if (maxKey < key.ts) { nMax = 0; + maxKey = key.ts; } iMax[nMax] = i; @@ -686,6 +698,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { // delete detection TSDBROW *merge[3] = {0}; + int iMerge[3] = {-1, -1, -1}; int nMerge = 0; for (int i = 0; i < nMax; ++i) { TSDBKEY maxKey = TSDBROW_KEY(max[i]); @@ -693,6 +706,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { // bool deleted = false; bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline); if (!deleted) { + iMerge[nMerge] = i; merge[nMerge++] = max[i]; } @@ -716,6 +730,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { tRowMergerClear(&merger); } } + } while (*ppRow == NULL); taosMemoryFreeClear(pTSchema); @@ -727,6 +742,245 @@ _err: return code; } +static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { + int32_t code = 0; + + STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); + int16_t nCol = pTSchema->numOfCols; + SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal)); + + tb_uid_t suid = getTableSuidByUid(uid, pTsdb); + + STbData *pMem = NULL; + if (pTsdb->mem) { + tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem); + } + + STbData *pIMem = NULL; + if (pTsdb->imem) { + tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem); + } + + *ppRow = NULL; + + SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); + + SDelIdx delIdx; + + SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState); + if (pDelFile) { + SDelFReader *pDelFReader; + + code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL); + if (code) goto _err; + + code = getTableDelIdx(pDelFReader, suid, uid, &delIdx); + if (code) goto _err; + + code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pSkyline); + if (code) goto _err; + + tsdbDelFReaderClose(pDelFReader); + } else { + code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pSkyline); + if (code) goto _err; + } + + int iSkyline = taosArrayGetSize(pSkyline) - 1; + + SBlockIdx idx = {.suid = suid, .uid = uid}; + + SFSNextRowIter fsState = {0}; + fsState.state = SFSNEXTROW_FS; + fsState.pTsdb = pTsdb; + fsState.pBlockIdxExp = &idx; + + SMemNextRowIter memState = {0}; + SMemNextRowIter imemState = {0}; + TSDBROW memRow, imemRow, fsRow; + + TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem}, + {&imemRow, true, false, &imemState, getNextRowFromMem}, + {&fsRow, false, true, &fsState, getNextRowFromFS}}; + + if (pMem) { + memState.pMem = pMem; + memState.state = SMEMNEXTROW_ENTER; + input[0].stop = false; + input[0].next = true; + } + if (pIMem) { + imemState.pMem = pIMem; + imemState.state = SMEMNEXTROW_ENTER; + input[1].stop = false; + input[1].next = true; + } + + int16_t nilColCount = nCol - 1; // count of null & none cols + int iCol = 0; // index of first nil col index from left to right + bool setICol = false; + + do { + for (int i = 0; i < 3; ++i) { + if (input[i].next && !input[i].stop) { + code = input[i].nextRowFn(input[i].iter, &input[i].pRow); + if (code) goto _err; + + if (input[i].pRow == NULL) { + input[i].stop = true; + input[i].next = false; + } + } + } + + if (input[0].stop && input[1].stop && input[2].stop) { + break; + } + + // select maxpoint(s) from mem, imem, fs + TSDBROW *max[3] = {0}; + int iMax[3] = {-1, -1, -1}; + int nMax = 0; + TSKEY maxKey = TSKEY_MIN; + + for (int i = 0; i < 3; ++i) { + if (!input[i].stop && input[i].pRow != NULL) { + TSDBKEY key = TSDBROW_KEY(input[i].pRow); + + // merging & deduplicating on client side + if (maxKey <= key.ts) { + if (maxKey < key.ts) { + nMax = 0; + maxKey = key.ts; + } + + iMax[nMax] = i; + max[nMax++] = input[i].pRow; + } + } + } + + // delete detection + TSDBROW *merge[3] = {0}; + int iMerge[3] = {-1, -1, -1}; + int nMerge = 0; + for (int i = 0; i < nMax; ++i) { + TSDBKEY maxKey = TSDBROW_KEY(max[i]); + + // bool deleted = false; + bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline); + if (!deleted) { + iMerge[nMerge] = i; + merge[nMerge++] = max[i]; + } + + input[iMax[i]].next = deleted; + } + + // merge if nMerge > 1 + if (nMerge > 0) { + if (nMerge == 1) { + code = tsRowFromTsdbRow(pTSchema, merge[nMerge - 1], ppRow); + if (code) goto _err; + } else { + // merge 2 or 3 rows + SRowMerger merger = {0}; + + tRowMergerInit(&merger, merge[0], pTSchema); + for (int i = 1; i < nMerge; ++i) { + tRowMerge(&merger, merge[i]); + } + tRowMergerGetRow(&merger, ppRow); + tRowMergerClear(&merger); + } + } + + if (iCol == 0) { + STColumn *pTColumn = &pTSchema->columns[0]; + SColVal *pColVal = &(SColVal){0}; + + *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = maxKey}); + + if (taosArrayPush(pColArray, pColVal) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + ++iCol; + + setICol = false; + for (int16_t i = iCol; iCol < nCol; ++i) { + // tsdbRowGetColVal(*ppRow, pTSchema, i, pColVal); + if (taosArrayPush(pColArray, pColVal) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + if (pColVal->isNull || pColVal->isNone) { + for (int j = 0; j < nMerge; ++j) { + SColVal jColVal = {0}; + tsdbRowGetColVal(merge[j], pTSchema, i, &jColVal); + if (jColVal.isNull || jColVal.isNone) { + input[iMerge[j]].next = true; + } + } + if (!setICol) { + iCol = i; + setICol = true; + } + } else { + --nilColCount; + } + } + + continue; + } + + setICol = false; + for (int16_t i = iCol; i < nCol; ++i) { + SColVal colVal = {0}; + tTSRowGetVal(*ppRow, pTSchema, i, &colVal); + + SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, i); + + if (!colVal.isNone && !colVal.isNull) { + if (tColVal->isNull || tColVal->isNone) { + taosArraySet(pColArray, i, &colVal); + --nilColCount; + } + } else { + if (tColVal->isNull || tColVal->isNone && !setICol) { + iCol = i; + setICol = true; + + for (int j = 0; j < nMerge; ++j) { + SColVal jColVal = {0}; + tsdbRowGetColVal(merge[j], pTSchema, i, &jColVal); + if (jColVal.isNull || jColVal.isNone) { + input[iMerge[j]].next = true; + } + } + } + } + } + } while (nilColCount > 0); + + // if () new ts row from pColArray if non empty + if (taosArrayGetSize(pColArray) == nCol) { + code = tdSTSRowNew(pColArray, pTSchema, ppRow); + if (code) goto _err; + } + taosArrayDestroy(pColArray); + taosMemoryFreeClear(pTSchema); + + return code; +_err: + taosArrayDestroy(pColArray); + taosMemoryFreeClear(pTSchema); + tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { int32_t code = 0; char key[32] = {0}; @@ -749,6 +1003,8 @@ int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRo *ppRow = (STSRow *)taosLRUCacheValue(pCache, h); } + // taosLRUCacheRelease(pCache, h, true); + return code; } @@ -763,7 +1019,7 @@ int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow * *ppRow = (STSRow *)taosLRUCacheValue(pCache, h); } else { STSRow *pRow = NULL; - // code = mergeLast(uid, pTsdb, &pRow); + code = mergeLast(uid, pTsdb, &pRow); // if table's empty or error, return code of -1 if (code < 0 || pRow == NULL) { return -1; @@ -774,10 +1030,12 @@ int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow * *ppRow = (STSRow *)taosLRUCacheValue(pCache, h); } + // taosLRUCacheRelease(pCache, h, true); + return code; } -int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid) { +int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { int32_t code = 0; char key[32] = {0}; int keyLen = 0; @@ -785,10 +1043,32 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid) { getTableCacheKey(uid, "lr", key, &keyLen); LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); if (h) { + STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h); + if (pRow->ts <= eKey) { + taosLRUCacheRelease(pCache, h, true); + } else { + taosLRUCacheRelease(pCache, h, false); + } + + // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); + } + + getTableCacheKey(uid, "l", key, &keyLen); + h = taosLRUCacheLookup(pCache, key, keyLen); + if (h) { + // clear last cache anyway, no matter where eKey ends. taosLRUCacheRelease(pCache, h, true); - // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t - // keyLen); + + // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); } return code; } + +int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { + int32_t code = 0; + + taosLRUCacheRelease(pCache, h, false); + + return code; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 537fa5b866..a9c22b6b7f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -180,7 +180,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid pMemTable->nDel++; if (tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) { - tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid); + tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey); } tsdbError("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 From 8e1de153dd4c273d9d30d8cce2602a447bcc7b65 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 29 Jun 2022 08:54:32 +0000 Subject: [PATCH 02/10] fix bug --- source/dnode/vnode/src/tsdb/tsdbUtil.c | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 489d6d749a..d43bf3664e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1031,20 +1031,24 @@ static int32_t tColDataUpdateOffset(SColData *pColData) { ASSERT(pColData->nVal > 0); ASSERT(pColData->flag); + ASSERT(IS_VAR_DATA_TYPE(pColData->type)); - if (IS_VAR_DATA_TYPE(pColData->type) && (pColData->flag & HAS_VALUE)) { + if ((pColData->flag & HAS_VALUE)) { code = tsdbRealloc((uint8_t **)&pColData->aOffset, sizeof(int32_t) * pColData->nVal); if (code) goto _exit; int32_t offset = 0; for (int32_t iVal = 0; iVal < pColData->nVal; iVal++) { - uint8_t v = GET_BIT2(pColData->pBitMap, iVal); - if (v == 0 || v == 1) { - pColData->aOffset[iVal] = -1; - } else { - pColData->aOffset[iVal] = offset; - offset += tGetValue(pColData->pData + offset, &value, pColData->type); + if (pColData->flag != HAS_VALUE) { + uint8_t v = GET_BIT2(pColData->pBitMap, iVal); + if (v == 0 || v == 1) { + pColData->aOffset[iVal] = -1; + continue; + } } + + pColData->aOffset[iVal] = offset; + offset += tGetValue(pColData->pData + offset, &value, pColData->type); } ASSERT(offset == pColData->nData); From 3a566d5e5ed4a1b4a8e4f0afce85c688ef62f26c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 29 Jun 2022 17:14:00 +0800 Subject: [PATCH 03/10] refactor(query): do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 32 +++++++++++++++---------- source/libs/executor/inc/executil.h | 2 +- source/libs/executor/src/executil.c | 6 ++--- source/libs/executor/src/executorimpl.c | 8 +++---- source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/tsort.c | 1 + 6 files changed, 30 insertions(+), 21 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 062e91a6a6..bd4f4d1ca8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -984,8 +984,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI } } else { SColVal cv = {0}; - - SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pReader->suppInfo.slotIds[i]); + SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pReader->suppInfo.slotIds[i] - 1); for (int32_t j = 0; j < pBlockData->nRow; ++j) { tColDataGetValue(pData, j, &cv); colDataAppend(pColData, j, (const char*)&cv.value, cv.isNull); @@ -994,7 +993,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI } pReader->pResBlock->info.rows = pBlockData->nRow; - setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlockData); + setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlockData); /* int32_t ret = tsdbLoadBlockDataCols(&(pReader->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, @@ -2283,7 +2282,7 @@ static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) } static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) { - return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) /*&& (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer)*/; + return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer); } static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) { @@ -2616,7 +2615,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { return code; } -static int32_t buildInmemBlockSeqentially(STsdbReader* pReader) { +static int32_t buildBlockFromBufferSeqentially(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; while(1) { @@ -2644,7 +2643,7 @@ static int32_t buildInmemBlockSeqentially(STsdbReader* pReader) { } } -static int32_t loadDataInFiles(STsdbReader* pReader) { +static int32_t buildBlockFromFiles(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SFileSetIter* pFIter = &pStatus->fileIter; @@ -3018,7 +3017,7 @@ int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) { // // check if the query range overlaps with the file data block // bool exists = true; -// int32_t code = loadDataInFiles(pTsdbReadHandle, &exists); +// int32_t code = buildBlockFromFiles(pTsdbReadHandle, &exists); // if (code != TSDB_CODE_SUCCESS) { // pTsdbReadHandle->checkFiles = false; // return false; @@ -3306,7 +3305,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { if (pReader->type == BLOCK_LOAD_OFFSET_ORDER) { if (pStatus->loadFromFile) { - int32_t code = loadDataInFiles(pReader); + int32_t code = buildBlockFromFiles(pReader); if (code != TSDB_CODE_SUCCESS) { return false; } @@ -3314,11 +3313,11 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { if (pBlock->info.rows > 0) { return true; } else { - buildInmemBlockSeqentially(pReader); + buildBlockFromBufferSeqentially(pReader); return pBlock->info.rows > 0; } } else { // no data in files, let's try the buffer - buildInmemBlockSeqentially(pReader); + buildBlockFromBufferSeqentially(pReader); return pBlock->info.rows > 0; } } else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) { @@ -3334,7 +3333,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { // if (pReader->checkFiles) { // // check if the query range overlaps with the file data block // bool exists = true; - // int32_t code = loadDataInFiles(pReader, &exists); + // int32_t code = buildBlockFromFiles(pReader, &exists); // if (code != TSDB_CODE_SUCCESS) { // pReader->activeIndex = 0; // pReader->checkFiles = false; @@ -3454,7 +3453,16 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); int32_t code = tBlockDataInit(&pStatus->fileBlockData); - doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } + + code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } return pReader->pResBlock->pDataBlock; } diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index f3e1eb47e8..b5aad7589a 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -107,7 +107,7 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext); -int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo); +int32_t getTableList(void* metaHandle, void* vnode, SScanPhysiNode* pScanNode, STableListInfo* pListInfo); SArray* createSortInfo(SNodeList* pNodeList); SArray* extractPartitionColInfo(SNodeList* pNodeList); SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 221ebbd3ac..74debd1531 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -283,7 +283,7 @@ static bool isTableOk(STableKeyInfo* info, SNode *pTagCond, SMeta *metaHandle){ return result; } -int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) { +int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) { int32_t code = TSDB_CODE_SUCCESS; pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo)); if(pListInfo->pTableList == NULL) return TSDB_CODE_OUT_OF_MEMORY; @@ -303,7 +303,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo //code = doFilterTag(pTagIndexCond, &metaArg, res); code = TSDB_CODE_INDEX_REBUILDING; if (code == TSDB_CODE_INDEX_REBUILDING) { - code = vnodeGetAllTableList(metaHandle, tableUid, pListInfo->pTableList); + code = vnodeGetAllTableList(pVnode, tableUid, pListInfo->pTableList); } else if (code != TSDB_CODE_SUCCESS) { qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid); taosArrayDestroy(res); @@ -319,7 +319,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo } taosArrayDestroy(res); } else { - code = vnodeGetAllTableList(metaHandle, tableUid, pListInfo->pTableList); + code = vnodeGetAllTableList(pVnode, tableUid, pListInfo->pTableList); } if(pTagCond){ diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 570dea474a..496cfbf276 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4121,7 +4121,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; - int32_t code = getTableList(pHandle->meta, pScanPhyNode, pTableListInfo); + int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = terrno; return NULL; @@ -4133,7 +4133,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); if (pBlockNode->tableType == TSDB_SUPER_TABLE) { - int32_t code = vnodeGetAllTableList(pHandle->meta, pBlockNode->uid, pTableListInfo->pTableList); + int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pTableListInfo->pTableList); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = terrno; return NULL; @@ -4183,7 +4183,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); if (pScanNode->tableType == TSDB_SUPER_TABLE) { - code = vnodeGetAllTableList(pHandle->meta, pScanNode->uid, pTableListInfo->pTableList); + code = vnodeGetAllTableList(pHandle->vnode, pScanNode->uid, pTableListInfo->pTableList); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = terrno; return NULL; @@ -4399,7 +4399,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { } STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, const char* idstr) { - int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo); + int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index bb3f4e403d..a467dd2a53 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2033,7 +2033,7 @@ typedef struct STableMergeScanInfo { int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) { - int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo); + int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 7d44d41b55..b3c51c8f88 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -545,6 +545,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { return 0; } +// TODO consider the page meta size int32_t getProperSortPageSize(size_t rowSize) { uint32_t defaultPageSize = 4096; From 847016aa21281d9e1987dfd6dbcd497b9c085b08 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 29 Jun 2022 17:29:49 +0800 Subject: [PATCH 04/10] tsdbCache: new H version api for last/last_row getting --- source/dnode/vnode/src/inc/tsdb.h | 8 +++- source/dnode/vnode/src/tsdb/tsdbCache.c | 59 ++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index eac4f06132..76bfc217b3 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -250,10 +250,16 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf); int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(SLRUCache *pCache); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row); + +// bug api, deprecated, USE H version int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow); -int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); + +int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h); +int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h); int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); +int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); + // structs ======================= typedef struct { int minFid; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 12f323e92e..3cd8ac922a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1007,7 +1007,7 @@ int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRo return code; } - +#if 0 int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { int32_t code = 0; char key[32] = {0}; @@ -1034,6 +1034,63 @@ int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow * return code; } +#endif +int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) { + int32_t code = 0; + char key[32] = {0}; + int keyLen = 0; + + getTableCacheKey(uid, "lr", key, &keyLen); + LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); + if (h) { + //*ppRow = (STSRow *)taosLRUCacheValue(pCache, h); + } else { + STSRow *pRow = NULL; + code = mergeLastRow(uid, pTsdb, &pRow); + // if table's empty or error, return code of -1 + if (code < 0 || pRow == NULL) { + return -1; + } + + tsdbCacheInsertLastrow(pCache, uid, pRow); + h = taosLRUCacheLookup(pCache, key, keyLen); + //*ppRow = (STSRow *)taosLRUCacheValue(pCache, h); + } + + *handle = h; + // taosLRUCacheRelease(pCache, h, true); + + return code; +} + +int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) { + int32_t code = 0; + char key[32] = {0}; + int keyLen = 0; + + getTableCacheKey(uid, "l", key, &keyLen); + LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); + if (h) { + //*ppRow = (STSRow *)taosLRUCacheValue(pCache, h); + + } else { + STSRow *pRow = NULL; + code = mergeLast(uid, pTsdb, &pRow); + // if table's empty or error, return code of -1 + if (code < 0 || pRow == NULL) { + return -1; + } + + tsdbCacheInsertLast(pCache, uid, pRow); + h = taosLRUCacheLookup(pCache, key, keyLen); + //*ppRow = (STSRow *)taosLRUCacheValue(pCache, h); + } + + *handle = h; + // taosLRUCacheRelease(pCache, h, true); + + return code; +} int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { int32_t code = 0; From 843ef87574050989ad229f38b2e393f5ed06d529 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 29 Jun 2022 09:34:04 +0000 Subject: [PATCH 05/10] fix file corrput --- source/dnode/vnode/src/tsdb/tsdbReaderWriter.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index ef9d7c0424..a3f1c92772 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -908,7 +908,7 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, pBlockData->nRow = pSubBlock->nRow; p = *ppBuf1 + sizeof(SBlockDataHdr); - code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2); + code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, p, ppBuf2); if (code) goto _err; p = p + pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM); From 16074154693ff826df5ef87c1797b151af261af9 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 29 Jun 2022 09:37:59 +0000 Subject: [PATCH 06/10] add a msg type --- include/common/tmsgdef.h | 1 + 1 file changed, 1 insertion(+) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index acf08bd47e..60f8d28c12 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -205,6 +205,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "drop-ttl-stb", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_COMMIT, "commit vnode", NULL, NULL) TD_NEW_MSG_SEG(TDMT_STREAM_MSG) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) From 2a6be2bc8a2ca03252b8bcab16e398e610f1dfdd Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 29 Jun 2022 09:40:33 +0000 Subject: [PATCH 07/10] do commit --- source/dnode/vnode/src/vnd/vnodeSvr.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 34b596e34a..60a13ee34c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -218,6 +218,8 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp break; case TDMT_VND_ALTER_CONFIG: break; + case TDMT_VND_COMMIT: + goto _do_commit; default: ASSERT(0); break; @@ -232,6 +234,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp // commit if need if (vnodeShouldCommit(pVnode)) { + _do_commit: vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version); // commit current change vnodeCommit(pVnode); From 8cd3bffb3685db550ce2c1ddd5f64cac6d8fed54 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 29 Jun 2022 17:41:06 +0800 Subject: [PATCH 08/10] tsdbCache/delete: refine tsdbCacheDelete API --- source/dnode/vnode/src/inc/tsdb.h | 2 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 42 ++++++++++++++++++++-- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 2 +- 3 files changed, 42 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 76bfc217b3..a8b5822749 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -258,7 +258,7 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h); int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); -int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); +int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); // structs ======================= typedef struct { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 3cd8ac922a..7776954b6b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -51,6 +51,44 @@ static void getTableCacheKey(tb_uid_t uid, const char *cacheType, char *key, int static void deleteTableCacheLastrow(const void *key, size_t keyLen, void *value) { taosMemoryFree(value); } +static int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { + int32_t code = 0; + char key[32] = {0}; + int keyLen = 0; + + getTableCacheKey(uid, "lr", key, &keyLen); + LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); + if (h) { + STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h); + if (pRow->ts <= eKey) { + taosLRUCacheRelease(pCache, h, true); + } else { + taosLRUCacheRelease(pCache, h, false); + } + + // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); + } + + return code; +} + +static int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { + int32_t code = 0; + char key[32] = {0}; + int keyLen = 0; + + getTableCacheKey(uid, "l", key, &keyLen); + LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); + if (h) { + // clear last cache anyway, no matter where eKey ends. + taosLRUCacheRelease(pCache, h, true); + + // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); + } + + return code; +} + int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row) { int32_t code = 0; STSRow *cacheRow = NULL; @@ -97,7 +135,7 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row) { if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) { tdRowCpy(cacheRow, row); } else { - tsdbCacheDeleteLastrow(pCache, uid, TSKEY_MAX); + tsdbCacheDeleteLast(pCache, uid, TSKEY_MAX); tsdbCacheInsertLastrow(pCache, uid, row); } } @@ -1092,7 +1130,7 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand return code; } -int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { +int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { int32_t code = 0; char key[32] = {0}; int keyLen = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index a9c22b6b7f..ba1fddf956 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -180,7 +180,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid pMemTable->nDel++; if (tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) { - tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey); + tsdbCacheDelete(pTsdb->lruCache, pTbData->uid, eKey); } tsdbError("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 From ed69d8ee9f12d11b5a8ff95384d805f4590da970 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 29 Jun 2022 10:05:21 +0000 Subject: [PATCH 09/10] more work --- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 223 +++++++++--------- 1 file changed, 116 insertions(+), 107 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index a3f1c92772..4f07dfd497 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -738,98 +738,135 @@ _err: return code; } +static int32_t tsdbReadColDataImpl(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock, + int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1, + uint8_t **ppBuf2) { + TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; + SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock]; + int32_t code = 0; + int64_t offset; + int64_t size; + int64_t n; + + tBlockDataReset(pBlockData); + pBlockData->nRow = pSubBlock->nRow; + + // TSDBKEY + offset = pSubBlock->offset + sizeof(SBlockDataHdr); + size = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM); + code = tsdbRealloc(ppBuf1, size); + if (code) goto _err; + + n = taosLSeekFile(pFD, offset, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + n = taosReadFile(pFD, *ppBuf1, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2); + if (code) goto _err; + + // OTHER + SBlockCol blockCol; + SBlockCol *pBlockCol = &blockCol; + SColData *pColData; + for (int32_t iCol = 0; iCol < nCol; iCol++) { + int16_t cid = aColId[iCol]; + + if (tMapDataSearch(&pSubBlock->mBlockCol, &(SBlockCol){.cid = cid}, tGetBlockCol, tBlockColCmprFn, pBlockCol) == + 0) { + code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData); + if (code) goto _err; + + tColDataReset(pColData, pBlockCol->cid, pBlockCol->type); + if (pBlockCol->flag == HAS_NULL) { + for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) { + code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type)); + if (code) goto _err; + } + } else { + offset = pSubBlock->offset + sizeof(SBlockDataHdr) + pSubBlock->vsize + pSubBlock->ksize + pBlockCol->offset; + size = pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); + + code = tsdbRealloc(ppBuf1, size); + if (code) goto _err; + + // seek + n = taosLSeekFile(pFD, offset, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // read + n = taosReadFile(pFD, *ppBuf1, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + code = tsdbRecoverColData(pBlockData, pSubBlock, pBlockCol, pColData, *ppBuf1, ppBuf2); + if (code) goto _err; + } + } + } + return code; + +_err: + return code; +} + int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2) { - int32_t code = 0; - TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; - uint8_t *pBuf1 = NULL; - uint8_t *pBuf2 = NULL; + int32_t code = 0; + uint8_t *pBuf1 = NULL; + uint8_t *pBuf2 = NULL; ASSERT(nCol == 0 || aColId[0] != PRIMARYKEY_TIMESTAMP_COL_ID); if (!ppBuf1) ppBuf1 = &pBuf1; if (!ppBuf2) ppBuf2 = &pBuf2; - for (int32_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock]; - int64_t offset; - int64_t size; - int64_t n; + code = tsdbReadColDataImpl(pReader, pBlockIdx, pBlock, 0, aColId, nCol, pBlockData, ppBuf1, ppBuf2); + if (code) goto _err; - tBlockDataReset(pBlockData); - pBlockData->nRow = pSubBlock->nRow; + if (pBlock->nSubBlock > 1) { + SBlockData *pBlockData1 = &(SBlockData){0}; + SBlockData *pBlockData2 = &(SBlockData){0}; - // TSDBKEY - offset = pSubBlock->offset + sizeof(SBlockDataHdr); - size = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM); - code = tsdbRealloc(ppBuf1, size); - if (code) goto _err; + for (int32_t iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { + code = tsdbReadColDataImpl(pReader, pBlockIdx, pBlock, iSubBlock, aColId, nCol, pBlockData1, ppBuf1, ppBuf2); + if (code) goto _err; - n = taosLSeekFile(pFD, offset, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tBlockDataCopy(pBlockData, pBlockData2); + if (code) { + tBlockDataClear(pBlockData1); + tBlockDataClear(pBlockData2); + goto _err; + } - n = taosReadFile(pFD, *ppBuf1, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2); - if (code) goto _err; - - // OTHER - SBlockCol blockCol; - SBlockCol *pBlockCol = &blockCol; - SColData *pColData; - for (int32_t iCol = 0; iCol < nCol; iCol++) { - int16_t cid = aColId[iCol]; - - if (tMapDataSearch(&pSubBlock->mBlockCol, &(SBlockCol){.cid = cid}, tGetBlockCol, tBlockColCmprFn, pBlockCol) == - 0) { - code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData); - if (code) goto _err; - - tColDataReset(pColData, pBlockCol->cid, pBlockCol->type); - if (pBlockCol->flag == HAS_NULL) { - for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) { - code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type)); - if (code) goto _err; - } - } else { - offset = pSubBlock->offset + sizeof(SBlockDataHdr) + pSubBlock->vsize + pSubBlock->ksize + pBlockCol->offset; - size = pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); - - code = tsdbRealloc(ppBuf1, size); - if (code) goto _err; - - // seek - n = taosLSeekFile(pFD, offset, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - // read - n = taosReadFile(pFD, *ppBuf1, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - code = tsdbRecoverColData(pBlockData, pSubBlock, pBlockCol, pColData, *ppBuf1, ppBuf2); - if (code) goto _err; - } + code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData); + if (code) { + tBlockDataClear(pBlockData1); + tBlockDataClear(pBlockData2); + goto _err; } } + + tBlockDataClear(pBlockData1); + tBlockDataClear(pBlockData2); } tsdbFree(pBuf1); @@ -876,34 +913,6 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, goto _err; } - // // check - // p = *ppBuf1; - // SBlockDataHdr *pHdr = (SBlockDataHdr *)p; - // ASSERT(pHdr->delimiter == TSDB_FILE_DLMT); - // ASSERT(pHdr->suid == pBlockIdx->suid); - // ASSERT(pHdr->uid == pBlockIdx->uid); - // p += sizeof(*pHdr); - - // if (!taosCheckChecksumWhole(p, pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM))) { - // code = TSDB_CODE_FILE_CORRUPTED; - // goto _err; - // } - // p += (pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM)); - - // for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) { - // tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol); - - // ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE); - - // if (pBlockCol->flag == HAS_NULL) continue; - - // if (!taosCheckChecksumWhole(p, pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM))) { - // code = TSDB_CODE_FILE_CORRUPTED; - // goto _err; - // } - // p = p + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); - // } - // recover pBlockData->nRow = pSubBlock->nRow; p = *ppBuf1 + sizeof(SBlockDataHdr); @@ -964,7 +973,7 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p SBlockData *pBlockData2 = &(SBlockData){0}; for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData, ppBuf1, ppBuf2); + code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2); if (code) { tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData2); From 5e793c2f3f63459ea360546e28083c0189786684 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 29 Jun 2022 18:10:25 +0800 Subject: [PATCH 10/10] tsdbCacheRead: use new H version api of last_row --- source/dnode/vnode/src/tsdb/tsdbCache.c | 6 ++- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 53 ++++++++++++--------- 2 files changed, 35 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 7776954b6b..0cd555c5ac 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1087,7 +1087,8 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH code = mergeLastRow(uid, pTsdb, &pRow); // if table's empty or error, return code of -1 if (code < 0 || pRow == NULL) { - return -1; + *handle = NULL; + return 0; } tsdbCacheInsertLastrow(pCache, uid, pRow); @@ -1116,7 +1117,8 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand code = mergeLast(uid, pTsdb, &pRow); // if table's empty or error, return code of -1 if (code < 0 || pRow == NULL) { - return -1; + *handle = NULL; + return 0; } tsdbCacheInsertLast(pCache, uid, pRow); diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index e3eb015c75..e00c165f99 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -22,15 +22,15 @@ typedef struct SLastrowReader { SVnode* pVnode; STSchema* pSchema; uint64_t uid; -// int32_t* pSlotIds; - char** transferBuf; // todo remove it soon - int32_t numOfCols; - int32_t type; - int32_t tableIndex; // currently returned result tables - SArray* pTableList; // table id list + // int32_t* pSlotIds; + char** transferBuf; // todo remove it soon + int32_t numOfCols; + int32_t type; + int32_t tableIndex; // currently returned result tables + SArray* pTableList; // table id list } SLastrowReader; -static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t *slotIds) { +static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t* slotIds) { int32_t numOfRows = pBlock->info.rows; size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); @@ -60,21 +60,21 @@ static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReade pBlock->info.rows += 1; } - -int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t* colId, int32_t numOfCols, void** pReader) { +int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t* colId, int32_t numOfCols, + void** pReader) { SLastrowReader* p = taosMemoryCalloc(1, sizeof(SLastrowReader)); if (p == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - p->type = type; - p->pVnode = pVnode; - p->numOfCols = numOfCols; + p->type = type; + p->pVnode = pVnode; + p->numOfCols = numOfCols; p->transferBuf = taosMemoryCalloc(p->numOfCols, POINTER_BYTES); STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0); - p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1); - p->pTableList = pTableIdList; + p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1); + p->pTableList = pTableIdList; #if 0 for(int32_t i = 0; i < p->numOfCols; ++i) { for(int32_t j = 0; j < p->pSchema->numOfCols; ++j) { @@ -101,7 +101,7 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t tsdbLastrowReaderClose(void* pReader) { SLastrowReader* p = pReader; - for(int32_t i = 0; i < p->numOfCols; ++i) { + for (int32_t i = 0; i < p->numOfCols; ++i) { taosMemoryFreeClear(p->transferBuf[i]); } @@ -117,8 +117,9 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t SLastrowReader* pr = pReader; - STSRow* pRow = NULL; - size_t numOfTables = taosArrayGetSize(pr->pTableList); + LRUHandle* h = NULL; + STSRow* pRow = NULL; + size_t numOfTables = taosArrayGetSize(pr->pTableList); // retrieve the only one last row of all tables in the uid list. if (pr->type == LASTROW_RETRIEVE_TYPE_SINGLE) { @@ -126,16 +127,18 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t bool internalResult = false; for (int32_t i = 0; i < numOfTables; ++i) { STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); - - int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); + + /* int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); */ + int32_t code = tsdbCacheGetLastrowH(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h); if (code != TSDB_CODE_SUCCESS) { return code; } - if (pRow == NULL) { + if (h == NULL) { continue; } + pRow = (STSRow*)taosLRUCacheValue(pr->pVnode->pTsdb->lruCache, h); if (pRow->ts > lastKey) { // Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already // appended or not. @@ -147,23 +150,29 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t internalResult = true; lastKey = pRow->ts; } + + tsdbCacheRelease(pr->pVnode->pTsdb->lruCache, h); } } else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) { for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); - int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); + /* int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); */ + int32_t code = tsdbCacheGetLastrowH(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h); if (code != TSDB_CODE_SUCCESS) { return code; } // no data in the table of Uid - if (pRow == NULL) { + if (h == NULL) { continue; } + pRow = (STSRow*)taosLRUCacheValue(pr->pVnode->pTsdb->lruCache, h); saveOneRow(pRow, pResBlock, pr, slotIds); + tsdbCacheRelease(pr->pVnode->pTsdb->lruCache, h); + pr->tableIndex += 1; if (pResBlock->info.rows >= pResBlock->info.capacity) { return TSDB_CODE_SUCCESS;