From 91de00597d64d89b41acb254eb09f30fdf67c874 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 31 May 2023 22:12:14 +0800 Subject: [PATCH 01/10] refactor: do some internal refactor. --- source/libs/stream/src/streamExec.c | 36 ++++++++++++++++++----------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 716b939e5f..c3dd848bc7 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -17,7 +17,7 @@ // maximum allowed processed block batches. One block may include several submit blocks #define MAX_STREAM_EXEC_BATCH_NUM 32 -#define MIN_STREAM_EXEC_BATCH_NUM 8 +#define MIN_STREAM_EXEC_BATCH_NUM 4 #define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 static int32_t updateCheckPointInfo (SStreamTask* pTask); @@ -44,6 +44,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* if (numOfBlocks > 0) { SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes); if (pStreamBlocks == NULL) { + qError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno)); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return -1; } @@ -314,8 +315,13 @@ int32_t updateCheckPointInfo (SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } +/** + * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the + * appropriate batch of blocks should be handled in 5 to 10 sec. + */ int32_t streamExecForAll(SStreamTask* pTask) { - int32_t code = 0; + const char* id = pTask->id.idStr; + while (1) { int32_t batchSize = 1; int16_t times = 0; @@ -323,7 +329,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { SStreamQueueItem* pInput = NULL; // merge multiple input data if possible in the input queue. - qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr); + qDebug("s-task:%s start to extract data block from inputQ", id); while (1) { if (streamTaskShouldPause(&pTask->status)) { @@ -338,7 +344,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (qItem == NULL) { if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { times++; - taosMsleep(1); + taosMsleep(10); qDebug("===stream===try again batchSize:%d", batchSize); continue; } @@ -363,8 +369,10 @@ int32_t streamExecForAll(SStreamTask* pTask) { batchSize++; pInput = newRet; streamQueueProcessSuccess(pTask->inputQueue); + if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) { - qDebug("maximum batch limit:%d reached, processing, %s", MAX_STREAM_EXEC_BATCH_NUM, pTask->id.idStr); + qDebug("s-task:%s maximum batch limit:%d reached, processing this batch of blocks", id, + MAX_STREAM_EXEC_BATCH_NUM); break; } } @@ -375,7 +383,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (pInput) { streamFreeQitem(pInput); } - return 0; } @@ -385,7 +392,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (pTask->taskLevel == TASK_LEVEL__SINK) { ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK); - qDebug("s-task:%s sink task start to sink %d blocks", pTask->id.idStr, batchSize); + qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize); streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput); continue; } @@ -394,16 +401,16 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (pTask->taskLevel == TASK_LEVEL__SOURCE) { int8_t status = atomic_load_8(&pTask->status.taskStatus); if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) { - qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr, + qError("stream task wait for the end of fill history, s-task:%s, status:%d", id, atomic_load_8(&pTask->status.taskStatus)); - taosMsleep(2); + taosMsleep(100); } else { break; } } int64_t st = taosGetTimestampMs(); - qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize); + qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize); { // set input @@ -417,21 +424,21 @@ int32_t streamExecForAll(SStreamTask* pTask) { ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput; qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); - qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, + qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit, pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput; SArray* pBlockList = pBlock->blocks; int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); + qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput; SArray* pBlockList = pMerged->submits; int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks); + qDebug("s-task:%s %p set submit input (merged), batch num:%d", id, pTask, numOfBlocks); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput; @@ -446,7 +453,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks); double el = (taosGetTimestampMs() - st) / 1000.0; - qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize / 1048576.0, totalBlocks); + qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", + id, el, resSize / 1048576.0, totalBlocks); streamFreeQitem(pInput); } From 27b7d1ec8818672d93e6e0d5c47c084b0b140ae1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 2 Jun 2023 11:17:44 +0800 Subject: [PATCH 02/10] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/executor/src/scanoperator.c | 4 ++-- source/libs/stream/src/stream.c | 10 +++++----- source/libs/stream/src/streamDispatch.c | 6 +++--- source/libs/stream/src/streamExec.c | 12 ++++++------ 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8cc791d1a6..5e7e21e7bd 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1086,7 +1086,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t return -1; } - qDebug("s-task:%s set the start wal offset to be:%"PRId64, pTask->id.idStr, sversion); + qDebug("s-task:%s set start wal scan start ver:%"PRId64, pTask->id.idStr, sversion); walReaderSeekVer(pTask->exec.pWalReader, sversion); pTask->chkInfo.currentVer = sversion; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 88f5642ef9..80a7505c77 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2005,7 +2005,7 @@ FETCH_NEXT_BLOCK: // printDataBlock(pBlock, "stream scan recv"); return pBlock; } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) { - qDebug("scan mode %d", pInfo->scanMode); + qDebug("stream scan mode:%d, %s", pInfo->scanMode, id); switch (pInfo->scanMode) { case STREAM_SCAN_FROM_RES: { pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; @@ -2122,7 +2122,7 @@ FETCH_NEXT_BLOCK: pInfo->numOfExec++; pOperator->resultInfo.totalRows += pBlockInfo->rows; - qDebug("stream scan get source rows:%" PRId64", %s", pBlockInfo->rows, id); + qDebug("stream scan completed, and return source rows:%" PRId64", %s", pBlockInfo->rows, id); if (pBlockInfo->rows > 0) { return pBlock; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index acc69c5a2b..7457b2197e 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -296,11 +296,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { if (type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; - qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, - px->submit.msgLen, px->submit.ver, total, size); - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) { - qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", + qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total, size); streamDataSubmitDestroy(px); @@ -314,9 +311,12 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { taosFreeQitem(pItem); return code; } + + qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, + px->submit.msgLen, px->submit.ver, total, size + px->submit.msgLen/1048576.0); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - if (/*(pTask->taskLevel == TASK_LEVEL__SOURCE) && */(tInputQueueIsFull(pTask))) { + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total, size); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 1e939cb071..bd6a013de2 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -283,7 +283,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov msg.info.noResp = 1; tmsgSendReq(pEpSet, &msg); - qDebug("s-task:%s dispatch recover finish msg to taskId:%d node %d: recover finish msg", pTask->id.idStr, + qDebug("s-task:%s dispatch recover finish msg to downstream taskId:0x%x node %d: recover finish msg", pTask->id.idStr, pReq->taskId, vgId); return 0; @@ -414,7 +414,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat req.taskId = downstreamTaskId; - qDebug("s-task:%s (child taskId:%d) fix-dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr, + qDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to down stream s-task:0x%x in vgId:%d", pTask->id.idStr, pTask->selfChildId, numOfBlocks, downstreamTaskId, vgId); code = doSendDispatchMsg(pTask, &req, vgId, pEpSet); @@ -514,7 +514,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { return 0; } - qDebug("s-task:%s start to dispatch msg, output status:%d", pTask->id.idStr, pTask->outputStatus); + qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pTask->outputStatus); SStreamDataBlock* pDispatchedBlock = streamQueueNextItem(pTask->outputQueue); if (pDispatchedBlock == NULL) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c3dd848bc7..813bc50b87 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -236,11 +236,11 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { taosFreeQitem(qRes); return code; } - - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { - qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt); - streamDispatchStreamBlock(pTask); - } +// +// if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { +// qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt); +// streamDispatchStreamBlock(pTask); +// } if (finished) { break; @@ -438,7 +438,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { SArray* pBlockList = pMerged->submits; int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s %p set submit input (merged), batch num:%d", id, pTask, numOfBlocks); + qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d", id, pTask, numOfBlocks); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput; From f1bd829977b43a0bbb12b4f9acdfb13d57f4a87d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 2 Jun 2023 13:10:54 +0800 Subject: [PATCH 03/10] refactor: do some internal refactor. --- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/tq/tq.c | 3 ++- source/dnode/vnode/src/tq/tqMeta.c | 3 ++- source/dnode/vnode/src/tq/tqRead.c | 19 +++++++++++-------- source/libs/executor/src/executor.c | 2 +- source/libs/executor/src/scanoperator.c | 5 +++-- source/libs/stream/src/streamDispatch.c | 2 +- source/libs/stream/src/streamExec.c | 4 ++-- 8 files changed, 23 insertions(+), 17 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index fbe2f4809c..8003419ec9 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -200,7 +200,7 @@ STqReader *tqReaderOpen(SVnode *pVnode); void tqReaderClose(STqReader *); void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList); -int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList); +int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList, const char* id); int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5e7e21e7bd..a6bed26884 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -708,8 +708,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid); } + pHandle->execHandle.pTqReader = tqReaderOpen(pVnode); - tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList); + tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList, NULL); taosArrayDestroy(tbUidList); buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index be5b5706e2..a720e556e6 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -344,8 +344,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid); } + handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode); - tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList); + tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList, NULL); taosArrayDestroy(tbUidList); buildSnapContext(reader.vnode, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index dbb7d564f8..f15a9898b5 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -394,8 +394,8 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) { SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); if (pReader->tbIdHash == NULL) { - SSDataBlock* pRes = NULL; - int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL); + SSDataBlock* pRes = NULL; + int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL); if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) { return true; } @@ -457,7 +457,7 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) { int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData); while (pReader->nextBlk < numOfBlocks) { - tqDebug("tq reader next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver, + tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver, pReader->nextBlk, numOfBlocks, idstr); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); @@ -467,10 +467,11 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) { void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); if (ret != NULL) { - tqDebug("tq reader block found, ver:%" PRId64 ", uid:%" PRId64, pReader->msg.ver, pSubmitTbData->uid); + tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64", %s", pReader->msg.ver, pSubmitTbData->uid, idstr); return true; } else { - tqDebug("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid); + tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid, + taosHashGetSize(pReader->tbIdHash), idstr); } pReader->nextBlk++; @@ -604,7 +605,6 @@ static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SCol int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) { tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk); - SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++); SSDataBlock* pBlock = pReader->pResBlock; @@ -612,6 +612,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* blockDataCleanup(pBlock); + int32_t vgId = pReader->pWalReader->pWal->cfg.vgId; int32_t sversion = pSubmitTbData->sver; int64_t suid = pSubmitTbData->suid; int64_t uid = pSubmitTbData->uid; @@ -628,7 +629,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* if (pReader->pSchemaWrapper == NULL) { tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64 "version %d, possibly dropped table", - pReader->pWalReader->pWal->cfg.vgId, suid, uid, pReader->cachedSchemaVer); + vgId, suid, uid, pReader->cachedSchemaVer); pReader->cachedSchemaSuid = 0; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return -1; @@ -642,6 +643,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* if (blockDataGetNumOfCols(pBlock) == 0) { int32_t code = buildResSDataBlock(pReader->pResBlock, pReader->pSchemaWrapper, pReader->pColIdList); if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d failed to build data block, code:%s", vgId, tstrerror(code)); return code; } } @@ -998,7 +1000,7 @@ FAIL: void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; } -int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) { +int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) { if (pReader->tbIdHash) { taosHashClear(pReader->tbIdHash); } else { @@ -1015,6 +1017,7 @@ int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) { taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0); } + tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t) taosArrayGetSize(tbUidList)); return 0; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index ed8a4f3a3b..4d02362904 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -401,7 +401,7 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI int32_t code = 0; if (isAdd) { - qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), id); + qDebug("try to add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), id); } // traverse to the stream scanner node to add this table id diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 80a7505c77..5f09b0c67a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2308,7 +2308,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys SArray* pColIds = NULL; SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; + const char* idstr = pTaskInfo->id.str; if (pInfo == NULL || pOperator == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -2419,7 +2420,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys // set the extract column id to streamHandle pAPI->tqReaderFn.tqReaderSetColIdList(pInfo->tqReader, pColIds); SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo); - code = pAPI->tqReaderFn.tqReaderSetQueryTableList(pInfo->tqReader, tableIdList); + code = pAPI->tqReaderFn.tqReaderSetQueryTableList(pInfo->tqReader, tableIdList, idstr); if (code != 0) { taosArrayDestroy(tableIdList); goto _error; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index bd6a013de2..9501718416 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -318,7 +318,7 @@ int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, in msg.pCont = buf; msg.msgType = pTask->dispatchMsgType; - qDebug("dispatch from s-task:%s to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId); + qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId); tmsgSendReq(pEpSet, &msg); code = 0; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 813bc50b87..c28cd6f334 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -133,7 +133,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i taosArrayPush(pRes, &block); - qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr, + qDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr, pTask->selfChildId, numOfBlocks, size / 1048576.0); // current output should be dispatched to down stream nodes @@ -371,7 +371,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { streamQueueProcessSuccess(pTask->inputQueue); if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) { - qDebug("s-task:%s maximum batch limit:%d reached, processing this batch of blocks", id, + qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); break; } From 76106165daf887be46463b78f49cc21a1d3c59f9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 2 Jun 2023 15:13:25 +0800 Subject: [PATCH 04/10] other: add some logs. --- source/libs/executor/src/scanoperator.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index cea1e6361d..83a4f91034 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2090,7 +2090,9 @@ FETCH_NEXT_BLOCK: while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) { SSDataBlock* pRes = NULL; int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id); + if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) { + qDebug("failed to retrieve data from block, code:%s, rows:%"PRId64 " try next block in submit block", tstrerror(code), pRes->info.rows); continue; } From bcd8867fb2010892bdae2b3fa92edef8b4e60fd0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 2 Jun 2023 15:27:14 +0800 Subject: [PATCH 05/10] refactor: do some internal refactor. --- source/libs/executor/src/scanoperator.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 83a4f91034..8151c459e5 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1897,6 +1897,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { printDataBlock(pInfo->pCreateTbRes, "recover createTbl"); return pInfo->pCreateTbRes; } + qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows); printDataBlock(pInfo->pRecoverRes, "scan recover"); return pInfo->pRecoverRes; @@ -2089,10 +2090,13 @@ FETCH_NEXT_BLOCK: while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) { SSDataBlock* pRes = NULL; + int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id); + qDebug("retrieve data from submit completed code:%s, rows:%" PRId64 " %s", + tstrerror(code), pRes->info.rows, id); if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) { - qDebug("failed to retrieve data from block, code:%s, rows:%"PRId64 " try next block in submit block", tstrerror(code), pRes->info.rows); + qDebug("retrieve data failed, try next block in submit block, %s", id); continue; } @@ -2100,6 +2104,7 @@ FETCH_NEXT_BLOCK: if (pInfo->pCreateTbRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_RES; + qDebug("create table res exists, rows:%"PRId64" return from stream scan, %s", pInfo->pCreateTbRes->info.rows, id); return pInfo->pCreateTbRes; } From 2edba85efe930fe324c1c008799c1a7bfa8cadef Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 2 Jun 2023 15:27:44 +0800 Subject: [PATCH 06/10] refactor: do some internal refactor. --- source/libs/executor/src/scanoperator.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8151c459e5..0ed41aeea1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2092,8 +2092,8 @@ FETCH_NEXT_BLOCK: SSDataBlock* pRes = NULL; int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id); - qDebug("retrieve data from submit completed code:%s, rows:%" PRId64 " %s", - tstrerror(code), pRes->info.rows, id); + qDebug("retrieve data from submit completed code:%s, rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows, + id); if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) { qDebug("retrieve data failed, try next block in submit block, %s", id); From 3d7defa881d8f8a146ffa6cba663d2ffe9a3677f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 2 Jun 2023 15:43:38 +0800 Subject: [PATCH 07/10] refactor: do some internal refactor. --- source/libs/executor/src/scanoperator.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0ed41aeea1..df2062afe6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2113,6 +2113,8 @@ FETCH_NEXT_BLOCK: pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); + qDebug("%" PRId64 " rows in datablock, update res:%" PRId64 " %s", pBlockInfo->rows, + pInfo->pUpdateDataRes->info.rows, id); if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { break; } From b5b4cd2a05b686ab0de277d3354fad3f62c04f86 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 2 Jun 2023 23:15:27 +0800 Subject: [PATCH 08/10] fix(query): load del info with upper version limitation. --- source/dnode/vnode/src/inc/tsdb.h | 2 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 2 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 2 +- source/dnode/vnode/src/tsdb/tsdbDataIter.c | 2 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 17 +++++++++++++---- source/dnode/vnode/src/tsdb/tsdbReaderWriter.c | 12 ++++++++---- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 4 ++-- source/libs/wal/src/walRead.c | 2 +- 8 files changed, 28 insertions(+), 15 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index faf550ab75..828e0e7e3d 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -297,7 +297,7 @@ int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter); // SDelFReader int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb); int32_t tsdbDelFReaderClose(SDelFReader **ppReader); -int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData); +int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer); int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); // tsdbRead.c ============================================================================================== int32_t tsdbTakeReadSnap(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 6a97ea89b3..d6c19ee18e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1454,7 +1454,7 @@ static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelI int32_t code = 0; if (pDelIdx) { - code = tsdbReadDelData(pDelReader, pDelIdx, aDelData); + code = tsdbReadDelData(pDelReader, pDelIdx, aDelData, INT64_MAX); } return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index d15f848cfd..ba74613bd9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -266,7 +266,7 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel suid = pDelIdx->suid; uid = pDelIdx->uid; - code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData); + code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData, INT64_MAX); TSDB_CHECK_CODE(code, lino, _exit); } else { taosArrayClear(pCommitter->aDelData); diff --git a/source/dnode/vnode/src/tsdb/tsdbDataIter.c b/source/dnode/vnode/src/tsdb/tsdbDataIter.c index e27aec5b1b..701ad56181 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataIter.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataIter.c @@ -412,7 +412,7 @@ static int32_t tsdbTombFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* } } - code = tsdbReadDelData(pIter->tIter.pReader, pDelIdx, pIter->tIter.aDelData); + code = tsdbReadDelData(pIter->tIter.pReader, pDelIdx, pIter->tIter.aDelData, INT64_MAX); TSDB_CHECK_CODE(code, lino, _exit); pIter->delInfo.suid = pDelIdx->suid; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 9cae4ad5aa..1286c2f7c4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2967,7 +2967,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* SDelIdx* pIdx = taosArraySearch(pReader->pDelIdx, &idx, tCmprDelIdx, TD_EQ); if (pIdx != NULL) { - code = tsdbReadDelData(pReader->pDelFReader, pIdx, pDelData); + code = tsdbReadDelData(pReader->pDelFReader, pIdx, pDelData, pReader->verRange.maxVer); } if (code != TSDB_CODE_SUCCESS) { goto _err; @@ -2978,7 +2978,10 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* if (pMemTbData != NULL) { p = pMemTbData->pHead; while (p) { - taosArrayPush(pDelData, p); + if (p->version <= pReader->verRange.maxVer) { + taosArrayPush(pDelData, p); + } + p = p->pNext; } } @@ -2986,7 +2989,9 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* if (piMemTbData != NULL) { p = piMemTbData->pHead; while (p) { - taosArrayPush(pDelData, p); + if (p->version <= pReader->verRange.maxVer) { + taosArrayPush(pDelData, p); + } p = p->pNext; } } @@ -4558,7 +4563,11 @@ int32_t tsdbReaderOpen(void* pVnode, SQueryTableDataCond* pCond, void* pTableLis pReader->pIgnoreTables = pIgnoreTables; - tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); + tsdbDebug("%p total numOfTable:%d, window:%" PRId64 " - %" PRId64 ", verRange:%" PRId64 " - %" PRId64 + " in this query %s", + pReader, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->verRange.minVer, + pReader->verRange.maxVer, pReader->idStr); + return code; _err: diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index b25ab393da..e4b8a57291 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -1488,7 +1488,7 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader) { return code; } -int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) { +int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer) { int32_t code = 0; int64_t offset = pDelIdx->offset; int64_t size = pDelIdx->size; @@ -1510,11 +1510,15 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData SDelData delData; n += tGetDelData(pReader->aBuf[0] + n, &delData); - if (taosArrayPush(aDelData, &delData) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + if (delData.version > maxVer) { + continue; } + if (taosArrayPush(aDelData, &delData) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } } + ASSERT(n == size); return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index dfea125cc1..f7362b366f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -1166,7 +1166,7 @@ static int32_t tsdbSnapWriteDelTableDataStart(STsdbSnapWriter* pWriter, TABLEID* int32_t c = tTABLEIDCmprFn(pDelIdx, &pWriter->tbid); if (c < 0) { - code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->pTIter->tIter.aDelData); + code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->pTIter->tIter.aDelData, INT64_MAX); TSDB_CHECK_CODE(code, lino, _exit); SDelIdx* pDelIdxNew = taosArrayReserve(pWriter->aDelIdx, 1); @@ -1183,7 +1183,7 @@ static int32_t tsdbSnapWriteDelTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pWriter->pTIter->tIter.iDelIdx++; } else if (c == 0) { - code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData); + code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData, INT64_MAX); TSDB_CHECK_CODE(code, lino, _exit); pWriter->pTIter->tIter.iDelIdx++; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 0eb3d6ef09..1223e3756c 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -333,7 +333,7 @@ static int32_t walFetchBodyNew(SWalReader *pReader) { return -1; } - wDebug("vgId:%d, index:%" PRId64 " is fetched, cursor advance", pReader->pWal->cfg.vgId, ver); + wDebug("vgId:%d, index:%" PRId64 " is fetched, type:%d, cursor advance", pReader->pWal->cfg.vgId, ver, pReader->pHead->head.msgType); pReader->curVersion = ver + 1; return 0; } From becacbc84b63a181758d16a0b033a89003d240b2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 2 Jun 2023 23:36:47 +0800 Subject: [PATCH 09/10] fix(query): fix syntax error. --- source/dnode/vnode/src/inc/tsdb.h | 3 ++- source/dnode/vnode/src/tsdb/tsdbCache.c | 2 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 2 +- source/dnode/vnode/src/tsdb/tsdbDataIter.c | 2 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 2 +- source/dnode/vnode/src/tsdb/tsdbReaderWriter.c | 2 +- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 4 ++-- 7 files changed, 9 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 828e0e7e3d..9df95a379a 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -297,7 +297,8 @@ int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter); // SDelFReader int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb); int32_t tsdbDelFReaderClose(SDelFReader **ppReader); -int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer); +int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer); +int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData); int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); // tsdbRead.c ============================================================================================== int32_t tsdbTakeReadSnap(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index d6c19ee18e..c659c8f4a2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1454,7 +1454,7 @@ static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelI int32_t code = 0; if (pDelIdx) { - code = tsdbReadDelData(pDelReader, pDelIdx, aDelData, INT64_MAX); + code = tsdbReadDelDatav1(pDelReader, pDelIdx, aDelData, INT64_MAX); } return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index ba74613bd9..b440d51883 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -266,7 +266,7 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel suid = pDelIdx->suid; uid = pDelIdx->uid; - code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData, INT64_MAX); + code = tsdbReadDelDatav1(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData, INT64_MAX); TSDB_CHECK_CODE(code, lino, _exit); } else { taosArrayClear(pCommitter->aDelData); diff --git a/source/dnode/vnode/src/tsdb/tsdbDataIter.c b/source/dnode/vnode/src/tsdb/tsdbDataIter.c index 701ad56181..8215c1ac29 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataIter.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataIter.c @@ -412,7 +412,7 @@ static int32_t tsdbTombFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* } } - code = tsdbReadDelData(pIter->tIter.pReader, pDelIdx, pIter->tIter.aDelData, INT64_MAX); + code = tsdbReadDelDatav1(pIter->tIter.pReader, pDelIdx, pIter->tIter.aDelData, INT64_MAX); TSDB_CHECK_CODE(code, lino, _exit); pIter->delInfo.suid = pDelIdx->suid; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 1286c2f7c4..2500015ec1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2967,7 +2967,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* SDelIdx* pIdx = taosArraySearch(pReader->pDelIdx, &idx, tCmprDelIdx, TD_EQ); if (pIdx != NULL) { - code = tsdbReadDelData(pReader->pDelFReader, pIdx, pDelData, pReader->verRange.maxVer); + code = tsdbReadDelDatav1(pReader->pDelFReader, pIdx, pDelData, pReader->verRange.maxVer); } if (code != TSDB_CODE_SUCCESS) { goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index e4b8a57291..fb899fa1ce 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -1488,7 +1488,7 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader) { return code; } -int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer) { +int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer) { int32_t code = 0; int64_t offset = pDelIdx->offset; int64_t size = pDelIdx->size; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index f7362b366f..b5ca716701 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -1166,7 +1166,7 @@ static int32_t tsdbSnapWriteDelTableDataStart(STsdbSnapWriter* pWriter, TABLEID* int32_t c = tTABLEIDCmprFn(pDelIdx, &pWriter->tbid); if (c < 0) { - code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->pTIter->tIter.aDelData, INT64_MAX); + code = tsdbReadDelDatav1(pWriter->pDelFReader, pDelIdx, pWriter->pTIter->tIter.aDelData, INT64_MAX); TSDB_CHECK_CODE(code, lino, _exit); SDelIdx* pDelIdxNew = taosArrayReserve(pWriter->aDelIdx, 1); @@ -1183,7 +1183,7 @@ static int32_t tsdbSnapWriteDelTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pWriter->pTIter->tIter.iDelIdx++; } else if (c == 0) { - code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData, INT64_MAX); + code = tsdbReadDelDatav1(pWriter->pDelFReader, pDelIdx, pWriter->aDelData, INT64_MAX); TSDB_CHECK_CODE(code, lino, _exit); pWriter->pTIter->tIter.iDelIdx++; From 4bb64292498390fd17abb5fc5e141a1c2b34920c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 3 Jun 2023 00:20:59 +0800 Subject: [PATCH 10/10] fix(query): fix syntax error. --- source/dnode/vnode/src/tsdb/tsdbReaderWriter.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index fb899fa1ce..4b677533e7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -1488,6 +1488,10 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader) { return code; } +int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) { + return tsdbReadDelDatav1(pReader, pDelIdx, aDelData, INT64_MAX); +} + int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer) { int32_t code = 0; int64_t offset = pDelIdx->offset;