fix: no block after new duration

This commit is contained in:
slzhou 2023-12-05 09:51:14 +08:00
parent 8c290587b7
commit 538f6fc722
2 changed files with 24 additions and 27 deletions

View File

@ -298,7 +298,7 @@ typedef struct STableMergeScanInfo {
int64_t mergeLimit; int64_t mergeLimit;
SSortExecInfo sortExecInfo; SSortExecInfo sortExecInfo;
bool bNewDuration; bool bNewDuration;
SSDataBlock* pNewDurationBlock; bool bOnlyRetrieveBlock;
} STableMergeScanInfo; } STableMergeScanInfo;
typedef struct STagScanFilterContext { typedef struct STagScanFilterContext {

View File

@ -3242,29 +3242,28 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
STsdbReader* reader = pInfo->base.dataReader; STsdbReader* reader = pInfo->base.dataReader;
while (true) { while (true) {
if (pInfo->pNewDurationBlock) { if (!pInfo->bOnlyRetrieveBlock) {
SSDataBlock* pNewDurationBlock = pInfo->pNewDurationBlock; code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
pInfo->pNewDurationBlock = NULL; if (code != 0) {
return pNewDurationBlock; pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
} qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, code);
}
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); if (!hasNext || isTaskKilled(pTaskInfo)) {
if (code != 0) { pInfo->bNewDuration = false;
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); if (isTaskKilled(pTaskInfo)) {
qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo)); qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, code); pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
} }
break;
}
if (!hasNext) { if (pInfo->bNewDuration) {
break; pInfo->bOnlyRetrieveBlock = true;
return NULL;
}
} }
if (isTaskKilled(pTaskInfo)) {
qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo));
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
break;
}
// process this data block based on the probabilities // process this data block based on the probabilities
bool processThisBlock = processBlockWithProbability(&pInfo->sample); bool processThisBlock = processBlockWithProbability(&pInfo->sample);
if (!processThisBlock) { if (!processThisBlock) {
@ -3273,6 +3272,9 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
uint32_t status = 0; uint32_t status = 0;
code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status); code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
if (pInfo->bOnlyRetrieveBlock) {
pInfo->bOnlyRetrieveBlock = false;
}
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo)); qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
@ -3295,11 +3297,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
if (pInfo->bNewDuration) {
pInfo->pNewDurationBlock = pBlock;
return NULL;
}
return pBlock; return pBlock;
} }
@ -3336,7 +3334,6 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond*
} }
void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) { void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) {
uInfo("tableMergeScanTsdbNotifyCb, %d, %d", type, info->duration.fileSetId);
STableMergeScanInfo* pTmsInfo = param; STableMergeScanInfo* pTmsInfo = param;
pTmsInfo->bNewDuration = true; pTmsInfo->bNewDuration = true;
return; return;