From a951af2492b99df52bddb2724c8caac86d363316 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 16:21:41 +0800 Subject: [PATCH 01/10] fix(query): check the rows before apply the agg in session window. --- source/libs/executor/src/timewindowoperator.c | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 57c038e75a..9474db8553 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1332,23 +1332,25 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator // The gap is less than the threshold, so it belongs to current session window that has been opened already. doKeepTuple(pRowSup, tsList[j], gid); } else { // start a new session window - SResultRow* pResult = NULL; + // start a new session window + if (pRowSup->numOfRows > 0) { // handled data that belongs to the previous session window + SResultRow* pResult = NULL; - // keep the time window for the closed time window. - STimeWindow window = pRowSup->win; + // keep the time window for the closed time window. + STimeWindow window = pRowSup->win; + int32_t ret = + setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, + numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + } - pRowSup->win.ekey = pRowSup->win.skey; - int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, - numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + // pInfo->numOfRows data belong to the current session window + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0); + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); } - // pInfo->numOfRows data belong to the current session window - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, pBlock->info.rows, numOfOutput); - // here we start a new session window doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); doKeepTuple(pRowSup, tsList[j], gid); From c39fcc5194eefc13c79ccc458a39e7e703c20543 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Thu, 11 Apr 2024 16:55:01 +0800 Subject: [PATCH 02/10] fix: arb assigned step down need to reset token --- source/libs/sync/src/syncAppendEntriesReply.c | 16 +---- source/libs/sync/src/syncMain.c | 71 ++----------------- 2 files changed, 8 insertions(+), 79 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index ede4dc07e1..4b7ed59039 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -78,21 +78,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely); if (ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { if (commitIndex >= ths->assignedCommitIndex) { - terrno = TSDB_CODE_SUCCESS; - raftStoreNextTerm(ths); - if (terrno != TSDB_CODE_SUCCESS) { - sError("vgId:%d, failed to update term, reason:%s", ths->vgId, tstrerror(terrno)); - return -1; - } - if (syncNodeAssignedLeader2Leader(ths) != 0) { - sError("vgId:%d, failed to change state from assigned leader to leader", ths->vgId); - return -1; - } - - taosThreadMutexLock(&ths->arbTokenMutex); - syncUtilGenerateArbToken(ths->myNodeInfo.nodeId, ths->vgId, ths->arbToken); - sInfo("vgId:%d, assigned leader to leader, arbToken:%s", ths->vgId, ths->arbToken); - taosThreadMutexUnlock(&ths->arbTokenMutex); + syncNodeStepDown(ths, pMsg->term); } } else { (void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 85aa3a2796..fbdb5f4201 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -503,20 +503,6 @@ int32_t syncEndSnapshot(int64_t rid) { return code; } -#ifdef BUILD_NO_CALL -int32_t syncStepDown(int64_t rid, SyncTerm newTerm) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - sError("sync step down error"); - return -1; - } - - syncNodeStepDown(pSyncNode, newTerm); - syncNodeRelease(pSyncNode); - return 0; -} -#endif - bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) { if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; @@ -1277,7 +1263,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { // start in syncNodeStart // start raft - // syncNodeBecomeFollower(pSyncNode); int64_t timeNow = taosGetTimestampMs(); pSyncNode->startTime = timeNow; @@ -1848,20 +1833,6 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde // persist cfg syncWriteCfgFile(pSyncNode); - -#if 0 - // change isStandBy to normal (election timeout) - if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - syncNodeBecomeLeader(pSyncNode, ""); - - // Raft 3.6.2 Committing entries from previous terms - syncNodeAppendNoop(pSyncNode); - // syncMaybeAdvanceCommitIndex(pSyncNode); - - } else { - syncNodeBecomeFollower(pSyncNode, ""); - } -#endif } else { // persist cfg syncWriteCfgFile(pSyncNode); @@ -1874,18 +1845,6 @@ _END: } // raft state change -------------- -#ifdef BUILD_NO_CALL -void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { - if (term > raftStoreGetTerm(pSyncNode)) { - raftStoreSetTerm(pSyncNode, term); - char tmpBuf[64]; - snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term); - syncNodeBecomeFollower(pSyncNode, tmpBuf); - raftStoreClearVote(pSyncNode); - } -} -#endif - void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) { if (term > raftStoreGetTerm(pSyncNode)) { raftStoreSetTerm(pSyncNode, term); @@ -1903,13 +1862,19 @@ void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) { sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm); } while (0); + if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { + taosThreadMutexLock(&pSyncNode->arbTokenMutex); + syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken); + sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken); + taosThreadMutexUnlock(&pSyncNode->arbTokenMutex); + } + if (currentTerm < newTerm) { raftStoreSetTerm(pSyncNode, newTerm); char tmpBuf[64]; snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm); syncNodeBecomeFollower(pSyncNode, tmpBuf); raftStoreClearVote(pSyncNode); - } else { if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) { syncNodeBecomeFollower(pSyncNode, "step down"); @@ -2170,28 +2135,6 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { sNTrace(pSyncNode, "follower to candidate"); } -#ifdef BUILD_NO_CALL -void syncNodeLeader2Follower(SSyncNode* pSyncNode) { - ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER); - syncNodeBecomeFollower(pSyncNode, "leader to follower"); - SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); - sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, - pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex); - - sNTrace(pSyncNode, "leader to follower"); -} - -void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { - ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); - syncNodeBecomeFollower(pSyncNode, "candidate to follower"); - SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); - sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, - pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex); - - sNTrace(pSyncNode, "candidate to follower"); -} -#endif - int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER); syncNodeBecomeLeader(pSyncNode, "assigned leader to leader"); From 14242331b9b29692d5809710fac908e5394d1471 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 17:25:58 +0800 Subject: [PATCH 03/10] fix(tsdb): fix error in decode key. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 5 ++--- source/libs/executor/src/scanoperator.c | 2 ++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 54d5c54788..6267eb1263 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -179,9 +179,8 @@ static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) { } if (IS_VAR_DATA_TYPE(indices[i].type)) { - tGetU32v(pKey->pks[i].pData, &pKey->pks[i].nData); - pKey->pks[i].pData = memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData); - pKey->pks[i].pData += pKey->pks[i].nData; + tdata += tGetU32v(tdata, &pKey->pks[i].nData); + memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData); } else { memcpy(&pKey->pks[i].val, data + indices[i].offset, tDataTypes[pKey->pks[i].type].bytes); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index bec5a73198..7274811812 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4133,6 +4133,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { if (pInfo->rtnNextDurationBlocks) { qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d", GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks); + if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) { pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]; ++pInfo->nextDurationBlocksIdx; @@ -4141,6 +4142,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { blockDataDestroy(pInfo->nextDurationBlocks[i]); pInfo->nextDurationBlocks[i] = NULL; } + pInfo->rtnNextDurationBlocks = false; pInfo->nextDurationBlocksIdx = 0; pInfo->numNextDurationBlocks = 0; From 5ee40fb5c9671aea63e8f2c050cd14b1ecac0218 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 18:38:17 +0800 Subject: [PATCH 04/10] fix(tsdb): fix invalid free --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 18 +++++++++++------- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 10 +++++----- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 3 ++- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 6267eb1263..047ee66b8f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -395,14 +395,18 @@ _err: return code; } -void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk) { +bool shouldFreePkBuf(SBlockLoadSuppInfo *pSupp) { + return pSupp->numOfPks > 0 && IS_VAR_DATA_TYPE(pSupp->pk.type); +} + +void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool needFree) { pIter->order = order; pIter->index = -1; pIter->numOfBlocks = 0; if (pIter->blockList == NULL) { pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); } else { - clearDataBlockIterator(pIter, hasPk); + clearDataBlockIterator(pIter, needFree); } } @@ -3202,7 +3206,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList); } else { // no block data, only last block exists tBlockDataReset(&pReader->status.fileBlockData); - resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0); + resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); resetTableListIndex(&pReader->status); } @@ -3312,7 +3316,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } tBlockDataReset(pBlockData); - resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0); + resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); resetTableListIndex(&pReader->status); ERetrieveType type = doReadDataFromSttFiles(pReader); @@ -4157,7 +4161,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { } initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); - resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0); + resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); int32_t code = TSDB_CODE_SUCCESS; if (pStatus->fileIter.numOfFiles == 0) { @@ -4361,7 +4365,7 @@ void tsdbReaderClose2(STsdbReader* pReader) { taosMemoryFree(pSupInfo->colId); tBlockDataDestroy(&pReader->status.fileBlockData); - cleanupDataBlockIterator(&pReader->status.blockIter, pReader->suppInfo.numOfPks > 0); + cleanupDataBlockIterator(&pReader->status.blockIter, shouldFreePkBuf(&pReader->suppInfo)); size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); if (pReader->status.pTableMap != NULL) { @@ -5037,7 +5041,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap); initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); - resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0); + resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); resetTableListIndex(&pReader->status); bool asc = ASCENDING_TRAVERSE(pReader->info.order); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index c82363c921..d049aed496 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -467,21 +467,21 @@ static void freePkItem(void* pItem) { taosMemoryFreeClear(p->lastPk.pData); } -void clearDataBlockIterator(SDataBlockIter* pIter, bool hasPk) { +void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree) { pIter->index = -1; pIter->numOfBlocks = 0; - if (hasPk) { + if (needFree) { taosArrayClearEx(pIter->blockList, freePkItem); } else { taosArrayClear(pIter->blockList); } } -void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk) { +void cleanupDataBlockIterator(SDataBlockIter* pIter, bool needFree) { pIter->index = -1; pIter->numOfBlocks = 0; - if (hasPk) { + if (needFree) { taosArrayDestroyEx(pIter->blockList, freePkItem); } else { taosArrayDestroy(pIter->blockList); @@ -492,7 +492,7 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 bool asc = ASCENDING_TRAVERSE(pReader->info.order); SBlockOrderSupporter sup = {0}; - clearDataBlockIterator(pBlockIter, pReader->suppInfo.numOfPks > 0); + clearDataBlockIterator(pBlockIter, shouldFreePkBuf(&pReader->suppInfo)); pBlockIter->numOfBlocks = numOfBlocks; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 94909aabf4..49bb92c7ce 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -349,8 +349,9 @@ int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2); int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc); void clearRowKey(SRowKey* pKey); +bool shouldFreePkBuf(SBlockLoadSuppInfo *pSupp); void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk); -void clearDataBlockIterator(SDataBlockIter* pIter, bool hasPk); +void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree); void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk); typedef struct { From 8f92dc614d7409054e49cbb2b2672216133d9590 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 19:01:23 +0800 Subject: [PATCH 05/10] fix(tsdb):add some logs. --- source/common/src/tdatablock.c | 12 ++++++++---- source/libs/executor/src/tsort.c | 4 ++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 20c4fa64c4..fdb72a9d5f 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1332,6 +1332,8 @@ void* blockDataDestroy(SSDataBlock* pBlock) { } if (IS_VAR_DATA_TYPE(pBlock->info.pks[0].type)) { + uInfo("1====free pk:%p, %p pBlock", pBlock->info.pks[0].pData, pBlock); + uInfo("2====free pk:%p, %p pBlock", pBlock->info.pks[1].pData, pBlock); taosMemoryFreeClear(pBlock->info.pks[0].pData); taosMemoryFreeClear(pBlock->info.pks[1].pData); } @@ -1503,11 +1505,13 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { pVal->type = pDataBlock->info.pks[0].type; pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[0].nData); pVal->nData = pDataBlock->info.pks[0].nData; + memcpy(pVal->pData, pDataBlock->info.pks[0].pData, pVal->nData); - pVal = &pBlock->info.pks[1]; - pVal->type = pDataBlock->info.pks[1].type; - pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData); - pVal->nData = pDataBlock->info.pks[1].nData; + SValue* p = &pBlock->info.pks[1]; + p->type = pDataBlock->info.pks[1].type; + p->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData); + p->nData = pDataBlock->info.pks[1].nData; + memcpy(p->pData, pDataBlock->info.pks[1].pData, p->nData); } if (copyData) { diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 44404c345e..3dbf29e3a8 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1649,8 +1649,8 @@ static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSH } static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { - size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); - SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); + size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); + SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); size_t maxBufSize = (pHandle->bSortByRowId) ? pHandle->extRowsMemSize : (pHandle->numOfPages * pHandle->pageSize); From b60cc321f3acbc4385049d875803f0c6d4a54099 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 19:35:14 +0800 Subject: [PATCH 06/10] fix(tsdb):add some logs. --- source/common/src/tdatablock.c | 5 +++ source/libs/executor/inc/executorInt.h | 29 +++++++------- source/libs/executor/src/scanoperator.c | 50 ++++++++++++------------- source/libs/executor/src/tsort.c | 6 +-- 4 files changed, 45 insertions(+), 45 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index fdb72a9d5f..fd56b5f5ae 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1485,6 +1485,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { SSDataBlock* pBlock = createDataBlock(); pBlock->info = pDataBlock->info; + pBlock->info.rows = 0; pBlock->info.capacity = 0; pBlock->info.rowSize = 0; @@ -1512,6 +1513,10 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { p->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData); p->nData = pDataBlock->info.pks[1].nData; memcpy(p->pData, pDataBlock->info.pks[1].pData, p->nData); + uInfo("===========clone block, with varchar, %p, 0---addr:%p, src:%p, %p", pBlock, pBlock->info.pks[0].pData, pDataBlock, pDataBlock->info.pks[0].pData); + uInfo("===========clone block, with varchar, %p, 1---addr:%p, src:%p, %p", pBlock, pBlock->info.pks[1].pData, pDataBlock, pDataBlock->info.pks[1].pData); + } else { + uInfo("===========clone block without varchar pk, %p, src:%p", pBlock, pDataBlock); } if (copyData) { diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c2032554b6..8e6f637d81 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -340,22 +340,21 @@ typedef struct STableMergeScanInfo { int32_t scanTimes; int32_t readIdx; SSDataBlock* pResBlock; - SSampleExecInfo sample; // sample execution info - SSHashObj* mTableNumRows; // uid->num of table rows - SHashObj* mSkipTables; - int64_t mergeLimit; + SSampleExecInfo sample; // sample execution info + SSHashObj* mTableNumRows; // uid->num of table rows + SHashObj* mSkipTables; + int64_t mergeLimit; SSortExecInfo sortExecInfo; - bool needCountEmptyTable; - bool bGroupProcessed; // the group return data means processed - bool filesetDelimited; - bool bNewFilesetEvent; - bool bNextDurationBlockEvent; - int32_t numNextDurationBlocks; - SSDataBlock* nextDurationBlocks[2]; - bool rtnNextDurationBlocks; - int32_t nextDurationBlocksIdx; - - bool bSortRowId; + bool needCountEmptyTable; + bool bGroupProcessed; // the group return data means processed + bool filesetDelimited; + bool bNewFilesetEvent; + bool bNextDurationBlockEvent; + int32_t numNextDurationBlocks; + SSDataBlock* nextDurationBlocks[2]; + bool rtnNextDurationBlocks; + int32_t nextDurationBlocksIdx; + bool bSortRowId; STmsSubTablesMergeInfo* pSubTablesMergeInfo; } STableMergeScanInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7274811812..1a47895c05 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4069,14 +4069,13 @@ static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) { } static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) { - STableMergeScanInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; - - SSDataBlock* pBlock = pInfo->pReaderBlock; - int32_t code = 0; - bool hasNext = false; - STsdbReader* reader = pInfo->base.dataReader; + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SSDataBlock* pBlock = pInfo->pReaderBlock; + int32_t code = 0; + bool hasNext = false; + STsdbReader* reader = pInfo->base.dataReader; code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); if (code != 0) { @@ -4112,27 +4111,23 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe *pSkipped = true; return; } + return; } static SSDataBlock* getBlockForTableMergeScan(void* param) { STableMergeScanSortSourceParam* source = param; - SOperatorInfo* pOperator = source->pOperator; - STableMergeScanInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; - SSDataBlock* pBlock = NULL; - int32_t code = 0; + SOperatorInfo* pOperator = source->pOperator; + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SSDataBlock* pBlock = NULL; + int64_t st = taosGetTimestampUs(); - int64_t st = taosGetTimestampUs(); - bool hasNext = false; - - STsdbReader* reader = pInfo->base.dataReader; while (true) { if (pInfo->rtnNextDurationBlocks) { - qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d", - GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks); + qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d", + GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks); if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) { pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]; @@ -4149,13 +4144,12 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { continue; } } else { - bool bFinished = false; bool bSkipped = false; doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped); pBlock = pInfo->pReaderBlock; - qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d", - GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent); + qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d", + GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent); if (bFinished) { pInfo->bNewFilesetEvent = false; break; @@ -4166,15 +4160,18 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true); ++pInfo->numNextDurationBlocks; if (pInfo->numNextDurationBlocks > 2) { - qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), pInfo->numNextDurationBlocks); + qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), + pInfo->numNextDurationBlocks); pInfo->bNewFilesetEvent = false; break; } } + if (pInfo->bNewFilesetEvent) { pInfo->rtnNextDurationBlocks = true; return NULL; } + if (pInfo->bNextDurationBlockEvent) { pInfo->bNextDurationBlockEvent = false; continue; @@ -4182,19 +4179,18 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { } if (bSkipped) continue; } + pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid); pOperator->resultInfo.totalRows += pBlock->info.rows; - pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - + return pBlock; } return NULL; } - SArray* generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order) { SArray* pSortInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); SBlockOrderInfo biTs = {0}; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 3dbf29e3a8..cd1a858175 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1688,12 +1688,12 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if (pBlk != NULL) { SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId); - int64_t firstRowTs = *(int64_t*)tsCol->pData; - if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || + int64_t firstRowTs = *(int64_t*)tsCol->pData; + if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || (pOrigTsOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) { if (bExtractedBlock) { blockDataDestroy(pBlk); - } + } continue; } } From efdd0c8a2a1b46fb98b49bec8da91226fe274545 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 22:34:47 +0800 Subject: [PATCH 07/10] refactor: do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 19 +++++++------------ source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 20 ++++++++++---------- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 2 -- source/libs/executor/src/executil.c | 1 + 4 files changed, 18 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 047ee66b8f..b7f97771da 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -119,7 +119,7 @@ int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) { return ret > 0 ? 1 : -1; } } else { - return comparFn(&p1->pks[0].val, &p2->pks[0].val); + return p1->pks[0].val - p2->pks[0].val; } } } @@ -396,7 +396,7 @@ _err: } bool shouldFreePkBuf(SBlockLoadSuppInfo *pSupp) { - return pSupp->numOfPks > 0 && IS_VAR_DATA_TYPE(pSupp->pk.type); + return (pSupp->numOfPks > 0) && IS_VAR_DATA_TYPE(pSupp->pk.type); } void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool needFree) { @@ -824,18 +824,13 @@ static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, S return TSDB_CODE_SUCCESS; } -// todo keep the the last returned key static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) { -// int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1; pDumpInfo->allDumped = true; -// ASSERT(0); -// pDumpInfo->lastKey.key.ts = maxKey + step; } static void updateLastKeyInfo(SRowKey* pKey, SFileDataBlockInfo* pBlockInfo, SDataBlockInfo* pInfo, int32_t numOfPks, bool asc) { pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey; - pKey->numOfPKs = numOfPks; if (pKey->numOfPKs <= 0) { return; @@ -845,7 +840,7 @@ static void updateLastKeyInfo(SRowKey* pKey, SFileDataBlockInfo* pBlockInfo, SDa pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val; } else { uint8_t* p = asc ? pBlockInfo->lastPk.pData : pBlockInfo->firstPk.pData; - pKey->pks[0].nData = asc ? pBlockInfo->lastPKLen : pBlockInfo->firstPKLen; + pKey->pks[0].nData = asc ? varDataLen(pBlockInfo->lastPk.pData) : varDataLen(pBlockInfo->firstPk.pData); memcpy(pKey->pks[0].pData, p, pKey->pks[0].nData); } } @@ -2841,11 +2836,11 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn pInfo->pks[0].val = pBlockInfo->firstPk.val; pInfo->pks[1].val = pBlockInfo->lastPk.val; } else { - memcpy(pInfo->pks[0].pData, pBlockInfo->firstPk.pData, pBlockInfo->firstPKLen); - memcpy(pInfo->pks[1].pData, pBlockInfo->lastPk.pData, pBlockInfo->lastPKLen); + memcpy(pInfo->pks[0].pData, varDataVal(pBlockInfo->firstPk.pData), varDataLen(pBlockInfo->firstPk.pData)); + memcpy(pInfo->pks[1].pData, varDataVal(pBlockInfo->lastPk.pData), varDataLen(pBlockInfo->lastPk.pData)); - pInfo->pks[0].nData = pBlockInfo->firstPKLen; - pInfo->pks[1].nData = pBlockInfo->lastPKLen; + pInfo->pks[0].nData = varDataLen(pBlockInfo->firstPk.pData); + pInfo->pks[1].nData = varDataLen(pBlockInfo->lastPk.pData); } } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index d049aed496..ae8a6466ae 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -446,17 +446,17 @@ void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record) { if (IS_NUMERIC_TYPE(pFirstKey->pks[0].type)) { pBlockInfo->firstPk.val = pFirstKey->pks[0].val; pBlockInfo->lastPk.val = record->lastKey.key.pks[0].val; + } else { + char* p = taosMemoryCalloc(1, pFirstKey->pks[0].nData + VARSTR_HEADER_SIZE); + memcpy(varDataVal(p), pFirstKey->pks[0].pData, pFirstKey->pks[0].nData); + varDataSetLen(p, pFirstKey->pks[0].nData); + pBlockInfo->firstPk.pData = (uint8_t*)p; - pBlockInfo->firstPKLen = 0; - pBlockInfo->lastPKLen = 0; - } else { // todo handle memory alloc error, opt memory alloc perf - pBlockInfo->firstPKLen = pFirstKey->pks[0].nData; - pBlockInfo->firstPk.pData = taosMemoryCalloc(1, pBlockInfo->firstPKLen); - memcpy(pBlockInfo->firstPk.pData, pFirstKey->pks[0].pData, pBlockInfo->firstPKLen); - - pBlockInfo->lastPKLen = record->lastKey.key.pks[0].nData; - pBlockInfo->lastPk.pData = taosMemoryCalloc(1, pBlockInfo->lastPKLen); - memcpy(pBlockInfo->lastPk.pData, record->lastKey.key.pks[0].pData, pBlockInfo->lastPKLen); + int32_t keyLen = record->lastKey.key.pks[0].nData; + p = taosMemoryCalloc(1, keyLen + VARSTR_HEADER_SIZE); + memcpy(varDataVal(p), record->lastKey.key.pks[0].pData, keyLen); + varDataSetLen(p, keyLen); + pBlockInfo->lastPk.pData = (uint8_t*)p; } } } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 49bb92c7ce..581696c94a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -212,8 +212,6 @@ typedef struct SFileDataBlockInfo { uint8_t* pData; } lastPk; - int32_t firstPKLen; - int32_t lastPKLen; int64_t minVer; int64_t maxVer; int64_t blockOffset; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index d152baf502..be6fb2983c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -261,6 +261,7 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) pBlockInfo->pks[0].type = pInfoData->info.type; pBlockInfo->pks[1].type = pInfoData->info.type; + // allocate enough buffer size, which is pInfoData->info.bytes if (IS_VAR_DATA_TYPE(pItem->dataType.type)) { pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes); if (pBlockInfo->pks[0].pData == NULL) { From f2ccb8aa7ede38e6835fbab8b7b00fc42f861359 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 22:54:35 +0800 Subject: [PATCH 08/10] fix(stream): add lock when retrieving info from the tableGroup struct --- source/libs/executor/src/scanoperator.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 64806a1c72..9ccabd2cd6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -708,9 +708,7 @@ static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, i tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size); pInfo->tableStartIndex = TARRAY_ELEM_IDX(pInfo->base.pTableListInfo->pTableList, *pKeyInfo); - pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1); - pInfo->pResBlock->info.blankFill = false; if (!pInfo->needCountEmptyTable) { @@ -1011,8 +1009,8 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { STableScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; - int32_t num = 0; - STableKeyInfo* pList = NULL; + int32_t num = 0; + STableKeyInfo* pList = NULL; if (pInfo->currentGroupId == -1) { if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { @@ -1020,7 +1018,10 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { return NULL; } + taosRLockLatch(&pTaskInfo->lock); initNextGroupScan(pInfo, &pList, &num); + taosRUnLockLatch(&pTaskInfo->lock); + ASSERT(pInfo->base.dataReader == NULL); int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, From 80707cd59205fa18965d5485d6fef39248a9020a Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 12 Apr 2024 10:42:41 +0800 Subject: [PATCH 09/10] fix: primary key column check for stb insert --- source/libs/parser/src/parInsertSql.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index f3192b4956..700cfff0b7 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1965,7 +1965,7 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt clearStbRowsDataContext(pStbRowsCxt); - return TSDB_CODE_SUCCESS; + return code; } static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow, From 4da12faf2969c2ca8494cfb4d2ae6896a749f5b9 Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 12 Apr 2024 10:53:43 +0800 Subject: [PATCH 10/10] enh: error process for stb insert --- source/libs/parser/src/parInsertSql.c | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 700cfff0b7..9829528d92 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1945,10 +1945,14 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt if (code == TSDB_CODE_SUCCESS && bFirstTable) { code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt); } - - code = insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid), - pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, false, true); - initTableColSubmitData(*ppTableDataCxt); + if (code == TSDB_CODE_SUCCESS) { + code = + insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid), + pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, false, true); + } + if (code == TSDB_CODE_SUCCESS) { + code = initTableColSubmitData(*ppTableDataCxt); + } if (code == TSDB_CODE_SUCCESS) { SRow** pRow = taosArrayReserve((*ppTableDataCxt)->pData->aRowP, 1); code = tRowBuild(pStbRowsCxt->aColVals, (*ppTableDataCxt)->pSchema, pRow);