From 980382d4334b6f2b11dc9dde59f7d1d44d3c980c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Apr 2024 11:18:08 +0800 Subject: [PATCH 01/25] fix(tsdb): fix memory error. --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 22 ++++++++++-------- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 1 + source/libs/executor/src/cachescanoperator.c | 24 ++++++++++---------- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index dd5da28b6b..d5f3624851 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -57,7 +57,6 @@ static void saveOneRowForLastRaw(SLastCol* pColVal, SCacheRowsReader* pReader, c static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, const int32_t* dstSlotIds, void** pRes, const char* idStr) { int32_t numOfRows = pBlock->info.rows; - // bool allNullRow = true; if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { uint64_t ts = TSKEY_MIN; @@ -108,11 +107,12 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p } } - // pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to substruct it + // pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to subtract it p->hasResult = true; varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE); colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false); } + for (int32_t idx = 0; idx < taosArrayGetSize(pBlock->pDataBlock); ++idx) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx); if (idx < funcTypeBlockArray->size) { @@ -233,6 +233,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, if (IS_VAR_DATA_TYPE(pPkCol->type)) { p->rowKey.pks[0].pData = taosMemoryCalloc(1, pPkCol->bytes); } + + p->pkColumn = *pPkCol; } if (numOfTables == 0) { @@ -366,15 +368,15 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 goto _end; } - for (int32_t j = 0; j < pr->numOfCols; ++j) { - int32_t bytes; - if (slotIds[j] == -1) { - bytes = 1; - } else { - bytes = pr->pSchema->columns[slotIds[j]].bytes; - } + int32_t pkBufLen = 0; + if (pr->rowKey.numOfPKs > 0) { + pkBufLen = pr->pkColumn.bytes; + } - pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + VARSTR_HEADER_SIZE); + for (int32_t j = 0; j < pr->numOfCols; ++j) { + int32_t bytes = (slotIds[j] == -1) ? 1 : pr->pSchema->columns[slotIds[j]].bytes; + + pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + pkBufLen + VARSTR_HEADER_SIZE); SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]); p->ts = INT64_MIN; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index af7d00e019..bece22adad 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -382,6 +382,7 @@ typedef struct SCacheRowsReader { SArray* pFuncTypeList; __compar_fn_t pkComparFn; SRowKey rowKey; + SColumnInfo pkColumn; } SCacheRowsReader; int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 56052434a4..23e873d335 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -40,7 +40,7 @@ typedef struct SCacheRowsScanInfo { SExprSupp pseudoExprSup; int32_t retrieveType; int32_t currentGroupIndex; - SSDataBlock* pBufferredRes; + SSDataBlock* pBufferedRes; SArray* pUidList; SArray* pCidList; int32_t indexOfBufferedRes; @@ -160,9 +160,9 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe capacity = TMIN(totalTables, 4096); - pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false); - setColIdForCacheReadBlock(pInfo->pBufferredRes, pScanNode); - blockDataEnsureCapacity(pInfo->pBufferredRes, capacity); + pInfo->pBufferedRes = createOneDataBlock(pInfo->pRes, false); + setColIdForCacheReadBlock(pInfo->pBufferedRes, pScanNode); + blockDataEnsureCapacity(pInfo->pBufferedRes, capacity); } else { // by tags pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull); capacity = 1; // only one row output @@ -219,18 +219,18 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } - if (pInfo->indexOfBufferedRes >= pInfo->pBufferredRes->info.rows) { - blockDataCleanup(pInfo->pBufferredRes); + if (pInfo->indexOfBufferedRes >= pInfo->pBufferedRes->info.rows) { + blockDataCleanup(pInfo->pBufferedRes); taosArrayClear(pInfo->pUidList); - int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, + int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes, pInfo->pSlotIds, pInfo->pDstSlotIds, pInfo->pUidList); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } // check for tag values - int32_t resultRows = pInfo->pBufferredRes->info.rows; + int32_t resultRows = pInfo->pBufferedRes->info.rows; // the results may be null, if last values are all null ASSERT(resultRows == 0 || resultRows == taosArrayGetSize(pInfo->pUidList)); @@ -239,12 +239,12 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->pRes; - if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) { - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferredRes->pDataBlock); ++i) { + if (pInfo->indexOfBufferedRes < pInfo->pBufferedRes->info.rows) { + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferedRes->pDataBlock); ++i) { SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i); int32_t slotId = pCol->info.slotId; - SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId); + SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferedRes->pDataBlock, slotId); SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId); if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) { @@ -350,7 +350,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { void destroyCacheScanOperator(void* param) { SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param; blockDataDestroy(pInfo->pRes); - blockDataDestroy(pInfo->pBufferredRes); + blockDataDestroy(pInfo->pBufferedRes); taosMemoryFree(pInfo->pSlotIds); taosMemoryFree(pInfo->pDstSlotIds); taosArrayDestroy(pInfo->pCidList); From 9811a9e2164a77e2bd7a84b68f349d9f0a7fee05 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Apr 2024 18:06:58 +0800 Subject: [PATCH 02/25] fix(tsdb): deep copy the pk for varchar type. --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 52 ++++++++++++++------- 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index e8b1f870c3..a14f866bcc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -53,6 +53,13 @@ SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, return pLoadInfo; } +static void freeItem(void* pValue) { + SValue* p = (SValue*) pValue; + if (IS_VAR_DATA_TYPE(p->type)) { + taosMemoryFree(p->pData); + } +} + void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { if (pLoadInfo == NULL) { return NULL; @@ -72,8 +79,8 @@ void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { if (pLoadInfo->info.pCount != NULL) { taosArrayDestroy(pLoadInfo->info.pUid); - taosArrayDestroy(pLoadInfo->info.pFirstKey); - taosArrayDestroy(pLoadInfo->info.pLastKey); + taosArrayDestroyEx(pLoadInfo->info.pFirstKey, freeItem); + taosArrayDestroyEx(pLoadInfo->info.pLastKey, freeItem); taosArrayDestroy(pLoadInfo->info.pCount); taosArrayDestroy(pLoadInfo->info.pFirstTs); taosArrayDestroy(pLoadInfo->info.pLastTs); @@ -319,6 +326,21 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray return TSDB_CODE_SUCCESS; } +static int32_t tValueDupPayload(SValue *pVal) { + if (IS_VAR_DATA_TYPE(pVal->type)) { + char *p = (char *)pVal->pData; + char *pBuf = taosMemoryMalloc(pVal->nData); + if (pBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + memcpy(pBuf, p, pVal->nData); + pVal->pData = (uint8_t *)pBuf; + } + + return TSDB_CODE_SUCCESS; +} + static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo, TStatisBlkArray *pStatisBlkArray, uint64_t suid, const char *id) { int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray); @@ -384,25 +406,16 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl break; } - if (IS_VAR_DATA_TYPE(vFirst.type)) { - char *p = (char *)vFirst.pData; - char *pBuf = taosMemoryMalloc(vFirst.nData); - memcpy(pBuf, p, vFirst.nData); - vFirst.pData = (uint8_t *)pBuf; - } + tValueDupPayload(&vFirst); taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst); + // todo add api to clone the original data code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast); if (code) { break; } - if (IS_VAR_DATA_TYPE(vLast.type)) { - char *p = (char *)vLast.pData; - char *pBuf = taosMemoryMalloc(vLast.nData); - memcpy(pBuf, p, vLast.nData); - vLast.pData = (uint8_t *)pBuf; - } + tValueDupPayload(&vLast); taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast); } @@ -420,8 +433,15 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl taosArrayPush(pBlockLoadInfo->info.pFirstTs, &record.firstKey.ts); taosArrayPush(pBlockLoadInfo->info.pLastTs, &record.lastKey.ts); - taosArrayPush(pBlockLoadInfo->info.pFirstKey, &record.firstKey.pks[0]); - taosArrayPush(pBlockLoadInfo->info.pLastKey, &record.lastKey.pks[0]); + SValue s = record.firstKey.pks[0]; + tValueDupPayload(&s); + + taosArrayPush(pBlockLoadInfo->info.pFirstKey, &s); + + s = record.lastKey.pks[0]; + tValueDupPayload(&s); + + taosArrayPush(pBlockLoadInfo->info.pLastKey, &s); i += 1; } } From b9581548c6121adcf89ee101715507e483d7723d Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 10 Apr 2024 18:20:56 +0800 Subject: [PATCH 03/25] adj msg --- source/libs/parser/src/parTranslater.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 732920a3ce..74c9338fed 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -8221,7 +8221,7 @@ static int32_t adjustOrderOfProjections(STranslateContext* pCxt, SNodeList** ppC if (TSDB_CODE_SUCCESS == code && !hasPrimaryKey && hasPkInTable(pMeta)) { code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, - "Primary key column of dest table can not be null"); + "Primary key column name must be defined in existed-stable field"); } SNodeList* pNewProjections = NULL; From f0beceb5ebde5528f32998bbd725843b780f0ed7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Apr 2024 18:32:08 +0800 Subject: [PATCH 04/25] fix(tsdb): check for the duplicated ts in delete-skyline. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 44 +++++++++++++++++++++---- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index a8a4ced517..c08face243 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -59,7 +59,7 @@ static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pR int32_t rowIndex); static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order, - SVersionRange* pVerRange); + SVersionRange* pVerRange, bool hasPk); static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow); @@ -1595,7 +1595,8 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc tColRowGetKeyDeepCopy(pRow->pBlockData, pRow->iRow, pkSrcSlot, pNextProc); if (pScanInfo->delSkyline != NULL && TARRAY_SIZE(pScanInfo->delSkyline) > 0) { - if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange)) { + if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange, + pSttBlockReader->numOfPks > 0)) { pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA; return true; } @@ -2135,7 +2136,7 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable if (pBlockScanInfo->delSkyline != NULL && TARRAY_SIZE(pBlockScanInfo->delSkyline) > 0) { bool dropped = hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver, pInfo->order, - &pInfo->verRange); + &pInfo->verRange, pReader->suppInfo.numOfPks > 0); if (dropped) { return false; } @@ -3381,8 +3382,35 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_ return (SVersionRange){.minVer = startVer, .maxVer = endVer}; } +static int32_t reverseSearchStartPos(const SArray* pDelList, int32_t index, int64_t key, bool asc) { + size_t num = taosArrayGetSize(pDelList); + int32_t start = index; + + if (asc) { + if (start >= num - 1) { + start = num - 1; + } + + TSDBKEY* p = taosArrayGet(pDelList, start); + while (p->ts >= key && start > 0) { + start -= 1; + } + } else { + if (index <= 0) { + start = 0; + } + + TSDBKEY* p = taosArrayGet(pDelList, start); + while (p->ts <= key && start < num - 1) { + start += 1; + } + } + + return start; +} + bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order, - SVersionRange* pVerRange) { + SVersionRange* pVerRange, bool hasPk) { if (pDelList == NULL || (TARRAY_SIZE(pDelList) == 0)) { return false; } @@ -3391,6 +3419,10 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t bool asc = ASCENDING_TRAVERSE(order); int32_t step = asc ? 1 : -1; + if (hasPk) { // handle the case where duplicated timestamps existed. + *index = reverseSearchStartPos(pDelList, *index, key, asc); + } + if (asc) { if (*index >= num - 1) { TSDBKEY* last = taosArrayGetLast(pDelList); @@ -3503,7 +3535,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) { return pRow; } else { - bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange); + bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange, pReader->suppInfo.numOfPks > 0); if (!dropped) { return pRow; } @@ -3528,7 +3560,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) { return pRow; } else { - bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange); + bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange, pReader->suppInfo.numOfPks > 0); if (!dropped) { return pRow; } From 7f11a3682bf80390e0f012afcf103a627c1af2a9 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 10 Apr 2024 18:41:20 +0800 Subject: [PATCH 05/25] enh: primary key column should not be null --- source/common/src/tdataformat.c | 22 +++++++++++++--------- source/libs/executor/src/dataInserter.c | 11 +---------- source/libs/parser/src/parInsertSql.c | 3 --- source/libs/parser/src/parInsertStmt.c | 10 ---------- 4 files changed, 14 insertions(+), 32 deletions(-) diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index f13a0a0825..f8d2da0bd5 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -101,16 +101,18 @@ typedef struct { int32_t kvRowSize; } SRowBuildScanInfo; -static FORCE_INLINE void tRowBuildScanAddNone(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) { - ASSERT((pTColumn->flags & COL_IS_KEY) == 0); +static FORCE_INLINE int32_t tRowBuildScanAddNone(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) { + if ((pTColumn->flags & COL_IS_KEY)) return TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; sinfo->numOfNone++; + return 0; } -static FORCE_INLINE void tRowBuildScanAddNull(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) { - ASSERT((pTColumn->flags & COL_IS_KEY) == 0); +static FORCE_INLINE int32_t tRowBuildScanAddNull(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) { + if ((pTColumn->flags & COL_IS_KEY)) return TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; sinfo->numOfNull++; sinfo->kvMaxOffset = sinfo->kvPayloadSize; sinfo->kvPayloadSize += tPutI16v(NULL, -pTColumn->colId); + return 0; } static FORCE_INLINE void tRowBuildScanAddValue(SRowBuildScanInfo *sinfo, SColVal *colVal, const STColumn *pTColumn) { @@ -142,6 +144,7 @@ static FORCE_INLINE void tRowBuildScanAddValue(SRowBuildScanInfo *sinfo, SColVal } static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildScanInfo *sinfo) { + int32_t code = 0; int32_t colValIndex = 1; int32_t numOfColVals = TARRAY_SIZE(colVals); SColVal *colValArray = (SColVal *)TARRAY_DATA(colVals); @@ -158,7 +161,7 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS for (int32_t i = 1; i < schema->numOfCols; i++) { for (;;) { if (colValIndex >= numOfColVals) { - tRowBuildScanAddNone(sinfo, schema->columns + i); + if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit; break; } @@ -168,15 +171,15 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS if (COL_VAL_IS_VALUE(&colValArray[colValIndex])) { tRowBuildScanAddValue(sinfo, &colValArray[colValIndex], schema->columns + i); } else if (COL_VAL_IS_NULL(&colValArray[colValIndex])) { - tRowBuildScanAddNull(sinfo, schema->columns + i); + if ((code = tRowBuildScanAddNull(sinfo, schema->columns + i))) goto _exit; } else if (COL_VAL_IS_NONE(&colValArray[colValIndex])) { - tRowBuildScanAddNone(sinfo, schema->columns + i); + if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit; } colValIndex++; break; } else if (colValArray[colValIndex].cid > schema->columns[i].colId) { - tRowBuildScanAddNone(sinfo, schema->columns + i); + if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit; break; } else { // skip useless value colValIndex++; @@ -250,7 +253,8 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS + sinfo->kvIndexSize // index array + sinfo->kvPayloadSize; // payload - return 0; +_exit: + return code; } static int32_t tRowBuildTupleRow(SArray *aColVal, const SRowBuildScanInfo *sinfo, const STSchema *schema, diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 06f63f5f04..45d6f55278 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -216,11 +216,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY ASSERT(pColInfoData->info.type == pCol->type); if (colDataIsNull_s(pColInfoData, j)) { - if ((pCol->flags & COL_IS_KEY)) { - qError("Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type); - terrno = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; - goto _end; - } + SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); taosArrayPush(pVals, &cv); } else { @@ -248,11 +244,6 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL; goto _end; } - if ((pCol->flags & COL_IS_KEY)) { - qError("Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type); - terrno = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; - goto _end; - } SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type taosArrayPush(pVals, &cv); diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index a1c257022a..f3192b4956 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1657,9 +1657,6 @@ static int32_t parseValueToken(SInsertParseContext* pCxt, const char** pSql, STo if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { return buildSyntaxErrMsg(&pCxt->msg, "Primary timestamp column should not be null", pToken->z); } - if (pSchema->flags & COL_IS_KEY) { - return buildSyntaxErrMsg(&pCxt->msg, "Primary key column should not be null", pToken->z); - } pVal->flag = CV_FLAG_NULL; return TSDB_CODE_SUCCESS; diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index 59c5ce82ad..bdeb548bd7 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -267,11 +267,6 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in pBind = bind + c; } - if(pBind->is_null && (pColSchema->flags & COL_IS_KEY)){ - code = buildInvalidOperationMsg(&pBuf, "Primary key column should not be null"); - goto _return; - } - code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE: -1); if (code) { goto _return; @@ -318,11 +313,6 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu pBind = bind; } - if (pBind->is_null && (pColSchema->flags & COL_IS_KEY)) { - code = buildInvalidOperationMsg(&pBuf, "Primary key column should not be null"); - goto _return; - } - tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1); qDebug("stmt col %d bind %d rows data", colIdx, rowNum); From 507e40ddb7485e8b01a15c9961a49856dc88301c Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 10 Apr 2024 18:56:55 +0800 Subject: [PATCH 06/25] enh: primary key column should not be null --- source/libs/executor/src/dataInserter.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 45d6f55278..39bbc1bc69 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -216,7 +216,6 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY ASSERT(pColInfoData->info.type == pCol->type); if (colDataIsNull_s(pColInfoData, j)) { - SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); taosArrayPush(pVals, &cv); } else { From bff8226c0f6ef56860ce10c305ad5b5b2c69efa9 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 11 Apr 2024 08:27:17 +0800 Subject: [PATCH 07/25] enh: none column for primary key --- include/util/taoserror.h | 1 + source/common/src/tdataformat.c | 2 +- source/util/src/terror.c | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 2389079fd2..a3cae6a7db 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -767,6 +767,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_COL_PK_TYPE TAOS_DEF_ERROR_CODE(0, 0x2673) #define TSDB_CODE_PAR_INVALID_PK_OP TAOS_DEF_ERROR_CODE(0, 0x2674) #define TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x2675) +#define TSDB_CODE_PAR_PRIMARY_KEY_IS_NONE TAOS_DEF_ERROR_CODE(0, 0x2676) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) //planner diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index f8d2da0bd5..9686059052 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -102,7 +102,7 @@ typedef struct { } SRowBuildScanInfo; static FORCE_INLINE int32_t tRowBuildScanAddNone(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) { - if ((pTColumn->flags & COL_IS_KEY)) return TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; + if ((pTColumn->flags & COL_IS_KEY)) return TSDB_CODE_PAR_PRIMARY_KEY_IS_NONE; sinfo->numOfNone++; return 0; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 0812181c5c..8d80e3883d 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -629,6 +629,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SECOND_COL_PK, "primary key must be TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_PK_TYPE, "primary key column must be of type int, uint, bigint, ubigint, and varchar") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_PK_OP, "primary key column can not be added, modified, and dropped") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL, "Primary key column should not be null") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PRIMARY_KEY_IS_NONE, "Primary key column should not be none") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error") //planner From 79be7eea8c1e8b3c02f1a51b71f734b94d94e674 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 10:06:07 +0800 Subject: [PATCH 08/25] fix(tsdb): fix invalid read, and do some internal refactor. --- source/common/src/tdatablock.c | 5 ++ source/dnode/vnode/src/tsdb/tsdbRead2.c | 85 +++++++++------------- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 67 +++++++++++------ source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 7 +- 4 files changed, 90 insertions(+), 74 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 69b2a2e6a3..8d9ef6831d 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1331,6 +1331,11 @@ void* blockDataDestroy(SSDataBlock* pBlock) { return NULL; } + if (IS_VAR_DATA_TYPE(pBlock->info.pks[0].type)) { + taosMemoryFreeClear(pBlock->info.pks[0].pData); + taosMemoryFreeClear(pBlock->info.pks[1].pData); + } + blockDataFreeRes(pBlock); taosMemoryFreeClear(pBlock); return NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index c08face243..be01afb960 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -67,8 +67,7 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRo STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow); static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, SRowKey* pKey, STsdbReader* pReader); -static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo, - STsdbReader* pReader); +static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost); static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr, @@ -392,7 +391,7 @@ _err: return code; } -static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) { +void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) { pIter->order = order; pIter->index = -1; pIter->numOfBlocks = 0; @@ -403,8 +402,6 @@ static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) { } } -static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); } - static void initReaderStatus(SReaderStatus* pStatus) { pStatus->pTableIter = NULL; pStatus->loadFromFile = true; @@ -657,21 +654,19 @@ _end: return code; } -static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum, - SArray* pTableScanInfoList) { - size_t sizeInDisk = 0; - int64_t st = taosGetTimestampUs(); +static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum, + SArray* pTableScanInfoList) { + int32_t k = 0; + size_t sizeInDisk = 0; + int64_t st = taosGetTimestampUs(); + bool asc = ASCENDING_TRAVERSE(pReader->info.order); + STimeWindow w = pReader->info.window; + SBrinRecord* pRecord = NULL; + int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); + SBrinRecordIter iter = {0}; // clear info for the new file cleanupInfoForNextFileset(pReader->status.pTableMap); - - int32_t k = 0; - int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); - bool asc = ASCENDING_TRAVERSE(pReader->info.order); - STimeWindow w = pReader->info.window; - SBrinRecord* pRecord = NULL; - - SBrinRecordIter iter = {0}; initBrinRecordIter(&iter, pReader->pFileReader, pIndexList); while (((pRecord = getNextBrinRecord(&iter)) != NULL)) { @@ -743,14 +738,27 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN } if (pScanInfo->pBlockList == NULL) { - pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBrinRecord)); + pScanInfo->pBlockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); + if (pScanInfo->pBlockList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } - void* p1 = taosArrayPush(pScanInfo->pBlockList, pRecord); + if (pScanInfo->pBlockIdxList == NULL) { + pScanInfo->pBlockIdxList = taosArrayInit(4, sizeof(STableDataBlockIdx)); + if (pScanInfo->pBlockIdxList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + SFileDataBlockInfo blockInfo = {.tbBlockIdx = TARRAY_SIZE(pScanInfo->pBlockList)}; + recordToBlockInfo(&blockInfo, pRecord); + void* p1 = taosArrayPush(pScanInfo->pBlockList, &blockInfo); if (p1 == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + // todo: refactor to record the fileset skey/ekey if (pScanInfo->filesetWindow.skey > pRecord->firstKey.key.ts) { pScanInfo->filesetWindow.skey = pRecord->firstKey.key.ts; } @@ -1323,10 +1331,12 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p } static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo, - STableBlockScanInfo* pTableBlockScanInfo, int32_t* nextIndex, int32_t order, + STableBlockScanInfo* pScanInfo, int32_t* nextIndex, int32_t order, SBrinRecord* pRecord) { - bool asc = ASCENDING_TRAVERSE(order); - if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockIdxList) - 1) { + bool asc = ASCENDING_TRAVERSE(order); + int32_t step = asc ? 1 : -1; + + if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pScanInfo->pBlockIdxList) - 1) { return false; } @@ -1334,9 +1344,7 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo return false; } - int32_t step = asc ? 1 : -1; - STableDataBlockIdx* pTableDataBlockIdx = - taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step); + STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step); SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex); blockInfoToRecord(pRecord, p); @@ -1344,22 +1352,6 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo return true; } -static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) { - int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1; - int32_t index = pBlockIter->index; - - while (index < pBlockIter->numOfBlocks && index >= 0) { - SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index); - if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) { - return index; - } - - index += step; - } - - return -1; -} - static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t index, int32_t step) { if (index < 0 || index >= pBlockIter->numOfBlocks) { @@ -2706,7 +2698,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr } if (taosArrayGetSize(pIndexList) > 0 || pReader->status.pCurrentFileset->lvlArr->size > 0) { - code = doLoadFileBlock(pReader, pIndexList, pBlockNum, pTableList); + code = loadFileBlockBrinInfo(pReader, pIndexList, pBlockNum, pTableList); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pIndexList); return code; @@ -3154,23 +3146,14 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo; if (pBlockInfo) { - // todo handle -// STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); -// if (pScanInfo) { -// tsdbRowKeyAssign(&pDumpInfo->lastKey, &pScanInfo->lastProcKey); -// lastKey = pScanInfo->lastProcKey; -// } - pDumpInfo->totalRows = pBlockInfo->numRow; pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->info.order) ? 0 : pBlockInfo->numRow - 1; } else { pDumpInfo->totalRows = 0; pDumpInfo->rowIndex = 0; -// pDumpInfo->lastKey.key.ts = lastKey; } pDumpInfo->allDumped = false; -// pDumpInfo->lastKey = lastKey; } static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 2a7b0140df..7e81f1df36 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -167,6 +167,13 @@ int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, in return TSDB_CODE_SUCCESS; } +void clearRowKey(SRowKey* pKey) { + if (pKey == NULL || pKey->numOfPKs == 0 || (!IS_VAR_DATA_TYPE(pKey->pks[0].type))) { + return; + } + taosMemoryFree(pKey->pks[0].pData); +} + static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) { int32_t numOfPks = pReader->suppInfo.numOfPks; bool asc = ASCENDING_TRAVERSE(pReader->info.order); @@ -293,6 +300,11 @@ void clearBlockScanInfo(STableBlockScanInfo* p) { p->pBlockIdxList = taosArrayDestroy(p->pBlockIdxList); p->pMemDelData = taosArrayDestroy(p->pMemDelData); p->pFileDelData = taosArrayDestroy(p->pFileDelData); + + clearRowKey(&p->lastProcKey); + clearRowKey(&p->sttRange.skey); + clearRowKey(&p->sttRange.ekey); + clearRowKey(&p->sttKeyInfo.nextProcKey); } void destroyAllBlockScanInfo(SSHashObj* pTableMap) { @@ -415,7 +427,7 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v return pLeftBlock->offset > pRightBlock->offset ? 1 : -1; } -static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record, STsdbReader* pReader) { +void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record) { pBlockInfo->uid = record->uid; pBlockInfo->firstKey = record->firstKey.key.ts; pBlockInfo->lastKey = record->lastKey.key.ts; @@ -449,12 +461,36 @@ static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* recor } } +static void freeItem(void* pItem) { + SFileDataBlockInfo* p = pItem; + if (p->firstPKLen > 0) { + taosMemoryFreeClear(p->firstPk.pData); + } + + if (p->lastPKLen > 0) { + taosMemoryFreeClear(p->lastPk.pData); + } +} + +void clearDataBlockIterator(SDataBlockIter* pIter) { + pIter->index = -1; + pIter->numOfBlocks = 0; + taosArrayClearEx(pIter->blockList, freeItem); +} + +void cleanupDataBlockIterator(SDataBlockIter* pIter) { + pIter->index = -1; + pIter->numOfBlocks = 0; + taosArrayDestroyEx(pIter->blockList, freeItem); +} + int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList) { bool asc = ASCENDING_TRAVERSE(pReader->info.order); SBlockOrderSupporter sup = {0}; + clearDataBlockIterator(pBlockIter); + pBlockIter->numOfBlocks = numOfBlocks; - taosArrayClear(pBlockIter->blockList); // access data blocks according to the offset of each block in asc/desc order. int32_t numOfTables = taosArrayGetSize(pTableList); @@ -482,9 +518,9 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf; for (int32_t k = 0; k < num; ++k) { - SBrinRecord* pRecord = taosArrayGet(pTableScanInfo->pBlockList, k); + SFileDataBlockInfo* pBlockInfo = taosArrayGet(pTableScanInfo->pBlockList, k); sup.pDataBlockInfo[sup.numOfTables][k] = - (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pRecord->blockOffset, .pInfo = pTableScanInfo}; + (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pBlockInfo->blockOffset, .pInfo = pTableScanInfo}; cnt++; } @@ -499,20 +535,12 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 // since there is only one table qualified, blocks are not sorted if (sup.numOfTables == 1) { STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, 0); - if (pTableScanInfo->pBlockIdxList == NULL) { - pTableScanInfo->pBlockIdxList = taosArrayInit(numOfBlocks, sizeof(STableDataBlockIdx)); - } - for (int32_t i = 0; i < numOfBlocks; ++i) { - SFileDataBlockInfo blockInfo = {.tbBlockIdx = i}; - SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i); - recordToBlockInfo(&blockInfo, record, pReader); - - taosArrayPush(pBlockIter->blockList, &blockInfo); STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i}; taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx); } + taosArrayAddAll(pBlockIter->blockList, pTableScanInfo->pBlockList); pTableScanInfo->pBlockList = taosArrayDestroy(pTableScanInfo->pBlockList); int64_t et = taosGetTimestampUs(); @@ -540,18 +568,13 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 int32_t pos = tMergeTreeGetChosenIndex(pTree); int32_t index = sup.indexPerTable[pos]++; - SFileDataBlockInfo blockInfo = {.tbBlockIdx = index}; - SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index); - recordToBlockInfo(&blockInfo, record, pReader); + SFileDataBlockInfo* pBlockInfo = taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index); + taosArrayPush(pBlockIter->blockList, pBlockInfo); - taosArrayPush(pBlockIter->blockList, &blockInfo); STableBlockScanInfo* pTableScanInfo = sup.pDataBlockInfo[pos][index].pInfo; - if (pTableScanInfo->pBlockIdxList == NULL) { - size_t szTableDataBlocks = taosArrayGetSize(pTableScanInfo->pBlockList); - pTableScanInfo->pBlockIdxList = taosArrayInit(szTableDataBlocks, sizeof(STableDataBlockIdx)); - } - STableDataBlockIdx tableDataBlockIdx = {.globalIndex = numOfTotal}; + STableDataBlockIdx tableDataBlockIdx = {.globalIndex = numOfTotal}; taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx); + // set data block index overflow, in order to disable the offset comparator if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) { sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index bece22adad..0e7895c272 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -237,7 +237,6 @@ typedef struct SDataBlockIter { typedef struct SFileBlockDumpInfo { int32_t totalRows; int32_t rowIndex; -// int64_t lastKey; // STsdbRowKey lastKey; // this key should be removed bool allDumped; } SFileBlockDumpInfo; @@ -338,6 +337,7 @@ int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileRead int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo, TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList, int32_t numOfTables); +void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record); void destroyLDataIter(SLDataIter* pIter); int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet); @@ -347,6 +347,11 @@ bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STab bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order); int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2); int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc); +void clearRowKey(SRowKey* pKey); + +void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order); +void clearDataBlockIterator(SDataBlockIter* pIter); +void cleanupDataBlockIterator(SDataBlockIter* pIter); typedef struct { SArray* pTombData; From 9f95360eccac219a8ad9a1f7a1ed05a470627d9b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 10:14:15 +0800 Subject: [PATCH 09/25] fix(tsdb): fix error in read rowkey with null. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index be01afb960..8f4c719a35 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -173,11 +173,16 @@ static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) { for (int32_t i = 0; i < pRow->numOfPKs; i++) { pKey->pks[i].type = indices[i].type; + uint8_t *tdata = data + indices[i].offset; + if (pRow->flag >> 4) { + tdata += tGetI16v(tdata, NULL); + } + if (IS_VAR_DATA_TYPE(indices[i].type)) { tGetU32v(pKey->pks[i].pData, &pKey->pks[i].nData); - pKey->pks[i].pData = memcpy(pKey->pks[i].pData, data + indices[i].offset, pKey->pks[i].nData); + pKey->pks[i].pData = memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData); pKey->pks[i].pData += pKey->pks[i].nData; - } else { + } else { // todo pKey->pks[i].val = *(int64_t*) (data + indices[i].offset); } } @@ -4353,9 +4358,11 @@ void tsdbReaderClose2(STsdbReader* pReader) { SReadCostSummary* pCost = &pReader->cost; SFilesetIter* pFilesetIter = &pReader->status.fileIter; if (pFilesetIter->pSttBlockReader != NULL) { - SSttBlockReader* pLReader = pFilesetIter->pSttBlockReader; - tMergeTreeClose(&pLReader->mergeTree); - taosMemoryFree(pLReader); + SSttBlockReader* pSttBlockReader = pFilesetIter->pSttBlockReader; + tMergeTreeClose(&pSttBlockReader->mergeTree); + + clearRowKey(&pSttBlockReader->currentKey); + taosMemoryFree(pSttBlockReader); } destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost); From 1f85a47cabe964d046f401612d80da502ea9ef67 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 10:47:05 +0800 Subject: [PATCH 10/25] fix(tsdb): fix memory leak. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 18 ++++++------ source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 34 ++++++++++++---------- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 6 ++-- 3 files changed, 31 insertions(+), 27 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 8f4c719a35..3409559867 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -182,8 +182,8 @@ static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) { tGetU32v(pKey->pks[i].pData, &pKey->pks[i].nData); pKey->pks[i].pData = memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData); pKey->pks[i].pData += pKey->pks[i].nData; - } else { // todo - pKey->pks[i].val = *(int64_t*) (data + indices[i].offset); + } else { + memcpy(&pKey->pks[i].val, data + indices[i].offset, tDataTypes[pKey->pks[i].type].bytes); } } } @@ -396,14 +396,14 @@ _err: return code; } -void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) { +void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk) { pIter->order = order; pIter->index = -1; pIter->numOfBlocks = 0; if (pIter->blockList == NULL) { pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); } else { - taosArrayClear(pIter->blockList); + clearDataBlockIterator(pIter, hasPk); } } @@ -3183,7 +3183,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList); } else { // no block data, only last block exists tBlockDataReset(&pReader->status.fileBlockData); - resetDataBlockIterator(pBlockIter, pReader->info.order); + resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0); resetTableListIndex(&pReader->status); } @@ -3293,7 +3293,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } tBlockDataReset(pBlockData); - resetDataBlockIterator(pBlockIter, pReader->info.order); + resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0); resetTableListIndex(&pReader->status); ERetrieveType type = doReadDataFromSttFiles(pReader); @@ -4138,7 +4138,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { } initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); - resetDataBlockIterator(&pStatus->blockIter, pReader->info.order); + resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0); int32_t code = TSDB_CODE_SUCCESS; if (pStatus->fileIter.numOfFiles == 0) { @@ -4342,7 +4342,7 @@ void tsdbReaderClose2(STsdbReader* pReader) { taosMemoryFree(pSupInfo->colId); tBlockDataDestroy(&pReader->status.fileBlockData); - cleanupDataBlockIterator(&pReader->status.blockIter); + cleanupDataBlockIterator(&pReader->status.blockIter, pReader->suppInfo.numOfPks > 0); size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); if (pReader->status.pTableMap != NULL) { @@ -5018,7 +5018,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap); initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); - resetDataBlockIterator(pBlockIter, pReader->info.order); + resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0); resetTableListIndex(&pReader->status); bool asc = ASCENDING_TRAVERSE(pReader->info.order); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 7e81f1df36..d93c8c8f79 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -461,34 +461,38 @@ void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record) { } } -static void freeItem(void* pItem) { +static void freePkItem(void* pItem) { SFileDataBlockInfo* p = pItem; - if (p->firstPKLen > 0) { - taosMemoryFreeClear(p->firstPk.pData); - } + taosMemoryFreeClear(p->firstPk.pData); + taosMemoryFreeClear(p->lastPk.pData); +} - if (p->lastPKLen > 0) { - taosMemoryFreeClear(p->lastPk.pData); +void clearDataBlockIterator(SDataBlockIter* pIter, bool hasPk) { + pIter->index = -1; + pIter->numOfBlocks = 0; + + if (hasPk) { + taosArrayClearEx(pIter->blockList, freePkItem); + } else { + taosArrayClear(pIter->blockList); } } -void clearDataBlockIterator(SDataBlockIter* pIter) { +void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk) { pIter->index = -1; pIter->numOfBlocks = 0; - taosArrayClearEx(pIter->blockList, freeItem); -} - -void cleanupDataBlockIterator(SDataBlockIter* pIter) { - pIter->index = -1; - pIter->numOfBlocks = 0; - taosArrayDestroyEx(pIter->blockList, freeItem); + if (hasPk) { + taosArrayDestroyEx(pIter->blockList, freePkItem); + } else { + taosArrayClear(pIter->blockList); + } } int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList) { bool asc = ASCENDING_TRAVERSE(pReader->info.order); SBlockOrderSupporter sup = {0}; - clearDataBlockIterator(pBlockIter); + clearDataBlockIterator(pBlockIter, pReader->suppInfo.numOfPks > 0); pBlockIter->numOfBlocks = numOfBlocks; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 0e7895c272..94909aabf4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -349,9 +349,9 @@ int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2); int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc); void clearRowKey(SRowKey* pKey); -void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order); -void clearDataBlockIterator(SDataBlockIter* pIter); -void cleanupDataBlockIterator(SDataBlockIter* pIter); +void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk); +void clearDataBlockIterator(SDataBlockIter* pIter, bool hasPk); +void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk); typedef struct { SArray* pTombData; From 563efeb560a9317c927a9798fc0c8d4c9d220127 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 11:09:57 +0800 Subject: [PATCH 11/25] fix(tsdb): fix memory leak. --- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index d93c8c8f79..1fba39227c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -484,7 +484,7 @@ void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk) { if (hasPk) { taosArrayDestroyEx(pIter->blockList, freePkItem); } else { - taosArrayClear(pIter->blockList); + taosArrayDestroy(pIter->blockList); } } From e0f7b14ffa6b06d5da3c181358d4bcb05a83988e Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 11 Apr 2024 11:41:05 +0800 Subject: [PATCH 12/25] fix: eliminate double free --- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 1fba39227c..c82363c921 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -171,7 +171,7 @@ void clearRowKey(SRowKey* pKey) { if (pKey == NULL || pKey->numOfPKs == 0 || (!IS_VAR_DATA_TYPE(pKey->pks[0].type))) { return; } - taosMemoryFree(pKey->pks[0].pData); + taosMemoryFreeClear(pKey->pks[0].pData); } static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) { From a2a237a4b0ae6c6d13f0609e55535cf8408d7fed Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 14:31:48 +0800 Subject: [PATCH 13/25] fix(query): avoid process data that belongs to the next session window. --- source/libs/executor/src/timewindowoperator.c | 27 ++++++++++--------- source/libs/function/src/builtinsimpl.c | 6 ++--- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8fb8aaa69d..e62763ebc5 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1339,23 +1339,24 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator // The gap is less than the threshold, so it belongs to current session window that has been opened already. doKeepTuple(pRowSup, tsList[j], gid); } else { // start a new session window - SResultRow* pResult = NULL; + if (pRowSup->numOfRows > 0) { // handled data that belongs to the previous session window + SResultRow* pResult = NULL; - // keep the time window for the closed time window. - STimeWindow window = pRowSup->win; + // keep the time window for the closed time window. + STimeWindow window = pRowSup->win; + int32_t ret = + setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, + numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + } - pRowSup->win.ekey = pRowSup->win.skey; - int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, - numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + // pInfo->numOfRows data belong to the current session window + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0); + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); } - // pInfo->numOfRows data belong to the current session window - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, pBlock->info.rows, numOfOutput); - // here we start a new session window doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); doKeepTuple(pRowSup, tsList[j], gid); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 6be95b786b..6f3f5f7e5c 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2657,8 +2657,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; - // please ref. to the comment in lastRowFunction for the reason why disabling the opt version of last/first - // function. + // please ref. to the comment in lastRowFunction for the reason why disabling the opt version of last/first function. #if 0 if (blockDataOrder == TSDB_ORDER_ASC) { for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { @@ -2709,6 +2708,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } } #else + +// todo refactor if (!pInputCol->hasNull && !pCtx->hasPrimaryKey) { numOfElems = 1; @@ -2790,7 +2791,6 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } } - // SET_VAL(pResInfo, numOfElems, 1); return TSDB_CODE_SUCCESS; } From 312f7864c81639f81c2f7173f230ebae7b6da660 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 14:32:17 +0800 Subject: [PATCH 14/25] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tqSink.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 2b99c6f6ef..b060de029c 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -622,8 +622,6 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat } SRow* pRow = NULL; - tqInfo("result column flag:%d", pTSchema->columns[1].flags); - code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow); if (code != TSDB_CODE_SUCCESS) { tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE); From a3bbf3ba0e416f8b93130d7436e125b8285058b0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 15:16:21 +0800 Subject: [PATCH 15/25] fix(tsdb): check numOfPks before load pk --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 54 ++++++++++++--------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index a14f866bcc..9feae4c57e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -399,28 +399,31 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl rows - i); taosArrayAddBatch(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, i * sizeof(int64_t)), rows - i); - SValue vFirst = {0}, vLast = {0}; - for (int32_t f = i; f < rows; ++f) { - int32_t code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst); - if (code) { - break; + if (block.numOfPKs > 0) { + SValue vFirst = {0}, vLast = {0}; + for (int32_t f = i; f < rows; ++f) { + int32_t code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst); + if (code) { + break; + } + + tValueDupPayload(&vFirst); + taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst); + + // todo add api to clone the original data + code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast); + if (code) { + break; + } + + tValueDupPayload(&vLast); + taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast); } - - tValueDupPayload(&vFirst); - taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst); - - // todo add api to clone the original data - code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast); - if (code) { - break; - } - - tValueDupPayload(&vLast); - taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast); } } else { - STbStatisRecord record; + STbStatisRecord record = {0}; + while (i < rows) { tStatisBlockGet(&block, i, &record); if (record.suid != suid) { @@ -433,15 +436,18 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl taosArrayPush(pBlockLoadInfo->info.pFirstTs, &record.firstKey.ts); taosArrayPush(pBlockLoadInfo->info.pLastTs, &record.lastKey.ts); - SValue s = record.firstKey.pks[0]; - tValueDupPayload(&s); + if (record.firstKey.numOfPKs > 0) { + SValue s = record.firstKey.pks[0]; + tValueDupPayload(&s); - taosArrayPush(pBlockLoadInfo->info.pFirstKey, &s); + taosArrayPush(pBlockLoadInfo->info.pFirstKey, &s); - s = record.lastKey.pks[0]; - tValueDupPayload(&s); + s = record.lastKey.pks[0]; + tValueDupPayload(&s); + + taosArrayPush(pBlockLoadInfo->info.pLastKey, &s); + } - taosArrayPush(pBlockLoadInfo->info.pLastKey, &s); i += 1; } } From eaf44ec603e87a9d4a2c342855aee929f3105047 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 15:57:04 +0800 Subject: [PATCH 16/25] fix(tsdb):set the initial size of pk in ssdatablock. --- source/libs/executor/src/executil.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 4f055cb928..a9ab8b783c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -255,6 +255,7 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) { SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i); + if (pItem->isPk) { SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId); pBlockInfo->pks[0].type = pInfoData->info.type; @@ -271,6 +272,9 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) taosMemoryFreeClear(pBlockInfo->pks[0].pData); return TSDB_CODE_OUT_OF_MEMORY; } + + pBlockInfo->pks[0].nData = pInfoData->info.bytes; + pBlockInfo->pks[1].nData = pInfoData->info.bytes; } } } From a951af2492b99df52bddb2724c8caac86d363316 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 16:21:41 +0800 Subject: [PATCH 17/25] fix(query): check the rows before apply the agg in session window. --- source/libs/executor/src/timewindowoperator.c | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 57c038e75a..9474db8553 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1332,23 +1332,25 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator // The gap is less than the threshold, so it belongs to current session window that has been opened already. doKeepTuple(pRowSup, tsList[j], gid); } else { // start a new session window - SResultRow* pResult = NULL; + // start a new session window + if (pRowSup->numOfRows > 0) { // handled data that belongs to the previous session window + SResultRow* pResult = NULL; - // keep the time window for the closed time window. - STimeWindow window = pRowSup->win; + // keep the time window for the closed time window. + STimeWindow window = pRowSup->win; + int32_t ret = + setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, + numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + } - pRowSup->win.ekey = pRowSup->win.skey; - int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, - numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + // pInfo->numOfRows data belong to the current session window + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0); + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); } - // pInfo->numOfRows data belong to the current session window - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, pBlock->info.rows, numOfOutput); - // here we start a new session window doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); doKeepTuple(pRowSup, tsList[j], gid); From c39fcc5194eefc13c79ccc458a39e7e703c20543 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Thu, 11 Apr 2024 16:55:01 +0800 Subject: [PATCH 18/25] fix: arb assigned step down need to reset token --- source/libs/sync/src/syncAppendEntriesReply.c | 16 +---- source/libs/sync/src/syncMain.c | 71 ++----------------- 2 files changed, 8 insertions(+), 79 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index ede4dc07e1..4b7ed59039 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -78,21 +78,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely); if (ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { if (commitIndex >= ths->assignedCommitIndex) { - terrno = TSDB_CODE_SUCCESS; - raftStoreNextTerm(ths); - if (terrno != TSDB_CODE_SUCCESS) { - sError("vgId:%d, failed to update term, reason:%s", ths->vgId, tstrerror(terrno)); - return -1; - } - if (syncNodeAssignedLeader2Leader(ths) != 0) { - sError("vgId:%d, failed to change state from assigned leader to leader", ths->vgId); - return -1; - } - - taosThreadMutexLock(&ths->arbTokenMutex); - syncUtilGenerateArbToken(ths->myNodeInfo.nodeId, ths->vgId, ths->arbToken); - sInfo("vgId:%d, assigned leader to leader, arbToken:%s", ths->vgId, ths->arbToken); - taosThreadMutexUnlock(&ths->arbTokenMutex); + syncNodeStepDown(ths, pMsg->term); } } else { (void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 85aa3a2796..fbdb5f4201 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -503,20 +503,6 @@ int32_t syncEndSnapshot(int64_t rid) { return code; } -#ifdef BUILD_NO_CALL -int32_t syncStepDown(int64_t rid, SyncTerm newTerm) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - sError("sync step down error"); - return -1; - } - - syncNodeStepDown(pSyncNode, newTerm); - syncNodeRelease(pSyncNode); - return 0; -} -#endif - bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) { if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; @@ -1277,7 +1263,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { // start in syncNodeStart // start raft - // syncNodeBecomeFollower(pSyncNode); int64_t timeNow = taosGetTimestampMs(); pSyncNode->startTime = timeNow; @@ -1848,20 +1833,6 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde // persist cfg syncWriteCfgFile(pSyncNode); - -#if 0 - // change isStandBy to normal (election timeout) - if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - syncNodeBecomeLeader(pSyncNode, ""); - - // Raft 3.6.2 Committing entries from previous terms - syncNodeAppendNoop(pSyncNode); - // syncMaybeAdvanceCommitIndex(pSyncNode); - - } else { - syncNodeBecomeFollower(pSyncNode, ""); - } -#endif } else { // persist cfg syncWriteCfgFile(pSyncNode); @@ -1874,18 +1845,6 @@ _END: } // raft state change -------------- -#ifdef BUILD_NO_CALL -void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { - if (term > raftStoreGetTerm(pSyncNode)) { - raftStoreSetTerm(pSyncNode, term); - char tmpBuf[64]; - snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term); - syncNodeBecomeFollower(pSyncNode, tmpBuf); - raftStoreClearVote(pSyncNode); - } -} -#endif - void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) { if (term > raftStoreGetTerm(pSyncNode)) { raftStoreSetTerm(pSyncNode, term); @@ -1903,13 +1862,19 @@ void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) { sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm); } while (0); + if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { + taosThreadMutexLock(&pSyncNode->arbTokenMutex); + syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken); + sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken); + taosThreadMutexUnlock(&pSyncNode->arbTokenMutex); + } + if (currentTerm < newTerm) { raftStoreSetTerm(pSyncNode, newTerm); char tmpBuf[64]; snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm); syncNodeBecomeFollower(pSyncNode, tmpBuf); raftStoreClearVote(pSyncNode); - } else { if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) { syncNodeBecomeFollower(pSyncNode, "step down"); @@ -2170,28 +2135,6 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { sNTrace(pSyncNode, "follower to candidate"); } -#ifdef BUILD_NO_CALL -void syncNodeLeader2Follower(SSyncNode* pSyncNode) { - ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER); - syncNodeBecomeFollower(pSyncNode, "leader to follower"); - SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); - sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, - pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex); - - sNTrace(pSyncNode, "leader to follower"); -} - -void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { - ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); - syncNodeBecomeFollower(pSyncNode, "candidate to follower"); - SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); - sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, - pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex); - - sNTrace(pSyncNode, "candidate to follower"); -} -#endif - int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER); syncNodeBecomeLeader(pSyncNode, "assigned leader to leader"); From b4ea80b637592f3efd725261fd7d18640f4a607c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 17:09:07 +0800 Subject: [PATCH 19/25] fix(tsdb): prepare the pk buf for blocks generated by reader. --- source/common/src/tdatablock.c | 2 ++ source/dnode/vnode/src/tsdb/tsdbRead2.c | 38 +++++++++++++++++++------ source/libs/executor/src/executil.c | 2 ++ 3 files changed, 33 insertions(+), 9 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 8d9ef6831d..20c4fa64c4 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1502,10 +1502,12 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { pVal->type = pDataBlock->info.pks[0].type; pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[0].nData); + pVal->nData = pDataBlock->info.pks[0].nData; pVal = &pBlock->info.pks[1]; pVal->type = pDataBlock->info.pks[1].type; pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData); + pVal->nData = pDataBlock->info.pks[1].nData; } if (copyData) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 3409559867..54d5c54788 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -485,7 +485,7 @@ void tsdbReleaseDataBlock2(STsdbReader* pReader) { } static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacity, SSDataBlock* pResBlock, - SQueryTableDataCond* pCond) { + SQueryTableDataCond* pCond, SBlockLoadSuppInfo* pSup) { pResBlockInfo->capacity = capacity; pResBlockInfo->pResBlock = pResBlock; terrno = 0; @@ -493,6 +493,28 @@ static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacit if (pResBlockInfo->pResBlock == NULL) { pResBlockInfo->freeBlock = true; pResBlockInfo->pResBlock = createResBlock(pCond, pResBlockInfo->capacity); + + if (pSup->numOfPks > 0) { + SSDataBlock* p = pResBlockInfo->pResBlock; + p->info.pks[0].type = pSup->pk.type; + p->info.pks[1].type = pSup->pk.type; + + if (IS_VAR_DATA_TYPE(pSup->pk.type)) { + p->info.pks[0].pData = taosMemoryCalloc(1, pSup->pk.bytes); + if (p->info.pks[0].pData == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + p->info.pks[1].pData = taosMemoryCalloc(1, pSup->pk.bytes); + if (p->info.pks[1].pData == NULL) { + taosMemoryFreeClear(p->info.pks[0].pData); + return TSDB_CODE_OUT_OF_MEMORY; + } + + p->info.pks[0].nData = pSup->pk.bytes; + p->info.pks[1].nData = pSup->pk.bytes; + } + } } else { pResBlockInfo->freeBlock = false; } @@ -525,14 +547,9 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL; pReader->type = pCond->type; - + pReader->bFilesetDelimited = false; pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket - code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond); - if (code != TSDB_CODE_SUCCESS) { - goto _end; - } - if (pCond->numOfCols <= 0) { tsdbError("vgId:%d, invalid column number %d in query cond, %s", TD_VID(pVnode), pCond->numOfCols, idstr); code = TSDB_CODE_INVALID_PARA; @@ -548,6 +565,11 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void pReader->pkComparFn = getComparFunc(pSup->pk.type, 0); } + code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond, pSup); + if (code != TSDB_CODE_SUCCESS) { + goto _end; + } + code = tBlockDataCreate(&pReader->status.fileBlockData); if (code != TSDB_CODE_SUCCESS) { terrno = code; @@ -569,8 +591,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void goto _end; } - pReader->bFilesetDelimited = false; - tsdbInitReaderLock(pReader); tsem_init(&pReader->resumeAfterSuspend, 0, 0); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index a9ab8b783c..d152baf502 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -276,6 +276,8 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) pBlockInfo->pks[0].nData = pInfoData->info.bytes; pBlockInfo->pks[1].nData = pInfoData->info.bytes; } + + break; } } From 14242331b9b29692d5809710fac908e5394d1471 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 17:25:58 +0800 Subject: [PATCH 20/25] fix(tsdb): fix error in decode key. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 5 ++--- source/libs/executor/src/scanoperator.c | 2 ++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 54d5c54788..6267eb1263 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -179,9 +179,8 @@ static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) { } if (IS_VAR_DATA_TYPE(indices[i].type)) { - tGetU32v(pKey->pks[i].pData, &pKey->pks[i].nData); - pKey->pks[i].pData = memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData); - pKey->pks[i].pData += pKey->pks[i].nData; + tdata += tGetU32v(tdata, &pKey->pks[i].nData); + memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData); } else { memcpy(&pKey->pks[i].val, data + indices[i].offset, tDataTypes[pKey->pks[i].type].bytes); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index bec5a73198..7274811812 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4133,6 +4133,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { if (pInfo->rtnNextDurationBlocks) { qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d", GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks); + if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) { pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]; ++pInfo->nextDurationBlocksIdx; @@ -4141,6 +4142,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { blockDataDestroy(pInfo->nextDurationBlocks[i]); pInfo->nextDurationBlocks[i] = NULL; } + pInfo->rtnNextDurationBlocks = false; pInfo->nextDurationBlocksIdx = 0; pInfo->numNextDurationBlocks = 0; From 5ee40fb5c9671aea63e8f2c050cd14b1ecac0218 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 18:38:17 +0800 Subject: [PATCH 21/25] fix(tsdb): fix invalid free --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 18 +++++++++++------- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 10 +++++----- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 3 ++- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 6267eb1263..047ee66b8f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -395,14 +395,18 @@ _err: return code; } -void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk) { +bool shouldFreePkBuf(SBlockLoadSuppInfo *pSupp) { + return pSupp->numOfPks > 0 && IS_VAR_DATA_TYPE(pSupp->pk.type); +} + +void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool needFree) { pIter->order = order; pIter->index = -1; pIter->numOfBlocks = 0; if (pIter->blockList == NULL) { pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); } else { - clearDataBlockIterator(pIter, hasPk); + clearDataBlockIterator(pIter, needFree); } } @@ -3202,7 +3206,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList); } else { // no block data, only last block exists tBlockDataReset(&pReader->status.fileBlockData); - resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0); + resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); resetTableListIndex(&pReader->status); } @@ -3312,7 +3316,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } tBlockDataReset(pBlockData); - resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0); + resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); resetTableListIndex(&pReader->status); ERetrieveType type = doReadDataFromSttFiles(pReader); @@ -4157,7 +4161,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { } initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); - resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0); + resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); int32_t code = TSDB_CODE_SUCCESS; if (pStatus->fileIter.numOfFiles == 0) { @@ -4361,7 +4365,7 @@ void tsdbReaderClose2(STsdbReader* pReader) { taosMemoryFree(pSupInfo->colId); tBlockDataDestroy(&pReader->status.fileBlockData); - cleanupDataBlockIterator(&pReader->status.blockIter, pReader->suppInfo.numOfPks > 0); + cleanupDataBlockIterator(&pReader->status.blockIter, shouldFreePkBuf(&pReader->suppInfo)); size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); if (pReader->status.pTableMap != NULL) { @@ -5037,7 +5041,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap); initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); - resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0); + resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); resetTableListIndex(&pReader->status); bool asc = ASCENDING_TRAVERSE(pReader->info.order); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index c82363c921..d049aed496 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -467,21 +467,21 @@ static void freePkItem(void* pItem) { taosMemoryFreeClear(p->lastPk.pData); } -void clearDataBlockIterator(SDataBlockIter* pIter, bool hasPk) { +void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree) { pIter->index = -1; pIter->numOfBlocks = 0; - if (hasPk) { + if (needFree) { taosArrayClearEx(pIter->blockList, freePkItem); } else { taosArrayClear(pIter->blockList); } } -void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk) { +void cleanupDataBlockIterator(SDataBlockIter* pIter, bool needFree) { pIter->index = -1; pIter->numOfBlocks = 0; - if (hasPk) { + if (needFree) { taosArrayDestroyEx(pIter->blockList, freePkItem); } else { taosArrayDestroy(pIter->blockList); @@ -492,7 +492,7 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 bool asc = ASCENDING_TRAVERSE(pReader->info.order); SBlockOrderSupporter sup = {0}; - clearDataBlockIterator(pBlockIter, pReader->suppInfo.numOfPks > 0); + clearDataBlockIterator(pBlockIter, shouldFreePkBuf(&pReader->suppInfo)); pBlockIter->numOfBlocks = numOfBlocks; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 94909aabf4..49bb92c7ce 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -349,8 +349,9 @@ int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2); int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc); void clearRowKey(SRowKey* pKey); +bool shouldFreePkBuf(SBlockLoadSuppInfo *pSupp); void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk); -void clearDataBlockIterator(SDataBlockIter* pIter, bool hasPk); +void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree); void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk); typedef struct { From 8f92dc614d7409054e49cbb2b2672216133d9590 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 19:01:23 +0800 Subject: [PATCH 22/25] fix(tsdb):add some logs. --- source/common/src/tdatablock.c | 12 ++++++++---- source/libs/executor/src/tsort.c | 4 ++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 20c4fa64c4..fdb72a9d5f 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1332,6 +1332,8 @@ void* blockDataDestroy(SSDataBlock* pBlock) { } if (IS_VAR_DATA_TYPE(pBlock->info.pks[0].type)) { + uInfo("1====free pk:%p, %p pBlock", pBlock->info.pks[0].pData, pBlock); + uInfo("2====free pk:%p, %p pBlock", pBlock->info.pks[1].pData, pBlock); taosMemoryFreeClear(pBlock->info.pks[0].pData); taosMemoryFreeClear(pBlock->info.pks[1].pData); } @@ -1503,11 +1505,13 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { pVal->type = pDataBlock->info.pks[0].type; pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[0].nData); pVal->nData = pDataBlock->info.pks[0].nData; + memcpy(pVal->pData, pDataBlock->info.pks[0].pData, pVal->nData); - pVal = &pBlock->info.pks[1]; - pVal->type = pDataBlock->info.pks[1].type; - pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData); - pVal->nData = pDataBlock->info.pks[1].nData; + SValue* p = &pBlock->info.pks[1]; + p->type = pDataBlock->info.pks[1].type; + p->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData); + p->nData = pDataBlock->info.pks[1].nData; + memcpy(p->pData, pDataBlock->info.pks[1].pData, p->nData); } if (copyData) { diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 44404c345e..3dbf29e3a8 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1649,8 +1649,8 @@ static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSH } static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { - size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); - SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); + size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); + SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); size_t maxBufSize = (pHandle->bSortByRowId) ? pHandle->extRowsMemSize : (pHandle->numOfPages * pHandle->pageSize); From b60cc321f3acbc4385049d875803f0c6d4a54099 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 19:35:14 +0800 Subject: [PATCH 23/25] fix(tsdb):add some logs. --- source/common/src/tdatablock.c | 5 +++ source/libs/executor/inc/executorInt.h | 29 +++++++------- source/libs/executor/src/scanoperator.c | 50 ++++++++++++------------- source/libs/executor/src/tsort.c | 6 +-- 4 files changed, 45 insertions(+), 45 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index fdb72a9d5f..fd56b5f5ae 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1485,6 +1485,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { SSDataBlock* pBlock = createDataBlock(); pBlock->info = pDataBlock->info; + pBlock->info.rows = 0; pBlock->info.capacity = 0; pBlock->info.rowSize = 0; @@ -1512,6 +1513,10 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { p->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData); p->nData = pDataBlock->info.pks[1].nData; memcpy(p->pData, pDataBlock->info.pks[1].pData, p->nData); + uInfo("===========clone block, with varchar, %p, 0---addr:%p, src:%p, %p", pBlock, pBlock->info.pks[0].pData, pDataBlock, pDataBlock->info.pks[0].pData); + uInfo("===========clone block, with varchar, %p, 1---addr:%p, src:%p, %p", pBlock, pBlock->info.pks[1].pData, pDataBlock, pDataBlock->info.pks[1].pData); + } else { + uInfo("===========clone block without varchar pk, %p, src:%p", pBlock, pDataBlock); } if (copyData) { diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c2032554b6..8e6f637d81 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -340,22 +340,21 @@ typedef struct STableMergeScanInfo { int32_t scanTimes; int32_t readIdx; SSDataBlock* pResBlock; - SSampleExecInfo sample; // sample execution info - SSHashObj* mTableNumRows; // uid->num of table rows - SHashObj* mSkipTables; - int64_t mergeLimit; + SSampleExecInfo sample; // sample execution info + SSHashObj* mTableNumRows; // uid->num of table rows + SHashObj* mSkipTables; + int64_t mergeLimit; SSortExecInfo sortExecInfo; - bool needCountEmptyTable; - bool bGroupProcessed; // the group return data means processed - bool filesetDelimited; - bool bNewFilesetEvent; - bool bNextDurationBlockEvent; - int32_t numNextDurationBlocks; - SSDataBlock* nextDurationBlocks[2]; - bool rtnNextDurationBlocks; - int32_t nextDurationBlocksIdx; - - bool bSortRowId; + bool needCountEmptyTable; + bool bGroupProcessed; // the group return data means processed + bool filesetDelimited; + bool bNewFilesetEvent; + bool bNextDurationBlockEvent; + int32_t numNextDurationBlocks; + SSDataBlock* nextDurationBlocks[2]; + bool rtnNextDurationBlocks; + int32_t nextDurationBlocksIdx; + bool bSortRowId; STmsSubTablesMergeInfo* pSubTablesMergeInfo; } STableMergeScanInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7274811812..1a47895c05 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4069,14 +4069,13 @@ static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) { } static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) { - STableMergeScanInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; - - SSDataBlock* pBlock = pInfo->pReaderBlock; - int32_t code = 0; - bool hasNext = false; - STsdbReader* reader = pInfo->base.dataReader; + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SSDataBlock* pBlock = pInfo->pReaderBlock; + int32_t code = 0; + bool hasNext = false; + STsdbReader* reader = pInfo->base.dataReader; code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); if (code != 0) { @@ -4112,27 +4111,23 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe *pSkipped = true; return; } + return; } static SSDataBlock* getBlockForTableMergeScan(void* param) { STableMergeScanSortSourceParam* source = param; - SOperatorInfo* pOperator = source->pOperator; - STableMergeScanInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; - SSDataBlock* pBlock = NULL; - int32_t code = 0; + SOperatorInfo* pOperator = source->pOperator; + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SSDataBlock* pBlock = NULL; + int64_t st = taosGetTimestampUs(); - int64_t st = taosGetTimestampUs(); - bool hasNext = false; - - STsdbReader* reader = pInfo->base.dataReader; while (true) { if (pInfo->rtnNextDurationBlocks) { - qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d", - GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks); + qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d", + GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks); if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) { pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]; @@ -4149,13 +4144,12 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { continue; } } else { - bool bFinished = false; bool bSkipped = false; doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped); pBlock = pInfo->pReaderBlock; - qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d", - GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent); + qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d", + GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent); if (bFinished) { pInfo->bNewFilesetEvent = false; break; @@ -4166,15 +4160,18 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true); ++pInfo->numNextDurationBlocks; if (pInfo->numNextDurationBlocks > 2) { - qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), pInfo->numNextDurationBlocks); + qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), + pInfo->numNextDurationBlocks); pInfo->bNewFilesetEvent = false; break; } } + if (pInfo->bNewFilesetEvent) { pInfo->rtnNextDurationBlocks = true; return NULL; } + if (pInfo->bNextDurationBlockEvent) { pInfo->bNextDurationBlockEvent = false; continue; @@ -4182,19 +4179,18 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { } if (bSkipped) continue; } + pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid); pOperator->resultInfo.totalRows += pBlock->info.rows; - pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - + return pBlock; } return NULL; } - SArray* generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order) { SArray* pSortInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); SBlockOrderInfo biTs = {0}; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 3dbf29e3a8..cd1a858175 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1688,12 +1688,12 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if (pBlk != NULL) { SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId); - int64_t firstRowTs = *(int64_t*)tsCol->pData; - if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || + int64_t firstRowTs = *(int64_t*)tsCol->pData; + if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || (pOrigTsOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) { if (bExtractedBlock) { blockDataDestroy(pBlk); - } + } continue; } } From efdd0c8a2a1b46fb98b49bec8da91226fe274545 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 22:34:47 +0800 Subject: [PATCH 24/25] refactor: do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 19 +++++++------------ source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 20 ++++++++++---------- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 2 -- source/libs/executor/src/executil.c | 1 + 4 files changed, 18 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 047ee66b8f..b7f97771da 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -119,7 +119,7 @@ int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) { return ret > 0 ? 1 : -1; } } else { - return comparFn(&p1->pks[0].val, &p2->pks[0].val); + return p1->pks[0].val - p2->pks[0].val; } } } @@ -396,7 +396,7 @@ _err: } bool shouldFreePkBuf(SBlockLoadSuppInfo *pSupp) { - return pSupp->numOfPks > 0 && IS_VAR_DATA_TYPE(pSupp->pk.type); + return (pSupp->numOfPks > 0) && IS_VAR_DATA_TYPE(pSupp->pk.type); } void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool needFree) { @@ -824,18 +824,13 @@ static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, S return TSDB_CODE_SUCCESS; } -// todo keep the the last returned key static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) { -// int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1; pDumpInfo->allDumped = true; -// ASSERT(0); -// pDumpInfo->lastKey.key.ts = maxKey + step; } static void updateLastKeyInfo(SRowKey* pKey, SFileDataBlockInfo* pBlockInfo, SDataBlockInfo* pInfo, int32_t numOfPks, bool asc) { pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey; - pKey->numOfPKs = numOfPks; if (pKey->numOfPKs <= 0) { return; @@ -845,7 +840,7 @@ static void updateLastKeyInfo(SRowKey* pKey, SFileDataBlockInfo* pBlockInfo, SDa pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val; } else { uint8_t* p = asc ? pBlockInfo->lastPk.pData : pBlockInfo->firstPk.pData; - pKey->pks[0].nData = asc ? pBlockInfo->lastPKLen : pBlockInfo->firstPKLen; + pKey->pks[0].nData = asc ? varDataLen(pBlockInfo->lastPk.pData) : varDataLen(pBlockInfo->firstPk.pData); memcpy(pKey->pks[0].pData, p, pKey->pks[0].nData); } } @@ -2841,11 +2836,11 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn pInfo->pks[0].val = pBlockInfo->firstPk.val; pInfo->pks[1].val = pBlockInfo->lastPk.val; } else { - memcpy(pInfo->pks[0].pData, pBlockInfo->firstPk.pData, pBlockInfo->firstPKLen); - memcpy(pInfo->pks[1].pData, pBlockInfo->lastPk.pData, pBlockInfo->lastPKLen); + memcpy(pInfo->pks[0].pData, varDataVal(pBlockInfo->firstPk.pData), varDataLen(pBlockInfo->firstPk.pData)); + memcpy(pInfo->pks[1].pData, varDataVal(pBlockInfo->lastPk.pData), varDataLen(pBlockInfo->lastPk.pData)); - pInfo->pks[0].nData = pBlockInfo->firstPKLen; - pInfo->pks[1].nData = pBlockInfo->lastPKLen; + pInfo->pks[0].nData = varDataLen(pBlockInfo->firstPk.pData); + pInfo->pks[1].nData = varDataLen(pBlockInfo->lastPk.pData); } } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index d049aed496..ae8a6466ae 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -446,17 +446,17 @@ void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record) { if (IS_NUMERIC_TYPE(pFirstKey->pks[0].type)) { pBlockInfo->firstPk.val = pFirstKey->pks[0].val; pBlockInfo->lastPk.val = record->lastKey.key.pks[0].val; + } else { + char* p = taosMemoryCalloc(1, pFirstKey->pks[0].nData + VARSTR_HEADER_SIZE); + memcpy(varDataVal(p), pFirstKey->pks[0].pData, pFirstKey->pks[0].nData); + varDataSetLen(p, pFirstKey->pks[0].nData); + pBlockInfo->firstPk.pData = (uint8_t*)p; - pBlockInfo->firstPKLen = 0; - pBlockInfo->lastPKLen = 0; - } else { // todo handle memory alloc error, opt memory alloc perf - pBlockInfo->firstPKLen = pFirstKey->pks[0].nData; - pBlockInfo->firstPk.pData = taosMemoryCalloc(1, pBlockInfo->firstPKLen); - memcpy(pBlockInfo->firstPk.pData, pFirstKey->pks[0].pData, pBlockInfo->firstPKLen); - - pBlockInfo->lastPKLen = record->lastKey.key.pks[0].nData; - pBlockInfo->lastPk.pData = taosMemoryCalloc(1, pBlockInfo->lastPKLen); - memcpy(pBlockInfo->lastPk.pData, record->lastKey.key.pks[0].pData, pBlockInfo->lastPKLen); + int32_t keyLen = record->lastKey.key.pks[0].nData; + p = taosMemoryCalloc(1, keyLen + VARSTR_HEADER_SIZE); + memcpy(varDataVal(p), record->lastKey.key.pks[0].pData, keyLen); + varDataSetLen(p, keyLen); + pBlockInfo->lastPk.pData = (uint8_t*)p; } } } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 49bb92c7ce..581696c94a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -212,8 +212,6 @@ typedef struct SFileDataBlockInfo { uint8_t* pData; } lastPk; - int32_t firstPKLen; - int32_t lastPKLen; int64_t minVer; int64_t maxVer; int64_t blockOffset; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index d152baf502..be6fb2983c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -261,6 +261,7 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) pBlockInfo->pks[0].type = pInfoData->info.type; pBlockInfo->pks[1].type = pInfoData->info.type; + // allocate enough buffer size, which is pInfoData->info.bytes if (IS_VAR_DATA_TYPE(pItem->dataType.type)) { pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes); if (pBlockInfo->pks[0].pData == NULL) { From f2ccb8aa7ede38e6835fbab8b7b00fc42f861359 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 22:54:35 +0800 Subject: [PATCH 25/25] fix(stream): add lock when retrieving info from the tableGroup struct --- source/libs/executor/src/scanoperator.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 64806a1c72..9ccabd2cd6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -708,9 +708,7 @@ static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, i tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size); pInfo->tableStartIndex = TARRAY_ELEM_IDX(pInfo->base.pTableListInfo->pTableList, *pKeyInfo); - pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1); - pInfo->pResBlock->info.blankFill = false; if (!pInfo->needCountEmptyTable) { @@ -1011,8 +1009,8 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { STableScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; - int32_t num = 0; - STableKeyInfo* pList = NULL; + int32_t num = 0; + STableKeyInfo* pList = NULL; if (pInfo->currentGroupId == -1) { if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { @@ -1020,7 +1018,10 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { return NULL; } + taosRLockLatch(&pTaskInfo->lock); initNextGroupScan(pInfo, &pList, &num); + taosRUnLockLatch(&pTaskInfo->lock); + ASSERT(pInfo->base.dataReader == NULL); int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,