fix(tsdb): open reader during resume task for main reader after suspending reader.

This commit is contained in:
Haojun Liao 2024-01-09 21:40:27 +08:00
parent 1b6d55616f
commit b0202bb853
1 changed files with 59 additions and 41 deletions

View File

@ -676,6 +676,10 @@ static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int
} }
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) { static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
if (pBlockIter->blockList == NULL) {
return NULL;
}
size_t num = TARRAY_SIZE(pBlockIter->blockList); size_t num = TARRAY_SIZE(pBlockIter->blockList);
if (num == 0) { if (num == 0) {
ASSERT(pBlockIter->numOfBlocks == num); ASSERT(pBlockIter->numOfBlocks == num);
@ -1969,30 +1973,34 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STableBlockScanInfo* pBlockScanInfo, bool asc,
STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { STsdbReaderInfo* pInfo) {
// it is an multi-table data block // it is an multi-table data block
if (pBlockData->aUid != NULL) { if (pBlockData->aUid != NULL) {
uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex]; uint64_t uid = pBlockData->aUid[rowIndex];
if (uid != pBlockScanInfo->uid) { // move to next row if (uid != pBlockScanInfo->uid) { // move to next row
return false; return false;
} }
} }
// check for version and time range // check for version and time range
int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex]; int64_t ver = pBlockData->aVersion[rowIndex];
if (ver > pReader->info.verRange.maxVer || ver < pReader->info.verRange.minVer) { if (ver > pInfo->verRange.maxVer || ver < pInfo->verRange.minVer) {
return false; return false;
} }
int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex]; int64_t ts = pBlockData->aTSKEY[rowIndex];
if (ts > pReader->info.window.ekey || ts < pReader->info.window.skey) { if (ts > pInfo->window.ekey || ts < pInfo->window.skey) {
return false;
}
if ((asc && (ts <= pBlockScanInfo->lastProcKey)) || ((!asc) && (ts >= pBlockScanInfo->lastProcKey))) {
return false; return false;
} }
if (pBlockScanInfo->delSkyline != NULL && TARRAY_SIZE(pBlockScanInfo->delSkyline) > 0) { if (pBlockScanInfo->delSkyline != NULL && TARRAY_SIZE(pBlockScanInfo->delSkyline) > 0) {
bool dropped = hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver, bool dropped = hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver,
pReader->info.order, &pReader->info.verRange); pInfo->order, &pInfo->verRange);
if (dropped) { if (dropped) {
return false; return false;
} }
@ -2344,7 +2352,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
{ {
while (pBlockData->nRow > 0 && pBlockData->uid == pBlockScanInfo->uid) { while (pBlockData->nRow > 0 && pBlockData->uid == pBlockScanInfo->uid) {
// find the first qualified row in data block // find the first qualified row in data block
if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) { if (isValidFileBlockRow(pBlockData, pDumpInfo->rowIndex, pBlockScanInfo, asc, &pReader->info)) {
hasBlockData = true; hasBlockData = true;
break; break;
} }
@ -4130,37 +4138,21 @@ void tsdbReaderClose2(STsdbReader* pReader) {
taosMemoryFreeClear(pReader); taosMemoryFreeClear(pReader);
} }
int32_t tsdbReaderSuspend2(STsdbReader* pReader) { static int32_t doSuspendCurrentReader(STsdbReader* pCurrentReader) {
// save reader's base state & reset top state to be reconstructed from base state SReaderStatus* pStatus = &pCurrentReader->status;
int32_t code = 0;
SReaderStatus* pStatus = &pReader->status;
STableBlockScanInfo* pBlockScanInfo = NULL;
pReader->status.suspendInvoked = true; // record the suspend status
if (pStatus->loadFromFile) { if (pStatus->loadFromFile) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); tsdbDataFileReaderClose(&pCurrentReader->pFileReader);
if (pBlockInfo != NULL) {
pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr);
if (pBlockScanInfo == NULL) {
goto _err;
}
} else {
pBlockScanInfo = *pStatus->pTableIter;
}
tsdbDataFileReaderClose(&pReader->pFileReader); SReadCostSummary* pCost = &pCurrentReader->cost;
pStatus->pLDataIterArray = destroySttBlockReader(pStatus->pLDataIterArray, &pCost->sttCost);
SReadCostSummary* pCost = &pReader->cost; pStatus->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost);
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
} }
// resetDataBlockScanInfo excluding lastKey // resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo** p = NULL; STableBlockScanInfo** p = NULL;
int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pCurrentReader->info.order) ? 1 : -1;
int32_t iter = 0; int32_t iter = 0;
while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) { while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
@ -4171,6 +4163,26 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
pStatus->uidList.currentIndex = 0; pStatus->uidList.currentIndex = 0;
initReaderStatus(pStatus); initReaderStatus(pStatus);
return TSDB_CODE_SUCCESS;
}
int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
// save reader's base state & reset top state to be reconstructed from base state
int32_t code = 0;
pReader->status.suspendInvoked = true; // record the suspend status
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
if (pReader->step == EXTERNAL_ROWS_PREV) {
doSuspendCurrentReader(pReader->innerReader[0]);
} else if (pReader->step == EXTERNAL_ROWS_MAIN) {
doSuspendCurrentReader(pReader);
} else {
doSuspendCurrentReader(pReader->innerReader[1]);
}
} else {
doSuspendCurrentReader(pReader);
}
tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false); tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false);
pReader->pReadSnap = NULL; pReader->pReadSnap = NULL;
if (pReader->bFilesetDelimited) { if (pReader->bFilesetDelimited) {
@ -4179,16 +4191,16 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
} }
pReader->flag = READER_STATUS_SUSPEND; pReader->flag = READER_STATUS_SUSPEND;
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
clearSharedPtr(pReader->innerReader[0]);
clearSharedPtr(pReader->innerReader[1]);
}
#if SUSPEND_RESUME_TEST #if SUSPEND_RESUME_TEST
tsem_post(&pReader->resumeAfterSuspend); tsem_post(&pReader->resumeAfterSuspend);
#endif #endif
tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0, tsdbDebug("reader: %p suspended in this query %s, step:%d", pReader, pReader->idStr, pReader->step);
pReader->idStr);
return code;
_err:
tsdbError("failed to suspend data reader, code:%s %s", tstrerror(code), pReader->idStr);
return code; return code;
} }
@ -4219,8 +4231,7 @@ int32_t tsdbReaderResume2(STsdbReader* pReader) {
int32_t code = 0; int32_t code = 0;
STableBlockScanInfo** pBlockScanInfo = pReader->status.pTableIter; STableBlockScanInfo** pBlockScanInfo = pReader->status.pTableIter;
// restore reader's state // restore reader's state, task snapshot
// task snapshot
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
if (numOfTables > 0) { if (numOfTables > 0) {
qTrace("tsdb/reader: %p, take snapshot", pReader); qTrace("tsdb/reader: %p, take snapshot", pReader);
@ -4245,7 +4256,14 @@ int32_t tsdbReaderResume2(STsdbReader* pReader) {
pNextReader->resBlockInfo.capacity = 1; pNextReader->resBlockInfo.capacity = 1;
setSharedPtr(pNextReader, pReader); setSharedPtr(pNextReader, pReader);
if (pReader->step == 0 || pReader->step == EXTERNAL_ROWS_PREV) {
code = doOpenReaderImpl(pPrevReader); code = doOpenReaderImpl(pPrevReader);
} else if (pReader->step == EXTERNAL_ROWS_MAIN) {
code = doOpenReaderImpl(pReader);
} else {
code = doOpenReaderImpl(pNextReader);
}
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }