From f7d4c274a8ecc94710378e9729bb907eafbe170b Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Fri, 16 Aug 2024 18:03:16 +0800 Subject: [PATCH 01/22] fix exchange operator blocked --- source/libs/executor/src/exchangeoperator.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index bdc1e42b28..fb085c39c8 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -225,7 +225,10 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) { } else { concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); } - + if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code); + } if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) { return NULL; } else { From ee36bd741f4d511ea31c776f6cc7fa31c3cab8da Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 19 Aug 2024 09:50:38 +0800 Subject: [PATCH 02/22] fix(query): return error code. --- source/libs/executor/src/executor.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b1c9207ab7..a034e011f8 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -663,7 +663,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo if (isTaskKilled(pTaskInfo)) { atomic_store_64(&pTaskInfo->owner, 0); qDebug("%s already killed, abort", GET_TASKID(pTaskInfo)); - return TSDB_CODE_SUCCESS; + return pTaskInfo->code; } // error occurs, record the error code and return to client @@ -785,7 +785,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { qDebug("%s already killed, abort", GET_TASKID(pTaskInfo)); taosRUnLockLatch(&pTaskInfo->lock); - return TSDB_CODE_SUCCESS; + return pTaskInfo->code; } if (pTaskInfo->owner != 0) { From 2f92b80cd694c87fdaee04e03cb5bdcc534cdad0 Mon Sep 17 00:00:00 2001 From: sima Date: Mon, 19 Aug 2024 10:10:47 +0800 Subject: [PATCH 03/22] fix:[TD-31511] Fix memory leak when error occurs. --- source/libs/scalar/src/filter.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 00487b140d..cc9cc9ed76 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -2372,6 +2372,7 @@ int32_t filterMergeGroupUnits(SFilterInfo *info, SFilterGroupCtx **gRes, int32_t } gRes[gResIdx]->colInfo = taosMemoryCalloc(info->fields[FLD_TYPE_COLUMN].num, sizeof(SFilterColInfo)); if (gRes[gResIdx]->colInfo == NULL) { + filterFreeGroupCtx(gRes[gResIdx]); FLT_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } colIdxi = 0; @@ -2384,6 +2385,7 @@ int32_t filterMergeGroupUnits(SFilterInfo *info, SFilterGroupCtx **gRes, int32_t if (gRes[gResIdx]->colInfo[cidx].info == NULL) { gRes[gResIdx]->colInfo[cidx].info = (SArray *)taosArrayInit(4, POINTER_BYTES); if (gRes[gResIdx]->colInfo[cidx].info == NULL) { + filterFreeGroupCtx(gRes[gResIdx]); FLT_ERR_JRET(terrno); } colIdx[colIdxi++] = cidx; @@ -2408,7 +2410,11 @@ int32_t filterMergeGroupUnits(SFilterInfo *info, SFilterGroupCtx **gRes, int32_t continue; } - FLT_ERR_JRET(filterMergeUnits(info, gRes[gResIdx], colIdx[l], &empty)); + code = filterMergeUnits(info, gRes[gResIdx], colIdx[l], &empty); + if (TSDB_CODE_SUCCESS != code) { + filterFreeGroupCtx(gRes[gResIdx]); + SCL_ERR_JRET(code); + } if (empty) { break; @@ -2426,10 +2432,9 @@ int32_t filterMergeGroupUnits(SFilterInfo *info, SFilterGroupCtx **gRes, int32_t gRes[gResIdx]->colNum = colIdxi; FILTER_COPY_IDX(&gRes[gResIdx]->colIdx, colIdx, colIdxi); ++gResIdx; + *gResNum = gResIdx; } - *gResNum = gResIdx; - if (gResIdx == 0) { FILTER_SET_FLAG(info->status, FI_STATUS_EMPTY); } From d0e31f711fb82b1f3274365a12013a51d247f6f2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 19 Aug 2024 11:01:54 +0800 Subject: [PATCH 04/22] fix(tsdb): return code for tMergeTreeNext --- source/dnode/vnode/src/inc/tsdb.h | 2 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 9 +-- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 21 +++++-- source/dnode/vnode/src/tsdb/tsdbRead2.c | 64 ++++++++++++--------- 4 files changed, 58 insertions(+), 38 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index ab5b07581a..85084a0b81 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -898,7 +898,7 @@ typedef struct SSttDataInfoForTable { int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable *pTableInfo); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); -bool tMergeTreeNext(SMergeTree *pMTree); +int32_t tMergeTreeNext(SMergeTree *pMTree, bool* pHasNext); void tMergeTreePinSttBlock(SMergeTree *pMTree); void tMergeTreeUnpinSttBlock(SMergeTree *pMTree); bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index aa92597211..70e6e1ee2a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -2245,17 +2245,18 @@ static int32_t lastIterClose(SFSLastIter **iter) { } static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) { - int32_t code = 0; + bool hasVal = false; + int32_t code = tMergeTreeNext(iter->pMergeTree, &hasVal); + if (code != 0) { + return code; + } - bool hasVal = tMergeTreeNext(iter->pMergeTree); if (!hasVal) { *ppRow = NULL; - TAOS_RETURN(code); } *ppRow = tMergeTreeGetRow(iter->pMergeTree); - TAOS_RETURN(code); } diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 4729b912a7..8bfc066731 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -1102,13 +1102,21 @@ void tMergeTreeUnpinSttBlock(SMergeTree *pMTree) { tLDataIterUnpinSttBlock(pIter, pMTree->idStr); } -bool tMergeTreeNext(SMergeTree *pMTree) { +int32_t tMergeTreeNext(SMergeTree *pMTree, bool *pHasNext) { + int32_t code = 0; + if (pHasNext == NULL) { + return TSDB_CODE_INVALID_PARA; + } + if (pMTree->pIter) { SLDataIter *pIter = pMTree->pIter; - - bool hasVal = false; - int32_t code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal); + bool hasVal = false; + code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal); if (!hasVal || (code != 0)) { + if (code == TSDB_CODE_FILE_CORRUPTED) { + code = 0; // suppress the file corrupt error to enable all queries within this cluster can run without failed. + } + pMTree->pIter = NULL; } @@ -1117,7 +1125,7 @@ bool tMergeTreeNext(SMergeTree *pMTree) { if (pMTree->pIter && pIter) { int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->node, &pIter->node); if (c > 0) { - (void) tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter); + (void)tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter); pMTree->pIter = NULL; } else { ASSERT(c); @@ -1132,7 +1140,8 @@ bool tMergeTreeNext(SMergeTree *pMTree) { } } - return pMTree->pIter != NULL; + *pHasNext = (pMTree->pIter != NULL); + return code; } void tMergeTreeClose(SMergeTree *pMTree) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 9a9c74a3a0..607d96bcbc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1759,14 +1759,22 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB return code; } -static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int32_t pkSrcSlot, - SVersionRange* pVerRange) { +static int32_t nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int32_t pkSrcSlot, + SVersionRange* pVerRange) { + int32_t code = 0; int32_t order = pSttBlockReader->order; int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1; SRowKey* pNextProc = &pScanInfo->sttKeyInfo.nextProcKey; while (1) { - bool hasVal = tMergeTreeNext(&pSttBlockReader->mergeTree); + bool hasVal = false; + code = tMergeTreeNext(&pSttBlockReader->mergeTree, &hasVal); + if (code) { + tsdbError("failed to iter the next row in stt-file merge tree, code:%s, %s", tstrerror(code), + pSttBlockReader->mergeTree.idStr); + return code; + } + if (!hasVal) { // the next value will be the accessed key in stt pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; @@ -1779,7 +1787,6 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc memset(pNextProc->pks[0].pData, 0, pNextProc->pks[0].nData); } } - return false; } TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree); @@ -1798,13 +1805,13 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange, pSttBlockReader->numOfPks > 0)) { pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA; - return true; } } else { pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA; - return true; } } + + return code; } static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreePinSttBlock(&pSttBlockReader->mergeTree); } @@ -2380,14 +2387,13 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable return true; } -static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { - bool hasData = true; +static void initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { int32_t order = pReader->info.order; bool asc = ASCENDING_TRAVERSE(order); // the stt block reader has been initialized for this table. if (pSttBlockReader->uid == pScanInfo->uid) { - return hasDataInSttBlock(pScanInfo); + return; } if (pSttBlockReader->uid != 0) { @@ -2396,9 +2402,14 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan pSttBlockReader->uid = pScanInfo->uid; - // second time init stt block reader + // second or third time init stt block reader if (pScanInfo->cleanSttBlocks && (pReader->info.execMode == READER_EXEC_ROWS)) { - return !pScanInfo->sttBlockReturned; + // only allowed to retrieve clean stt blocks for count once + if (pScanInfo->sttBlockReturned) { + pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; + tsdbDebug("uid:%" PRIu64 " set no stt-file data after stt-block retrieved", pScanInfo->uid, pReader->idStr); + } + return; } STimeWindow w = pSttBlockReader->window; @@ -2435,28 +2446,28 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan SSttDataInfoForTable info = {.pKeyRangeList = taosArrayInit(4, sizeof(SSttKeyRange))}; if (info.pKeyRangeList == NULL) { pReader->code = terrno; - return false; + return; } int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(info.pKeyRangeList); pReader->code = code; - return false; + return; } code = initMemDataIterator(pScanInfo, pReader); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(info.pKeyRangeList); pReader->code = code; - return false; + return; } code = initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(info.pKeyRangeList); pReader->code = code; - return false; + return; } if (conf.rspRows) { @@ -2484,27 +2495,26 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan SRowKey* p = asc ? &pScanInfo->sttRange.skey : &pScanInfo->sttRange.ekey; tRowKeyAssign(&pScanInfo->sttKeyInfo.nextProcKey, p); - - hasData = (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA); } else { // not clean stt blocks INIT_KEYRANGE(&pScanInfo->sttRange); // reset the time window - pScanInfo->sttBlockReturned = false; - hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange); + code = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange); } } else { pScanInfo->cleanSttBlocks = false; INIT_KEYRANGE(&pScanInfo->sttRange); // reset the time window - pScanInfo->sttBlockReturned = false; - hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange); + code = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange); } + pScanInfo->sttBlockReturned = false; taosArrayDestroy(info.pKeyRangeList); int64_t el = taosGetTimestampUs() - st; pReader->cost.initSttBlockReader += (el / 1000.0); tsdbDebug("init stt block reader completed, elapsed time:%" PRId64 "us %s", el, pReader->idStr); - return hasData; + if (code != 0) { + pReader->code = code; + } } static bool hasDataInSttBlock(STableBlockScanInfo* pInfo) { return pInfo->sttKeyInfo.status == STT_FILE_HAS_DATA; } @@ -2772,7 +2782,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } SBlockData* pBlockData = &pReader->status.fileBlockData; - (void) initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader); + initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader); if (pReader->code != 0) { code = pReader->code; goto _end; @@ -3190,12 +3200,12 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { continue; } - bool hasDataInSttFile = initSttBlockReader(pSttBlockReader, pScanInfo, pReader); + initSttBlockReader(pSttBlockReader, pScanInfo, pReader); if (pReader->code != TSDB_CODE_SUCCESS) { return pReader->code; } - if (!hasDataInSttFile) { + if (!hasDataInSttBlock(pScanInfo)) { bool hasNexTable = moveToNextTable(pUidList, pStatus); if (!hasNexTable) { return TSDB_CODE_SUCCESS; @@ -3287,7 +3297,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { } if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) { - (void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader); + initSttBlockReader(pSttBlockReader, pScanInfo, pReader); if (pReader->code != 0) { return pReader->code; } @@ -3331,7 +3341,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { int64_t st = taosGetTimestampUs(); // let's load data from stt files, make sure clear the cleanStt block flag before load the data from stt files - (void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader); + initSttBlockReader(pSttBlockReader, pScanInfo, pReader); if (pReader->code != 0) { return pReader->code; } From f2f0bad021205290ba959c6037c01b1800352cea Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 19 Aug 2024 11:06:43 +0800 Subject: [PATCH 05/22] fix(tsdb): return code for tMergeTreeNext --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 607d96bcbc..639cab9f52 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1826,9 +1826,14 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB // avoid the fetch next row replace the referenced stt block in buffer doPinSttBlock(pSttBlockReader); - bool hasVal = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange); + code = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange); doUnpinSttBlock(pSttBlockReader); - if (hasVal) { + + if (code) { + return code; + } + + if (hasDataInSttBlock(pScanInfo)) { SRowKey* pNext = getCurrentKeyInSttBlock(pSttBlockReader); if (pkCompEx(pSttKey, pNext) != 0) { code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow); @@ -4097,7 +4102,11 @@ int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanI SRowKey* pRowKey = &pScanInfo->lastProcKey; int32_t code = TSDB_CODE_SUCCESS; - while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pkSrcSlot, pVerRange)) { + while (1) { + code = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pkSrcSlot, pVerRange); + if (code) { + + } SRowKey* pNextKey = getCurrentKeyInSttBlock(pSttBlockReader); int32_t ret = pkCompEx(pRowKey, pNextKey); From bc2f648cf79d5e13c83276a4db0aefb6c7b9e94b Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Mon, 19 Aug 2024 13:33:34 +0800 Subject: [PATCH 06/22] fix memory leak in tsdbreader --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 4729b912a7..cfb9a2f215 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -115,16 +115,14 @@ void destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoad SArray *pList = taosArrayGetP(pLDataIterArray, i); for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) { SLDataIter *pIter = taosArrayGetP(pList, j); - if (pIter->pBlockLoadInfo == NULL) { - continue; - } - - SSttBlockLoadCostInfo *pCost = &pIter->pBlockLoadInfo->cost; - if (pLoadCost != NULL) { - pLoadCost->loadBlocks += pCost->loadBlocks; - pLoadCost->loadStatisBlocks += pCost->loadStatisBlocks; - pLoadCost->blockElapsedTime += pCost->blockElapsedTime; - pLoadCost->statisElapsedTime += pCost->statisElapsedTime; + if (pIter->pBlockLoadInfo != NULL) { + SSttBlockLoadCostInfo *pCost = &pIter->pBlockLoadInfo->cost; + if (pLoadCost != NULL) { + pLoadCost->loadBlocks += pCost->loadBlocks; + pLoadCost->loadStatisBlocks += pCost->loadStatisBlocks; + pLoadCost->blockElapsedTime += pCost->blockElapsedTime; + pLoadCost->statisElapsedTime += pCost->statisElapsedTime; + } } destroyLDataIter(pIter); From d4dc632d6f43aab9d5e22c5b41aa5ca1c7ab52d5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 19 Aug 2024 13:39:30 +0800 Subject: [PATCH 07/22] fix(tsdb): fix syntax error. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 2 +- source/libs/executor/src/scanoperator.c | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 639cab9f52..5987b673c3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2412,7 +2412,7 @@ static void initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan // only allowed to retrieve clean stt blocks for count once if (pScanInfo->sttBlockReturned) { pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; - tsdbDebug("uid:%" PRIu64 " set no stt-file data after stt-block retrieved", pScanInfo->uid, pReader->idStr); + tsdbDebug("uid:%" PRIu64 " set no stt-file data after stt-block retrieved, %s", pScanInfo->uid, pReader->idStr); } return; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1935f2b0b6..5984f75c05 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -465,14 +465,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca SSDataBlock* p = NULL; code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pTableScanInfo->dataReader, &p, NULL); - if (p == NULL || code != TSDB_CODE_SUCCESS) { + if (p == NULL || code != TSDB_CODE_SUCCESS || p != pBlock) { return code; } - if(p != pBlock) { - qError("[loadDataBlock] p != pBlock"); - return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; - } doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows); // restore the previous value From c6350794fee265cffe8b4a8016199da35df0a285 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 19 Aug 2024 15:53:55 +0800 Subject: [PATCH 08/22] fix(scheduler/exec cb): remove schedulerFreeJob from cb --- source/client/src/clientImpl.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 664de5619f..e12c761fcc 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1135,8 +1135,6 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows); } } - - schedulerFreeJob(&pRequest->body.queryJob, 0); } taosMemoryFree(pResult); From 5035b3a624e970b805e7d66facd1e0aa66a484c5 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 19 Aug 2024 17:13:27 +0800 Subject: [PATCH 09/22] fix: merge join destroy table issue --- source/libs/executor/src/mergejoinoperator.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 946a1d2aa5..52b0da7c92 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -1746,6 +1746,9 @@ void destroyGrpArray(void* ppArray) { } void destroyMergeJoinTableCtx(SMJoinTableCtx* pTable) { + if (NULL == pTable) { + return; + } mJoinDestroyCreatedBlks(pTable->createdBlks); taosArrayDestroy(pTable->createdBlks); tSimpleHashCleanup(pTable->pGrpHash); From c7f9c82950c6197bba0c0afcd1cf71898ca7c3ef Mon Sep 17 00:00:00 2001 From: menshibin Date: Mon, 19 Aug 2024 18:02:50 +0800 Subject: [PATCH 10/22] add kafka config propose --- .../20-third-party/01-collection/11-kafka.md | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/docs/zh/20-third-party/01-collection/11-kafka.md b/docs/zh/20-third-party/01-collection/11-kafka.md index 651ef860cb..2e4677ca31 100644 --- a/docs/zh/20-third-party/01-collection/11-kafka.md +++ b/docs/zh/20-third-party/01-collection/11-kafka.md @@ -27,22 +27,35 @@ TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送 ## 安装 Kafka -在任意目录下执行: +- 在任意目录下执行: -```shell -curl -O https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz -tar xzf kafka_2.13-3.4.0.tgz -C /opt/ -ln -s /opt/kafka_2.13-3.4.0 /opt/kafka -``` + ```shell + curl -O https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz + tar xzf kafka_2.13-3.4.0.tgz -C /opt/ + ln -s /opt/kafka_2.13-3.4.0 /opt/kafka + ``` -然后需要把 `$KAFKA_HOME/bin` 目录加入 PATH。 +- 然后需要把 `$KAFKA_HOME/bin` 目录加入 PATH。 -```title=".profile" -export KAFKA_HOME=/opt/kafka -export PATH=$PATH:$KAFKA_HOME/bin -``` + ```title=".profile" + export KAFKA_HOME=/opt/kafka + export PATH=$PATH:$KAFKA_HOME/bin + ``` + 以上脚本可以追加到当前用户的 profile 文件(~/.profile 或 ~/.bash_profile) -以上脚本可以追加到当前用户的 profile 文件(~/.profile 或 ~/.bash_profile) +- 提升 Kafka 吞吐率的建议配置 + + 1. 打开 KAFKA_HOME/config/producer.properties 配置文件。 + 2. 参数说明及建议如下: + + | **参数** | **参数说明** | **设置建议** | + | --------| --------------------------------- | -------------- | + | producer.type | 此参数用于设置消息的发送方式,默认值为 `sync` 表示同步发送,`async` 表示异步发送。采用异步发送能够提升消息发送的吞吐量。 | async | + | request.required.acks | 参数用于配置生产者发送消息后需要等待的确认数量。当设置为1时,表示只要领导者副本成功写入消息就会给生产者发送确认,而无需等待集群中的其他副本写入成功。这种设置可以在一定程度上保证消息的可靠性,同时也能保证一定的吞吐量。因为不需要等待所有副本都写入成功,所以可以减少生产者的等待时间,提高发送消息的效率。|1| + | max.request.size| 该参数决定了生产者在一次请求中可以发送的最大数据量。其默认值为 1048576,也就是 1M。如果设置得太小,可能会导致频繁的网络请求,降低吞吐量。如果设置得太大,可能会导致内存占用过高,或者在网络状况不佳时增加请求失败的概率。建议设置为 100M。|104857600| + |batch.size| 此参数用于设定 batch 的大小,默认值为 16384,即 16KB。在消息发送过程中,发送到 Kafka 缓冲区中的消息会被划分成一个个的 batch。故而减小 batch 大小有助于降低消息延迟,而增大 batch 大小则有利于提升吞吐量,可根据实际的数据量大小进行合理配置。可根据实际情况进行调整,建议设置为 512K。|524288| + | buffer.memory| 此参数用于设置生产者缓冲待发送消息的内存总量。较大的缓冲区可以允许生产者积累更多的消息后批量发送,提高吞吐量,但也会增加延迟和内存使用。可根据机器资源来配置,建议配置为 1G。|1073741824| + ## 安装 TDengine Connector 插件 From 12dbcf561201937c464cbc0ef1054138a52e05a3 Mon Sep 17 00:00:00 2001 From: gccgdb1234 Date: Mon, 19 Aug 2024 18:26:34 +0800 Subject: [PATCH 11/22] docs: add privilege control --- .../03-taos-sql/{25-grant.md => 25-user.md} | 0 docs/zh/14-reference/03-taos-sql/26-grant.md | 168 ++++++++++++++++++ .../03-taos-sql/{26-udf.md => 27-udf.md} | 0 .../{27-indexing.md => 28-index.md} | 0 .../{28-recovery.md => 29-recovery.md} | 0 .../{29-changes.md => 30-changes.md} | 0 .../03-taos-sql/{30-join.md => 31-join.md} | 0 .../{31-compress.md => 32-compress.md} | 0 .../03-taos-sql/{32-view.md => 33-view.md} | 0 9 files changed, 168 insertions(+) rename docs/zh/14-reference/03-taos-sql/{25-grant.md => 25-user.md} (100%) create mode 100644 docs/zh/14-reference/03-taos-sql/26-grant.md rename docs/zh/14-reference/03-taos-sql/{26-udf.md => 27-udf.md} (100%) rename docs/zh/14-reference/03-taos-sql/{27-indexing.md => 28-index.md} (100%) rename docs/zh/14-reference/03-taos-sql/{28-recovery.md => 29-recovery.md} (100%) rename docs/zh/14-reference/03-taos-sql/{29-changes.md => 30-changes.md} (100%) rename docs/zh/14-reference/03-taos-sql/{30-join.md => 31-join.md} (100%) rename docs/zh/14-reference/03-taos-sql/{31-compress.md => 32-compress.md} (100%) rename docs/zh/14-reference/03-taos-sql/{32-view.md => 33-view.md} (100%) diff --git a/docs/zh/14-reference/03-taos-sql/25-grant.md b/docs/zh/14-reference/03-taos-sql/25-user.md similarity index 100% rename from docs/zh/14-reference/03-taos-sql/25-grant.md rename to docs/zh/14-reference/03-taos-sql/25-user.md diff --git a/docs/zh/14-reference/03-taos-sql/26-grant.md b/docs/zh/14-reference/03-taos-sql/26-grant.md new file mode 100644 index 0000000000..c3fd1790d0 --- /dev/null +++ b/docs/zh/14-reference/03-taos-sql/26-grant.md @@ -0,0 +1,168 @@ +--- +toc_max_heading_level: 4 +title: 权限管理 +--- + +TDengine 中的权限管理分为[用户管理](../user)、数据库授权管理以及消息订阅授权管理,本节重点说明数据库授权和订阅授权。 + +## 数据库访问授权 + +系统管理员可以根据业务需要对系统中的每个用户针对每个数据库进行特定的授权,以防止业务数据被不恰当的用户读取或修改。对某个用户进行数据库访问授权的语法如下: + +```sql +GRANT privileges ON priv_level TO user_name + +privileges : { + ALL + | priv_type [, priv_type] ... +} + +priv_type : { + READ + | WRITE +} + +priv_level : { + dbname.tbname + | dbname.* + | *.* +} +``` + +对数据库的访问权限包含读和写两种权限,它们可以被分别授予,也可以被同时授予。 + +说明 + +- priv_level 格式中 "." 之前为数据库名称, "." 之后为表名称,意思为表级别的授权控制。如果 "." 之后为 "\*" ,意为 "." 前所指定的数据库中的所有表 +- "dbname.\*" 意思是名为 "dbname" 的数据库中的所有表 +- "\*.\*" 意思是所有数据库名中的所有表 + +### 数据库权限说明 + +对 root 用户和普通用户的权限的说明如下表 + +| 用户 | 描述 | 权限说明 | +| -------- | ---------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| 超级用户 | 只有 root 是超级用户 | DB 外部 所有操作权限,例如user、dnode、udf、qnode等的CRUD DB 权限,包括 创建 删除 更新,例如修改 Option,移动 Vgruop等 读 写 Enable/Disable 用户 | +| 普通用户 | 除 root 以外的其它用户均为普通用户 | 在可读的 DB 中,普通用户可以进行读操作 select describe show subscribe 在可写 DB 的内部,用户可以进行写操作: 创建、删除、修改 超级表 创建、删除、修改 子表 创建、删除、修改 topic 写入数据 被限制系统信息时,不可进行如下操作 show dnode、mnode、vgroups、qnode、snode 修改用户包括自身密码 show db时只能看到自己的db,并且不能看到vgroups、副本、cache等信息 无论是否被限制系统信息,都可以 管理 udf 可以创建 DB 自己创建的 DB 具备所有权限 非自己创建的 DB ,参照读、写列表中的权限 | + +## 消息订阅授权 + +任意用户都可以在自己拥有读权限的数据库上创建 topic。超级用户 root 可以在任意数据库上创建 topic。每个 topic 的订阅权限都可以被独立授权给任何用户,不管该用户是否拥有该数据库的访问权限。删除 topic 只能由 root 用户或者该 topic 的创建者进行。topic 只能由超级用户、topic的创建者或者被显式授予 subscribe 权限的用户订阅。 + +具体的 SQL 语法如下: + +```sql +GRANT SUBSCRIBE ON topic_name TO user_name + +REVOKE SUBSCRIBE ON topic_name FROM user_name +``` + +## 基于标签的授权(表级授权) + +从 TDengine 3.0.5.0 开始,我们支持按标签授权某个超级表中部分特定的子表。具体的 SQL 语法如下。 + +```sql +GRANT privileges ON priv_level [WITH tag_condition] TO user_name + +privileges : { + ALL + | priv_type [, priv_type] ... +} + +priv_type : { + READ + | WRITE +} + +priv_level : { + dbname.tbname + | dbname.* + | *.* +} + +REVOKE privileges ON priv_level [WITH tag_condition] FROM user_name + +privileges : { + ALL + | priv_type [, priv_type] ... +} + +priv_type : { + READ + | WRITE +} + +priv_level : { + dbname.tbname + | dbname.* + | *.* +} +``` + +上面 SQL 的语义为: + +- 用户可以通过 dbname.tbname 来为指定的表(包括超级表和普通表)授予或回收其读写权限,不支持直接对子表授予或回收权限。 +- 用户可以通过 dbname.tbname 和 WITH 子句来为符合条件的所有子表授予或回收其读写权限。使用 WITH 子句时,权限级别必须为超级表。 + +## 表级权限和数据库权限的关系 + +下表列出了在不同的数据库授权和表级授权的组合下产生的实际权限。 + +| | **表无授权** | **表读授权** | **表读授权有标签条件** | **表写授权** | **表写授权有标签条件** | +| ---------------- | ---------------- | ---------------------------------------- | ------------------------------------------------------------ | ---------------------------------------- | ---------------------------------------------------------- | +| **数据库无授权** | 无授权 | 对此表有读权限,对数据库下的其他表无权限 | 对此表符合标签权限的子表有读权限,对数据库下的其他表无权限 | 对此表有写权限,对数据库下的其他表无权限 | 对此表符合标签权限的子表有写权限,对数据库下的其他表无权限 | +| **数据库读授权** | 对所有表有读权限 | 对所有表有读权限 | 对此表符合标签权限的子表有读权限,对数据库下的其他表有读权限 | 对此表有写权限,对所有表有读权限 | 对此表符合标签权限的子表有写权限,所有表有读权限 | +| **数据库写授权** | 对所有表有写权限 | 对此表有读权限,对所有表有写权限 | 对此表符合标签权限的子表有读权限,对所有表有写权限 | 对所有表有写权限 | 对此表符合标签权限的子表有写权限,数据库下的其他表有写权限 | + + +## 查看用户授权 + +使用下面的命令可以显示一个用户所拥有的授权: + +```sql +show user privileges +``` + +## 撤销授权 + +1. 撤销数据库访问的授权 + +```sql +REVOKE privileges ON priv_level FROM user_name + +privileges : { + ALL + | priv_type [, priv_type] ... +} + +priv_type : { + READ + | WRITE +} + +priv_level : { + dbname.tbname + | dbname.* + | *.* +} +``` + +2. 撤销数据订阅的授权 + +```sql +REVOKE privileges ON priv_level FROM user_name + +privileges : { + ALL + | priv_type [, priv_type] ... +} + +priv_type : { + SUBSCRIBE +} + +priv_level : { + topic_name +} +``` diff --git a/docs/zh/14-reference/03-taos-sql/26-udf.md b/docs/zh/14-reference/03-taos-sql/27-udf.md similarity index 100% rename from docs/zh/14-reference/03-taos-sql/26-udf.md rename to docs/zh/14-reference/03-taos-sql/27-udf.md diff --git a/docs/zh/14-reference/03-taos-sql/27-indexing.md b/docs/zh/14-reference/03-taos-sql/28-index.md similarity index 100% rename from docs/zh/14-reference/03-taos-sql/27-indexing.md rename to docs/zh/14-reference/03-taos-sql/28-index.md diff --git a/docs/zh/14-reference/03-taos-sql/28-recovery.md b/docs/zh/14-reference/03-taos-sql/29-recovery.md similarity index 100% rename from docs/zh/14-reference/03-taos-sql/28-recovery.md rename to docs/zh/14-reference/03-taos-sql/29-recovery.md diff --git a/docs/zh/14-reference/03-taos-sql/29-changes.md b/docs/zh/14-reference/03-taos-sql/30-changes.md similarity index 100% rename from docs/zh/14-reference/03-taos-sql/29-changes.md rename to docs/zh/14-reference/03-taos-sql/30-changes.md diff --git a/docs/zh/14-reference/03-taos-sql/30-join.md b/docs/zh/14-reference/03-taos-sql/31-join.md similarity index 100% rename from docs/zh/14-reference/03-taos-sql/30-join.md rename to docs/zh/14-reference/03-taos-sql/31-join.md diff --git a/docs/zh/14-reference/03-taos-sql/31-compress.md b/docs/zh/14-reference/03-taos-sql/32-compress.md similarity index 100% rename from docs/zh/14-reference/03-taos-sql/31-compress.md rename to docs/zh/14-reference/03-taos-sql/32-compress.md diff --git a/docs/zh/14-reference/03-taos-sql/32-view.md b/docs/zh/14-reference/03-taos-sql/33-view.md similarity index 100% rename from docs/zh/14-reference/03-taos-sql/32-view.md rename to docs/zh/14-reference/03-taos-sql/33-view.md From 128adaa3b4adfa23e82b1f0b8525c229282a87e6 Mon Sep 17 00:00:00 2001 From: menshibin Date: Mon, 19 Aug 2024 18:28:53 +0800 Subject: [PATCH 12/22] add kafka config propose --- .../20-third-party/01-collection/11-kafka.md | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/docs/zh/20-third-party/01-collection/11-kafka.md b/docs/zh/20-third-party/01-collection/11-kafka.md index 2e4677ca31..e9ac68251c 100644 --- a/docs/zh/20-third-party/01-collection/11-kafka.md +++ b/docs/zh/20-third-party/01-collection/11-kafka.md @@ -43,20 +43,6 @@ TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送 ``` 以上脚本可以追加到当前用户的 profile 文件(~/.profile 或 ~/.bash_profile) -- 提升 Kafka 吞吐率的建议配置 - - 1. 打开 KAFKA_HOME/config/producer.properties 配置文件。 - 2. 参数说明及建议如下: - - | **参数** | **参数说明** | **设置建议** | - | --------| --------------------------------- | -------------- | - | producer.type | 此参数用于设置消息的发送方式,默认值为 `sync` 表示同步发送,`async` 表示异步发送。采用异步发送能够提升消息发送的吞吐量。 | async | - | request.required.acks | 参数用于配置生产者发送消息后需要等待的确认数量。当设置为1时,表示只要领导者副本成功写入消息就会给生产者发送确认,而无需等待集群中的其他副本写入成功。这种设置可以在一定程度上保证消息的可靠性,同时也能保证一定的吞吐量。因为不需要等待所有副本都写入成功,所以可以减少生产者的等待时间,提高发送消息的效率。|1| - | max.request.size| 该参数决定了生产者在一次请求中可以发送的最大数据量。其默认值为 1048576,也就是 1M。如果设置得太小,可能会导致频繁的网络请求,降低吞吐量。如果设置得太大,可能会导致内存占用过高,或者在网络状况不佳时增加请求失败的概率。建议设置为 100M。|104857600| - |batch.size| 此参数用于设定 batch 的大小,默认值为 16384,即 16KB。在消息发送过程中,发送到 Kafka 缓冲区中的消息会被划分成一个个的 batch。故而减小 batch 大小有助于降低消息延迟,而增大 batch 大小则有利于提升吞吐量,可根据实际的数据量大小进行合理配置。可根据实际情况进行调整,建议设置为 512K。|524288| - | buffer.memory| 此参数用于设置生产者缓冲待发送消息的内存总量。较大的缓冲区可以允许生产者积累更多的消息后批量发送,提高吞吐量,但也会增加延迟和内存使用。可根据机器资源来配置,建议配置为 1G。|1073741824| - - ## 安装 TDengine Connector 插件 ### 编译插件 @@ -338,6 +324,21 @@ curl -X DELETE http://localhost:8083/connectors/TDengineSinkConnector curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector ``` +### 性能调优 + +如果在从 TDengine 同步数据到 Kafka 的过程中发现性能不达预期,可以尝试使用如下参数提升 Kafka 的写入吞吐量。 + +1. 打开 KAFKA_HOME/config/producer.properties 配置文件。 +2. 参数说明及配置建议如下: + | **参数** | **参数说明** | **设置建议** | + | --------| --------------------------------- | -------------- | + | producer.type | 此参数用于设置消息的发送方式,默认值为 `sync` 表示同步发送,`async` 表示异步发送。采用异步发送能够提升消息发送的吞吐量。 | async | + | request.required.acks | 参数用于配置生产者发送消息后需要等待的确认数量。当设置为1时,表示只要领导者副本成功写入消息就会给生产者发送确认,而无需等待集群中的其他副本写入成功。这种设置可以在一定程度上保证消息的可靠性,同时也能保证一定的吞吐量。因为不需要等待所有副本都写入成功,所以可以减少生产者的等待时间,提高发送消息的效率。|1| + | max.request.size| 该参数决定了生产者在一次请求中可以发送的最大数据量。其默认值为 1048576,也就是 1M。如果设置得太小,可能会导致频繁的网络请求,降低吞吐量。如果设置得太大,可能会导致内存占用过高,或者在网络状况不佳时增加请求失败的概率。建议设置为 100M。|104857600| + |batch.size| 此参数用于设定 batch 的大小,默认值为 16384,即 16KB。在消息发送过程中,发送到 Kafka 缓冲区中的消息会被划分成一个个的 batch。故而减小 batch 大小有助于降低消息延迟,而增大 batch 大小则有利于提升吞吐量,可根据实际的数据量大小进行合理配置。可根据实际情况进行调整,建议设置为 512K。|524288| + | buffer.memory| 此参数用于设置生产者缓冲待发送消息的内存总量。较大的缓冲区可以允许生产者积累更多的消息后批量发送,提高吞吐量,但也会增加延迟和内存使用。可根据机器资源来配置,建议配置为 1G。|1073741824| + + ## 配置参考 ### 通用配置 From 28b9611f380062aeea0a6120fd62b12c2d27fbfa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 19 Aug 2024 18:33:39 +0800 Subject: [PATCH 13/22] fix(tsdb): return if no data. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 5987b673c3..355bcca469 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1383,7 +1383,6 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro static FORCE_INLINE STSchema* getTableSchemaImpl(STsdbReader* pReader, uint64_t uid) { ASSERT(pReader->info.pSchema == NULL); - int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, -1, &pReader->info.pSchema); if (code != TSDB_CODE_SUCCESS || pReader->info.pSchema == NULL) { terrno = code; @@ -1787,6 +1786,7 @@ static int32_t nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBloc memset(pNextProc->pks[0].pData, 0, pNextProc->pks[0].nData); } } + return code; } TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree); From e298b5acb320cc379660885e4c6288852d162f04 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Mon, 19 Aug 2024 18:39:08 +0800 Subject: [PATCH 14/22] fix: s3 support multi proto --- source/common/src/cos.c | 44 +++++++++++++++++++------------------ source/common/src/tglobal.c | 9 ++++---- 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 8392b0564a..a5a278e82e 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -12,7 +12,7 @@ extern char tsS3AccessKeySecret[][TSDB_FQDN_LEN]; extern char tsS3BucketName[TSDB_FQDN_LEN]; extern char tsS3AppId[][TSDB_FQDN_LEN]; extern char tsS3Hostname[][TSDB_FQDN_LEN]; -extern int8_t tsS3Https; +extern int8_t tsS3Https[]; static int32_t s3ListBucketByEp(char const *bucketname, int8_t epIndex); static int32_t s3PutObjectFromFileOffsetByEp(const char *file, const char *object_name, int64_t offset, int64_t size, @@ -33,13 +33,13 @@ static int verifyPeerG = 0; static const char *awsRegionG = NULL; static int forceG = 0; static int showResponsePropertiesG = 0; -static S3Protocol protocolG = S3ProtocolHTTPS; +static S3Protocol protocolG[TSDB_MAX_EP_NUM] = {S3ProtocolHTTPS}; // static S3Protocol protocolG = S3ProtocolHTTP; -static S3UriStyle uriStyleG = S3UriStylePath; +static S3UriStyle uriStyleG[TSDB_MAX_EP_NUM] = {S3UriStylePath}; static int retriesG = 5; static int timeoutMsG = 0; -extern int8_t tsS3Oss; +extern int8_t tsS3Oss[]; int32_t s3Begin() { S3Status status; @@ -55,9 +55,11 @@ int32_t s3Begin() { TAOS_RETURN(TSDB_CODE_FAILED); } - protocolG = !tsS3Https; - if (tsS3Oss) { - uriStyleG = S3UriStyleVirtualHost; + for (int i = 0; i < tsS3EpNum; i++) { + protocolG[i] = !tsS3Https[i]; + if (tsS3Oss[i]) { + uriStyleG[i] = S3UriStyleVirtualHost; + } } TAOS_RETURN(TSDB_CODE_SUCCESS); @@ -976,8 +978,8 @@ int32_t s3PutObjectFromFile2ByEp(const char *file, const char *object_name, int8 S3BucketContext bucketContext = {tsS3Hostname[epIndex], tsS3BucketName, - protocolG, - uriStyleG, + protocolG[epIndex], + uriStyleG[epIndex], tsS3AccessKeyId[epIndex], tsS3AccessKeySecret[epIndex], 0, @@ -1059,8 +1061,8 @@ static int32_t s3PutObjectFromFileOffsetByEp(const char *file, const char *objec S3BucketContext bucketContext = {tsS3Hostname[epIndex], tsS3BucketName, - protocolG, - uriStyleG, + protocolG[epIndex], + uriStyleG[epIndex], tsS3AccessKeyId[epIndex], tsS3AccessKeySecret[epIndex], 0, @@ -1155,8 +1157,8 @@ static void s3FreeObjectKey(void *pItem) { static SArray *getListByPrefixByEp(const char *prefix, int8_t epIndex) { S3BucketContext bucketContext = {tsS3Hostname[epIndex], tsS3BucketName, - protocolG, - uriStyleG, + protocolG[epIndex], + uriStyleG[epIndex], tsS3AccessKeyId[epIndex], tsS3AccessKeySecret[epIndex], 0, @@ -1223,8 +1225,8 @@ static int32_t s3DeleteObjectsByEp(const char *object_name[], int nobject, int8_ S3BucketContext bucketContext = {tsS3Hostname[epIndex], tsS3BucketName, - protocolG, - uriStyleG, + protocolG[epIndex], + uriStyleG[epIndex], tsS3AccessKeyId[epIndex], tsS3AccessKeySecret[epIndex], 0, @@ -1299,8 +1301,8 @@ static int32_t s3GetObjectBlockByEp(const char *object_name, int64_t offset, int S3BucketContext bucketContext = {tsS3Hostname[epIndex], tsS3BucketName, - protocolG, - uriStyleG, + protocolG[epIndex], + uriStyleG[epIndex], tsS3AccessKeyId[epIndex], tsS3AccessKeySecret[epIndex], 0, @@ -1372,8 +1374,8 @@ static int32_t s3GetObjectToFileByEp(const char *object_name, const char *fileNa S3BucketContext bucketContext = {tsS3Hostname[epIndex], tsS3BucketName, - protocolG, - uriStyleG, + protocolG[epIndex], + uriStyleG[epIndex], tsS3AccessKeyId[epIndex], tsS3AccessKeySecret[epIndex], 0, @@ -1449,8 +1451,8 @@ static long s3SizeByEp(const char *object_name, int8_t epIndex) { S3BucketContext bucketContext = {tsS3Hostname[epIndex], tsS3BucketName, - protocolG, - uriStyleG, + protocolG[epIndex], + uriStyleG[epIndex], tsS3AccessKeyId[epIndex], tsS3AccessKeySecret[epIndex], 0, diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 40ace11d4f..cf0a4725c1 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -303,10 +303,10 @@ char tsS3BucketName[TSDB_FQDN_LEN] = ""; char tsS3AppId[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; int8_t tsS3Enabled = false; int8_t tsS3EnabledCfg = false; -int8_t tsS3Oss = false; +int8_t tsS3Oss[TSDB_MAX_EP_NUM] = {false}; int8_t tsS3StreamEnabled = false; -int8_t tsS3Https = true; +int8_t tsS3Https[TSDB_MAX_EP_NUM] = {true}; char tsS3Hostname[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; int32_t tsS3BlockSize = -1; // number of tsdb pages (4096) @@ -431,11 +431,10 @@ int32_t taosSetS3Cfg(SConfig *pCfg) { tstrncpy(tsS3AppId[i], appid + 1, TSDB_FQDN_LEN); } } + tsS3Https[i] = (strstr(tsS3Endpoint[i], "https://") != NULL); + tsS3Oss[i] = (strstr(tsS3Endpoint[i], "aliyuncs.") != NULL); } - tsS3Https = (strstr(tsS3Endpoint[0], "https://") != NULL); - tsS3Oss = (strstr(tsS3Endpoint[0], "aliyuncs.") != NULL); - if (tsS3BucketName[0] != '<') { #if defined(USE_COS) || defined(USE_S3) #ifdef TD_ENTERPRISE From 21c266d1231c476ee76c1778d1c93debbb5260f2 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 19 Aug 2024 18:52:14 +0800 Subject: [PATCH 15/22] fix(vnode/cfg): use default value if loading 0 --- source/dnode/vnode/src/vnd/vnodeCfg.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 1f2cf707f3..e2791d8a00 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -375,11 +375,11 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { } tjsonGetNumberValue(pJson, "s3ChunkSize", pCfg->s3ChunkSize, code); - if (code < 0) { + if (code < 0 || pCfg->s3ChunkSize < TSDB_MIN_S3_CHUNK_SIZE) { pCfg->s3ChunkSize = TSDB_DEFAULT_S3_CHUNK_SIZE; } tjsonGetNumberValue(pJson, "s3KeepLocal", pCfg->s3KeepLocal, code); - if (code < 0) { + if (code < 0 || pCfg->s3KeepLocal < TSDB_MIN_S3_KEEP_LOCAL) { pCfg->s3KeepLocal = TSDB_DEFAULT_S3_KEEP_LOCAL; } tjsonGetNumberValue(pJson, "s3Compact", pCfg->s3Compact, code); From 1b59d4a8d7443c41c971ad39bdc5de467d0c6c9b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 19 Aug 2024 18:54:42 +0800 Subject: [PATCH 16/22] fix(tsdb): pass the error code out. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 355bcca469..24476e8df1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1413,7 +1413,8 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI if (pReader->info.pSchema == NULL) { pSchema = getTableSchemaImpl(pReader, uid); if (pSchema == NULL) { - tsdbDebug("%p table uid:%" PRIu64 " has been dropped, no data existed, %s", pReader, uid, pReader->idStr); + code = terrno; + tsdbError("%p table uid:%" PRIu64 " has been dropped, no data existed, %s", pReader, uid, pReader->idStr); return code; } } @@ -1448,7 +1449,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI pReader->cost.blockLoadTime += elapsedTime; pDumpInfo->allDumped = false; - return TSDB_CODE_SUCCESS; + return code; } /** @@ -2137,7 +2138,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (piRow->type == TSDBROW_ROW_FMT) { piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); if (piSchema == NULL) { - return code; + return terrno; } } From 2b68e110e51728c0ecd9ea2346eab4641aecf4af Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 19 Aug 2024 19:01:26 +0800 Subject: [PATCH 17/22] fix(tsdb): check return value. --- source/dnode/vnode/src/tsdb/tsdbCache.c | 4 +++- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 1 + source/dnode/vnode/src/tsdb/tsdbRead2.c | 5 +++-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 70e6e1ee2a..fb72784229 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -2245,7 +2245,9 @@ static int32_t lastIterClose(SFSLastIter **iter) { } static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) { - bool hasVal = false; + bool hasVal = false; + *ppRow = NULL; + int32_t code = tMergeTreeNext(iter->pMergeTree, &hasVal); if (code != 0) { return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 8bfc066731..160ff2e13c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -1108,6 +1108,7 @@ int32_t tMergeTreeNext(SMergeTree *pMTree, bool *pHasNext) { return TSDB_CODE_INVALID_PARA; } + *pHasNext = false; if (pMTree->pIter) { SLDataIter *pIter = pMTree->pIter; bool hasVal = false; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 24476e8df1..cc369ba3b0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4105,9 +4105,10 @@ int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanI while (1) { code = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pkSrcSlot, pVerRange); - if (code) { - + if (code || (!hasDataInSttBlock(pScanInfo))) { + return code; } + SRowKey* pNextKey = getCurrentKeyInSttBlock(pSttBlockReader); int32_t ret = pkCompEx(pRowKey, pNextKey); From a760ede4ae473a52cc7284706362ab44c25cc1f5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 19 Aug 2024 19:16:40 +0800 Subject: [PATCH 18/22] refactor: update logs. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index cc369ba3b0..74992c40d3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1414,7 +1414,8 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI pSchema = getTableSchemaImpl(pReader, uid); if (pSchema == NULL) { code = terrno; - tsdbError("%p table uid:%" PRIu64 " has been dropped, no data existed, %s", pReader, uid, pReader->idStr); + tsdbError("%p table uid:%" PRIu64 " failed to get tableschema, code:%s, %s", pReader, uid, tstrerror(code), + pReader->idStr); return code; } } From 051763e71f0e41a44fd38a93ab3ffe10bc2e1a88 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 20 Aug 2024 09:14:50 +0800 Subject: [PATCH 19/22] fix(tsdb): return if get data. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 74992c40d3..9be2c3b3f6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1807,9 +1807,11 @@ static int32_t nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBloc if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange, pSttBlockReader->numOfPks > 0)) { pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA; + return code; } } else { pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA; + return code; } } From 820661f192d61f5188bca14539cb33890351a439 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 20 Aug 2024 10:43:28 +0800 Subject: [PATCH 20/22] enh: remove some asserts --- include/util/tcoding.h | 3 -- source/dnode/vnode/src/tsdb/tsdbSttFileRW.c | 36 ++++++++++++++------- source/dnode/vnode/src/tsdb/tsdbUtil2.c | 2 +- source/dnode/vnode/src/vnd/vnodeAsync.c | 15 ++------- source/dnode/vnode/src/vnd/vnodeCommit.c | 3 +- source/dnode/vnode/src/vnd/vnodeHash.c | 1 - source/dnode/vnode/src/vnd/vnodeOpen.c | 4 ++- 7 files changed, 32 insertions(+), 32 deletions(-) diff --git a/include/util/tcoding.h b/include/util/tcoding.h index 1040adf431..b4f62d349c 100644 --- a/include/util/tcoding.h +++ b/include/util/tcoding.h @@ -213,7 +213,6 @@ static FORCE_INLINE int32_t taosEncodeVariantU16(void **buf, uint16_t value) { if (buf != NULL) ((uint8_t *)(*buf))[i] = (uint8_t)(value | ENCODE_LIMIT); value >>= 7; i++; - ASSERT(i < 3); } if (buf != NULL) { @@ -261,7 +260,6 @@ static FORCE_INLINE int32_t taosEncodeVariantU32(void **buf, uint32_t value) { if (buf != NULL) ((uint8_t *)(*buf))[i] = (value | ENCODE_LIMIT); value >>= 7; i++; - ASSERT(i < 5); } if (buf != NULL) { @@ -309,7 +307,6 @@ static FORCE_INLINE int32_t taosEncodeVariantU64(void **buf, uint64_t value) { if (buf != NULL) ((uint8_t *)(*buf))[i] = (uint8_t)(value | ENCODE_LIMIT); value >>= 7; i++; - ASSERT(i < 10); } if (buf != NULL) { diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index e3d7f9d45f..0bd00a100c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -61,7 +61,9 @@ int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *con // // open each segment reader int64_t offset = config->file->size - sizeof(SSttFooter); - ASSERT(offset >= TSDB_FHDR_SIZE); + if (offset < TSDB_FHDR_SIZE) { + TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit); + } int32_t encryptAlgoirthm = config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm; char *encryptKey = config->tsdb->pVnode->config.tsdbCfg.encryptKey; @@ -115,7 +117,9 @@ int32_t tsdbSttFileReaderClose(SSttFileReader **reader) { int32_t tsdbSttFileReadStatisBlk(SSttFileReader *reader, const TStatisBlkArray **statisBlkArray) { if (!reader->ctx->statisBlkLoaded) { if (reader->footer->statisBlkPtr->size > 0) { - ASSERT(reader->footer->statisBlkPtr->size % sizeof(SStatisBlk) == 0); + if (reader->footer->statisBlkPtr->size % sizeof(SStatisBlk) != 0) { + return TSDB_CODE_FILE_CORRUPTED; + } int32_t size = reader->footer->statisBlkPtr->size / sizeof(SStatisBlk); void *data = taosMemoryMalloc(reader->footer->statisBlkPtr->size); @@ -147,7 +151,9 @@ int32_t tsdbSttFileReadStatisBlk(SSttFileReader *reader, const TStatisBlkArray * int32_t tsdbSttFileReadTombBlk(SSttFileReader *reader, const TTombBlkArray **tombBlkArray) { if (!reader->ctx->tombBlkLoaded) { if (reader->footer->tombBlkPtr->size > 0) { - ASSERT(reader->footer->tombBlkPtr->size % sizeof(STombBlk) == 0); + if (reader->footer->tombBlkPtr->size % sizeof(STombBlk) != 0) { + return TSDB_CODE_FILE_CORRUPTED; + } int32_t size = reader->footer->tombBlkPtr->size / sizeof(STombBlk); void *data = taosMemoryMalloc(reader->footer->tombBlkPtr->size); @@ -179,7 +185,9 @@ int32_t tsdbSttFileReadTombBlk(SSttFileReader *reader, const TTombBlkArray **tom int32_t tsdbSttFileReadSttBlk(SSttFileReader *reader, const TSttBlkArray **sttBlkArray) { if (!reader->ctx->sttBlkLoaded) { if (reader->footer->sttBlkPtr->size > 0) { - ASSERT(reader->footer->sttBlkPtr->size % sizeof(SSttBlk) == 0); + if (reader->footer->sttBlkPtr->size % sizeof(SSttBlk) != 0) { + return TSDB_CODE_FILE_CORRUPTED; + } int32_t size = reader->footer->sttBlkPtr->size / sizeof(SSttBlk); void *data = taosMemoryMalloc(reader->footer->sttBlkPtr->size); @@ -256,7 +264,9 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk * SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0); TAOS_CHECK_GOTO(tGetDiskDataHdr(&br, &hdr), &lino, _exit); - ASSERT(hdr.delimiter == TSDB_FILE_DLMT); + if (hdr.delimiter != TSDB_FILE_DLMT) { + TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit); + } // set data container tBlockDataReset(bData); @@ -266,7 +276,9 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk * // key part TAOS_CHECK_GOTO(tBlockDataDecompressKeyPart(&hdr, &br, bData, assist), &lino, _exit); - ASSERT(br.offset == buffer0->size); + if (br.offset != buffer0->size) { + TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit); + } bool loadExtra = false; for (int i = 0; i < ncid; i++) { @@ -376,7 +388,10 @@ int32_t tsdbSttFileReadTombBlock(SSttFileReader *reader, const STombBlk *tombBlk br.offset += tombBlk->size[i]; } - ASSERT(br.offset == tombBlk->dp->size); + if (br.offset != tombBlk->dp->size) { + TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit); + } + _exit: if (code) { tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino, @@ -444,7 +459,9 @@ int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *sta } } - ASSERT(br.offset == buffer0->size); + if (br.offset != buffer0->size) { + TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit); + } _exit: if (code) { @@ -814,8 +831,6 @@ _exit: } static void tsdbSttFWriterDoClose(SSttFileWriter *writer) { - ASSERT(writer->fd == NULL); - for (int32_t i = 0; i < ARRAY_SIZE(writer->local); ++i) { tBufferDestroy(writer->local + i); } @@ -854,7 +869,6 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *o tsdbCloseFile(&writer->fd); - ASSERT(writer->file->size > 0); STFileOp op = (STFileOp){ .optype = TSDB_FOP_CREATE, .fid = writer->config->fid, diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil2.c b/source/dnode/vnode/src/tsdb/tsdbUtil2.c index 7ada3085b1..e13e520cbf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil2.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil2.c @@ -171,7 +171,7 @@ static int32_t tStatisBlockUpdate(STbStatisBlock *block, SRowInfo *row) { TAOS_CHECK_RETURN(tBufferPutAt(&block->counts, (block->numOfRecords - 1) * sizeof(record.count), &record.count, sizeof(record.count))); } else { - ASSERT(0); + return TSDB_CODE_INVALID_PARA; } return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeAsync.c b/source/dnode/vnode/src/vnd/vnodeAsync.c index 2ddd3c9d3e..1208b06337 100644 --- a/source/dnode/vnode/src/vnd/vnodeAsync.c +++ b/source/dnode/vnode/src/vnd/vnodeAsync.c @@ -165,9 +165,7 @@ static int32_t vnodeAsyncTaskDone(SVAsync *async, SVATask *task) { } ret = vHashDrop(async->taskTable, task); - if (ret != 0) { - ASSERT(0); - } + TAOS_UNUSED(ret); async->numTasks--; if (task->numWait == 0) { @@ -403,7 +401,6 @@ static int32_t vnodeAsyncDestroy(SVAsync **async) { } (void)taosThreadJoin((*async)->workers[i].thread, NULL); - ASSERT((*async)->workers[i].state == EVA_WORKER_STATE_STOP); (*async)->workers[i].state = EVA_WORKER_STATE_UINIT; } @@ -413,18 +410,11 @@ static int32_t vnodeAsyncDestroy(SVAsync **async) { channel->prev->next = channel->next; int32_t ret = vHashDrop((*async)->channelTable, channel); - if (ret) { - ASSERT(0); - } + TAOS_UNUSED(ret); (*async)->numChannels--; taosMemoryFree(channel); } - ASSERT((*async)->numLaunchWorkers == 0); - ASSERT((*async)->numIdleWorkers == 0); - ASSERT((*async)->numChannels == 0); - ASSERT((*async)->numTasks == 0); - (void)taosThreadMutexDestroy(&(*async)->mutex); (void)taosThreadCondDestroy(&(*async)->hasTask); @@ -438,7 +428,6 @@ static int32_t vnodeAsyncDestroy(SVAsync **async) { static int32_t vnodeAsyncLaunchWorker(SVAsync *async) { for (int32_t i = 0; i < async->numWorkers; i++) { - ASSERT(async->workers[i].state != EVA_WORKER_STATE_IDLE); if (async->workers[i].state == EVA_WORKER_STATE_ACTIVE) { continue; } else if (async->workers[i].state == EVA_WORKER_STATE_STOP) { diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 8fcbe49f9a..70b40e8d0b 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -302,7 +302,6 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { TSDB_CHECK_CODE(code, lino, _exit); (void)taosThreadMutexLock(&pVnode->mutex); - ASSERT(pVnode->onCommit == NULL); pVnode->onCommit = pVnode->inUse; pVnode->inUse = NULL; (void)taosThreadMutexUnlock(&pVnode->mutex); @@ -339,7 +338,7 @@ static void vnodeReturnBufPool(SVnode *pVnode) { pVnode->recycleTail = pPool; } } else { - ASSERT(0); + vError("vgId:%d, buffer pool %p of id %d nRef:%d", TD_VID(pVnode), pPool, pPool->id, nRef); } (void)taosThreadMutexUnlock(&pVnode->mutex); diff --git a/source/dnode/vnode/src/vnd/vnodeHash.c b/source/dnode/vnode/src/vnd/vnodeHash.c index 00fc2dfc00..96ad759a90 100644 --- a/source/dnode/vnode/src/vnd/vnodeHash.c +++ b/source/dnode/vnode/src/vnd/vnodeHash.c @@ -77,7 +77,6 @@ int32_t vHashDestroy(SVHashTable** ht) { } if (*ht) { - ASSERT((*ht)->numEntries == 0); taosMemoryFree((*ht)->buckets); taosMemoryFree(*ht); (*ht) = NULL; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index ed008d4f88..989faa3a0f 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -558,7 +558,9 @@ void vnodeClose(SVnode *pVnode) { // start the sync timer after the queue is ready int32_t vnodeStart(SVnode *pVnode) { - ASSERT(pVnode); + if (pVnode == NULL) { + return TSDB_CODE_INVALID_PARA; + } return vnodeSyncStart(pVnode); } From 7073f204ac4594d7271efc6f8e82594bf2bbc8f9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 20 Aug 2024 11:00:02 +0800 Subject: [PATCH 21/22] refactor: do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 160ff2e13c..2288e8bbce 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -903,6 +903,7 @@ int32_t tLDataIterNextRow(SLDataIter *pIter, const char *idStr, bool* hasNext) { pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow); _exit: + tsdbError("failed to exec stt-file nextIter, lino:%d, code:%s, %s", lino, tstrerror(code), idStr); *hasNext = (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL); return code; } From bbdd1f655b62698733e99f0f1188d9761c68b70f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 20 Aug 2024 11:04:44 +0800 Subject: [PATCH 22/22] fix(stream): send kill checkpoint trans to mnode when trying to close vnode. --- source/dnode/vnode/src/tq/tq.c | 4 ++-- source/libs/stream/src/streamMeta.c | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a70a04f23d..5fc550da32 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1204,11 +1204,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) } if (req.mndTrigger) { - qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d, ", pTask->id.idStr, + tqInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d, ", pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId); } else { const char* pPrevStatus = streamTaskGetStatusStr(streamTaskGetPrevStatus(pTask)); - qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 + tqInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d after transfer-state, prev status:%s", pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId, pPrevStatus); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a9976760b6..b6be1d04ca 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1260,7 +1260,9 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) pMeta->stage = stage; // mark the sign to send msg before close all tasks - if ((!isLeader) && (pMeta->role == NODE_ROLE_LEADER)) { + // 1. for leader vnode, always send msg before closing + // 2. for follower vnode, if it's is changed from leader, also sending msg before closing. + if (pMeta->role == NODE_ROLE_LEADER) { pMeta->sendMsgBeforeClosing = true; }