enhance: add log and clean up
This commit is contained in:
parent
ce48598f0d
commit
b2ee43540b
|
@ -3302,12 +3302,15 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
||||||
STsdbReader* reader = pInfo->base.dataReader;
|
STsdbReader* reader = pInfo->base.dataReader;
|
||||||
while (true) {
|
while (true) {
|
||||||
if (pInfo->rtnNextDurationBlocks) {
|
if (pInfo->rtnNextDurationBlocks) {
|
||||||
|
qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d",
|
||||||
|
GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks);
|
||||||
if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) {
|
if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) {
|
||||||
pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx];
|
pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx];
|
||||||
++pInfo->nextDurationBlocksIdx;
|
++pInfo->nextDurationBlocksIdx;
|
||||||
} else {
|
} else {
|
||||||
for (int32_t i = 0; i < pInfo->numNextDurationBlocks; ++i) {
|
for (int32_t i = 0; i < pInfo->numNextDurationBlocks; ++i) {
|
||||||
blockDataDestroy(pInfo->nextDurationBlocks[i]);
|
blockDataDestroy(pInfo->nextDurationBlocks[i]);
|
||||||
|
pInfo->nextDurationBlocks[i] = NULL;
|
||||||
}
|
}
|
||||||
pInfo->rtnNextDurationBlocks = false;
|
pInfo->rtnNextDurationBlocks = false;
|
||||||
pInfo->nextDurationBlocksIdx = 0;
|
pInfo->nextDurationBlocksIdx = 0;
|
||||||
|
@ -3320,7 +3323,8 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
||||||
bool bSkipped = false;
|
bool bSkipped = false;
|
||||||
doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped);
|
doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped);
|
||||||
pBlock = pInfo->pReaderBlock;
|
pBlock = pInfo->pReaderBlock;
|
||||||
|
qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d",
|
||||||
|
GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent);
|
||||||
if (bFinished) {
|
if (bFinished) {
|
||||||
pInfo->bNewFilesetEvent = false;
|
pInfo->bNewFilesetEvent = false;
|
||||||
break;
|
break;
|
||||||
|
@ -3330,7 +3334,11 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
||||||
if (!bSkipped) {
|
if (!bSkipped) {
|
||||||
pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true);
|
pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true);
|
||||||
++pInfo->numNextDurationBlocks;
|
++pInfo->numNextDurationBlocks;
|
||||||
ASSERT(pInfo->numNextDurationBlocks <= 2);
|
if (pInfo->numNextDurationBlocks > 2) {
|
||||||
|
qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), pInfo->numNextDurationBlocks);
|
||||||
|
pInfo->bNewFilesetEvent = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (pInfo->bNewFilesetEvent) {
|
if (pInfo->bNewFilesetEvent) {
|
||||||
pInfo->rtnNextDurationBlocks = true;
|
pInfo->rtnNextDurationBlocks = true;
|
||||||
|
@ -3394,6 +3402,8 @@ void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo*
|
||||||
} else if (type == TSD_READER_NOTIFY_NEXT_DURATION_BLOCK) {
|
} else if (type == TSD_READER_NOTIFY_NEXT_DURATION_BLOCK) {
|
||||||
pTmsInfo->bNextDurationBlockEvent = true;
|
pTmsInfo->bNextDurationBlockEvent = true;
|
||||||
}
|
}
|
||||||
|
qDebug("table merge scan receive notification. type %d, fileset %d", type, info->duration.filesetId);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3403,6 +3413,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
|
int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
|
||||||
|
|
||||||
|
qDebug("%s table merge scan start duration ", GET_TASKID(pTaskInfo));
|
||||||
pInfo->bNewFilesetEvent = false;
|
pInfo->bNewFilesetEvent = false;
|
||||||
pInfo->bNextDurationBlockEvent = false;
|
pInfo->bNextDurationBlockEvent = false;
|
||||||
|
|
||||||
|
@ -3434,6 +3445,8 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
void stopDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
|
void stopDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
STableMergeScanInfo* pInfo = pOperator->info;
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
qDebug("%s table merge scan stop duration ", GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
|
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
|
||||||
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
|
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
|
||||||
|
@ -3451,6 +3464,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SReadHandle* pHandle = &pInfo->base.readHandle;
|
SReadHandle* pHandle = &pInfo->base.readHandle;
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
|
qDebug("%s table merge scan start group %"PRIu64, GET_TASKID(pTaskInfo), pInfo->groupId);
|
||||||
|
|
||||||
{
|
{
|
||||||
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
||||||
|
@ -3498,10 +3512,19 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader);
|
pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader);
|
||||||
pInfo->base.dataReader = NULL;
|
pInfo->base.dataReader = NULL;
|
||||||
}
|
}
|
||||||
|
for (int32_t i = 0; i < pInfo->numNextDurationBlocks; ++i) {
|
||||||
|
if (pInfo->nextDurationBlocks[i]) {
|
||||||
|
blockDataDestroy(pInfo->nextDurationBlocks[i]);
|
||||||
|
pInfo->nextDurationBlocks[i] = NULL;
|
||||||
|
}
|
||||||
|
pInfo->numNextDurationBlocks = 0;
|
||||||
|
pInfo->nextDurationBlocksIdx = 0;
|
||||||
|
}
|
||||||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||||
taosHashCleanup(pInfo->mSkipTables);
|
taosHashCleanup(pInfo->mSkipTables);
|
||||||
pInfo->mSkipTables = NULL;
|
pInfo->mSkipTables = NULL;
|
||||||
|
qDebug("%s table merge scan stop group %"PRIu64, GET_TASKID(pTaskInfo), pInfo->groupId);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3612,6 +3635,13 @@ void destroyTableMergeScanOperatorInfo(void* param) {
|
||||||
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
||||||
pTableScanInfo->base.dataReader = NULL;
|
pTableScanInfo->base.dataReader = NULL;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pTableScanInfo->numNextDurationBlocks; ++i) {
|
||||||
|
if (pTableScanInfo->nextDurationBlocks[i] != NULL) {
|
||||||
|
blockDataDestroy(pTableScanInfo->nextDurationBlocks[i]);
|
||||||
|
pTableScanInfo->nextDurationBlocks[i] = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pTableScanInfo->sortSourceParams);
|
taosArrayDestroy(pTableScanInfo->sortSourceParams);
|
||||||
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
|
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
|
||||||
pTableScanInfo->pSortHandle = NULL;
|
pTableScanInfo->pSortHandle = NULL;
|
||||||
|
|
Loading…
Reference in New Issue