Merge pull request #23528 from taosdata/fix/liaohj

enh(tsdb): on-demand open stt file reader.
This commit is contained in:
Haojun Liao 2023-11-03 17:29:55 +08:00 committed by GitHub
commit 3415ba5195
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 123 additions and 124 deletions

View File

@ -184,11 +184,7 @@ void qDestroyTask(qTaskInfo_t tinfo);
void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/);
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList);
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);

View File

@ -846,14 +846,14 @@ static void tLDataIterPinSttBlock(SLDataIter* pIter, const char* id) {
if (pInfo->blockData[0].sttBlockIndex == pIter->iSttBlk) {
pInfo->blockData[0].pin = true;
ASSERT(!pInfo->blockData[1].pin);
tsdbDebug("pin stt-block, blockIndex:%d, stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
tsdbTrace("pin stt-block, blockIndex:%d, stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
return;
}
if (pInfo->blockData[1].sttBlockIndex == pIter->iSttBlk) {
pInfo->blockData[1].pin = true;
ASSERT(!pInfo->blockData[0].pin);
tsdbDebug("pin stt-block, blockIndex:%d, stt-fileVer:%"PRId64" %s", pIter->iSttBlk, pIter->cid, id);
tsdbTrace("pin stt-block, blockIndex:%d, stt-fileVer:%"PRId64" %s", pIter->iSttBlk, pIter->cid, id);
return;
}

View File

@ -565,9 +565,9 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
if (ASCENDING_TRAVERSE(pReader->info.order)) {
w.skey = pScanInfo->lastKey + step;
w.skey = pScanInfo->lastProcKey + step;
} else {
w.ekey = pScanInfo->lastKey + step;
w.ekey = pScanInfo->lastProcKey + step;
}
if (isEmptyQueryTimeWindow(&w)) {
@ -607,14 +607,14 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
clearBrinBlockIter(&iter);
pBlockNum->numOfLastFiles = pReader->status.pCurrentFileset->lvlArr->size;
int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
pBlockNum->numOfSttFiles = pReader->status.pCurrentFileset->lvlArr->size;
int32_t total = pBlockNum->numOfSttFiles + pBlockNum->numOfBlocks;
double el = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug(
"load block of %d tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
"time:%.2f ms %s",
numOfTables, pBlockNum->numOfBlocks, (int32_t)taosArrayGetSize(pTableScanInfoList), pBlockNum->numOfLastFiles,
numOfTables, pBlockNum->numOfBlocks, (int32_t)taosArrayGetSize(pTableScanInfoList), pBlockNum->numOfSttFiles,
sizeInDisk / 1000.0, el, pReader->idStr);
pReader->cost.numOfBlocks += total;
@ -1196,13 +1196,12 @@ static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* p
}
}
static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, SLastBlockReader* pLastBlockReader, int32_t order) {
static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, STableBlockScanInfo* pScanInfo, int32_t order) {
bool ascScan = ASCENDING_TRAVERSE(order);
bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader);
int64_t key = 0;
if (bHasDataInLastBlock) {
int64_t keyInStt = getCurrentKeyInLastBlock(pLastBlockReader);
if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) {
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
key = ascScan ? TMIN(pBlock->record.firstKey, keyInStt) : TMAX(pBlock->record.lastKey, keyInStt);
} else {
key = ascScan ? pBlock->record.firstKey : pBlock->record.lastKey;
@ -1211,10 +1210,10 @@ static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, SLastBlockReader
return key;
}
static bool bufferDataInFileBlockGap(TSDBKEY keyInBuf, SFileDataBlockInfo* pBlock,
SLastBlockReader* pLastBlockReader, int32_t order) {
static bool bufferDataInFileBlockGap(TSDBKEY keyInBuf, SFileDataBlockInfo* pBlock, STableBlockScanInfo* pScanInfo,
int32_t order) {
bool ascScan = ASCENDING_TRAVERSE(order);
int64_t key = getBoarderKeyInFiles(pBlock, pLastBlockReader, order);
int64_t key = getBoarderKeyInFiles(pBlock, pScanInfo, order);
return (ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts < key)) ||
(!ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts > key));
@ -1298,12 +1297,12 @@ typedef struct {
} SDataBlockToLoadInfo;
static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo,
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
STsdbReader* pReader) {
int32_t neighborIndex = 0;
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, STsdbReader* pReader) {
SBrinRecord rec = {0};
int32_t neighborIndex = 0;
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex, pReader->info.order, &rec);
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex,
pReader->info.order, &rec);
// overlap with neighbor
if (hasNeighbor) {
@ -1314,9 +1313,11 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
pInfo->hasDupTs = (pBlockInfo->record.numRow > pBlockInfo->record.count) || (pBlockInfo->record.count <= 0);
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pBlockInfo->record, pReader->info.order);
if (hasDataInLastBlock(pLastBlockReader)) {
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
pInfo->overlapWithLastBlock = !(pBlockInfo->record.lastKey < tsLast || pBlockInfo->record.firstKey > tsLast);
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) {
int64_t nextProcKeyInStt = pScanInfo->sttKeyInfo.nextProcKey;
pInfo->overlapWithLastBlock =
!(pBlockInfo->record.lastKey < nextProcKeyInStt || pBlockInfo->record.firstKey > nextProcKeyInStt);
}
pInfo->moreThanCapcity = pBlockInfo->record.numRow > pReader->resBlockInfo.capacity;
@ -1331,9 +1332,9 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
// 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo,
TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
TSDBKEY keyInBuf) {
SDataBlockToLoadInfo info = {0};
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader, pReader);
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pReader);
bool loadDataBlock =
(info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
@ -1353,9 +1354,9 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlock
}
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo,
TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
TSDBKEY keyInBuf) {
SDataBlockToLoadInfo info = {0};
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader, pReader);
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pReader);
bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
info.overlapWithDelInfo || info.overlapWithLastBlock);
return isCleanFileBlock;
@ -1412,14 +1413,15 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
return code;
}
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo,
SVersionRange* pVerRange) {
static bool nextRowFromSttBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo,
SVersionRange* pVerRange) {
int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order) ? 1 : -1;
while (1) {
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
if (!hasVal) { // the next value will be the accessed key in stt
pScanInfo->lastKeyInStt += step;
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
pScanInfo->sttKeyInfo.nextProcKey += step;
return false;
}
@ -1428,10 +1430,11 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
int64_t ver = pRow->pBlockData->aVersion[pRow->iRow];
pLastBlockReader->currentKey = key;
pScanInfo->lastKeyInStt = key;
pScanInfo->sttKeyInfo.nextProcKey = key;
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, pLastBlockReader->order,
pVerRange)) {
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
return true;
}
}
@ -1453,7 +1456,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
// avoid the fetch next row replace the referenced stt block in buffer
doPinSttBlock(pLastBlockReader);
bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->info.verRange);
bool hasVal = nextRowFromSttBlocks(pLastBlockReader, pScanInfo, &pReader->info.verRange);
doUnpinSttBlock(pLastBlockReader);
if (hasVal) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
@ -1684,7 +1687,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
}
if (copied) {
pBlockScanInfo->lastKey = tsLastBlock;
pBlockScanInfo->lastProcKey = tsLastBlock;
return TSDB_CODE_SUCCESS;
} else {
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
@ -2040,9 +2043,9 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
STbData* d = NULL;
TSDBKEY startKey = {0};
if (ASCENDING_TRAVERSE(pReader->info.order)) {
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->info.verRange.minVer};
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastProcKey + 1, .version = pReader->info.verRange.minVer};
} else {
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->info.verRange.maxVer};
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastProcKey - 1, .version = pReader->info.verRange.maxVer};
}
int32_t code =
@ -2107,9 +2110,9 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
STimeWindow w = pLBlockReader->window;
if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
w.skey = pScanInfo->lastKeyInStt;
w.skey = pScanInfo->sttKeyInfo.nextProcKey;
} else {
w.ekey = pScanInfo->lastKeyInStt;
w.ekey = pScanInfo->sttKeyInfo.nextProcKey;
}
int64_t st = taosGetTimestampUs();
@ -2142,7 +2145,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
initMemDataIterator(pScanInfo, pReader);
initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost);
code = nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange);
code = nextRowFromSttBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange);
int64_t el = taosGetTimestampUs() - st;
pReader->cost.initLastBlockReader += (el / 1000.0);
@ -2181,7 +2184,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
}
if (copied) {
pBlockScanInfo->lastKey = key;
pBlockScanInfo->lastProcKey = key;
return TSDB_CODE_SUCCESS;
} else {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
@ -2324,16 +2327,16 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
// it is a clean block, load it directly
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
(pRecord->numRow <= pReader->resBlockInfo.capacity)) {
if (asc || (!hasDataInLastBlock(pLastBlockReader))) {
int64_t cap = pReader->resBlockInfo.capacity;
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf) && (pRecord->numRow <= cap)) {
if (asc || (pBlockScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA)) {
code = copyBlockDataToSDataBlock(pReader);
if (code) {
goto _end;
}
// record the last key value
pBlockScanInfo->lastKey = asc ? pRecord->lastKey : pRecord->firstKey;
pBlockScanInfo->lastProcKey = asc ? pRecord->lastKey : pRecord->firstKey;
goto _end;
}
}
@ -2348,6 +2351,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
}
SBlockData* pBlockData = &pReader->status.fileBlockData;
initLastBlockReader(pLastBlockReader, pBlockScanInfo, pReader);
while (1) {
bool hasBlockData = false;
@ -2497,7 +2501,7 @@ TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader)
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SArray* pTableList) {
SReaderStatus* pStatus = &pReader->status;
pBlockNum->numOfBlocks = 0;
pBlockNum->numOfLastFiles = 0;
pBlockNum->numOfSttFiles = 0;
size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBrinBlk));
@ -2534,7 +2538,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
return code;
}
if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
if (pBlockNum->numOfBlocks + pBlockNum->numOfSttFiles > 0) {
break;
}
}
@ -2654,11 +2658,13 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
}
}
static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, SLastBlockReader* pLastBlockReader, bool asc) {
if(!hasDataInLastBlock(pLastBlockReader)) {
static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, bool asc) {
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
if(pScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA) {
return true;
} else {
int64_t keyInStt = getCurrentKeyInLastBlock(pLastBlockReader);
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
return (asc && pBlockInfo->record.lastKey < keyInStt) || (!asc && pBlockInfo->record.firstKey > keyInStt);
}
}
@ -2687,10 +2693,12 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
return terrno;
}
initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) {
initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
}
if (fileBlockShouldLoad(pReader, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader)) {
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
if (fileBlockShouldLoad(pReader, pBlockInfo, pScanInfo, keyInBuf)) {
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -2698,13 +2706,13 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// build composed data block
code = buildComposedDataBlock(pReader);
} else if (bufferDataInFileBlockGap(keyInBuf, pBlockInfo, pLastBlockReader, pReader->info.order)) {
} else if (bufferDataInFileBlockGap(keyInBuf, pBlockInfo, pScanInfo, pReader->info.order)) {
// data in memory that are earlier than current file block and stt blocks
// rows in buffer should be less than the file block in asc, greater than file block in desc
int64_t endKey = getBoarderKeyInFiles(pBlockInfo, pLastBlockReader, pReader->info.order);
int64_t endKey = getBoarderKeyInFiles(pBlockInfo, pScanInfo, pReader->info.order);
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
} else {
if (notOverlapWithSttFiles(pBlockInfo, pLastBlockReader, asc)) {
if (notOverlapWithSttFiles(pBlockInfo, pScanInfo, asc)) {
// whole block is required, return it directly
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
pInfo->rows = pBlockInfo->record.numRow;
@ -2715,7 +2723,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
// update the last key for the corresponding table
pScanInfo->lastKey = asc ? pInfo->window.ekey : pInfo->window.skey;
pScanInfo->lastProcKey = asc ? pInfo->window.ekey : pInfo->window.skey;
tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
@ -2730,8 +2738,13 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
tsdbDebug("load data in last block firstly %s", pReader->idStr);
int64_t st = taosGetTimestampUs();
// let's load data from stt files
initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
// no data in last block, no need to proceed.
while (hasDataInLastBlock(pLastBlockReader)) {
ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -2958,7 +2971,7 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
if (pBlockInfo) {
STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pScanInfo) {
lastKey = pScanInfo->lastKey;
lastKey = pScanInfo->lastProcKey;
}
pDumpInfo->totalRows = pBlockInfo->record.numRow;
@ -2983,7 +2996,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
}
// all data files are consumed, try data in buffer
if (num.numOfBlocks + num.numOfLastFiles == 0) {
if (num.numOfBlocks + num.numOfSttFiles == 0) {
pReader->status.loadFromFile = false;
taosArrayDestroy(pTableList);
return code;
@ -3428,15 +3441,15 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) {
while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
while (nextRowFromSttBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 == ts) {
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL);
} else {
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt,
idStr);
pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline),
pScanInfo->sttKeyInfo.nextProcKey, idStr);
break;
}
}
@ -3692,7 +3705,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT
pBlock->info.dataLoad = 1;
pBlock->info.rows += 1;
pScanInfo->lastKey = pTSRow->ts;
pScanInfo->lastProcKey = pTSRow->ts;
return TSDB_CODE_SUCCESS;
}
@ -3826,14 +3839,15 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t
// todo extract method
if (ASCENDING_TRAVERSE(pReader->info.order)) {
int64_t skey = pReader->info.window.skey;
pInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
pInfo->lastKeyInStt = skey;
pInfo->lastProcKey = (skey > INT64_MIN) ? (skey - 1) : skey;
pInfo->sttKeyInfo.nextProcKey = skey;
} else {
int64_t ekey = pReader->info.window.ekey;
pInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pInfo->lastKeyInStt = ekey;
pInfo->lastProcKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pInfo->sttKeyInfo.nextProcKey = ekey;
}
pInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
tSimpleHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
}
@ -4194,7 +4208,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
if (pBlockScanInfo) {
// save lastKey to restore memory iterator
STimeWindow w = pReader->resBlockInfo.pResBlock->info.window;
pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? w.ekey : w.skey;
pBlockScanInfo->lastProcKey = ASCENDING_TRAVERSE(pReader->info.order) ? w.ekey : w.skey;
// reset current current table's data block scan info,
pBlockScanInfo->iterInit = false;

View File

@ -157,17 +157,18 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf
if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) {
int64_t skey = pTsdbReader->info.window.skey;
pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
pScanInfo->lastKeyInStt = skey;
pScanInfo->lastProcKey = (skey > INT64_MIN) ? (skey - 1) : skey;
pScanInfo->sttKeyInfo.nextProcKey = skey;
} else {
int64_t ekey = pTsdbReader->info.window.ekey;
pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pScanInfo->lastKeyInStt = ekey;
pScanInfo->lastProcKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pScanInfo->sttKeyInfo.nextProcKey = ekey;
}
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
pScanInfo->lastKey, pTsdbReader->idStr);
pScanInfo->lastProcKey, pTsdbReader->idStr);
}
taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc);
@ -200,8 +201,8 @@ void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) {
}
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
pInfo->lastKey = ts;
pInfo->lastKeyInStt = ts + step;
pInfo->lastProcKey = ts;
pInfo->sttKeyInfo.nextProcKey = ts + step;
}
}
@ -241,6 +242,7 @@ static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) {
taosArrayClear(pScanInfo->pBlockList);
taosArrayClear(pScanInfo->pBlockIdxList);
taosArrayClear(pScanInfo->pFileDelData); // del data from each file set
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
}
void cleanupInfoForNextFileset(SSHashObj* pTableMap) {

View File

@ -63,20 +63,31 @@ typedef struct STableDataBlockIdx {
int32_t globalIndex;
} STableDataBlockIdx;
typedef enum ESttKeyStatus {
STT_FILE_READER_UNINIT = 0x0,
STT_FILE_NO_DATA = 0x1,
STT_FILE_HAS_DATA = 0x2,
} ESttKeyStatus;
typedef struct SSttKeyInfo {
ESttKeyStatus status; // this value should be updated when switch to the next fileset
int64_t nextProcKey;
} SSttKeyInfo;
typedef struct STableBlockScanInfo {
uint64_t uid;
TSKEY lastKey;
TSKEY lastKeyInStt; // last accessed key in stt
SArray* pBlockList; // block data index list, SArray<SBrinRecord>
SArray* pBlockIdxList; // SArray<STableDataBlockIndx>
SArray* pMemDelData; // SArray<SDelData>
SArray* pFileDelData; // SArray<SDelData> from each file set
SIterInfo iter; // mem buffer skip list iterator
SIterInfo iiter; // imem buffer skip list iterator
SArray* delSkyline; // delete info for this table
int32_t fileDelIndex; // file block delete index
int32_t sttBlockDelIndex; // delete index for last block
bool iterInit; // whether to initialize the in-memory skip list iterator or not
uint64_t uid;
TSKEY lastProcKey;
SSttKeyInfo sttKeyInfo;
SArray* pBlockList; // block data index list, SArray<SBrinRecord>
SArray* pBlockIdxList; // SArray<STableDataBlockIndx>
SArray* pMemDelData; // SArray<SDelData>
SArray* pFileDelData; // SArray<SDelData> from each file set
SIterInfo iter; // mem buffer skip list iterator
SIterInfo iiter; // imem buffer skip list iterator
SArray* delSkyline; // delete info for this table
int32_t fileDelIndex; // file block delete index
int32_t sttBlockDelIndex; // delete index for last block
bool iterInit; // whether to initialize the in-memory skip list iterator or not
} STableBlockScanInfo;
typedef struct SResultBlockInfo {
@ -108,7 +119,7 @@ typedef struct STableUidList {
typedef struct {
int32_t numOfBlocks;
int32_t numOfLastFiles;
int32_t numOfSttFiles;
} SBlockNumber;
typedef struct SBlockIndex {

View File

@ -871,32 +871,6 @@ int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
}
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
if (pTaskInfo->pRoot == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t nOptrWithVal = 0;
// int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
// if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) {
// taosMemoryFreeClear(*pOutput);
// *len = 0;
// }
return 0;
}
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
if (pTaskInfo == NULL || pInput == NULL || len == 0) {
return TSDB_CODE_INVALID_PARA;
}
return 0;
// return decodeOperator(pTaskInfo->pRoot, pInput, len);
}
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;

View File

@ -647,23 +647,25 @@ int32_t streamExecTask(SStreamTask* pTask) {
int32_t streamTaskReleaseState(SStreamTask* pTask) {
stDebug("s-task:%s release exec state", pTask->id.idStr);
void* pExecutor = pTask->exec.pExecutor;
int32_t code = TSDB_CODE_SUCCESS;
if (pExecutor != NULL) {
int32_t code = qStreamOperatorReleaseState(pExecutor);
return code;
} else {
return TSDB_CODE_SUCCESS;
code = qStreamOperatorReleaseState(pExecutor);
}
return code;
}
int32_t streamTaskReloadState(SStreamTask* pTask) {
stDebug("s-task:%s reload exec state", pTask->id.idStr);
void* pExecutor = pTask->exec.pExecutor;
int32_t code = TSDB_CODE_SUCCESS;
if (pExecutor != NULL) {
int32_t code = qStreamOperatorReloadState(pExecutor);
return code;
} else {
return TSDB_CODE_SUCCESS;
code = qStreamOperatorReloadState(pExecutor);
}
return code;
}
int32_t streamAlignTransferState(SStreamTask* pTask) {