fix: add new duration block event

This commit is contained in:
shenglian zhou 2023-12-15 15:04:47 +08:00
parent d3146a5bec
commit cc4a6c6d55
5 changed files with 105 additions and 60 deletions

View File

@ -153,7 +153,8 @@ typedef struct {
// clang-format off
/*-------------------------------------------------new api format---------------------------------------------------*/
typedef enum {
TSD_READER_NOTIFY_DURATION_START
TSD_READER_NOTIFY_DURATION_START,
TSD_READER_NOTIFY_NEXT_DURATION_BLOCK,
} ETsdReaderNotifyType;
typedef union {

View File

@ -4332,12 +4332,7 @@ static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader) {
STsdReaderNotifyInfo info = {0};
info.duration.filesetId = fid;
pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam);
tsdbDebug("new duration %d start notification when no buffer preceeding fileset, %s", fid, pReader->idStr);
}
if (pStatus->pNextFilesetBlock && pStatus->pNextFilesetBlock->info.rows > 0) {
tsdbDebug("return the saved block from fileset %d files, %s", fid, pReader->idStr);
copyDataBlock(pBlock, pStatus->pNextFilesetBlock);
blockDataDestroy(pStatus->pNextFilesetBlock);
tsdbDebug("new duration %d start notification when buffer pre-fileset, %s", fid, pReader->idStr);
}
}
return code;
@ -4364,12 +4359,12 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
tsdbTrace("block from file rows: %"PRId64", will process pre-file set buffer: %d. %s",
pBlock->info.rows, pStatus->bProcMemFirstFileset, pReader->idStr);
if (pStatus->bProcMemPreFileset) {
if ( pBlock->info.rows > 0) {
pStatus->pNextFilesetBlock = createOneDataBlock(pBlock, true);
blockDataCleanup(pBlock);
code = buildFromPreFilesetBuffer(pReader);
if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) {
return code;
if (pBlock->info.rows > 0) {
if (pReader->notifyFn) {
int32_t fid = pReader->status.pCurrentFileset->fid;
STsdReaderNotifyInfo info = {0};
info.duration.filesetId = fid;
pReader->notifyFn(TSD_READER_NOTIFY_NEXT_DURATION_BLOCK, &info, pReader->notifyParam);
}
} else {
pStatus->bProcMemPreFileset = false;

View File

@ -220,7 +220,6 @@ typedef struct SReaderStatus {
bool bProcMemFirstFileset;
STableUidList procMemUidList;
STableBlockScanInfo** pProcMemTableIter;
SSDataBlock* pNextFilesetBlock;
} SReaderStatus;
struct STsdbReader {

View File

@ -298,9 +298,14 @@ typedef struct STableMergeScanInfo {
SHashObj* mSkipTables;
int64_t mergeLimit;
SSortExecInfo sortExecInfo;
bool bNewFileset;
bool bOnlyRetrieveBlock;
bool filesetDelimited;
bool bNewFilesetEvent;
bool bNextDurationBlockEvent;
int32_t numNextDurationBlocks;
SSDataBlock* nextDurationBlocks[2];
bool rtnNextDurationBlocks;
int32_t nextDurationBlocksIdx;
} STableMergeScanInfo;
typedef struct STagScanFilterContext {

View File

@ -3240,6 +3240,53 @@ static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock
return TSDB_CODE_SUCCESS;
}
static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SSDataBlock* pBlock = pInfo->pReaderBlock;
int32_t code = 0;
bool hasNext = false;
STsdbReader* reader = pInfo->base.dataReader;
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
if (code != 0) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, code);
}
if (!hasNext || isTaskKilled(pTaskInfo)) {
if (isTaskKilled(pTaskInfo)) {
qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo));
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
}
*pFinished = true;
return;
}
uint32_t status = 0;
code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, code);
}
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
*pFinished = true;
return;
}
// current block is filter out according to filter condition, continue load the next block
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
*pSkipped = true;
return;
}
return;
}
static SSDataBlock* getBlockForTableMergeScan(void* param) {
STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator;
@ -3255,53 +3302,42 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
STsdbReader* reader = pInfo->base.dataReader;
while (true) {
if (!pInfo->bOnlyRetrieveBlock) {
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
if (code != 0) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, code);
}
if (!hasNext || isTaskKilled(pTaskInfo)) {
pInfo->bNewFileset = false;
if (isTaskKilled(pTaskInfo)) {
qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo));
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
if (pInfo->rtnNextDurationBlocks) {
if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) {
copyDataBlock(pBlock, pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]);
blockDataDestroy(pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]);
++pInfo->nextDurationBlocksIdx;
if (pInfo->nextDurationBlocksIdx >= pInfo->numNextDurationBlocks) {
pInfo->rtnNextDurationBlocks = false;
pInfo->nextDurationBlocksIdx = 0;
}
}
} else {
bool bFinished = false;
bool bSkipped = false;
doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped);
if (bFinished) {
pInfo->bNewFilesetEvent = false;
break;
}
if (pInfo->bNewFileset) {
pInfo->bOnlyRetrieveBlock = true;
return NULL;
if (pInfo->bNextDurationBlockEvent || pInfo->bNewFilesetEvent) {
if (!bSkipped) {
pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true);
++pInfo->numNextDurationBlocks;
ASSERT(pInfo->numNextDurationBlocks <= 2);
}
if (pInfo->bNewFilesetEvent) {
pInfo->rtnNextDurationBlocks = true;
return NULL;
}
if (pInfo->bNextDurationBlockEvent) {
pInfo->bNextDurationBlockEvent = false;
continue;
}
}
if (bSkipped) continue;
}
// process this data block based on the probabilities
bool processThisBlock = processBlockWithProbability(&pInfo->sample);
if (!processThisBlock) {
continue;
}
uint32_t status = 0;
code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
if (pInfo->bOnlyRetrieveBlock) {
pInfo->bOnlyRetrieveBlock = false;
}
if (code != TSDB_CODE_SUCCESS) {
qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, code);
}
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
break;
}
// current block is filter out according to filter condition, continue load the next block
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
continue;
}
pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
if (pInfo->mergeLimit != -1) {
@ -3317,6 +3353,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
return NULL;
}
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
int32_t tsTargetSlotId = 0;
for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
@ -3348,7 +3385,11 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond*
void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) {
STableMergeScanInfo* pTmsInfo = param;
pTmsInfo->bNewFileset = true;
if (type == TSD_READER_NOTIFY_DURATION_START) {
pTmsInfo->bNewFilesetEvent = true;
} else if (type == TSD_READER_NOTIFY_NEXT_DURATION_BLOCK) {
pTmsInfo->bNextDurationBlockEvent = true;
}
return;
}
@ -3358,7 +3399,11 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
pInfo->bNewFileset = false;
pInfo->bNewFilesetEvent = false;
pInfo->bNextDurationBlockEvent = false;
pInfo->numNextDurationBlocks = 0;
pInfo->nextDurationBlocksIdx = 0;
pInfo->rtnNextDurationBlocks = false;
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
@ -3535,7 +3580,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
return pBlock;
} else {
if (pInfo->bNewFileset) {
if (pInfo->bNewFilesetEvent) {
stopDurationForGroupTableMergeScan(pOperator);
startDurationForGroupTableMergeScan(pOperator);
} else {