fix(tsdb):add some logs.

This commit is contained in:
Haojun Liao 2024-04-11 19:35:14 +08:00
parent 8f92dc614d
commit b60cc321f3
4 changed files with 45 additions and 45 deletions

View File

@ -1485,6 +1485,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
SSDataBlock* pBlock = createDataBlock(); SSDataBlock* pBlock = createDataBlock();
pBlock->info = pDataBlock->info; pBlock->info = pDataBlock->info;
pBlock->info.rows = 0; pBlock->info.rows = 0;
pBlock->info.capacity = 0; pBlock->info.capacity = 0;
pBlock->info.rowSize = 0; pBlock->info.rowSize = 0;
@ -1512,6 +1513,10 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
p->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData); p->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData);
p->nData = pDataBlock->info.pks[1].nData; p->nData = pDataBlock->info.pks[1].nData;
memcpy(p->pData, pDataBlock->info.pks[1].pData, p->nData); memcpy(p->pData, pDataBlock->info.pks[1].pData, p->nData);
uInfo("===========clone block, with varchar, %p, 0---addr:%p, src:%p, %p", pBlock, pBlock->info.pks[0].pData, pDataBlock, pDataBlock->info.pks[0].pData);
uInfo("===========clone block, with varchar, %p, 1---addr:%p, src:%p, %p", pBlock, pBlock->info.pks[1].pData, pDataBlock, pDataBlock->info.pks[1].pData);
} else {
uInfo("===========clone block without varchar pk, %p, src:%p", pBlock, pDataBlock);
} }
if (copyData) { if (copyData) {

View File

@ -340,22 +340,21 @@ typedef struct STableMergeScanInfo {
int32_t scanTimes; int32_t scanTimes;
int32_t readIdx; int32_t readIdx;
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info SSampleExecInfo sample; // sample execution info
SSHashObj* mTableNumRows; // uid->num of table rows SSHashObj* mTableNumRows; // uid->num of table rows
SHashObj* mSkipTables; SHashObj* mSkipTables;
int64_t mergeLimit; int64_t mergeLimit;
SSortExecInfo sortExecInfo; SSortExecInfo sortExecInfo;
bool needCountEmptyTable; bool needCountEmptyTable;
bool bGroupProcessed; // the group return data means processed bool bGroupProcessed; // the group return data means processed
bool filesetDelimited; bool filesetDelimited;
bool bNewFilesetEvent; bool bNewFilesetEvent;
bool bNextDurationBlockEvent; bool bNextDurationBlockEvent;
int32_t numNextDurationBlocks; int32_t numNextDurationBlocks;
SSDataBlock* nextDurationBlocks[2]; SSDataBlock* nextDurationBlocks[2];
bool rtnNextDurationBlocks; bool rtnNextDurationBlocks;
int32_t nextDurationBlocksIdx; int32_t nextDurationBlocksIdx;
bool bSortRowId;
bool bSortRowId;
STmsSubTablesMergeInfo* pSubTablesMergeInfo; STmsSubTablesMergeInfo* pSubTablesMergeInfo;
} STableMergeScanInfo; } STableMergeScanInfo;

View File

@ -4069,14 +4069,13 @@ static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) {
} }
static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) { static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) {
STableMergeScanInfo* pInfo = pOperator->info; STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SSDataBlock* pBlock = pInfo->pReaderBlock;
SSDataBlock* pBlock = pInfo->pReaderBlock; int32_t code = 0;
int32_t code = 0; bool hasNext = false;
bool hasNext = false; STsdbReader* reader = pInfo->base.dataReader;
STsdbReader* reader = pInfo->base.dataReader;
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
if (code != 0) { if (code != 0) {
@ -4112,27 +4111,23 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe
*pSkipped = true; *pSkipped = true;
return; return;
} }
return; return;
} }
static SSDataBlock* getBlockForTableMergeScan(void* param) { static SSDataBlock* getBlockForTableMergeScan(void* param) {
STableMergeScanSortSourceParam* source = param; STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator;
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SSDataBlock* pBlock = NULL; SOperatorInfo* pOperator = source->pOperator;
int32_t code = 0; STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSDataBlock* pBlock = NULL;
int64_t st = taosGetTimestampUs();
int64_t st = taosGetTimestampUs();
bool hasNext = false;
STsdbReader* reader = pInfo->base.dataReader;
while (true) { while (true) {
if (pInfo->rtnNextDurationBlocks) { if (pInfo->rtnNextDurationBlocks) {
qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d", qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d",
GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks); GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks);
if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) { if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) {
pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]; pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx];
@ -4149,13 +4144,12 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
continue; continue;
} }
} else { } else {
bool bFinished = false; bool bFinished = false;
bool bSkipped = false; bool bSkipped = false;
doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped); doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped);
pBlock = pInfo->pReaderBlock; pBlock = pInfo->pReaderBlock;
qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d", qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d",
GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent); GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent);
if (bFinished) { if (bFinished) {
pInfo->bNewFilesetEvent = false; pInfo->bNewFilesetEvent = false;
break; break;
@ -4166,15 +4160,18 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true); pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true);
++pInfo->numNextDurationBlocks; ++pInfo->numNextDurationBlocks;
if (pInfo->numNextDurationBlocks > 2) { if (pInfo->numNextDurationBlocks > 2) {
qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), pInfo->numNextDurationBlocks); qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo),
pInfo->numNextDurationBlocks);
pInfo->bNewFilesetEvent = false; pInfo->bNewFilesetEvent = false;
break; break;
} }
} }
if (pInfo->bNewFilesetEvent) { if (pInfo->bNewFilesetEvent) {
pInfo->rtnNextDurationBlocks = true; pInfo->rtnNextDurationBlocks = true;
return NULL; return NULL;
} }
if (pInfo->bNextDurationBlockEvent) { if (pInfo->bNextDurationBlockEvent) {
pInfo->bNextDurationBlockEvent = false; pInfo->bNextDurationBlockEvent = false;
continue; continue;
@ -4182,10 +4179,10 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
} }
if (bSkipped) continue; if (bSkipped) continue;
} }
pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid); pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
return pBlock; return pBlock;
@ -4194,7 +4191,6 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
return NULL; return NULL;
} }
SArray* generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order) { SArray* generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order) {
SArray* pSortInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); SArray* pSortInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
SBlockOrderInfo biTs = {0}; SBlockOrderInfo biTs = {0};

View File

@ -1688,12 +1688,12 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
if (pBlk != NULL) { if (pBlk != NULL) {
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId); SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId);
int64_t firstRowTs = *(int64_t*)tsCol->pData; int64_t firstRowTs = *(int64_t*)tsCol->pData;
if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
(pOrigTsOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) { (pOrigTsOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
if (bExtractedBlock) { if (bExtractedBlock) {
blockDataDestroy(pBlk); blockDataDestroy(pBlk);
} }
continue; continue;
} }
} }