diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 871dd60741..3489b359cb 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -216,6 +216,7 @@ int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qStreamRestoreParam(qTaskInfo_t tinfo); bool qStreamRecoverScanFinished(qTaskInfo_t tinfo); +void qStreamCloseTsdbReader(void* task); #ifdef __cplusplus } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index a7564e352c..d7851d6fe7 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -54,7 +54,7 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodePreClose(SVnode *pVnode); -void vnodeSyncCheckTimeout(SVnode* pVnode); +void vnodeSyncCheckTimeout(SVnode *pVnode); void vnodeClose(SVnode *pVnode); int32_t vnodeStart(SVnode *pVnode); @@ -175,7 +175,8 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL void tsdbReaderClose(STsdbReader *pReader); bool tsdbNextDataBlock(STsdbReader *pReader); void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow); -int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock* pDataBlock, bool *allHave); +int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave); +void tsdbReleaseDataBlock(STsdbReader *pReader); SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond); int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); @@ -185,7 +186,7 @@ void *tsdbGetIvtIdx(SMeta *pMeta); uint64_t getReaderMaxVersion(STsdbReader *pReader); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, - uint64_t suid, void **pReader, const char* idstr); + uint64_t suid, void **pReader, const char *idstr); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); void *tsdbCacherowsReaderClose(void *pReader); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index acdc038299..a58301adf2 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -65,6 +65,7 @@ typedef struct SSmaInfo SSmaInfo; typedef struct SBlockCol SBlockCol; typedef struct SVersionRange SVersionRange; typedef struct SLDataIter SLDataIter; +typedef struct SQueryNode SQueryNode; typedef struct SDiskCol SDiskCol; typedef struct SDiskData SDiskData; typedef struct SDiskDataBuilder SDiskDataBuilder; @@ -208,11 +209,13 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in uint8_t **ppBuf); // tsdbMemTable ============================================================================================== // SMemTable +typedef int32_t (*_tsdb_reseek_func_t)(void *pQHandle); + int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); void tsdbMemTableDestroy(SMemTable *pMemTable); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); -void tsdbRefMemTable(SMemTable *pMemTable); -void tsdbUnrefMemTable(SMemTable *pMemTable); +int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_t reseek, SQueryNode **ppNode); +int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); // STbDataIter int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter); @@ -290,8 +293,8 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader); int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData); int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); // tsdbRead.c ============================================================================================== -int32_t tsdbTakeReadSnap(STsdb *pTsdb, STsdbReadSnap **ppSnap, const char *id); -void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap, const char *id); +int32_t tsdbTakeReadSnap(STsdbReader *pReader, _tsdb_reseek_func_t reseek, STsdbReadSnap **ppSnap); +void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap); // tsdbMerge.c ============================================================================================== int32_t tsdbMerge(STsdb *pTsdb); @@ -366,11 +369,20 @@ struct STbData { STbData *next; }; +struct SQueryNode { + SQueryNode *pNext; + SQueryNode **ppNext; + void *pQHandle; + _tsdb_reseek_func_t reseek; +}; + struct SMemTable { SRWLatch latch; STsdb *pTsdb; SVBufPool *pPool; volatile int32_t nRef; + int64_t minVer; + int64_t maxVer; TSKEY minKey; TSKEY maxKey; int64_t nRow; @@ -380,6 +392,7 @@ struct SMemTable { int32_t nBucket; STbData **aBucket; }; + SQueryNode qList; }; struct TSDBROW { @@ -605,9 +618,11 @@ struct SDelFWriter { }; struct STsdbReadSnap { - SMemTable *pMem; - SMemTable *pIMem; - STsdbFS fs; + SMemTable *pMem; + SQueryNode *pNode; + SMemTable *pIMem; + SQueryNode *pINode; + STsdbFS fs; }; struct SDataFWriter { @@ -726,6 +741,9 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); // tsdbCache ============================================================================================== typedef struct SCacheRowsReader { + STsdb *pTsdb; + SVersionRange verRange; + TdThreadMutex readerMutex; SVnode *pVnode; STSchema *pSchema; uint64_t uid; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index e7f0a6d297..c9120c6e6c 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -87,7 +87,8 @@ typedef struct SCommitInfo SCommitInfo; #define VNODE_RSMA1_DIR "rsma1" #define VNODE_RSMA2_DIR "rsma2" -#define VND_INFO_FNAME "vnode.json" +#define VNODE_BUF_POOL_SEG 1 // TODO: change parameter here for sync/async commit +#define VND_INFO_FNAME "vnode.json" // vnd.h diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 141f79ee41..a27b6988c5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -725,6 +725,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; + tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey); + taosWLockLatch(&pTq->pushLock); int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); if (code != 0) { @@ -791,6 +793,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL SMqRebVgReq req = {0}; tDecodeSMqRebVgReq(msg, &req); // todo lock + + tqDebug("vgId:%d, tq process sub req %s", pTq->pVnode->config.vgId, req.subKey); + STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { if (req.oldConsumerId != -1) { @@ -881,6 +886,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_add_fetch_32(&pHandle->epoch, 1); taosMemoryFree(req.qmsg); + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + qStreamCloseTsdbReader(pHandle->execHandle.task); + } if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { } // close handle diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 9f48125638..1c1626ba5c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -109,6 +109,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, p->type = type; p->pVnode = pVnode; + p->pTsdb = p->pVnode->pTsdb; + p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}; p->numOfCols = numOfCols; p->suid = suid; @@ -145,6 +147,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, } p->idstr = taosMemoryStrDup(idstr); + taosThreadMutexInit(&p->readerMutex, NULL); *pReader = p; return TSDB_CODE_SUCCESS; @@ -164,7 +167,9 @@ void* tsdbCacherowsReaderClose(void* pReader) { destroyLastBlockLoadInfo(p->pLoadInfo); - taosMemoryFree((void*) p->idstr); + taosMemoryFree((void*)p->idstr); + taosThreadMutexDestroy(&p->readerMutex); + taosMemoryFree(pReader); return NULL; } @@ -199,6 +204,19 @@ static void freeItem(void* pItem) { } } +static int32_t tsdbCacheQueryReseek(void* pQHandle) { + int32_t code = 0; + SCacheRowsReader* pReader = pQHandle; + + taosThreadMutexLock(&pReader->readerMutex); + + // pause current reader's state if not paused, save ts & version for resuming + // just wait for the big all tables' snapshot untaking for now + + taosThreadMutexUnlock(&pReader->readerMutex); + return code; +} + int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) { if (pReader == NULL || pResBlock == NULL) { return TSDB_CODE_INVALID_PARA; @@ -241,7 +259,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 taosArrayPush(pLastCols, &p); } - tsdbTakeReadSnap(pr->pVnode->pTsdb, &pr->pReadSnap, "cache-l"); + taosThreadMutexLock(&pr->readerMutex); + tsdbTakeReadSnap((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap); pr->pDataFReader = NULL; pr->pDataFReaderLast = NULL; @@ -355,8 +374,9 @@ _end: tsdbDataFReaderClose(&pr->pDataFReaderLast); tsdbDataFReaderClose(&pr->pDataFReader); - tsdbUntakeReadSnap(pr->pVnode->pTsdb, pr->pReadSnap, "cache-l"); resetLastBlockLoadInfo(pr->pLoadInfo); + tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap); + taosThreadMutexUnlock(&pr->readerMutex); for (int32_t j = 0; j < pr->numOfCols; ++j) { taosMemoryFree(pRes[j]); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 92451dcf58..31d6fd85c6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -175,7 +175,7 @@ int32_t tsdbCommit(STsdb *pTsdb, SCommitInfo *pInfo) { pTsdb->imem = NULL; taosThreadRwlockUnlock(&pTsdb->rwLock); - tsdbUnrefMemTable(pMemTable); + tsdbUnrefMemTable(pMemTable, NULL); goto _exit; } @@ -1664,7 +1664,7 @@ int32_t tsdbFinishCommit(STsdb *pTsdb) { // unlock taosThreadRwlockUnlock(&pTsdb->rwLock); if (pMemTable) { - tsdbUnrefMemTable(pMemTable); + tsdbUnrefMemTable(pMemTable, NULL); } _exit: diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 953b4884a3..00c126bcb6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -50,6 +50,8 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { pMemTable->pTsdb = pTsdb; pMemTable->pPool = pTsdb->pVnode->inUse; pMemTable->nRef = 1; + pMemTable->minVer = VERSION_MAX; + pMemTable->maxVer = VERSION_MIN; pMemTable->minKey = TSKEY_MAX; pMemTable->maxKey = TSKEY_MIN; pMemTable->nRow = 0; @@ -62,6 +64,8 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { taosMemoryFree(pMemTable); goto _err; } + pMemTable->qList.pNext = &pMemTable->qList; + pMemTable->qList.ppNext = &pMemTable->qList.pNext; vnodeBufPoolRef(pMemTable->pPool); *ppMemTable = pMemTable; @@ -146,6 +150,10 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmi } if (code) goto _err; + // update + pMemTable->minVer = TMIN(pMemTable->minVer, version); + pMemTable->maxVer = TMAX(pMemTable->maxVer, version); + return code; _err: @@ -197,6 +205,8 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid } pMemTable->nDel++; + pMemTable->minVer = TMIN(pMemTable->minVer, version); + pMemTable->maxVer = TMIN(pMemTable->maxVer, version); if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) { tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey); @@ -237,7 +247,6 @@ void *tsdbTbDataIterDestroy(STbDataIter *pIter) { if (pIter) { taosMemoryFree(pIter); } - return NULL; } @@ -740,16 +749,45 @@ _exit: int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; } -void tsdbRefMemTable(SMemTable *pMemTable) { +int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_t reseek, SQueryNode **ppNode) { + int32_t code = 0; + int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); ASSERT(nRef > 0); + /* + // register handle (todo: take concurrency in consideration) + *ppNode = taosMemoryMalloc(sizeof(SQueryNode)); + if (*ppNode == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + (*ppNode)->pQHandle = pQHandle; + (*ppNode)->reseek = reseek; + (*ppNode)->pNext = pMemTable->qList.pNext; + (*ppNode)->ppNext = &pMemTable->qList.pNext; + pMemTable->qList.pNext->ppNext = &(*ppNode)->pNext; + pMemTable->qList.pNext = *ppNode; + */ +_exit: + return code; } -void tsdbUnrefMemTable(SMemTable *pMemTable) { +int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode) { + int32_t code = 0; + /* + // unregister handle (todo: take concurrency in consideration) + if (pNode) { + pNode->pNext->ppNext = pNode->ppNext; + *pNode->ppNext = pNode->pNext; + taosMemoryFree(pNode); + } + */ int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1); if (nRef == 0) { tsdbMemTableDestroy(pMemTable); } + + return code; } static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) { @@ -789,3 +827,29 @@ SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable) { _exit: return aTbDataP; } + +int32_t tsdbRecycleMemTable(SMemTable *pMemTable) { + int32_t code = 0; + + SQueryNode *pNode = pMemTable->qList.pNext; + while (1) { + ASSERT(pNode != &pMemTable->qList); + SQueryNode *pNextNode = pNode->pNext; + + if (pNextNode == &pMemTable->qList) { + code = (*pNode->reseek)(pNode->pQHandle); + if (code) goto _exit; + break; + } else { + code = (*pNode->reseek)(pNode->pQHandle); + if (code) goto _exit; + pNode = pMemTable->qList.pNext; + ASSERT(pNode == pNextNode); + } + } + + // NOTE: Take care here, pMemTable is destroyed + +_exit: + return code; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 1938f751a2..87b5e59bc0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -156,6 +156,9 @@ typedef struct SBlockInfoBuf { struct STsdbReader { STsdb* pTsdb; + SVersionRange verRange; + TdThreadMutex readerMutex; + bool suspended; uint64_t suid; int16_t order; bool freeBlock; @@ -173,10 +176,10 @@ struct STsdbReader { SDataFReader* pFileReader; // the file reader SDelFReader* pDelFReader; // the del file reader SArray* pDelIdx; // del file block index; - SVersionRange verRange; - SBlockInfoBuf blockInfoBuf; - int32_t step; - STsdbReader* innerReader[2]; + // SVersionRange verRange; + SBlockInfoBuf blockInfoBuf; + int32_t step; + STsdbReader* innerReader[2]; }; static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); @@ -399,9 +402,7 @@ static void destroyAllBlockScanInfo(SHashObj* pTableMap) { taosHashCleanup(pTableMap); } -static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { - return pWindow->skey > pWindow->ekey; -} +static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { return pWindow->skey > pWindow->ekey; } // Update the query time window according to the data time to live(TTL) information, in order to avoid to return // the expired data to client, even it is queried already. @@ -635,6 +636,8 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd setColumnIdSlotList(&pReader->suppInfo, pCond->colList, pCond->pSlotList, pCond->numOfCols); + taosThreadMutexInit(&pReader->readerMutex, NULL); + *ppReader = pReader; return code; @@ -729,12 +732,25 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN sizeInDisk += pScanInfo->mapData.nData; + int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; + STimeWindow w = pReader->window; + if (ASCENDING_TRAVERSE(pReader->order)) { + w.skey = pScanInfo->lastKey + step; + } else { + w.ekey = pScanInfo->lastKey + step; + } + + if (isEmptyQueryTimeWindow(&w)) { + continue; + } + SDataBlk block = {0}; for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) { tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block); // 1. time range check - if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) { + // if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) { + if (block.minKey.ts > w.ekey || block.maxKey.ts < w.skey) { continue; } @@ -915,7 +931,7 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo int32_t step = asc ? 1 : -1; // make sure it is aligned to 8bit, the allocated memory address is aligned to 256bit -// ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0); + // ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0); // 1. copy data in a batch model memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes); @@ -2170,9 +2186,11 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea TSDBKEY startKey = {0}; if (ASCENDING_TRAVERSE(pReader->order)) { - startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer}; + // startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer}; + startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->verRange.minVer}; } else { - startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; + // startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; + startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->verRange.maxVer}; } int32_t backward = (!ASCENDING_TRAVERSE(pReader->order)); @@ -2295,7 +2313,7 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLas bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) { if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) { - return false; // this is an invalid result. + return false; // this is an invalid result. } return pBlockData->nRow > 0 && (!pDumpInfo->allDumped); } @@ -2451,7 +2469,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { while (1) { bool hasBlockData = false; { - while (pBlockData->nRow > 0 && pBlockData->uid == pBlockScanInfo->uid) { // find the first qualified row in data block + while (pBlockData->nRow > 0 && + pBlockData->uid == pBlockScanInfo->uid) { // find the first qualified row in data block if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) { hasBlockData = true; break; @@ -2882,9 +2901,16 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { } // set the correct start position in case of the first/last file block, according to the query time window -void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) { - SDataBlk* pBlock = getCurrentBlock(pBlockIter); - +static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) { + int64_t lastKey = ASCENDING_TRAVERSE(pReader->order) ? INT64_MIN : INT64_MAX; + SDataBlk* pBlock = getCurrentBlock(pBlockIter); + SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); + if (pBlockInfo) { + STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + if (pScanInfo) { + lastKey = pScanInfo->lastKey; + } + } SReaderStatus* pStatus = &pReader->status; SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo; @@ -2892,6 +2918,7 @@ void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) { pDumpInfo->totalRows = pBlock->nRow; pDumpInfo->allDumped = false; pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1; + pDumpInfo->lastKey = lastKey; } static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) { @@ -3096,7 +3123,8 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32 return false; } else if (pKey->ts == last->ts) { TSDBKEY* prev = taosArrayGet(pDelList, num - 2); - return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer && prev->version >= pVerRange->minVer); + return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer && + prev->version >= pVerRange->minVer); } } else { TSDBKEY* pCurrent = taosArrayGet(pDelList, *index); @@ -3285,7 +3313,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn *state = CHECK_FILEBLOCK_QUIT; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; - bool loadNeighbor = true; + bool loadNeighbor = true; int32_t code = loadNeighborIfOverlap(pFBlock, pScanInfo, pReader, &loadNeighbor); if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) { @@ -3826,40 +3854,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL goto _err; } - if (numOfTables > 0) { - code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } - - if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) { - code = doOpenReaderImpl(pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } else { - STsdbReader* pPrevReader = pReader->innerReader[0]; - STsdbReader* pNextReader = pReader->innerReader[1]; - - // we need only one row - pPrevReader->capacity = 1; - pPrevReader->status.pTableMap = pReader->status.pTableMap; - pPrevReader->pSchema = pReader->pSchema; - pPrevReader->pMemSchema = pReader->pMemSchema; - pPrevReader->pReadSnap = pReader->pReadSnap; - - pNextReader->capacity = 1; - pNextReader->status.pTableMap = pReader->status.pTableMap; - pNextReader->pSchema = pReader->pSchema; - pNextReader->pMemSchema = pReader->pMemSchema; - pNextReader->pReadSnap = pReader->pReadSnap; - - code = doOpenReaderImpl(pPrevReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - } + pReader->suspended = true; tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); return code; @@ -3932,7 +3927,10 @@ void tsdbReaderClose(STsdbReader* pReader) { pReader->pDelIdx = NULL; } - tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr); + qTrace("tsdb/reader: %p, untake snapshot", pReader); + tsdbUntakeReadSnap(pReader, pReader->pReadSnap); + + taosThreadMutexDestroy(&pReader->readerMutex); taosMemoryFree(pReader->status.uidCheckInfo.tableUidList); SIOCostSummary* pCost = &pReader->cost; @@ -3964,9 +3962,156 @@ void tsdbReaderClose(STsdbReader* pReader) { if (pReader->pMemSchema != pReader->pSchema) { taosMemoryFree(pReader->pMemSchema); } + taosMemoryFreeClear(pReader); } +int32_t tsdbReaderSuspend(STsdbReader* pReader) { + int32_t code = 0; + + // save reader's base state & reset top state to be reconstructed from base state + SReaderStatus* pStatus = &pReader->status; + STableBlockScanInfo* pBlockScanInfo = NULL; + + if (pStatus->loadFromFile) { + SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); + if (pBlockInfo != NULL) { + pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + if (pBlockScanInfo == NULL) { + code = TSDB_CODE_INVALID_PARA; + tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid, + taosHashGetSize(pReader->status.pTableMap), pReader->idStr); + goto _err; + } + } else { + pBlockScanInfo = *pStatus->pTableIter; + } + + tsdbDataFReaderClose(&pReader->pFileReader); + + // resetDataBlockScanInfo excluding lastKey + STableBlockScanInfo* p = NULL; + + while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) { + p->iterInit = false; + p->iiter.hasVal = false; + if (p->iter.iter != NULL) { + p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter); + } + + p->delSkyline = taosArrayDestroy(p->delSkyline); + // p->lastKey = ts; + } + } else { + pBlockScanInfo = *pStatus->pTableIter; + if (pBlockScanInfo) { + // save lastKey to restore memory iterator + STimeWindow w = pReader->pResBlock->info.window; + pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? w.ekey : w.skey; + + // reset current current table's data block scan info, + pBlockScanInfo->iterInit = false; + // pBlockScanInfo->iiter.hasVal = false; + if (pBlockScanInfo->iter.iter != NULL) { + pBlockScanInfo->iter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iter.iter); + } + + if (pBlockScanInfo->iiter.iter != NULL) { + pBlockScanInfo->iiter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iiter.iter); + } + + pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList); + tMapDataClear(&pBlockScanInfo->mapData); + // TODO: keep skyline for reuse + pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline); + } + } + + tsdbUntakeReadSnap(pReader, pReader->pReadSnap); + + pReader->suspended = true; + + tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo->uid, pReader->idStr); + return code; + +_err: + tsdbError("failed to suspend data reader, code:%s %s", tstrerror(code), pReader->idStr); + return code; +} + +static int32_t tsdbSetQueryReseek(void* pQHandle) { + int32_t code = 0; + STsdbReader* pReader = pQHandle; + + taosThreadMutexLock(&pReader->readerMutex); + + if (pReader->suspended) { + taosThreadMutexUnlock(&pReader->readerMutex); + return code; + } + + tsdbReaderSuspend(pReader); + + taosThreadMutexUnlock(&pReader->readerMutex); + + return code; +} + +int32_t tsdbReaderResume(STsdbReader* pReader) { + int32_t code = 0; + + STableBlockScanInfo** pBlockScanInfo = pReader->status.pTableIter; + + // restore reader's state + // task snapshot + int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap); + if (numOfTables > 0) { + qTrace("tsdb/reader: %p, take snapshot", pReader); + code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) { + code = doOpenReaderImpl(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } else { + STsdbReader* pPrevReader = pReader->innerReader[0]; + STsdbReader* pNextReader = pReader->innerReader[1]; + + // we need only one row + pPrevReader->capacity = 1; + pPrevReader->status.pTableMap = pReader->status.pTableMap; + pPrevReader->pSchema = pReader->pSchema; + pPrevReader->pMemSchema = pReader->pMemSchema; + pPrevReader->pReadSnap = pReader->pReadSnap; + + pNextReader->capacity = 1; + pNextReader->status.pTableMap = pReader->status.pTableMap; + pNextReader->pSchema = pReader->pSchema; + pNextReader->pMemSchema = pReader->pMemSchema; + pNextReader->pReadSnap = pReader->pReadSnap; + + code = doOpenReaderImpl(pPrevReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + } + + pReader->suspended = false; + + tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRId32 ", in this query %s", pReader, + pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr); + return code; + +_err: + tsdbError("failed to resume data reader, code:%s %s", tstrerror(code), pReader->idStr); + return code; +} + static bool doTsdbNextDataBlock(STsdbReader* pReader) { // cleanup the data that belongs to the previous data block SSDataBlock* pBlock = pReader->pResBlock; @@ -4000,10 +4145,24 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { return false; } + SReaderStatus* pStatus = &pReader->status; + + qTrace("tsdb/read: %p, take read mutex", pReader); + taosThreadMutexLock(&pReader->readerMutex); + if (pReader->suspended) { + tsdbReaderResume(pReader); + } + if (pReader->innerReader[0] != NULL && pReader->step == 0) { bool ret = doTsdbNextDataBlock(pReader->innerReader[0]); pReader->step = EXTERNAL_ROWS_PREV; if (ret) { + pStatus = &pReader->innerReader[0]->status; + if (pStatus->composedDataBlock) { + qTrace("tsdb/read: %p, unlock read mutex", pReader); + taosThreadMutexUnlock(&pReader->readerMutex); + } + return ret; } } @@ -4022,6 +4181,11 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { bool ret = doTsdbNextDataBlock(pReader); if (ret) { + if (pStatus->composedDataBlock) { + qTrace("tsdb/read: %p, unlock read mutex", pReader); + taosThreadMutexUnlock(&pReader->readerMutex); + } + return ret; } @@ -4036,10 +4200,19 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]); pReader->step = EXTERNAL_ROWS_NEXT; if (ret1) { + pStatus = &pReader->innerReader[1]->status; + if (pStatus->composedDataBlock) { + qTrace("tsdb/read: %p, unlock read mutex", pReader); + taosThreadMutexUnlock(&pReader->readerMutex); + } + return ret1; } } + qTrace("tsdb/read: %p, unlock read mutex", pReader); + taosThreadMutexUnlock(&pReader->readerMutex); + return false; } @@ -4175,12 +4348,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, } static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { - SReaderStatus* pStatus = &pReader->status; - - if (pStatus->composedDataBlock) { - return pReader->pResBlock; - } - + SReaderStatus* pStatus = &pReader->status; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter); STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); @@ -4202,20 +4370,50 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { return pReader->pResBlock; } +void tsdbReleaseDataBlock(STsdbReader* pReader) { + // SReaderStatus* pStatus = &pReader->status; + // if (!pStatus->composedDataBlock) { + taosThreadMutexUnlock(&pReader->readerMutex); + //} +} + SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { + STsdbReader* pTReader = pReader; if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { if (pReader->step == EXTERNAL_ROWS_PREV) { - return doRetrieveDataBlock(pReader->innerReader[0]); + pTReader = pReader->innerReader[0]; } else if (pReader->step == EXTERNAL_ROWS_NEXT) { - return doRetrieveDataBlock(pReader->innerReader[1]); + pTReader = pReader->innerReader[1]; } } - return doRetrieveDataBlock(pReader); + SReaderStatus* pStatus = &pTReader->status; + if (pStatus->composedDataBlock) { + return pTReader->pResBlock; + } + + SSDataBlock* ret = doRetrieveDataBlock(pTReader); + + qTrace("tsdb/read-retrieve: %p, unlock read mutex", pReader); + taosThreadMutexUnlock(&pReader->readerMutex); + + return ret; } int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { + SReaderStatus* pStatus = &pReader->status; + + qTrace("tsdb/read: %p, take read mutex", pReader); + taosThreadMutexLock(&pReader->readerMutex); + + if (pReader->suspended) { + tsdbReaderResume(pReader); + } + + taosThreadMutexUnlock(&pReader->readerMutex); + if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) { + tsdbDebug("tsdb reader reset return %p", pReader->pReadSnap); return TSDB_CODE_SUCCESS; } @@ -4343,13 +4541,18 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { int64_t rows = 0; SReaderStatus* pStatus = &pReader->status; + taosThreadMutexLock(&pReader->readerMutex); + if (pReader->suspended) { + tsdbReaderResume(pReader); + } + pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL); while (pStatus->pTableIter != NULL) { STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; STbData* d = NULL; - if (pReader->pTsdb->mem != NULL) { + if (pReader->pReadSnap->pMem != NULL) { d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid); if (d != NULL) { rows += tsdbGetNRowsInTbData(d); @@ -4357,7 +4560,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { } STbData* di = NULL; - if (pReader->pTsdb->imem != NULL) { + if (pReader->pReadSnap->pIMem != NULL) { di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid); if (di != NULL) { rows += tsdbGetNRowsInTbData(di); @@ -4368,6 +4571,8 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); } + taosThreadMutexUnlock(&pReader->readerMutex); + return rows; } @@ -4409,8 +4614,10 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 return TSDB_CODE_SUCCESS; } -int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr) { - int32_t code = 0; +int32_t tsdbTakeReadSnap(STsdbReader* pReader, _tsdb_reseek_func_t reseek, STsdbReadSnap** ppSnap) { + int32_t code = 0; + STsdb* pTsdb = pReader->pTsdb; + SVersionRange* pRange = &pReader->verRange; // alloc *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap)); @@ -4427,15 +4634,14 @@ int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr } // take snapshot - (*ppSnap)->pMem = pTsdb->mem; - (*ppSnap)->pIMem = pTsdb->imem; - - if ((*ppSnap)->pMem) { - tsdbRefMemTable((*ppSnap)->pMem); + if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) { + tsdbRefMemTable(pTsdb->mem, pReader, reseek, &(*ppSnap)->pNode); + (*ppSnap)->pMem = pTsdb->mem; } - if ((*ppSnap)->pIMem) { - tsdbRefMemTable((*ppSnap)->pIMem); + if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) { + tsdbRefMemTable(pTsdb->imem, pReader, reseek, &(*ppSnap)->pINode); + (*ppSnap)->pIMem = pTsdb->imem; } // fs @@ -4452,23 +4658,25 @@ int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr goto _exit; } - tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr); + tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode)); _exit: return code; } -void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) { +void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) { + STsdb* pTsdb = pReader->pTsdb; + if (pSnap) { if (pSnap->pMem) { - tsdbUnrefMemTable(pSnap->pMem); + tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode); } if (pSnap->pIMem) { - tsdbUnrefMemTable(pSnap->pIMem); + tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode); } tsdbFSUnref(pTsdb, &pSnap->fs); taosMemoryFree(pSnap); } - tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr); + tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode)); } diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index e67f2cc59a..9c1d54b57e 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -74,7 +74,7 @@ int vnodeOpenBufPool(SVnode *pVnode) { ASSERT(pVnode->pPool == NULL); - for (int i = 0; i < 3; i++) { + for (int i = 0; i < VNODE_BUF_POOL_SEG; i++) { // create pool if (vnodeBufPoolCreate(pVnode, size, &pPool)) { vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 0a2802ba37..a40e29ba09 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1207,3 +1207,4 @@ void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { rpcFreeCont(pMsg->pCont); destroySendMsgInfo(pSendInfo); } + diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 72419d8fe4..1d1298bf5f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2654,3 +2654,24 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta blockDataUpdateTsWindow(pBlock, 0); return TSDB_CODE_SUCCESS; } + +void qStreamCloseTsdbReader(void* task) { + if (task == NULL) return; + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task; + SOperatorInfo* pOp = pTaskInfo->pRoot; + qDebug("stream close tsdb reader, reset status uid %" PRId64 " ts %" PRId64, pTaskInfo->streamInfo.lastStatus.uid, + pTaskInfo->streamInfo.lastStatus.ts); + pTaskInfo->streamInfo.lastStatus = (STqOffsetVal){0}; + while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) { + SOperatorInfo* pDownstreamOp = pOp->pDownstream[0]; + if (pDownstreamOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + SStreamScanInfo* pInfo = pDownstreamOp->info; + if (pInfo->pTableScanOp) { + STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; + tsdbReaderClose(pTSInfo->base.dataReader); + pTSInfo->base.dataReader = NULL; + return; + } + } + } +} diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a7f5eb9b22..6d93989582 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -173,7 +173,7 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro if (NULL == *pPage) { return NULL; } - + return (SResultRow*)((char*)(*pPage) + p1->offset); } @@ -306,12 +306,14 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pCost->filterOutBlocks += 1; pCost->totalRows += pBlock->info.rows; + tsdbReleaseDataBlock(pTableScanInfo->dataReader); return TSDB_CODE_SUCCESS; } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1); pCost->skipBlocks += 1; + tsdbReleaseDataBlock(pTableScanInfo->dataReader); return TSDB_CODE_SUCCESS; } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) { pCost->loadBlockStatis += 1; @@ -321,6 +323,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1); + tsdbReleaseDataBlock(pTableScanInfo->dataReader); return TSDB_CODE_SUCCESS; } else { qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo)); @@ -342,6 +345,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca pCost->filterOutBlocks += 1; (*status) = FUNC_DATA_REQUIRED_FILTEROUT; + tsdbReleaseDataBlock(pTableScanInfo->dataReader); return TSDB_CODE_SUCCESS; } } @@ -356,7 +360,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pCost->skipBlocks += 1; - + tsdbReleaseDataBlock(pTableScanInfo->dataReader); *status = FUNC_DATA_REQUIRED_FILTEROUT; return TSDB_CODE_SUCCESS; } @@ -1584,7 +1588,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) { - qDebug("queue scan tsdb return %d rows", pResult->info.rows); + qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64, pResult->info.rows, + pResult->info.window.skey, pResult->info.window.ekey); pTaskInfo->streamInfo.returned = 1; return pResult; } else { @@ -2554,6 +2559,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { } STsdbReader* reader = pInfo->base.dataReader; + qTrace("tsdb/read-table-data: %p, enter next reader", reader); while (tsdbNextDataBlock(reader)) { if (isTaskKilled(pTaskInfo)) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); @@ -2588,6 +2594,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; + qTrace("tsdb/read-table-data: %p, close reader", reader); tsdbReaderClose(pInfo->base.dataReader); pInfo->base.dataReader = NULL; return pBlock;