diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 4c513371f5..65293a220d 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -723,6 +723,9 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); // tsdbCache ============================================================================================== typedef struct SCacheRowsReader { + STsdb *pTsdb; + SVersionRange verRange; + TdThreadMutex readerMutex; SVnode *pVnode; STSchema *pSchema; uint64_t uid; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 86cc00568e..aef402acaf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -108,6 +108,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList p->type = type; p->pVnode = pVnode; + p->pTsdb = p->pVnode->pTsdb; + p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}; p->numOfCols = numOfCols; p->suid = suid; @@ -142,6 +144,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList return TSDB_CODE_OUT_OF_MEMORY; } + taosThreadMutexInit(&p->readerMutex, NULL); + *pReader = p; return TSDB_CODE_SUCCESS; } @@ -160,6 +164,8 @@ void* tsdbCacherowsReaderClose(void* pReader) { destroyLastBlockLoadInfo(p->pLoadInfo); + taosThreadMutexDestroy(&p->readerMutex); + taosMemoryFree(pReader); return NULL; } @@ -195,8 +201,15 @@ static void freeItem(void* pItem) { } static int32_t tsdbCacheQueryReseek(void* pQHandle) { - int32_t code = 0; + int32_t code = 0; + SCacheRowsReader* pReader = pQHandle; + taosThreadMutexLock(&pReader->readerMutex); + + // pause current reader's state if not paused, save ts & version for resuming + // just wait for the big all tables' snapshot untaking for now + + taosThreadMutexUnlock(&pReader->readerMutex); return code; } @@ -243,7 +256,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 taosArrayPush(pLastCols, &p); } - tsdbTakeReadSnap(NULL, tsdbCacheQueryReseek, &pr->pReadSnap); + taosThreadMutexLock(&pr->readerMutex); + tsdbTakeReadSnap((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap); pr->pDataFReader = NULL; pr->pDataFReaderLast = NULL; @@ -344,7 +358,8 @@ _end: tsdbDataFReaderClose(&pr->pDataFReaderLast); tsdbDataFReaderClose(&pr->pDataFReader); - tsdbUntakeReadSnap(NULL, pr->pReadSnap); + tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap); + taosThreadMutexUnlock(&pr->readerMutex); for (int32_t j = 0; j < pr->numOfCols; ++j) { taosMemoryFree(pRes[j]); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 4808956693..92d5515430 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -142,6 +142,9 @@ typedef struct SReaderStatus { struct STsdbReader { STsdb* pTsdb; + SVersionRange verRange; + TdThreadMutex readerMutex; + bool suspended; uint64_t suid; int16_t order; STimeWindow window; // the primary query time window that applies to all queries @@ -156,7 +159,6 @@ struct STsdbReader { STSchema* pSchema; // the newest version schema STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times SDataFReader* pFileReader; - SVersionRange verRange; int32_t step; STsdbReader* innerReader[2]; @@ -522,6 +524,8 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd setColumnIdSlotList(pReader, pReader->pResBlock); + taosThreadMutexInit(&pReader->readerMutex, NULL); + *ppReader = pReader; return code; @@ -613,12 +617,26 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); sizeInDisk += pScanInfo->mapData.nData; + + int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; + STimeWindow w = pReader->window; + if (ASCENDING_TRAVERSE(pReader->order)) { + w.skey = pScanInfo->lastKey + step; + } else { + w.ekey = pScanInfo->lastKey + step; + } + + if (isEmptyQueryTimeWindow(&w)) { + continue; + } + for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) { SDataBlk block = {0}; tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetDataBlk); // 1. time range check - if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) { + // if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) { + if (block.minKey.ts > w.ekey || block.maxKey.ts < w.skey) { continue; } @@ -2018,9 +2036,11 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea TSDBKEY startKey = {0}; if (ASCENDING_TRAVERSE(pReader->order)) { - startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer}; + // startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer}; + startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->verRange.minVer}; } else { - startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; + // startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; + startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->verRange.maxVer}; } int32_t backward = (!ASCENDING_TRAVERSE(pReader->order)); @@ -2669,7 +2689,13 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { // set the correct start position in case of the first/last file block, according to the query time window static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) { - SDataBlk* pBlock = getCurrentBlock(pBlockIter); + SDataBlk* pBlock = getCurrentBlock(pBlockIter); + SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); + STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + if (pScanInfo == NULL) { + tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list", pBlockInfo->uid); + return; + } SReaderStatus* pStatus = &pReader->status; @@ -2678,6 +2704,7 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) pDumpInfo->totalRows = pBlock->nRow; pDumpInfo->allDumped = false; pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1; + pDumpInfo->lastKey = pScanInfo->lastKey; } static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) { @@ -3489,8 +3516,6 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { } // ====================================== EXPOSED APIs ====================================== -static int32_t tsdbSetQueryReseek(void* pQHandle); - int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader, const char* idstr) { STimeWindow window = pCond->twindows; @@ -3571,50 +3596,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl goto _err; } - if (numOfTables > 0) { - code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } - - if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) { - code = doOpenReaderImpl(pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } else { - STsdbReader* pPrevReader = pReader->innerReader[0]; - STsdbReader* pNextReader = pReader->innerReader[1]; - - // we need only one row - pPrevReader->capacity = 1; - pPrevReader->status.pTableMap = pReader->status.pTableMap; - pPrevReader->pSchema = pReader->pSchema; - pPrevReader->pMemSchema = pReader->pMemSchema; - pPrevReader->pReadSnap = pReader->pReadSnap; - - pNextReader->capacity = 1; - pNextReader->status.pTableMap = pReader->status.pTableMap; - pNextReader->pSchema = pReader->pSchema; - pNextReader->pMemSchema = pReader->pMemSchema; - pNextReader->pReadSnap = pReader->pReadSnap; - - code = doOpenReaderImpl(pPrevReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - code = doOpenReaderImpl(pNextReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - code = doOpenReaderImpl(pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - } + pReader->suspended = true; tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); return code; @@ -3677,6 +3659,8 @@ void tsdbReaderClose(STsdbReader* pReader) { tsdbUntakeReadSnap(pReader, pReader->pReadSnap); + taosThreadMutexDestroy(&pReader->readerMutex); + taosMemoryFree(pReader->status.uidCheckInfo.tableUidList); SIOCostSummary* pCost = &pReader->cost; @@ -3706,9 +3690,165 @@ void tsdbReaderClose(STsdbReader* pReader) { if (pReader->pMemSchema != pReader->pSchema) { taosMemoryFree(pReader->pMemSchema); } + taosMemoryFreeClear(pReader); } +int32_t tsdbReaderSuspend(STsdbReader* pReader) { + int32_t code = 0; + + // save reader's base state & reset top state to be reconstructed from base state + SReaderStatus* pStatus = &pReader->status; + STableBlockScanInfo* pBlockScanInfo = NULL; + + if (pStatus->loadFromFile) { + SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); + if (pBlockInfo != NULL) { + pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + if (pBlockScanInfo == NULL) { + code = TSDB_CODE_INVALID_PARA; + tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid, + taosHashGetSize(pReader->status.pTableMap), pReader->idStr); + goto _err; + } + } else { + pBlockScanInfo = pStatus->pTableIter; + } + + tsdbDataFReaderClose(&pReader->pFileReader); + + // resetDataBlockScanInfo excluding lastKey + STableBlockScanInfo* p = NULL; + + while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) { + p->iterInit = false; + p->iiter.hasVal = false; + if (p->iter.iter != NULL) { + p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter); + } + + p->delSkyline = taosArrayDestroy(p->delSkyline); + // p->lastKey = ts; + } + } else { + pBlockScanInfo = pStatus->pTableIter; + if (pBlockScanInfo) { + // save lastKey to restore memory iterator + STimeWindow w = pReader->pResBlock->info.window; + pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? w.ekey : w.skey; + + // reset current current table's data block scan info, + pBlockScanInfo->iterInit = false; + // pBlockScanInfo->iiter.hasVal = false; + if (pBlockScanInfo->iter.iter != NULL) { + pBlockScanInfo->iter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iter.iter); + } + + if (pBlockScanInfo->iiter.iter != NULL) { + pBlockScanInfo->iiter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iiter.iter); + } + + pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList); + tMapDataClear(&pBlockScanInfo->mapData); + // TODO: keep skyline for reuse + pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline); + } + } + + tsdbUntakeReadSnap(pReader, pReader->pReadSnap); + + pReader->suspended = true; + + tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo->uid, pReader->idStr); + return code; + +_err: + tsdbError("failed to suspend data reader, code:%s %s", tstrerror(code), pReader->idStr); + return code; +} + +static int32_t tsdbSetQueryReseek(void* pQHandle) { + int32_t code = 0; + STsdbReader* pReader = pQHandle; + + taosThreadMutexLock(&pReader->readerMutex); + + if (pReader->suspended) { + taosThreadMutexUnlock(&pReader->readerMutex); + return code; + } + + tsdbReaderSuspend(pReader); + + taosThreadMutexUnlock(&pReader->readerMutex); + + return code; +} + +int32_t tsdbReaderResume(STsdbReader* pReader) { + int32_t code = 0; + + STableBlockScanInfo* pBlockScanInfo = pReader->status.pTableIter; + + // restore reader's state + // task snapshot + size_t numOfTables = taosHashGetSize(pReader->status.pTableMap); + if (numOfTables > 0) { + code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) { + code = doOpenReaderImpl(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } else { + STsdbReader* pPrevReader = pReader->innerReader[0]; + STsdbReader* pNextReader = pReader->innerReader[1]; + + // we need only one row + pPrevReader->capacity = 1; + pPrevReader->status.pTableMap = pReader->status.pTableMap; + pPrevReader->pSchema = pReader->pSchema; + pPrevReader->pMemSchema = pReader->pMemSchema; + pPrevReader->pReadSnap = pReader->pReadSnap; + + pNextReader->capacity = 1; + pNextReader->status.pTableMap = pReader->status.pTableMap; + pNextReader->pSchema = pReader->pSchema; + pNextReader->pMemSchema = pReader->pMemSchema; + pNextReader->pReadSnap = pReader->pReadSnap; + + code = doOpenReaderImpl(pPrevReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = doOpenReaderImpl(pNextReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = doOpenReaderImpl(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + } + + pReader->suspended = false; + + tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRIu64 ", in this query %s", pReader, + pBlockScanInfo ? pBlockScanInfo->uid : 0, numOfTables, pReader->idStr); + return code; + +_err: + tsdbError("failed to resume data reader, code:%s %s", tstrerror(code), pReader->idStr); + return code; +} + static bool doTsdbNextDataBlock(STsdbReader* pReader) { // cleanup the data that belongs to the previous data block SSDataBlock* pBlock = pReader->pResBlock; @@ -3741,12 +3881,23 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { return false; } + SReaderStatus* pStatus = &pReader->status; + + taosThreadMutexLock(&pReader->readerMutex); + if (pReader->suspended) { + tsdbReaderResume(pReader); + } + if (pReader->innerReader[0] != NULL && pReader->step == 0) { bool ret = doTsdbNextDataBlock(pReader->innerReader[0]); resetDataBlockScanInfo(pReader->innerReader[0]->status.pTableMap, pReader->innerReader[0]->window.ekey); pReader->step = EXTERNAL_ROWS_PREV; if (ret) { + if (pStatus->composedDataBlock) { + taosThreadMutexUnlock(&pReader->readerMutex); + } + return ret; } } @@ -3757,6 +3908,10 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { bool ret = doTsdbNextDataBlock(pReader); if (ret) { + if (pStatus->composedDataBlock) { + taosThreadMutexUnlock(&pReader->readerMutex); + } + return ret; } @@ -3765,10 +3920,18 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]); pReader->step = EXTERNAL_ROWS_NEXT; if (ret1) { + if (pStatus->composedDataBlock) { + taosThreadMutexUnlock(&pReader->readerMutex); + } + return ret1; } } + if (pStatus->composedDataBlock) { + taosThreadMutexUnlock(&pReader->readerMutex); + } + return false; } @@ -3903,6 +4066,8 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { return NULL; } + taosThreadMutexUnlock(&pReader->readerMutex); + copyBlockDataToSDataBlock(pReader, pBlockScanInfo); return pReader->pResBlock->pDataBlock; } @@ -4174,20 +4339,3 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) { } tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode)); } - -static int32_t tsdbSetQueryReseek(void* pQHandle) { - int32_t code = 0; - - // lock handle - - // check state (is already in reseek state, skip below) - - // save all state for further restore - - // unref read snapshot - // tsdbUntakeReadSnap(); - - // unlock handle - - return code; -}