From 4ade68aa317c5d6cf36ed7a3b4f7668a2427b922 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 2 Aug 2022 11:40:47 +0800 Subject: [PATCH 1/2] fix: fix clear cache caused deadlock issue --- source/libs/catalog/inc/catalogInt.h | 1 + source/libs/catalog/src/ctgCache.c | 7 +++++++ source/libs/catalog/src/ctgDbg.c | 2 +- source/libs/catalog/src/ctgRemote.c | 2 +- 4 files changed, 10 insertions(+), 2 deletions(-) diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 5b5c6010e8..816309beb1 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -416,6 +416,7 @@ typedef struct SCtgCacheOperation { bool syncOp; tsem_t rspSem; bool stopQueue; + bool unLocked; } SCtgCacheOperation; typedef struct SCtgQNode { diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 06e8216e87..2e8e259151 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -674,7 +674,13 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { tsem_post(&gCtgMgmt.queue.reqSem); if (syncOp) { + if (!operation->unLocked) { + CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); + } tsem_wait(&operation->rspSem); + if (!operation->unLocked) { + CTG_LOCK(CTG_READ, &gCtgMgmt.lock); + } taosMemoryFree(operation); } @@ -1011,6 +1017,7 @@ int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool freeCtg, bool stopQueue, bool op->opId = CTG_OP_CLEAR_CACHE; op->syncOp = syncOp; op->stopQueue = stopQueue; + op->unLocked = true; SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg)); if (NULL == msg) { diff --git a/source/libs/catalog/src/ctgDbg.c b/source/libs/catalog/src/ctgDbg.c index bd3402dc39..8333cb28c0 100644 --- a/source/libs/catalog/src/ctgDbg.c +++ b/source/libs/catalog/src/ctgDbg.c @@ -19,7 +19,7 @@ #include "catalogInt.h" extern SCatalogMgmt gCtgMgmt; -SCtgDebug gCTGDebug = {0}; +SCtgDebug gCTGDebug = {.lockEnable = true}; void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) { ASSERT(*(int32_t*)param == 1); diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 55bfc88a49..0f97b5c5b1 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -331,7 +331,7 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { SHashObj* pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); if (NULL == pBatchs) { ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM); - CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } pTask->pBatchs = pBatchs; #endif From 5cf4dac071aa87997c6d1c404e6f2b0226c1f607 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 2 Aug 2022 12:56:33 +0800 Subject: [PATCH 2/2] fix: new pSchema if reader's schema is null --- source/dnode/vnode/src/tsdb/tsdbRead.c | 27 ++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ed2558d344..21a758a342 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -146,7 +146,8 @@ static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanI static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader); static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow); -static int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex); +static int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, + int32_t rowIndex); static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order); @@ -1444,9 +1445,9 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI // 2. the direct next point is not an duplicated timestamp if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) || (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) { - int32_t step = pReader->order == TSDB_ORDER_ASC? 1:-1; + int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1; int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step]; - if (nextKey != key) { // merge is not needed + if (nextKey != key) { // merge is not needed doAppendRowFromBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex); pDumpInfo->rowIndex += step; return TSDB_CODE_SUCCESS; @@ -2134,15 +2135,18 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe int32_t sversion = TSDBROW_SVERSION(pRow); STSchema* pTSchema = NULL; - if (sversion != pReader->pSchema->version) { + if (pReader->pSchema == NULL || sversion != pReader->pSchema->version) { metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pTSchema); + if (pReader->pSchema == NULL) { + pReader->pSchema = pTSchema; + } } else { pTSchema = pReader->pSchema; } tRowMergerAdd(pMerger, pRow, pTSchema); - if (sversion != pReader->pSchema->version) { + if (pTSchema != pReader->pSchema) { taosMemoryFree(pTSchema); } } @@ -2230,7 +2234,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc int32_t step = asc ? 1 : -1; pDumpInfo->rowIndex += step; - if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) ||(pDumpInfo->rowIndex >= 0 && !asc)) { + if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) { pDumpInfo->rowIndex = doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step); } @@ -2271,8 +2275,11 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe // updateSchema(pRow, uid, pReader); int32_t sversion = TSDBROW_SVERSION(pRow); STSchema* pTSchema = NULL; - if (sversion != pReader->pSchema->version) { + if (pReader->pSchema == NULL || sversion != pReader->pSchema->version) { metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pTSchema); + if (pReader->pSchema == NULL) { + pReader->pSchema = pTSchema; + } } else { pTSchema = pReader->pSchema; } @@ -2282,7 +2289,7 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe tRowMergerGetRow(&merge, pTSRow); tRowMergerClear(&merge); - if (sversion != pReader->pSchema->version) { + if (pTSchema != pReader->pSchema) { taosMemoryFree(pTSchema); } } @@ -2425,9 +2432,9 @@ int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBloc int32_t numOfInputCols = taosArrayGetSize(pBlockData->aIdx); int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock); - while(i < numOfOutputCols && j < numOfInputCols) { + while (i < numOfOutputCols && j < numOfInputCols) { SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i); - SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j); + SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j); if (pData->cid == pCol->info.colId) { tColDataGetValue(pData, rowIndex, &cv);