tsdb/read: first round long query for non-cache reading
This commit is contained in:
parent
eac3847532
commit
ef606d3fd9
|
@ -723,6 +723,9 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
|
||||||
|
|
||||||
// tsdbCache ==============================================================================================
|
// tsdbCache ==============================================================================================
|
||||||
typedef struct SCacheRowsReader {
|
typedef struct SCacheRowsReader {
|
||||||
|
STsdb *pTsdb;
|
||||||
|
SVersionRange verRange;
|
||||||
|
TdThreadMutex readerMutex;
|
||||||
SVnode *pVnode;
|
SVnode *pVnode;
|
||||||
STSchema *pSchema;
|
STSchema *pSchema;
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
|
|
|
@ -108,6 +108,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList
|
||||||
|
|
||||||
p->type = type;
|
p->type = type;
|
||||||
p->pVnode = pVnode;
|
p->pVnode = pVnode;
|
||||||
|
p->pTsdb = p->pVnode->pTsdb;
|
||||||
|
p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX};
|
||||||
p->numOfCols = numOfCols;
|
p->numOfCols = numOfCols;
|
||||||
p->suid = suid;
|
p->suid = suid;
|
||||||
|
|
||||||
|
@ -142,6 +144,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosThreadMutexInit(&p->readerMutex, NULL);
|
||||||
|
|
||||||
*pReader = p;
|
*pReader = p;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -160,6 +164,8 @@ void* tsdbCacherowsReaderClose(void* pReader) {
|
||||||
|
|
||||||
destroyLastBlockLoadInfo(p->pLoadInfo);
|
destroyLastBlockLoadInfo(p->pLoadInfo);
|
||||||
|
|
||||||
|
taosThreadMutexDestroy(&p->readerMutex);
|
||||||
|
|
||||||
taosMemoryFree(pReader);
|
taosMemoryFree(pReader);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -195,8 +201,15 @@ static void freeItem(void* pItem) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbCacheQueryReseek(void* pQHandle) {
|
static int32_t tsdbCacheQueryReseek(void* pQHandle) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
SCacheRowsReader* pReader = pQHandle;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pReader->readerMutex);
|
||||||
|
|
||||||
|
// pause current reader's state if not paused, save ts & version for resuming
|
||||||
|
// just wait for the big all tables' snapshot untaking for now
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -243,7 +256,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
taosArrayPush(pLastCols, &p);
|
taosArrayPush(pLastCols, &p);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbTakeReadSnap(NULL, tsdbCacheQueryReseek, &pr->pReadSnap);
|
taosThreadMutexLock(&pr->readerMutex);
|
||||||
|
tsdbTakeReadSnap((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap);
|
||||||
pr->pDataFReader = NULL;
|
pr->pDataFReader = NULL;
|
||||||
pr->pDataFReaderLast = NULL;
|
pr->pDataFReaderLast = NULL;
|
||||||
|
|
||||||
|
@ -344,7 +358,8 @@ _end:
|
||||||
tsdbDataFReaderClose(&pr->pDataFReaderLast);
|
tsdbDataFReaderClose(&pr->pDataFReaderLast);
|
||||||
tsdbDataFReaderClose(&pr->pDataFReader);
|
tsdbDataFReaderClose(&pr->pDataFReader);
|
||||||
|
|
||||||
tsdbUntakeReadSnap(NULL, pr->pReadSnap);
|
tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap);
|
||||||
|
taosThreadMutexUnlock(&pr->readerMutex);
|
||||||
|
|
||||||
for (int32_t j = 0; j < pr->numOfCols; ++j) {
|
for (int32_t j = 0; j < pr->numOfCols; ++j) {
|
||||||
taosMemoryFree(pRes[j]);
|
taosMemoryFree(pRes[j]);
|
||||||
|
|
|
@ -142,6 +142,9 @@ typedef struct SReaderStatus {
|
||||||
|
|
||||||
struct STsdbReader {
|
struct STsdbReader {
|
||||||
STsdb* pTsdb;
|
STsdb* pTsdb;
|
||||||
|
SVersionRange verRange;
|
||||||
|
TdThreadMutex readerMutex;
|
||||||
|
bool suspended;
|
||||||
uint64_t suid;
|
uint64_t suid;
|
||||||
int16_t order;
|
int16_t order;
|
||||||
STimeWindow window; // the primary query time window that applies to all queries
|
STimeWindow window; // the primary query time window that applies to all queries
|
||||||
|
@ -156,7 +159,6 @@ struct STsdbReader {
|
||||||
STSchema* pSchema; // the newest version schema
|
STSchema* pSchema; // the newest version schema
|
||||||
STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times
|
STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times
|
||||||
SDataFReader* pFileReader;
|
SDataFReader* pFileReader;
|
||||||
SVersionRange verRange;
|
|
||||||
|
|
||||||
int32_t step;
|
int32_t step;
|
||||||
STsdbReader* innerReader[2];
|
STsdbReader* innerReader[2];
|
||||||
|
@ -522,6 +524,8 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
|
||||||
|
|
||||||
setColumnIdSlotList(pReader, pReader->pResBlock);
|
setColumnIdSlotList(pReader, pReader->pResBlock);
|
||||||
|
|
||||||
|
taosThreadMutexInit(&pReader->readerMutex, NULL);
|
||||||
|
|
||||||
*ppReader = pReader;
|
*ppReader = pReader;
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
|
@ -613,12 +617,26 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
||||||
tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
|
tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
|
||||||
|
|
||||||
sizeInDisk += pScanInfo->mapData.nData;
|
sizeInDisk += pScanInfo->mapData.nData;
|
||||||
|
|
||||||
|
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
|
||||||
|
STimeWindow w = pReader->window;
|
||||||
|
if (ASCENDING_TRAVERSE(pReader->order)) {
|
||||||
|
w.skey = pScanInfo->lastKey + step;
|
||||||
|
} else {
|
||||||
|
w.ekey = pScanInfo->lastKey + step;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isEmptyQueryTimeWindow(&w)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
|
for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
|
||||||
SDataBlk block = {0};
|
SDataBlk block = {0};
|
||||||
tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetDataBlk);
|
tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetDataBlk);
|
||||||
|
|
||||||
// 1. time range check
|
// 1. time range check
|
||||||
if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
|
// if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
|
||||||
|
if (block.minKey.ts > w.ekey || block.maxKey.ts < w.skey) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2018,9 +2036,11 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
|
||||||
|
|
||||||
TSDBKEY startKey = {0};
|
TSDBKEY startKey = {0};
|
||||||
if (ASCENDING_TRAVERSE(pReader->order)) {
|
if (ASCENDING_TRAVERSE(pReader->order)) {
|
||||||
startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
|
// startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
|
||||||
|
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->verRange.minVer};
|
||||||
} else {
|
} else {
|
||||||
startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
|
// startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
|
||||||
|
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->verRange.maxVer};
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
|
int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
|
||||||
|
@ -2669,7 +2689,13 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
|
||||||
|
|
||||||
// set the correct start position in case of the first/last file block, according to the query time window
|
// set the correct start position in case of the first/last file block, according to the query time window
|
||||||
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
|
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
|
||||||
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
|
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
|
||||||
|
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
|
||||||
|
STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
||||||
|
if (pScanInfo == NULL) {
|
||||||
|
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list", pBlockInfo->uid);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
|
||||||
|
@ -2678,6 +2704,7 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
|
||||||
pDumpInfo->totalRows = pBlock->nRow;
|
pDumpInfo->totalRows = pBlock->nRow;
|
||||||
pDumpInfo->allDumped = false;
|
pDumpInfo->allDumped = false;
|
||||||
pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
|
pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
|
||||||
|
pDumpInfo->lastKey = pScanInfo->lastKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
|
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
|
||||||
|
@ -3489,8 +3516,6 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ====================================== EXPOSED APIs ======================================
|
// ====================================== EXPOSED APIs ======================================
|
||||||
static int32_t tsdbSetQueryReseek(void* pQHandle);
|
|
||||||
|
|
||||||
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
|
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
|
||||||
const char* idstr) {
|
const char* idstr) {
|
||||||
STimeWindow window = pCond->twindows;
|
STimeWindow window = pCond->twindows;
|
||||||
|
@ -3571,50 +3596,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (numOfTables > 0) {
|
pReader->suspended = true;
|
||||||
code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
|
|
||||||
code = doOpenReaderImpl(pReader);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
STsdbReader* pPrevReader = pReader->innerReader[0];
|
|
||||||
STsdbReader* pNextReader = pReader->innerReader[1];
|
|
||||||
|
|
||||||
// we need only one row
|
|
||||||
pPrevReader->capacity = 1;
|
|
||||||
pPrevReader->status.pTableMap = pReader->status.pTableMap;
|
|
||||||
pPrevReader->pSchema = pReader->pSchema;
|
|
||||||
pPrevReader->pMemSchema = pReader->pMemSchema;
|
|
||||||
pPrevReader->pReadSnap = pReader->pReadSnap;
|
|
||||||
|
|
||||||
pNextReader->capacity = 1;
|
|
||||||
pNextReader->status.pTableMap = pReader->status.pTableMap;
|
|
||||||
pNextReader->pSchema = pReader->pSchema;
|
|
||||||
pNextReader->pMemSchema = pReader->pMemSchema;
|
|
||||||
pNextReader->pReadSnap = pReader->pReadSnap;
|
|
||||||
|
|
||||||
code = doOpenReaderImpl(pPrevReader);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = doOpenReaderImpl(pNextReader);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = doOpenReaderImpl(pReader);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
|
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
|
||||||
return code;
|
return code;
|
||||||
|
@ -3677,6 +3659,8 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
||||||
|
|
||||||
tsdbUntakeReadSnap(pReader, pReader->pReadSnap);
|
tsdbUntakeReadSnap(pReader, pReader->pReadSnap);
|
||||||
|
|
||||||
|
taosThreadMutexDestroy(&pReader->readerMutex);
|
||||||
|
|
||||||
taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
|
taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
|
||||||
SIOCostSummary* pCost = &pReader->cost;
|
SIOCostSummary* pCost = &pReader->cost;
|
||||||
|
|
||||||
|
@ -3706,9 +3690,165 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
||||||
if (pReader->pMemSchema != pReader->pSchema) {
|
if (pReader->pMemSchema != pReader->pSchema) {
|
||||||
taosMemoryFree(pReader->pMemSchema);
|
taosMemoryFree(pReader->pMemSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFreeClear(pReader);
|
taosMemoryFreeClear(pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tsdbReaderSuspend(STsdbReader* pReader) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
// save reader's base state & reset top state to be reconstructed from base state
|
||||||
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
STableBlockScanInfo* pBlockScanInfo = NULL;
|
||||||
|
|
||||||
|
if (pStatus->loadFromFile) {
|
||||||
|
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||||
|
if (pBlockInfo != NULL) {
|
||||||
|
pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
||||||
|
if (pBlockScanInfo == NULL) {
|
||||||
|
code = TSDB_CODE_INVALID_PARA;
|
||||||
|
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
|
||||||
|
taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pBlockScanInfo = pStatus->pTableIter;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbDataFReaderClose(&pReader->pFileReader);
|
||||||
|
|
||||||
|
// resetDataBlockScanInfo excluding lastKey
|
||||||
|
STableBlockScanInfo* p = NULL;
|
||||||
|
|
||||||
|
while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
|
||||||
|
p->iterInit = false;
|
||||||
|
p->iiter.hasVal = false;
|
||||||
|
if (p->iter.iter != NULL) {
|
||||||
|
p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
|
||||||
|
}
|
||||||
|
|
||||||
|
p->delSkyline = taosArrayDestroy(p->delSkyline);
|
||||||
|
// p->lastKey = ts;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pBlockScanInfo = pStatus->pTableIter;
|
||||||
|
if (pBlockScanInfo) {
|
||||||
|
// save lastKey to restore memory iterator
|
||||||
|
STimeWindow w = pReader->pResBlock->info.window;
|
||||||
|
pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? w.ekey : w.skey;
|
||||||
|
|
||||||
|
// reset current current table's data block scan info,
|
||||||
|
pBlockScanInfo->iterInit = false;
|
||||||
|
// pBlockScanInfo->iiter.hasVal = false;
|
||||||
|
if (pBlockScanInfo->iter.iter != NULL) {
|
||||||
|
pBlockScanInfo->iter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iter.iter);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pBlockScanInfo->iiter.iter != NULL) {
|
||||||
|
pBlockScanInfo->iiter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iiter.iter);
|
||||||
|
}
|
||||||
|
|
||||||
|
pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList);
|
||||||
|
tMapDataClear(&pBlockScanInfo->mapData);
|
||||||
|
// TODO: keep skyline for reuse
|
||||||
|
pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbUntakeReadSnap(pReader, pReader->pReadSnap);
|
||||||
|
|
||||||
|
pReader->suspended = true;
|
||||||
|
|
||||||
|
tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo->uid, pReader->idStr);
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tsdbError("failed to suspend data reader, code:%s %s", tstrerror(code), pReader->idStr);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbSetQueryReseek(void* pQHandle) {
|
||||||
|
int32_t code = 0;
|
||||||
|
STsdbReader* pReader = pQHandle;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pReader->readerMutex);
|
||||||
|
|
||||||
|
if (pReader->suspended) {
|
||||||
|
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbReaderSuspend(pReader);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbReaderResume(STsdbReader* pReader) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
STableBlockScanInfo* pBlockScanInfo = pReader->status.pTableIter;
|
||||||
|
|
||||||
|
// restore reader's state
|
||||||
|
// task snapshot
|
||||||
|
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
|
||||||
|
if (numOfTables > 0) {
|
||||||
|
code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
|
||||||
|
code = doOpenReaderImpl(pReader);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
STsdbReader* pPrevReader = pReader->innerReader[0];
|
||||||
|
STsdbReader* pNextReader = pReader->innerReader[1];
|
||||||
|
|
||||||
|
// we need only one row
|
||||||
|
pPrevReader->capacity = 1;
|
||||||
|
pPrevReader->status.pTableMap = pReader->status.pTableMap;
|
||||||
|
pPrevReader->pSchema = pReader->pSchema;
|
||||||
|
pPrevReader->pMemSchema = pReader->pMemSchema;
|
||||||
|
pPrevReader->pReadSnap = pReader->pReadSnap;
|
||||||
|
|
||||||
|
pNextReader->capacity = 1;
|
||||||
|
pNextReader->status.pTableMap = pReader->status.pTableMap;
|
||||||
|
pNextReader->pSchema = pReader->pSchema;
|
||||||
|
pNextReader->pMemSchema = pReader->pMemSchema;
|
||||||
|
pNextReader->pReadSnap = pReader->pReadSnap;
|
||||||
|
|
||||||
|
code = doOpenReaderImpl(pPrevReader);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = doOpenReaderImpl(pNextReader);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = doOpenReaderImpl(pReader);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pReader->suspended = false;
|
||||||
|
|
||||||
|
tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRIu64 ", in this query %s", pReader,
|
||||||
|
pBlockScanInfo ? pBlockScanInfo->uid : 0, numOfTables, pReader->idStr);
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tsdbError("failed to resume data reader, code:%s %s", tstrerror(code), pReader->idStr);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
|
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
// cleanup the data that belongs to the previous data block
|
// cleanup the data that belongs to the previous data block
|
||||||
SSDataBlock* pBlock = pReader->pResBlock;
|
SSDataBlock* pBlock = pReader->pResBlock;
|
||||||
|
@ -3741,12 +3881,23 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pReader->readerMutex);
|
||||||
|
if (pReader->suspended) {
|
||||||
|
tsdbReaderResume(pReader);
|
||||||
|
}
|
||||||
|
|
||||||
if (pReader->innerReader[0] != NULL && pReader->step == 0) {
|
if (pReader->innerReader[0] != NULL && pReader->step == 0) {
|
||||||
bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
|
bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
|
||||||
resetDataBlockScanInfo(pReader->innerReader[0]->status.pTableMap, pReader->innerReader[0]->window.ekey);
|
resetDataBlockScanInfo(pReader->innerReader[0]->status.pTableMap, pReader->innerReader[0]->window.ekey);
|
||||||
pReader->step = EXTERNAL_ROWS_PREV;
|
pReader->step = EXTERNAL_ROWS_PREV;
|
||||||
|
|
||||||
if (ret) {
|
if (ret) {
|
||||||
|
if (pStatus->composedDataBlock) {
|
||||||
|
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||||
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3757,6 +3908,10 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
|
|
||||||
bool ret = doTsdbNextDataBlock(pReader);
|
bool ret = doTsdbNextDataBlock(pReader);
|
||||||
if (ret) {
|
if (ret) {
|
||||||
|
if (pStatus->composedDataBlock) {
|
||||||
|
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||||
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3765,10 +3920,18 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
|
bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
|
||||||
pReader->step = EXTERNAL_ROWS_NEXT;
|
pReader->step = EXTERNAL_ROWS_NEXT;
|
||||||
if (ret1) {
|
if (ret1) {
|
||||||
|
if (pStatus->composedDataBlock) {
|
||||||
|
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||||
|
}
|
||||||
|
|
||||||
return ret1;
|
return ret1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pStatus->composedDataBlock) {
|
||||||
|
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||||
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3903,6 +4066,8 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||||
|
|
||||||
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
|
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
|
||||||
return pReader->pResBlock->pDataBlock;
|
return pReader->pResBlock->pDataBlock;
|
||||||
}
|
}
|
||||||
|
@ -4174,20 +4339,3 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) {
|
||||||
}
|
}
|
||||||
tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
|
tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbSetQueryReseek(void* pQHandle) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
// lock handle
|
|
||||||
|
|
||||||
// check state (is already in reseek state, skip below)
|
|
||||||
|
|
||||||
// save all state for further restore
|
|
||||||
|
|
||||||
// unref read snapshot
|
|
||||||
// tsdbUntakeReadSnap();
|
|
||||||
|
|
||||||
// unlock handle
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue