From 44915af5e798d7aa4939ba62de342bd012d6373b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 15 Sep 2022 14:16:12 +0800 Subject: [PATCH 01/15] more code --- source/dnode/vnode/src/inc/vnodeInt.h | 2 ++ source/dnode/vnode/src/vnd/vnodeBufPool.c | 4 ++-- source/dnode/vnode/src/vnd/vnodeCommit.c | 4 ++-- source/dnode/vnode/src/vnd/vnodeOpen.c | 2 +- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 0e85e7bfb6..19f7f00a63 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -85,6 +85,8 @@ typedef struct SSnapDataHdr SSnapDataHdr; #define VNODE_RSMA1_DIR "rsma1" #define VNODE_RSMA2_DIR "rsma2" +#define VNODE_BUF_POOL_SEG 1 // TODO: change parameter here for sync/async commit + // vnd.h void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); void vnodeBufPoolFree(SVBufPool* pPool, void* p); diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 5a22114ab4..33e4c8406f 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -26,9 +26,9 @@ int vnodeOpenBufPool(SVnode *pVnode, int64_t size) { ASSERT(pVnode->pPool == NULL); - for (int i = 0; i < 3; i++) { + for (int i = 0; i < VNODE_BUF_POOL_SEG; i++) { // create pool - ret = vnodeBufPoolCreate(pVnode, size, &pPool); + ret = vnodeBufPoolCreate(pVnode, size / VNODE_BUF_POOL_SEG, &pPool); if (ret < 0) { vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); vnodeCloseBufPool(pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 8c73499229..87b768e9a3 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -73,7 +73,7 @@ int vnodeBegin(SVnode *pVnode) { int vnodeShouldCommit(SVnode *pVnode) { if (pVnode->inUse) { - return pVnode->inUse->size > pVnode->config.szBuf / 3; + return pVnode->inUse->size > pVnode->config.szBuf / VNODE_BUF_POOL_SEG; } return false; } @@ -236,7 +236,7 @@ int vnodeCommit(SVnode *pVnode) { // preCommit // smaSyncPreCommit(pVnode->pSma); - if(smaAsyncPreCommit(pVnode->pSma) < 0){ + if (smaAsyncPreCommit(pVnode->pSma) < 0) { ASSERT(0); return -1; } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 4ccfea4051..6dca82f6b0 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -96,7 +96,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { taosThreadCondInit(&pVnode->poolNotEmpty, NULL); // open buffer pool - if (vnodeOpenBufPool(pVnode, pVnode->config.isHeap ? 0 : pVnode->config.szBuf / 3) < 0) { + if (vnodeOpenBufPool(pVnode, pVnode->config.szBuf) < 0) { vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } From 151fd39d1a978a2067e09bb1759d8fb265f9755b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 15 Sep 2022 14:51:40 +0800 Subject: [PATCH 02/15] more code --- source/dnode/vnode/src/inc/tsdb.h | 3 + source/dnode/vnode/src/tsdb/tsdbMemTable.c | 9 ++- source/dnode/vnode/src/tsdb/tsdbRead.c | 74 ++++++++++------------ 3 files changed, 45 insertions(+), 41 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index a836fa2bc5..756763914c 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -384,6 +384,8 @@ struct SMemTable { STsdb *pTsdb; SVBufPool *pPool; volatile int32_t nRef; + int64_t minVer; + int64_t maxVer; TSKEY minKey; TSKEY maxKey; int64_t nRow; @@ -393,6 +395,7 @@ struct SMemTable { int32_t nBucket; STbData **aBucket; }; + STsdbReader *pReaderList; }; struct TSDBROW { diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index a6628463f8..0ed143aab9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -44,6 +44,8 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { pMemTable->pTsdb = pTsdb; pMemTable->pPool = pTsdb->pVnode->inUse; pMemTable->nRef = 1; + pMemTable->minVer = VERSION_MAX; + pMemTable->maxVer = VERSION_MIN; pMemTable->minKey = TSKEY_MAX; pMemTable->maxKey = TSKEY_MIN; pMemTable->nRow = 0; @@ -130,6 +132,10 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI goto _err; } + // update + pMemTable->minVer = TMIN(pMemTable->minVer, version); + pMemTable->maxVer = TMAX(pMemTable->maxVer, version); + return code; _err: @@ -179,6 +185,8 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid } pMemTable->nDel++; + pMemTable->minVer = TMIN(pMemTable->minVer, version); + pMemTable->maxVer = TMIN(pMemTable->maxVer, version); if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) { tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey); @@ -219,7 +227,6 @@ void *tsdbTbDataIterDestroy(STbDataIter *pIter) { if (pIter) { taosMemoryFree(pIter); } - return NULL; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 60d967681b..73db13afe6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -758,51 +758,43 @@ static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int or s = pos; // check - assert(pos >=0 && pos < num); + assert(pos >= 0 && pos < num); assert(num > 0); if (order == TSDB_ORDER_ASC) { // find the first position which is smaller than the key - e = num - 1; - if (key < keyList[pos]) - return -1; + e = num - 1; + if (key < keyList[pos]) return -1; while (1) { // check can return - if (key >= keyList[e]) - return e; - if (key <= keyList[s]) - return s; - if (e - s <= 1) - return s; + if (key >= keyList[e]) return e; + if (key <= keyList[s]) return s; + if (e - s <= 1) return s; // change start or end position - int mid = s + (e - s + 1)/2; + int mid = s + (e - s + 1) / 2; if (keyList[mid] > key) e = mid; - else if(keyList[mid] < key) + else if (keyList[mid] < key) s = mid; else return mid; } - } else { // DESC + } else { // DESC // find the first position which is bigger than the key - e = 0; - if (key > keyList[pos]) - return -1; + e = 0; + if (key > keyList[pos]) return -1; while (1) { // check can return - if (key <= keyList[e]) - return e; - if (key >= keyList[s]) - return s; - if (s - e <= 1) - return s; + if (key <= keyList[e]) return e; + if (key >= keyList[s]) return s; + if (s - e <= 1) return s; // change start or end position - int mid = s - (s - e + 1)/2; + int mid = s - (s - e + 1) / 2; if (keyList[mid] < key) e = mid; - else if(keyList[mid] > key) + else if (keyList[mid] > key) s = mid; else return mid; @@ -813,7 +805,7 @@ static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int or int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) { // NOTE: reverse the order to find the end position in data block int32_t endPos = -1; - bool asc = ASCENDING_TRAVERSE(pReader->order); + bool asc = ASCENDING_TRAVERSE(pReader->order); if (asc && pReader->window.ekey >= pBlock->maxKey.ts) { endPos = pBlock->nRow - 1; @@ -849,8 +841,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) { pDumpInfo->rowIndex = pBlock->nRow - 1; } else { - int32_t pos = asc? pBlock->nRow-1:0; - int32_t order = (pReader->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; + int32_t pos = asc ? pBlock->nRow - 1 : 0; + int32_t order = (pReader->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, pReader->window.skey, order); } @@ -863,13 +855,13 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn endIndex += step; int32_t remain = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex); - if (remain > pReader->capacity) { // output buffer check + if (remain > pReader->capacity) { // output buffer check remain = pReader->capacity; } int32_t rowIndex = 0; - int32_t i = 0; + int32_t i = 0; SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i); if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { if (asc) { @@ -938,7 +930,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex]; setBlockAllDumped(pDumpInfo, ts, pReader->order); } else { - int64_t k = asc? pBlock->maxKey.ts:pBlock->minKey.ts; + int64_t k = asc ? pBlock->maxKey.ts : pBlock->minKey.ts; setBlockAllDumped(pDumpInfo, k, pReader->order); } @@ -948,8 +940,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1; tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", - pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows, - pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr); + pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, + unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr); return TSDB_CODE_SUCCESS; } @@ -2242,7 +2234,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { if (pBlockInfo != NULL) { pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); - TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader); + TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader); // it is a clean block, load it directly if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader)) { @@ -2257,7 +2249,6 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { SBlockData* pBlockData = &pReader->status.fileBlockData; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; - while (1) { // todo check the validate of row in file block bool hasBlockData = false; @@ -2299,12 +2290,12 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } } - _end: +_end: pResBlock->info.uid = pBlockScanInfo->uid; blockDataUpdateTsWindow(pResBlock, 0); setComposedBlockFlag(pReader, true); - double el = (taosGetTimestampUs() - st)/1000.0; + double el = (taosGetTimestampUs() - st) / 1000.0; pReader->cost.composedBlocks += 1; pReader->cost.buildComposedBlockTime += el; @@ -3366,7 +3357,8 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S tColDataGetValue(pData, rowIndex, &cv); doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo); j += 1; - } else if (pData->cid > pCol->info.colId) { // the specified column does not exist in file block, fill with null data + } else if (pData->cid > + pCol->info.colId) { // the specified column does not exist in file block, fill with null data colDataAppendNULL(pCol, outputRowIndex); } @@ -3492,13 +3484,14 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl if (pCond->suid != 0) { pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, /*pCond->endVersion*/ -1); if (pReader->pSchema == NULL) { - tsdbError("failed to get table schema, suid:%"PRIu64", ver:%"PRId64" , %s", pReader->suid, -1, pReader->idStr); + tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:%" PRId64 " , %s", pReader->suid, -1, + pReader->idStr); } } else if (taosArrayGetSize(pTableList) > 0) { STableKeyInfo* pKey = taosArrayGet(pTableList, 0); pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, /*pCond->endVersion*/ -1); if (pReader->pSchema == NULL) { - tsdbError("failed to get table schema, uid:%"PRIu64", ver:%"PRId64" , %s", pKey->uid, -1, pReader->idStr); + tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:%" PRId64 " , %s", pKey->uid, -1, pReader->idStr); } } @@ -3611,7 +3604,8 @@ void tsdbReaderClose(STsdbReader* pReader) { tsdbDebug( "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64 - " SMA-time:%.2f ms, fileBlocks:%" PRId64 ", fileBlocks-load-time:%.2f ms, " + " SMA-time:%.2f ms, fileBlocks:%" PRId64 + ", fileBlocks-load-time:%.2f ms, " "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64 ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb %s", pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks, From d2141ea5ca6d2757715c5b10c8ac23efc070628a Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 15 Sep 2022 16:44:33 +0800 Subject: [PATCH 03/15] more code --- source/dnode/vnode/src/inc/tsdb.h | 8 +++--- source/dnode/vnode/src/tsdb/tsdbCache.c | 4 +-- source/dnode/vnode/src/tsdb/tsdbCommit.c | 4 +-- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 20 +++++++++++-- source/dnode/vnode/src/tsdb/tsdbRead.c | 33 ++++++++++++---------- 5 files changed, 44 insertions(+), 25 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 48d02d275e..f802575c4a 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -206,8 +206,8 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); void tsdbMemTableDestroy(SMemTable *pMemTable); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); -void tsdbRefMemTable(SMemTable *pMemTable); -void tsdbUnrefMemTable(SMemTable *pMemTable); +int32_t tsdbRefMemTable(SMemTable *pMemTable, STsdbReader *pReader); +int32_t tsdbUnrefMemTable(SMemTable *pMemTable, STsdbReader *pReader); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); // STbDataIter int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter); @@ -285,8 +285,8 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader); int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData); int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); // tsdbRead.c ============================================================================================== -int32_t tsdbTakeReadSnap(STsdb *pTsdb, STsdbReadSnap **ppSnap); -void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap); +int32_t tsdbTakeReadSnap(STsdbReader *pReader, STsdbReadSnap **ppSnap); +void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap); // tsdbMerge.c ============================================================================================== int32_t tsdbMerge(STsdb *pTsdb); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 8da783a5bd..ab4d815689 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -847,7 +847,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs tb_uid_t suid = getTableSuidByUid(uid, pTsdb); - tsdbTakeReadSnap(pTsdb, &pIter->pReadSnap); + tsdbTakeReadSnap(NULL /*pTsdb (todo)*/, &pIter->pReadSnap); STbData *pMem = NULL; if (pIter->pReadSnap->pMem) { @@ -941,7 +941,7 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) { taosArrayDestroy(pIter->pSkyline); } - tsdbUntakeReadSnap(pIter->pTsdb, pIter->pReadSnap); + tsdbUntakeReadSnap(NULL /*pIter->pTsdb (todo)*/, pIter->pReadSnap); _err: return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index a619b9f2e4..05c103deec 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -158,7 +158,7 @@ int32_t tsdbCommit(STsdb *pTsdb) { pTsdb->mem = NULL; taosThreadRwlockUnlock(&pTsdb->rwLock); - tsdbUnrefMemTable(pMemTable); + tsdbUnrefMemTable(pMemTable, NULL); goto _exit; } @@ -983,7 +983,7 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { // unlock taosThreadRwlockUnlock(&pTsdb->rwLock); - tsdbUnrefMemTable(pMemTable); + tsdbUnrefMemTable(pMemTable, NULL); tsdbFSDestroy(&pCommitter->fs); taosArrayDestroy(pCommitter->aTbDataP); diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 0ed143aab9..7348b284ab 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -629,16 +629,32 @@ _err: int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; } -void tsdbRefMemTable(SMemTable *pMemTable) { +int32_t tsdbRefMemTable(SMemTable *pMemTable, STsdbReader *pReader) { + int32_t code = 0; + int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); ASSERT(nRef > 0); + + // register handle (todo) + if (pReader) { + } + + return code; } -void tsdbUnrefMemTable(SMemTable *pMemTable) { +int32_t tsdbUnrefMemTable(SMemTable *pMemTable, STsdbReader *pReader) { + int32_t code = 0; + + // unregister handle (todo) + if (pReader) { + } + int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1); if (nRef == 0) { tsdbMemTableDestroy(pMemTable); } + + return code; } static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 495d2b3070..bd8d665e94 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3354,7 +3354,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl goto _err; } - code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap); + code = tsdbTakeReadSnap(pReader, &pReader->pReadSnap); if (code != TSDB_CODE_SUCCESS) { goto _err; } @@ -3378,7 +3378,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl STsdbReader* pPrevReader = pReader->innerReader[0]; SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter; - code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap); + code = tsdbTakeReadSnap(pPrevReader, &pPrevReader->pReadSnap); if (code != TSDB_CODE_SUCCESS) { goto _err; } @@ -3435,7 +3435,7 @@ void tsdbReaderClose(STsdbReader* pReader) { tsdbDataFReaderClose(&pReader->pFileReader); } - tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap); + tsdbUntakeReadSnap(pReader, pReader->pReadSnap); taosMemoryFree(pReader->status.uidCheckInfo.tableUidList); SIOCostSummary* pCost = &pReader->cost; @@ -3858,8 +3858,10 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 return TSDB_CODE_SUCCESS; } -int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) { - int32_t code = 0; +int32_t tsdbTakeReadSnap(STsdbReader* pReader, STsdbReadSnap** ppSnap) { + int32_t code = 0; + STsdb* pTsdb = pReader->pTsdb; + SVersionRange* pRange = &pReader->verRange; // alloc *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap)); @@ -3876,15 +3878,14 @@ int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) { } // take snapshot - (*ppSnap)->pMem = pTsdb->mem; - (*ppSnap)->pIMem = pTsdb->imem; - - if ((*ppSnap)->pMem) { - tsdbRefMemTable((*ppSnap)->pMem); + if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) { + tsdbRefMemTable(pTsdb->mem, pReader); + (*ppSnap)->pMem = pTsdb->mem; } - if ((*ppSnap)->pIMem) { - tsdbRefMemTable((*ppSnap)->pIMem); + if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) { + tsdbRefMemTable(pTsdb->imem, pReader); + (*ppSnap)->pIMem = pTsdb->imem; } // fs @@ -3906,14 +3907,16 @@ _exit: return code; } -void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) { +void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) { + STsdb* pTsdb = pReader->pTsdb; + if (pSnap) { if (pSnap->pMem) { - tsdbUnrefMemTable(pSnap->pMem); + tsdbUnrefMemTable(pSnap->pMem, pReader); } if (pSnap->pIMem) { - tsdbUnrefMemTable(pSnap->pIMem); + tsdbUnrefMemTable(pSnap->pIMem, pReader); } tsdbFSUnref(pTsdb, &pSnap->fs); From 3a4273406e4876c5d32957a57099fff404c274ab Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 15 Sep 2022 17:32:39 +0800 Subject: [PATCH 04/15] more code --- source/dnode/vnode/src/inc/tsdb.h | 21 ++++++++++++++------ source/dnode/vnode/src/tsdb/tsdbMemTable.c | 23 ++++++++++++++++------ source/dnode/vnode/src/tsdb/tsdbRead.c | 8 ++++---- 3 files changed, 36 insertions(+), 16 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index f802575c4a..3d4cad4bd1 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -65,6 +65,7 @@ typedef struct SSmaInfo SSmaInfo; typedef struct SBlockCol SBlockCol; typedef struct SVersionRange SVersionRange; typedef struct SLDataIter SLDataIter; +typedef struct SQueryNode SQueryNode; #define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F) #define TSDB_MAX_SUBBLOCKS 8 @@ -206,8 +207,8 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); void tsdbMemTableDestroy(SMemTable *pMemTable); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); -int32_t tsdbRefMemTable(SMemTable *pMemTable, STsdbReader *pReader); -int32_t tsdbUnrefMemTable(SMemTable *pMemTable, STsdbReader *pReader); +int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQueryHandle, SQueryNode **ppNode); +int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); // STbDataIter int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter); @@ -365,6 +366,12 @@ struct STbData { STbData *next; }; +struct SQueryNode { + SQueryNode *pNext; + SQueryNode **ppNext; + void *pQueryHandle; +}; + struct SMemTable { SRWLatch latch; STsdb *pTsdb; @@ -381,7 +388,7 @@ struct SMemTable { int32_t nBucket; STbData **aBucket; }; - STsdbReader *pReaderList; + SQueryNode *qList; }; struct TSDBROW { @@ -592,9 +599,11 @@ struct SDelFWriter { }; struct STsdbReadSnap { - SMemTable *pMem; - SMemTable *pIMem; - STsdbFS fs; + SMemTable *pMem; + SQueryNode *pNode; + SMemTable *pIMem; + SQueryNode *pINode; + STsdbFS fs; }; struct SDataFWriter { diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 7348b284ab..a78021d697 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -629,24 +629,35 @@ _err: int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; } -int32_t tsdbRefMemTable(SMemTable *pMemTable, STsdbReader *pReader) { +int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQueryHandle, SQueryNode **ppNode) { int32_t code = 0; int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); ASSERT(nRef > 0); - // register handle (todo) - if (pReader) { + // register handle + *ppNode = taosMemoryMalloc(sizeof(SQueryNode)); + if (*ppNode == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; } + (*ppNode)->pQueryHandle = pQueryHandle; + (*ppNode)->pNext = pMemTable->qList; + (*ppNode)->ppNext = &pMemTable->qList; + pMemTable->qList = *ppNode; +_exit: return code; } -int32_t tsdbUnrefMemTable(SMemTable *pMemTable, STsdbReader *pReader) { +int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode) { int32_t code = 0; - // unregister handle (todo) - if (pReader) { + // unregister handle + if (pNode) { + pNode->pNext->ppNext = pNode->ppNext; + *pNode->ppNext = pNode->pNext; + taosMemoryFree(pNode); } int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index bd8d665e94..c0e4b4cd50 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3879,12 +3879,12 @@ int32_t tsdbTakeReadSnap(STsdbReader* pReader, STsdbReadSnap** ppSnap) { // take snapshot if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) { - tsdbRefMemTable(pTsdb->mem, pReader); + tsdbRefMemTable(pTsdb->mem, pReader, &(*ppSnap)->pNode); (*ppSnap)->pMem = pTsdb->mem; } if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) { - tsdbRefMemTable(pTsdb->imem, pReader); + tsdbRefMemTable(pTsdb->imem, pReader, &(*ppSnap)->pINode); (*ppSnap)->pIMem = pTsdb->imem; } @@ -3912,11 +3912,11 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) { if (pSnap) { if (pSnap->pMem) { - tsdbUnrefMemTable(pSnap->pMem, pReader); + tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode); } if (pSnap->pIMem) { - tsdbUnrefMemTable(pSnap->pIMem, pReader); + tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode); } tsdbFSUnref(pTsdb, &pSnap->fs); From 9b29931f7814539d6a5844d6dddfaa957fe29aef Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 15 Sep 2022 17:45:47 +0800 Subject: [PATCH 05/15] more code --- source/dnode/vnode/src/inc/tsdb.h | 2 +- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 3d4cad4bd1..c10301f0e9 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -388,7 +388,7 @@ struct SMemTable { int32_t nBucket; STbData **aBucket; }; - SQueryNode *qList; + SQueryNode qList; }; struct TSDBROW { diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index a78021d697..9f2c89c5e4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -58,6 +58,8 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { taosMemoryFree(pMemTable); goto _err; } + pMemTable->qList.pNext = &pMemTable->qList; + pMemTable->qList.ppNext = &pMemTable->qList.pNext; vnodeBufPoolRef(pMemTable->pPool); *ppMemTable = pMemTable; @@ -642,9 +644,10 @@ int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQueryHandle, SQueryNode **p goto _exit; } (*ppNode)->pQueryHandle = pQueryHandle; - (*ppNode)->pNext = pMemTable->qList; - (*ppNode)->ppNext = &pMemTable->qList; - pMemTable->qList = *ppNode; + (*ppNode)->pNext = pMemTable->qList.pNext; + (*ppNode)->ppNext = &pMemTable->qList.pNext; + pMemTable->qList.pNext->ppNext = &(*ppNode)->pNext; + pMemTable->qList.pNext = *ppNode; _exit: return code; From f845a01480ccb3b8fadfd09ae511ec7ed51e6251 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 15 Sep 2022 18:04:39 +0800 Subject: [PATCH 06/15] more code --- source/dnode/vnode/src/inc/tsdb.h | 4 +-- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 36 +++++++++++++++++++--- source/dnode/vnode/src/tsdb/tsdbRead.c | 17 ++++++++++ 3 files changed, 51 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index c10301f0e9..ac6d81707b 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -207,7 +207,7 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); void tsdbMemTableDestroy(SMemTable *pMemTable); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); -int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQueryHandle, SQueryNode **ppNode); +int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, SQueryNode **ppNode); int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); // STbDataIter @@ -369,7 +369,7 @@ struct STbData { struct SQueryNode { SQueryNode *pNext; SQueryNode **ppNext; - void *pQueryHandle; + void *pQHandle; }; struct SMemTable { diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 9f2c89c5e4..9aad06d0fb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -631,19 +631,19 @@ _err: int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; } -int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQueryHandle, SQueryNode **ppNode) { +int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, SQueryNode **ppNode) { int32_t code = 0; int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); ASSERT(nRef > 0); - // register handle + // register handle (todo: take concurrency in consideration) *ppNode = taosMemoryMalloc(sizeof(SQueryNode)); if (*ppNode == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - (*ppNode)->pQueryHandle = pQueryHandle; + (*ppNode)->pQHandle = pQHandle; (*ppNode)->pNext = pMemTable->qList.pNext; (*ppNode)->ppNext = &pMemTable->qList.pNext; pMemTable->qList.pNext->ppNext = &(*ppNode)->pNext; @@ -656,7 +656,7 @@ _exit: int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode) { int32_t code = 0; - // unregister handle + // unregister handle (todo: take concurrency in consideration) if (pNode) { pNode->pNext->ppNext = pNode->ppNext; *pNode->ppNext = pNode->pNext; @@ -708,3 +708,31 @@ SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable) { _exit: return aTbDataP; } + +extern int32_t tsdbSetQueryReseek(void *pQHandle); + +int32_t tsdbRecycleMemTable(SMemTable *pMemTable) { + int32_t code = 0; + + SQueryNode *pNode = pMemTable->qList.pNext; + while (1) { + ASSERT(pNode != &pMemTable->qList); + SQueryNode *pNextNode = pNode->pNext; + + if (pNextNode == &pMemTable->qList) { + code = tsdbSetQueryReseek(pNode->pQHandle); + if (code) goto _exit; + break; + } else { + code = tsdbSetQueryReseek(pNode->pQHandle); + if (code) goto _exit; + pNode = pMemTable->qList.pNext; + ASSERT(pNode == pNextNode); + } + } + + // NOTE: Take care here, pMemTable is destroyed + +_exit: + return code; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c0e4b4cd50..3864436390 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3925,3 +3925,20 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) { tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode)); } + +int32_t tsdbSetQueryReseek(void* pQHandle) { + int32_t code = 0; + + // lock handle + + // check state (is already in reseek state, skip below) + + // save all state for further restore + + // unref read snapshot + // tsdbUntakeReadSnap(); + + // unlock handle + + return code; +} \ No newline at end of file From 545607ccd2da629f8031de963352fed9db6bed26 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 27 Oct 2022 15:17:59 +0800 Subject: [PATCH 07/15] tsdb: resolve conflicts to make table creating & writing works --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 4 ++-- source/dnode/vnode/src/tsdb/tsdbCommit.c | 8 +------- source/dnode/vnode/src/tsdb/tsdbRead.c | 4 ++-- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index b8f49f38e4..41c03f1c0d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -237,7 +237,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 taosArrayPush(pLastCols, &p); } - tsdbTakeReadSnap(pr->pVnode->pTsdb, &pr->pReadSnap, "cache-l"); + tsdbTakeReadSnap(NULL, &pr->pReadSnap); pr->pDataFReader = NULL; pr->pDataFReaderLast = NULL; @@ -338,7 +338,7 @@ _end: tsdbDataFReaderClose(&pr->pDataFReaderLast); tsdbDataFReaderClose(&pr->pDataFReader); - tsdbUntakeReadSnap(pr->pVnode->pTsdb, pr->pReadSnap, "cache-l"); + tsdbUntakeReadSnap(NULL, pr->pReadSnap); for (int32_t j = 0; j < pr->numOfCols; ++j) { taosMemoryFree(pRes[j]); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index d3a0ed1259..a502b2b552 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -1054,12 +1054,6 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { TSDB_CHECK_CODE(code, lino, _exit); } - pTsdb->imem = NULL; - - // unlock - taosThreadRwlockUnlock(&pTsdb->rwLock); - - tsdbUnrefMemTable(pMemTable, NULL); _exit: tsdbFSDestroy(&pCommitter->fs); taosArrayDestroy(pCommitter->aTbDataP); @@ -1658,7 +1652,7 @@ int32_t tsdbFinishCommit(STsdb *pTsdb) { // unlock taosThreadRwlockUnlock(&pTsdb->rwLock); if (pMemTable) { - tsdbUnrefMemTable(pMemTable); + tsdbUnrefMemTable(pMemTable, NULL); } _exit: diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c7273ec2d3..3f276e5b33 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4150,7 +4150,7 @@ int32_t tsdbTakeReadSnap(STsdbReader* pReader, STsdbReadSnap** ppSnap) { goto _exit; } - tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr); + tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode)); _exit: return code; } @@ -4170,7 +4170,7 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) { tsdbFSUnref(pTsdb, &pSnap->fs); taosMemoryFree(pSnap); } - tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode), pReader->idStr); + tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode)); } int32_t tsdbSetQueryReseek(void* pQHandle) { From eac384753235f9400a27e73792f3ac2d9ce84a94 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 27 Oct 2022 16:48:33 +0800 Subject: [PATCH 08/15] fix: new reseek callback to separate tsdb & cache readers --- source/dnode/vnode/src/inc/tsdb.h | 13 ++++++++----- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 8 +++++++- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 9 ++++----- source/dnode/vnode/src/tsdb/tsdbRead.c | 12 +++++++----- 4 files changed, 26 insertions(+), 16 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index ce0d588df7..4c513371f5 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -207,10 +207,12 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in uint8_t **ppBuf); // tsdbMemTable ============================================================================================== // SMemTable +typedef int32_t (*_tsdb_reseek_func_t)(void *pQHandle); + int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); void tsdbMemTableDestroy(SMemTable *pMemTable); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); -int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, SQueryNode **ppNode); +int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_t reseek, SQueryNode **ppNode); int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); // STbDataIter @@ -290,7 +292,7 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader); int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData); int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); // tsdbRead.c ============================================================================================== -int32_t tsdbTakeReadSnap(STsdbReader *pReader, STsdbReadSnap **ppSnap); +int32_t tsdbTakeReadSnap(STsdbReader *pReader, _tsdb_reseek_func_t reseek, STsdbReadSnap **ppSnap); void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap); // tsdbMerge.c ============================================================================================== int32_t tsdbMerge(STsdb *pTsdb); @@ -362,9 +364,10 @@ struct STbData { }; struct SQueryNode { - SQueryNode *pNext; - SQueryNode **ppNext; - void *pQHandle; + SQueryNode *pNext; + SQueryNode **ppNext; + void *pQHandle; + _tsdb_reseek_func_t reseek; }; struct SMemTable { diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 41c03f1c0d..86cc00568e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -194,6 +194,12 @@ static void freeItem(void* pItem) { } } +static int32_t tsdbCacheQueryReseek(void* pQHandle) { + int32_t code = 0; + + return code; +} + int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) { if (pReader == NULL || pResBlock == NULL) { return TSDB_CODE_INVALID_PARA; @@ -237,7 +243,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 taosArrayPush(pLastCols, &p); } - tsdbTakeReadSnap(NULL, &pr->pReadSnap); + tsdbTakeReadSnap(NULL, tsdbCacheQueryReseek, &pr->pReadSnap); pr->pDataFReader = NULL; pr->pDataFReaderLast = NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index f1857b1047..04939959fa 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -636,7 +636,7 @@ _err: int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; } -int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, SQueryNode **ppNode) { +int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_t reseek, SQueryNode **ppNode) { int32_t code = 0; int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); @@ -649,6 +649,7 @@ int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, SQueryNode **ppNod goto _exit; } (*ppNode)->pQHandle = pQHandle; + (*ppNode)->reseek = reseek; (*ppNode)->pNext = pMemTable->qList.pNext; (*ppNode)->ppNext = &pMemTable->qList.pNext; pMemTable->qList.pNext->ppNext = &(*ppNode)->pNext; @@ -714,8 +715,6 @@ _exit: return aTbDataP; } -extern int32_t tsdbSetQueryReseek(void *pQHandle); - int32_t tsdbRecycleMemTable(SMemTable *pMemTable) { int32_t code = 0; @@ -725,11 +724,11 @@ int32_t tsdbRecycleMemTable(SMemTable *pMemTable) { SQueryNode *pNextNode = pNode->pNext; if (pNextNode == &pMemTable->qList) { - code = tsdbSetQueryReseek(pNode->pQHandle); + code = (*pNode->reseek)(pNode->pQHandle); if (code) goto _exit; break; } else { - code = tsdbSetQueryReseek(pNode->pQHandle); + code = (*pNode->reseek)(pNode->pQHandle); if (code) goto _exit; pNode = pMemTable->qList.pNext; ASSERT(pNode == pNextNode); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 3f276e5b33..4808956693 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3489,6 +3489,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { } // ====================================== EXPOSED APIs ====================================== +static int32_t tsdbSetQueryReseek(void* pQHandle); + int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader, const char* idstr) { STimeWindow window = pCond->twindows; @@ -3570,7 +3572,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl } if (numOfTables > 0) { - code = tsdbTakeReadSnap(pReader, &pReader->pReadSnap); + code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap); if (code != TSDB_CODE_SUCCESS) { goto _err; } @@ -4106,7 +4108,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 return TSDB_CODE_SUCCESS; } -int32_t tsdbTakeReadSnap(STsdbReader* pReader, STsdbReadSnap** ppSnap) { +int32_t tsdbTakeReadSnap(STsdbReader* pReader, _tsdb_reseek_func_t reseek, STsdbReadSnap** ppSnap) { int32_t code = 0; STsdb* pTsdb = pReader->pTsdb; SVersionRange* pRange = &pReader->verRange; @@ -4127,12 +4129,12 @@ int32_t tsdbTakeReadSnap(STsdbReader* pReader, STsdbReadSnap** ppSnap) { // take snapshot if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) { - tsdbRefMemTable(pTsdb->mem, pReader, &(*ppSnap)->pNode); + tsdbRefMemTable(pTsdb->mem, pReader, reseek, &(*ppSnap)->pNode); (*ppSnap)->pMem = pTsdb->mem; } if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) { - tsdbRefMemTable(pTsdb->imem, pReader, &(*ppSnap)->pINode); + tsdbRefMemTable(pTsdb->imem, pReader, reseek, &(*ppSnap)->pINode); (*ppSnap)->pIMem = pTsdb->imem; } @@ -4173,7 +4175,7 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) { tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode)); } -int32_t tsdbSetQueryReseek(void* pQHandle) { +static int32_t tsdbSetQueryReseek(void* pQHandle) { int32_t code = 0; // lock handle From ef606d3fd93318cdaad5eb995085fd8e5238715c Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 1 Nov 2022 16:38:35 +0800 Subject: [PATCH 09/15] tsdb/read: first round long query for non-cache reading --- source/dnode/vnode/src/inc/tsdb.h | 3 + source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 21 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 284 +++++++++++++++----- 3 files changed, 237 insertions(+), 71 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 4c513371f5..65293a220d 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -723,6 +723,9 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); // tsdbCache ============================================================================================== typedef struct SCacheRowsReader { + STsdb *pTsdb; + SVersionRange verRange; + TdThreadMutex readerMutex; SVnode *pVnode; STSchema *pSchema; uint64_t uid; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 86cc00568e..aef402acaf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -108,6 +108,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList p->type = type; p->pVnode = pVnode; + p->pTsdb = p->pVnode->pTsdb; + p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}; p->numOfCols = numOfCols; p->suid = suid; @@ -142,6 +144,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList return TSDB_CODE_OUT_OF_MEMORY; } + taosThreadMutexInit(&p->readerMutex, NULL); + *pReader = p; return TSDB_CODE_SUCCESS; } @@ -160,6 +164,8 @@ void* tsdbCacherowsReaderClose(void* pReader) { destroyLastBlockLoadInfo(p->pLoadInfo); + taosThreadMutexDestroy(&p->readerMutex); + taosMemoryFree(pReader); return NULL; } @@ -195,8 +201,15 @@ static void freeItem(void* pItem) { } static int32_t tsdbCacheQueryReseek(void* pQHandle) { - int32_t code = 0; + int32_t code = 0; + SCacheRowsReader* pReader = pQHandle; + taosThreadMutexLock(&pReader->readerMutex); + + // pause current reader's state if not paused, save ts & version for resuming + // just wait for the big all tables' snapshot untaking for now + + taosThreadMutexUnlock(&pReader->readerMutex); return code; } @@ -243,7 +256,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 taosArrayPush(pLastCols, &p); } - tsdbTakeReadSnap(NULL, tsdbCacheQueryReseek, &pr->pReadSnap); + taosThreadMutexLock(&pr->readerMutex); + tsdbTakeReadSnap((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap); pr->pDataFReader = NULL; pr->pDataFReaderLast = NULL; @@ -344,7 +358,8 @@ _end: tsdbDataFReaderClose(&pr->pDataFReaderLast); tsdbDataFReaderClose(&pr->pDataFReader); - tsdbUntakeReadSnap(NULL, pr->pReadSnap); + tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap); + taosThreadMutexUnlock(&pr->readerMutex); for (int32_t j = 0; j < pr->numOfCols; ++j) { taosMemoryFree(pRes[j]); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 4808956693..92d5515430 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -142,6 +142,9 @@ typedef struct SReaderStatus { struct STsdbReader { STsdb* pTsdb; + SVersionRange verRange; + TdThreadMutex readerMutex; + bool suspended; uint64_t suid; int16_t order; STimeWindow window; // the primary query time window that applies to all queries @@ -156,7 +159,6 @@ struct STsdbReader { STSchema* pSchema; // the newest version schema STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times SDataFReader* pFileReader; - SVersionRange verRange; int32_t step; STsdbReader* innerReader[2]; @@ -522,6 +524,8 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd setColumnIdSlotList(pReader, pReader->pResBlock); + taosThreadMutexInit(&pReader->readerMutex, NULL); + *ppReader = pReader; return code; @@ -613,12 +617,26 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); sizeInDisk += pScanInfo->mapData.nData; + + int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; + STimeWindow w = pReader->window; + if (ASCENDING_TRAVERSE(pReader->order)) { + w.skey = pScanInfo->lastKey + step; + } else { + w.ekey = pScanInfo->lastKey + step; + } + + if (isEmptyQueryTimeWindow(&w)) { + continue; + } + for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) { SDataBlk block = {0}; tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetDataBlk); // 1. time range check - if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) { + // if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) { + if (block.minKey.ts > w.ekey || block.maxKey.ts < w.skey) { continue; } @@ -2018,9 +2036,11 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea TSDBKEY startKey = {0}; if (ASCENDING_TRAVERSE(pReader->order)) { - startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer}; + // startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer}; + startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->verRange.minVer}; } else { - startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; + // startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; + startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->verRange.maxVer}; } int32_t backward = (!ASCENDING_TRAVERSE(pReader->order)); @@ -2669,7 +2689,13 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { // set the correct start position in case of the first/last file block, according to the query time window static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) { - SDataBlk* pBlock = getCurrentBlock(pBlockIter); + SDataBlk* pBlock = getCurrentBlock(pBlockIter); + SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); + STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + if (pScanInfo == NULL) { + tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list", pBlockInfo->uid); + return; + } SReaderStatus* pStatus = &pReader->status; @@ -2678,6 +2704,7 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) pDumpInfo->totalRows = pBlock->nRow; pDumpInfo->allDumped = false; pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1; + pDumpInfo->lastKey = pScanInfo->lastKey; } static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) { @@ -3489,8 +3516,6 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { } // ====================================== EXPOSED APIs ====================================== -static int32_t tsdbSetQueryReseek(void* pQHandle); - int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader, const char* idstr) { STimeWindow window = pCond->twindows; @@ -3571,50 +3596,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl goto _err; } - if (numOfTables > 0) { - code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } - - if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) { - code = doOpenReaderImpl(pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } else { - STsdbReader* pPrevReader = pReader->innerReader[0]; - STsdbReader* pNextReader = pReader->innerReader[1]; - - // we need only one row - pPrevReader->capacity = 1; - pPrevReader->status.pTableMap = pReader->status.pTableMap; - pPrevReader->pSchema = pReader->pSchema; - pPrevReader->pMemSchema = pReader->pMemSchema; - pPrevReader->pReadSnap = pReader->pReadSnap; - - pNextReader->capacity = 1; - pNextReader->status.pTableMap = pReader->status.pTableMap; - pNextReader->pSchema = pReader->pSchema; - pNextReader->pMemSchema = pReader->pMemSchema; - pNextReader->pReadSnap = pReader->pReadSnap; - - code = doOpenReaderImpl(pPrevReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - code = doOpenReaderImpl(pNextReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - code = doOpenReaderImpl(pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - } + pReader->suspended = true; tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); return code; @@ -3677,6 +3659,8 @@ void tsdbReaderClose(STsdbReader* pReader) { tsdbUntakeReadSnap(pReader, pReader->pReadSnap); + taosThreadMutexDestroy(&pReader->readerMutex); + taosMemoryFree(pReader->status.uidCheckInfo.tableUidList); SIOCostSummary* pCost = &pReader->cost; @@ -3706,9 +3690,165 @@ void tsdbReaderClose(STsdbReader* pReader) { if (pReader->pMemSchema != pReader->pSchema) { taosMemoryFree(pReader->pMemSchema); } + taosMemoryFreeClear(pReader); } +int32_t tsdbReaderSuspend(STsdbReader* pReader) { + int32_t code = 0; + + // save reader's base state & reset top state to be reconstructed from base state + SReaderStatus* pStatus = &pReader->status; + STableBlockScanInfo* pBlockScanInfo = NULL; + + if (pStatus->loadFromFile) { + SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); + if (pBlockInfo != NULL) { + pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + if (pBlockScanInfo == NULL) { + code = TSDB_CODE_INVALID_PARA; + tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid, + taosHashGetSize(pReader->status.pTableMap), pReader->idStr); + goto _err; + } + } else { + pBlockScanInfo = pStatus->pTableIter; + } + + tsdbDataFReaderClose(&pReader->pFileReader); + + // resetDataBlockScanInfo excluding lastKey + STableBlockScanInfo* p = NULL; + + while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) { + p->iterInit = false; + p->iiter.hasVal = false; + if (p->iter.iter != NULL) { + p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter); + } + + p->delSkyline = taosArrayDestroy(p->delSkyline); + // p->lastKey = ts; + } + } else { + pBlockScanInfo = pStatus->pTableIter; + if (pBlockScanInfo) { + // save lastKey to restore memory iterator + STimeWindow w = pReader->pResBlock->info.window; + pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? w.ekey : w.skey; + + // reset current current table's data block scan info, + pBlockScanInfo->iterInit = false; + // pBlockScanInfo->iiter.hasVal = false; + if (pBlockScanInfo->iter.iter != NULL) { + pBlockScanInfo->iter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iter.iter); + } + + if (pBlockScanInfo->iiter.iter != NULL) { + pBlockScanInfo->iiter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iiter.iter); + } + + pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList); + tMapDataClear(&pBlockScanInfo->mapData); + // TODO: keep skyline for reuse + pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline); + } + } + + tsdbUntakeReadSnap(pReader, pReader->pReadSnap); + + pReader->suspended = true; + + tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo->uid, pReader->idStr); + return code; + +_err: + tsdbError("failed to suspend data reader, code:%s %s", tstrerror(code), pReader->idStr); + return code; +} + +static int32_t tsdbSetQueryReseek(void* pQHandle) { + int32_t code = 0; + STsdbReader* pReader = pQHandle; + + taosThreadMutexLock(&pReader->readerMutex); + + if (pReader->suspended) { + taosThreadMutexUnlock(&pReader->readerMutex); + return code; + } + + tsdbReaderSuspend(pReader); + + taosThreadMutexUnlock(&pReader->readerMutex); + + return code; +} + +int32_t tsdbReaderResume(STsdbReader* pReader) { + int32_t code = 0; + + STableBlockScanInfo* pBlockScanInfo = pReader->status.pTableIter; + + // restore reader's state + // task snapshot + size_t numOfTables = taosHashGetSize(pReader->status.pTableMap); + if (numOfTables > 0) { + code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) { + code = doOpenReaderImpl(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } else { + STsdbReader* pPrevReader = pReader->innerReader[0]; + STsdbReader* pNextReader = pReader->innerReader[1]; + + // we need only one row + pPrevReader->capacity = 1; + pPrevReader->status.pTableMap = pReader->status.pTableMap; + pPrevReader->pSchema = pReader->pSchema; + pPrevReader->pMemSchema = pReader->pMemSchema; + pPrevReader->pReadSnap = pReader->pReadSnap; + + pNextReader->capacity = 1; + pNextReader->status.pTableMap = pReader->status.pTableMap; + pNextReader->pSchema = pReader->pSchema; + pNextReader->pMemSchema = pReader->pMemSchema; + pNextReader->pReadSnap = pReader->pReadSnap; + + code = doOpenReaderImpl(pPrevReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = doOpenReaderImpl(pNextReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = doOpenReaderImpl(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + } + + pReader->suspended = false; + + tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRIu64 ", in this query %s", pReader, + pBlockScanInfo ? pBlockScanInfo->uid : 0, numOfTables, pReader->idStr); + return code; + +_err: + tsdbError("failed to resume data reader, code:%s %s", tstrerror(code), pReader->idStr); + return code; +} + static bool doTsdbNextDataBlock(STsdbReader* pReader) { // cleanup the data that belongs to the previous data block SSDataBlock* pBlock = pReader->pResBlock; @@ -3741,12 +3881,23 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { return false; } + SReaderStatus* pStatus = &pReader->status; + + taosThreadMutexLock(&pReader->readerMutex); + if (pReader->suspended) { + tsdbReaderResume(pReader); + } + if (pReader->innerReader[0] != NULL && pReader->step == 0) { bool ret = doTsdbNextDataBlock(pReader->innerReader[0]); resetDataBlockScanInfo(pReader->innerReader[0]->status.pTableMap, pReader->innerReader[0]->window.ekey); pReader->step = EXTERNAL_ROWS_PREV; if (ret) { + if (pStatus->composedDataBlock) { + taosThreadMutexUnlock(&pReader->readerMutex); + } + return ret; } } @@ -3757,6 +3908,10 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { bool ret = doTsdbNextDataBlock(pReader); if (ret) { + if (pStatus->composedDataBlock) { + taosThreadMutexUnlock(&pReader->readerMutex); + } + return ret; } @@ -3765,10 +3920,18 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]); pReader->step = EXTERNAL_ROWS_NEXT; if (ret1) { + if (pStatus->composedDataBlock) { + taosThreadMutexUnlock(&pReader->readerMutex); + } + return ret1; } } + if (pStatus->composedDataBlock) { + taosThreadMutexUnlock(&pReader->readerMutex); + } + return false; } @@ -3903,6 +4066,8 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { return NULL; } + taosThreadMutexUnlock(&pReader->readerMutex); + copyBlockDataToSDataBlock(pReader, pBlockScanInfo); return pReader->pResBlock->pDataBlock; } @@ -4174,20 +4339,3 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) { } tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode)); } - -static int32_t tsdbSetQueryReseek(void* pQHandle) { - int32_t code = 0; - - // lock handle - - // check state (is already in reseek state, skip below) - - // save all state for further restore - - // unref read snapshot - // tsdbUntakeReadSnap(); - - // unlock handle - - return code; -} From d3c97947f3ca43e46bde3fe996558e49e7893e25 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 29 Dec 2022 18:21:38 +0800 Subject: [PATCH 10/15] fix(tsdb/read): use int32_t instead of size_t for taosHashGetSize --- source/dnode/vnode/src/tsdb/tsdbRead.c | 35 +++++++++++++++----------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ced01c76e8..17bb0c9db6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2902,14 +2902,15 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { // set the correct start position in case of the first/last file block, according to the query time window static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) { - SDataBlk* pBlock = getCurrentBlock(pBlockIter); - SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); - STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); - if (pScanInfo == NULL) { - tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list", pBlockInfo->uid); - return; + int64_t lastKey = ASCENDING_TRAVERSE(pReader->order) ? INT64_MIN : INT64_MAX; + SDataBlk* pBlock = getCurrentBlock(pBlockIter); + SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); + if (pBlockInfo) { + STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + if (pScanInfo) { + lastKey = pScanInfo->lastKey; + } } - SReaderStatus* pStatus = &pReader->status; SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo; @@ -2917,7 +2918,7 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) pDumpInfo->totalRows = pBlock->nRow; pDumpInfo->allDumped = false; pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1; - pDumpInfo->lastKey = pScanInfo->lastKey; + pDumpInfo->lastKey = lastKey; } static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) { @@ -4058,11 +4059,11 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) { int32_t tsdbReaderResume(STsdbReader* pReader) { int32_t code = 0; - STableBlockScanInfo* pBlockScanInfo = *pReader->status.pTableIter; + STableBlockScanInfo** pBlockScanInfo = pReader->status.pTableIter; // restore reader's state // task snapshot - size_t numOfTables = taosHashGetSize(pReader->status.pTableMap); + int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap); if (numOfTables > 0) { code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap); if (code != TSDB_CODE_SUCCESS) { @@ -4100,8 +4101,8 @@ int32_t tsdbReaderResume(STsdbReader* pReader) { pReader->suspended = false; - tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRIu64 ", in this query %s", pReader, - pBlockScanInfo ? pBlockScanInfo->uid : 0, numOfTables, pReader->idStr); + tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRId32 ", in this query %s", pReader, + pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr); return code; _err: @@ -4201,9 +4202,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { } } - if (pStatus->composedDataBlock) { - taosThreadMutexUnlock(&pReader->readerMutex); - } + taosThreadMutexUnlock(&pReader->readerMutex); return false; } @@ -4335,6 +4334,8 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, *pBlockSMA = pResBlock->pBlockAgg; pReader->cost.smaDataLoad += 1; + taosThreadMutexUnlock(&pReader->readerMutex); + tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr); return code; } @@ -4353,6 +4354,7 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { terrno = TSDB_CODE_INVALID_PARA; tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid, taosHashGetSize(pReader->status.pTableMap), pReader->idStr); + taosThreadMutexUnlock(&pReader->readerMutex); return NULL; } @@ -4360,6 +4362,7 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { if (code != TSDB_CODE_SUCCESS) { tBlockDataDestroy(&pStatus->fileBlockData); terrno = code; + taosThreadMutexUnlock(&pReader->readerMutex); return NULL; } @@ -4369,6 +4372,8 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { return pReader->pResBlock; } +void tsdbReleaseDataBlock(STsdbReader* pReader) { taosThreadMutexUnlock(&pReader->readerMutex); } + SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { if (pReader->step == EXTERNAL_ROWS_PREV) { From e4e72dd9e9d65b0f98026ebfea38d579c8ab50f2 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 30 Dec 2022 16:18:32 +0800 Subject: [PATCH 11/15] fix(tsdb/read): new tsdbReleaseDataBlock api from release reader's lock --- source/dnode/vnode/inc/vnode.h | 7 ++++--- source/dnode/vnode/src/tsdb/tsdbRead.c | 11 ++++++++--- source/libs/executor/src/scanoperator.c | 4 +++- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index a7564e352c..d7851d6fe7 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -54,7 +54,7 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodePreClose(SVnode *pVnode); -void vnodeSyncCheckTimeout(SVnode* pVnode); +void vnodeSyncCheckTimeout(SVnode *pVnode); void vnodeClose(SVnode *pVnode); int32_t vnodeStart(SVnode *pVnode); @@ -175,7 +175,8 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL void tsdbReaderClose(STsdbReader *pReader); bool tsdbNextDataBlock(STsdbReader *pReader); void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow); -int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock* pDataBlock, bool *allHave); +int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave); +void tsdbReleaseDataBlock(STsdbReader *pReader); SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond); int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); @@ -185,7 +186,7 @@ void *tsdbGetIvtIdx(SMeta *pMeta); uint64_t getReaderMaxVersion(STsdbReader *pReader); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, - uint64_t suid, void **pReader, const char* idstr); + uint64_t suid, void **pReader, const char *idstr); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); void *tsdbCacherowsReaderClose(void *pReader); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 17bb0c9db6..eb3fef1c54 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4514,13 +4514,18 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { int64_t rows = 0; SReaderStatus* pStatus = &pReader->status; + taosThreadMutexLock(&pReader->readerMutex); + if (pReader->suspended) { + tsdbReaderResume(pReader); + } + pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL); while (pStatus->pTableIter != NULL) { STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; STbData* d = NULL; - if (pReader->pTsdb->mem != NULL) { + if (pReader->pReadSnap->pMem != NULL) { d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid); if (d != NULL) { rows += tsdbGetNRowsInTbData(d); @@ -4528,7 +4533,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { } STbData* di = NULL; - if (pReader->pTsdb->imem != NULL) { + if (pReader->pReadSnap->pIMem != NULL) { di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid); if (di != NULL) { rows += tsdbGetNRowsInTbData(di); @@ -4538,7 +4543,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { // current table is exhausted, let's try the next table pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); } - + tsdbReleaseDataBlock(pReader); return rows; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b2aa2269a2..f775b98d03 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -302,12 +302,14 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pCost->filterOutBlocks += 1; pCost->totalRows += pBlock->info.rows; + tsdbReleaseDataBlock(pTableScanInfo->dataReader); return TSDB_CODE_SUCCESS; } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1); pCost->skipBlocks += 1; + tsdbReleaseDataBlock(pTableScanInfo->dataReader); return TSDB_CODE_SUCCESS; } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) { pCost->loadBlockStatis += 1; @@ -352,7 +354,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pCost->skipBlocks += 1; - + tsdbReleaseDataBlock(pTableScanInfo->dataReader); *status = FUNC_DATA_REQUIRED_FILTEROUT; return TSDB_CODE_SUCCESS; } From b020383046a8785663f981e14aea9fb0c1ac3d8f Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 3 Jan 2023 18:30:50 +0800 Subject: [PATCH 12/15] fix(tsdb): comment qlist out to please CI --- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 8 ++++---- source/dnode/vnode/src/tsdb/tsdbRead.c | 1 + source/libs/executor/src/scanoperator.c | 2 ++ 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 26fb39f523..8d75661857 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -750,7 +750,7 @@ int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_ int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); ASSERT(nRef > 0); - + /* // register handle (todo: take concurrency in consideration) *ppNode = taosMemoryMalloc(sizeof(SQueryNode)); if (*ppNode == NULL) { @@ -763,21 +763,21 @@ int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_ (*ppNode)->ppNext = &pMemTable->qList.pNext; pMemTable->qList.pNext->ppNext = &(*ppNode)->pNext; pMemTable->qList.pNext = *ppNode; - + */ _exit: return code; } int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode) { int32_t code = 0; - + /* // unregister handle (todo: take concurrency in consideration) if (pNode) { pNode->pNext->ppNext = pNode->ppNext; *pNode->ppNext = pNode->pNext; taosMemoryFree(pNode); } - + */ int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1); if (nRef == 0) { tsdbMemTableDestroy(pMemTable); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 841396c515..ce7f1b12be 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4392,6 +4392,7 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { SSDataBlock* ret = doRetrieveDataBlock(pTReader); + qTrace("tsdb/read-retrieve: %p, unlock read mutex", pReader); taosThreadMutexUnlock(&pReader->readerMutex); return ret; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 923b535e63..6dd5341b2e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2554,6 +2554,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { } STsdbReader* reader = pInfo->base.dataReader; + qTrace("tsdb/read-table-data: %p, enter next reader", reader); while (tsdbNextDataBlock(reader)) { if (isTaskKilled(pTaskInfo)) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); @@ -2588,6 +2589,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; + qTrace("tsdb/read-table-data: %p, close reader", reader); tsdbReaderClose(pInfo->base.dataReader); pInfo->base.dataReader = NULL; return pBlock; From e30c00875c4887c11936fb0320da5d5cc8cbc9f0 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 4 Jan 2023 15:14:00 +0800 Subject: [PATCH 13/15] enh: close tsdb reader --- include/libs/executor/executor.h | 1 + source/dnode/vnode/src/tq/tq.c | 8 +++++ source/libs/executor/src/executor.c | 1 + source/libs/executor/src/executorimpl.c | 41 +++++++++++++++++-------- 4 files changed, 39 insertions(+), 12 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 871dd60741..3489b359cb 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -216,6 +216,7 @@ int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qStreamRestoreParam(qTaskInfo_t tinfo); bool qStreamRecoverScanFinished(qTaskInfo_t tinfo); +void qStreamCloseTsdbReader(void* task); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index d0017e4ba2..d0c3233488 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -725,6 +725,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; + tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey); + taosWLockLatch(&pTq->pushLock); int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); if (code != 0) { @@ -791,6 +793,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL SMqRebVgReq req = {0}; tDecodeSMqRebVgReq(msg, &req); // todo lock + + tqDebug("vgId:%d, tq process sub req %s", pTq->pVnode->config.vgId, req.subKey); + STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { if (req.oldConsumerId != -1) { @@ -881,6 +886,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_add_fetch_32(&pHandle->epoch, 1); taosMemoryFree(req.qmsg); + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + qStreamCloseTsdbReader(pHandle->execHandle.task); + } if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { } // close handle diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 0a2802ba37..a40e29ba09 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1207,3 +1207,4 @@ void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { rpcFreeCont(pMsg->pCont); destroySendMsgInfo(pSendInfo); } + diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 149efef884..680d08fa19 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -604,9 +604,7 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB } } -bool isTaskKilled(SExecTaskInfo* pTaskInfo) { - return (0 != pTaskInfo->code) ? true : false; -} +bool isTaskKilled(SExecTaskInfo* pTaskInfo) { return (0 != pTaskInfo->code) ? true : false; } void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; } @@ -1349,7 +1347,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan } } -static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock **ppBlock) { +static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) { if (!tsCountAlwaysReturnValue) { return TSDB_CODE_SUCCESS; } @@ -1357,12 +1355,12 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION || (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN && - ((STableScanInfo *)downstream->info)->hasGroupByTag == true)) { + ((STableScanInfo*)downstream->info)->hasGroupByTag == true)) { return TSDB_CODE_SUCCESS; } SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; - bool hasCountFunc = false; + bool hasCountFunc = false; for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { const char* pName = pCtx[i].pExpr->pExpr->_function.functionName; @@ -1411,7 +1409,7 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc return TSDB_CODE_SUCCESS; } -static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock **ppBlock) { +static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) { if (!blockAllocated) { return; } @@ -1437,8 +1435,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { int32_t order = TSDB_ORDER_ASC; int32_t scanFlag = MAIN_SCAN; - bool hasValidBlock = false; - bool blockAllocated = false; + bool hasValidBlock = false; + bool blockAllocated = false; while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -1481,7 +1479,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } destroyDataBlockForEmptyInput(blockAllocated, &pBlock); - } // the downstream operator may return with error code, so let's check the code before generating results. @@ -1636,7 +1633,7 @@ void cleanupAggSup(SAggSupporter* pAggSup) { } int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, - const char* pkey) { + const char* pkey) { int32_t code = initExprSupp(pSup, pExprInfo, numOfCols); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1759,7 +1756,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, optrDefaultBufFn, NULL); + pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, + optrDefaultBufFn, NULL); if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = downstream->info; @@ -2608,3 +2606,22 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta blockDataUpdateTsWindow(pBlock, 0); return TSDB_CODE_SUCCESS; } + +void qStreamCloseTsdbReader(void* task) { + if (task == NULL) return; + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task; + SOperatorInfo* pOp = pTaskInfo->pRoot; + pTaskInfo->streamInfo.lastStatus = (STqOffsetVal){0}; + while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) { + SOperatorInfo* pDownstreamOp = pOp->pDownstream[0]; + if (pDownstreamOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + SStreamScanInfo* pInfo = pDownstreamOp->info; + if (pInfo->pTableScanOp) { + STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; + tsdbReaderClose(pTSInfo->base.dataReader); + pTSInfo->base.dataReader = NULL; + return; + } + } + } +} From 5ee7e7aed9d5989657c2aafde71e9c628b992d41 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 4 Jan 2023 17:55:00 +0800 Subject: [PATCH 14/15] fix: init tsdb read snap --- source/dnode/vnode/src/tsdb/tsdbRead.c | 12 ++++++++++++ source/libs/executor/src/executorimpl.c | 2 ++ source/libs/executor/src/scanoperator.c | 5 +++-- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index e04633fd01..363960a085 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4399,7 +4399,19 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { } int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { + SReaderStatus* pStatus = &pReader->status; + + qTrace("tsdb/read: %p, take read mutex", pReader); + taosThreadMutexLock(&pReader->readerMutex); + + if (pReader->suspended) { + tsdbReaderResume(pReader); + } + + taosThreadMutexUnlock(&pReader->readerMutex); + if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) { + tsdbDebug("tsdb reader reset return %p", pReader->pReadSnap); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e8a6d2cd63..1d1298bf5f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2659,6 +2659,8 @@ void qStreamCloseTsdbReader(void* task) { if (task == NULL) return; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task; SOperatorInfo* pOp = pTaskInfo->pRoot; + qDebug("stream close tsdb reader, reset status uid %" PRId64 " ts %" PRId64, pTaskInfo->streamInfo.lastStatus.uid, + pTaskInfo->streamInfo.lastStatus.ts); pTaskInfo->streamInfo.lastStatus = (STqOffsetVal){0}; while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) { SOperatorInfo* pDownstreamOp = pOp->pDownstream[0]; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d07a333070..6d93989582 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -173,7 +173,7 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro if (NULL == *pPage) { return NULL; } - + return (SResultRow*)((char*)(*pPage) + p1->offset); } @@ -1588,7 +1588,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) { - qDebug("queue scan tsdb return %d rows", pResult->info.rows); + qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64, pResult->info.rows, + pResult->info.window.skey, pResult->info.window.ekey); pTaskInfo->streamInfo.returned = 1; return pResult; } else { From 6591b982450eaf34a4ee767bf693e4fcde470da0 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 3 Jan 2023 18:30:50 +0800 Subject: [PATCH 15/15] fix(tsdb): comment qlist out to please CI --- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 8 ++++---- source/dnode/vnode/src/tsdb/tsdbRead.c | 3 +++ source/libs/executor/src/scanoperator.c | 2 ++ 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 26fb39f523..8d75661857 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -750,7 +750,7 @@ int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_ int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); ASSERT(nRef > 0); - + /* // register handle (todo: take concurrency in consideration) *ppNode = taosMemoryMalloc(sizeof(SQueryNode)); if (*ppNode == NULL) { @@ -763,21 +763,21 @@ int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_ (*ppNode)->ppNext = &pMemTable->qList.pNext; pMemTable->qList.pNext->ppNext = &(*ppNode)->pNext; pMemTable->qList.pNext = *ppNode; - + */ _exit: return code; } int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode) { int32_t code = 0; - + /* // unregister handle (todo: take concurrency in consideration) if (pNode) { pNode->pNext->ppNext = pNode->ppNext; *pNode->ppNext = pNode->pNext; taosMemoryFree(pNode); } - + */ int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1); if (nRef == 0) { tsdbMemTableDestroy(pMemTable); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 841396c515..d8ecaf9832 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3927,6 +3927,7 @@ void tsdbReaderClose(STsdbReader* pReader) { pReader->pDelIdx = NULL; } + qTrace("tsdb/reader: %p, untake snapshot", pReader); tsdbUntakeReadSnap(pReader, pReader->pReadSnap); taosThreadMutexDestroy(&pReader->readerMutex); @@ -4065,6 +4066,7 @@ int32_t tsdbReaderResume(STsdbReader* pReader) { // task snapshot int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap); if (numOfTables > 0) { + qTrace("tsdb/reader: %p, take snapshot", pReader); code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap); if (code != TSDB_CODE_SUCCESS) { goto _err; @@ -4392,6 +4394,7 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { SSDataBlock* ret = doRetrieveDataBlock(pTReader); + qTrace("tsdb/read-retrieve: %p, unlock read mutex", pReader); taosThreadMutexUnlock(&pReader->readerMutex); return ret; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 923b535e63..6dd5341b2e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2554,6 +2554,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { } STsdbReader* reader = pInfo->base.dataReader; + qTrace("tsdb/read-table-data: %p, enter next reader", reader); while (tsdbNextDataBlock(reader)) { if (isTaskKilled(pTaskInfo)) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); @@ -2588,6 +2589,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; + qTrace("tsdb/read-table-data: %p, close reader", reader); tsdbReaderClose(pInfo->base.dataReader); pInfo->base.dataReader = NULL; return pBlock;