diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e8d5dfd1f5..bd67af712a 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -704,10 +704,10 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask pReq->streamId = pTask->id.streamId; STransAction action = {0}; - SEpSet epset = {0}; - if(pTask->info.nodeId == SNODE_HANDLE){ + SEpSet epset = {0}; + if (pTask->info.nodeId == SNODE_HANDLE) { SSnodeObj *pObj = NULL; - void *pIter = NULL; + void *pIter = NULL; while (1) { pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj); if (pIter == NULL) { @@ -717,10 +717,16 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port); sdbRelease(pMnode->pSdb, pObj); } - }else{ + } else { SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); - epset = mndGetVgroupEpset(pMnode, pVgObj); - mndReleaseVgroup(pMnode, pVgObj); + if (pVgObj != NULL) { + epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + } else { + mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", pTask->id.taskId, pTask->info.nodeId); + taosMemoryFree(pReq); + return 0; + } } // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. @@ -1657,6 +1663,7 @@ static void setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDat STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); if (pe == NULL) { + mError("task:0x%" PRIx64 " not exists in vnode, no valid status/stage info", id.taskId); return; } diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index fa36d69d6e..43e85a9405 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -48,8 +48,7 @@ int32_t clearFinishedTrans(SMnode* pMnode) { void* pKey = taosHashGetKey(pEntry, &keyLen); // key is the name of src/dst db name SKeyInfo info = {.pKey = pKey, .keyLen = keyLen}; - - mDebug("transId:%d %s startTs:%" PRId64 "cleared due to finished", pEntry->transId, pEntry->name, + mDebug("transId:%d %s startTs:%" PRId64 " cleared since finished", pEntry->transId, pEntry->name, pEntry->startTime); taosArrayPush(pList, &info); } else { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index a2ef109800..52ee6d0b14 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -67,7 +67,7 @@ static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond static int32_t doBuildDataBlock(STsdbReader* pReader); static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo); -static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader); +static bool hasDataInSttBlock(STableBlockScanInfo *pInfo); static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order); static void resetTableListIndex(SReaderStatus* pStatus); @@ -1466,7 +1466,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; int64_t tsLast = INT64_MIN; - if (hasDataInSttBlock(pSttBlockReader)) { + if (hasDataInSttBlock(pBlockScanInfo)) { tsLast = getCurrentKeyInSttBlock(pSttBlockReader); } @@ -1485,7 +1485,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* int64_t minKey = 0; if (pReader->info.order == TSDB_ORDER_ASC) { minKey = INT64_MAX; // chosen the minimum value - if (minKey > tsLast && hasDataInSttBlock(pSttBlockReader)) { + if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo)) { minKey = tsLast; } @@ -1498,7 +1498,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } else { minKey = INT64_MIN; - if (minKey < tsLast && hasDataInSttBlock(pSttBlockReader)) { + if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo)) { minKey = tsLast; } @@ -1705,7 +1705,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* } bool dataInDataFile = hasDataInFileBlock(pBlockData, pDumpInfo); - bool dataInSttFile = hasDataInSttBlock(pSttBlockReader); + bool dataInSttFile = hasDataInSttBlock(pBlockScanInfo); if (dataInDataFile && (!dataInSttFile)) { // no stt file block available, only data block exists return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); @@ -1791,7 +1791,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader); int64_t tsLast = INT64_MIN; - if (hasDataInSttBlock(pSttBlockReader)) { + if (hasDataInSttBlock(pBlockScanInfo)) { tsLast = getCurrentKeyInSttBlock(pSttBlockReader); } @@ -1840,7 +1840,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* minKey = key; } - if (minKey > tsLast && hasDataInSttBlock(pSttBlockReader)) { + if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo)) { minKey = tsLast; } } else { @@ -1857,7 +1857,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* minKey = key; } - if (minKey < tsLast && hasDataInSttBlock(pSttBlockReader)) { + if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo)) { minKey = tsLast; } } @@ -2065,7 +2065,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan // the stt block reader has been initialized for this table. if (pSttBlockReader->uid == pScanInfo->uid) { - return hasDataInSttBlock(pSttBlockReader); + return hasDataInSttBlock(pScanInfo); } if (pSttBlockReader->uid != 0) { @@ -2158,7 +2158,9 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan return hasData; } -static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader) { return pSttBlockReader->mergeTree.pIter != NULL; } +static bool hasDataInSttBlock(STableBlockScanInfo *pInfo) { + return pInfo->sttKeyInfo.status == STT_FILE_HAS_DATA; +} bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) { if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) { @@ -2733,7 +2735,7 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { int64_t st = taosGetTimestampUs(); while (1) { // no data in stt block and block, no need to proceed. - if (!hasDataInSttBlock(pSttBlockReader)) { + if (!hasDataInSttBlock(pScanInfo)) { break; } @@ -2850,7 +2852,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { initSttBlockReader(pSttBlockReader, pScanInfo, pReader); // no data in stt block, no need to proceed. - while (hasDataInSttBlock(pSttBlockReader)) { + while (hasDataInSttBlock(pScanInfo)) { ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA); code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pSttBlockReader); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 3c26badc0e..a223a2dc2d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -252,6 +252,7 @@ static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) { taosArrayClear(pScanInfo->pFileDelData); // del data from each file set pScanInfo->cleanSttBlocks = false; pScanInfo->numOfRowsInStt = 0; + pScanInfo->sttBlockReturned = false; INIT_TIMEWINDOW(&pScanInfo->sttWindow); INIT_TIMEWINDOW(&pScanInfo->filesetWindow); pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;