From a951af2492b99df52bddb2724c8caac86d363316 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 16:21:41 +0800 Subject: [PATCH 1/3] fix(query): check the rows before apply the agg in session window. --- source/libs/executor/src/timewindowoperator.c | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 57c038e75a..9474db8553 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1332,23 +1332,25 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator // The gap is less than the threshold, so it belongs to current session window that has been opened already. doKeepTuple(pRowSup, tsList[j], gid); } else { // start a new session window - SResultRow* pResult = NULL; + // start a new session window + if (pRowSup->numOfRows > 0) { // handled data that belongs to the previous session window + SResultRow* pResult = NULL; - // keep the time window for the closed time window. - STimeWindow window = pRowSup->win; + // keep the time window for the closed time window. + STimeWindow window = pRowSup->win; + int32_t ret = + setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, + numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + } - pRowSup->win.ekey = pRowSup->win.skey; - int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, - numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + // pInfo->numOfRows data belong to the current session window + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0); + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); } - // pInfo->numOfRows data belong to the current session window - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, pBlock->info.rows, numOfOutput); - // here we start a new session window doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); doKeepTuple(pRowSup, tsList[j], gid); From c39fcc5194eefc13c79ccc458a39e7e703c20543 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Thu, 11 Apr 2024 16:55:01 +0800 Subject: [PATCH 2/3] fix: arb assigned step down need to reset token --- source/libs/sync/src/syncAppendEntriesReply.c | 16 +---- source/libs/sync/src/syncMain.c | 71 ++----------------- 2 files changed, 8 insertions(+), 79 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index ede4dc07e1..4b7ed59039 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -78,21 +78,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely); if (ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { if (commitIndex >= ths->assignedCommitIndex) { - terrno = TSDB_CODE_SUCCESS; - raftStoreNextTerm(ths); - if (terrno != TSDB_CODE_SUCCESS) { - sError("vgId:%d, failed to update term, reason:%s", ths->vgId, tstrerror(terrno)); - return -1; - } - if (syncNodeAssignedLeader2Leader(ths) != 0) { - sError("vgId:%d, failed to change state from assigned leader to leader", ths->vgId); - return -1; - } - - taosThreadMutexLock(&ths->arbTokenMutex); - syncUtilGenerateArbToken(ths->myNodeInfo.nodeId, ths->vgId, ths->arbToken); - sInfo("vgId:%d, assigned leader to leader, arbToken:%s", ths->vgId, ths->arbToken); - taosThreadMutexUnlock(&ths->arbTokenMutex); + syncNodeStepDown(ths, pMsg->term); } } else { (void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 85aa3a2796..fbdb5f4201 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -503,20 +503,6 @@ int32_t syncEndSnapshot(int64_t rid) { return code; } -#ifdef BUILD_NO_CALL -int32_t syncStepDown(int64_t rid, SyncTerm newTerm) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - sError("sync step down error"); - return -1; - } - - syncNodeStepDown(pSyncNode, newTerm); - syncNodeRelease(pSyncNode); - return 0; -} -#endif - bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) { if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; @@ -1277,7 +1263,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { // start in syncNodeStart // start raft - // syncNodeBecomeFollower(pSyncNode); int64_t timeNow = taosGetTimestampMs(); pSyncNode->startTime = timeNow; @@ -1848,20 +1833,6 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde // persist cfg syncWriteCfgFile(pSyncNode); - -#if 0 - // change isStandBy to normal (election timeout) - if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - syncNodeBecomeLeader(pSyncNode, ""); - - // Raft 3.6.2 Committing entries from previous terms - syncNodeAppendNoop(pSyncNode); - // syncMaybeAdvanceCommitIndex(pSyncNode); - - } else { - syncNodeBecomeFollower(pSyncNode, ""); - } -#endif } else { // persist cfg syncWriteCfgFile(pSyncNode); @@ -1874,18 +1845,6 @@ _END: } // raft state change -------------- -#ifdef BUILD_NO_CALL -void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { - if (term > raftStoreGetTerm(pSyncNode)) { - raftStoreSetTerm(pSyncNode, term); - char tmpBuf[64]; - snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term); - syncNodeBecomeFollower(pSyncNode, tmpBuf); - raftStoreClearVote(pSyncNode); - } -} -#endif - void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) { if (term > raftStoreGetTerm(pSyncNode)) { raftStoreSetTerm(pSyncNode, term); @@ -1903,13 +1862,19 @@ void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) { sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm); } while (0); + if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { + taosThreadMutexLock(&pSyncNode->arbTokenMutex); + syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken); + sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken); + taosThreadMutexUnlock(&pSyncNode->arbTokenMutex); + } + if (currentTerm < newTerm) { raftStoreSetTerm(pSyncNode, newTerm); char tmpBuf[64]; snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm); syncNodeBecomeFollower(pSyncNode, tmpBuf); raftStoreClearVote(pSyncNode); - } else { if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) { syncNodeBecomeFollower(pSyncNode, "step down"); @@ -2170,28 +2135,6 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { sNTrace(pSyncNode, "follower to candidate"); } -#ifdef BUILD_NO_CALL -void syncNodeLeader2Follower(SSyncNode* pSyncNode) { - ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER); - syncNodeBecomeFollower(pSyncNode, "leader to follower"); - SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); - sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, - pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex); - - sNTrace(pSyncNode, "leader to follower"); -} - -void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { - ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); - syncNodeBecomeFollower(pSyncNode, "candidate to follower"); - SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); - sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, - pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex); - - sNTrace(pSyncNode, "candidate to follower"); -} -#endif - int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER); syncNodeBecomeLeader(pSyncNode, "assigned leader to leader"); From f2ccb8aa7ede38e6835fbab8b7b00fc42f861359 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 22:54:35 +0800 Subject: [PATCH 3/3] fix(stream): add lock when retrieving info from the tableGroup struct --- source/libs/executor/src/scanoperator.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 64806a1c72..9ccabd2cd6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -708,9 +708,7 @@ static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, i tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size); pInfo->tableStartIndex = TARRAY_ELEM_IDX(pInfo->base.pTableListInfo->pTableList, *pKeyInfo); - pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1); - pInfo->pResBlock->info.blankFill = false; if (!pInfo->needCountEmptyTable) { @@ -1011,8 +1009,8 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { STableScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; - int32_t num = 0; - STableKeyInfo* pList = NULL; + int32_t num = 0; + STableKeyInfo* pList = NULL; if (pInfo->currentGroupId == -1) { if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { @@ -1020,7 +1018,10 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { return NULL; } + taosRLockLatch(&pTaskInfo->lock); initNextGroupScan(pInfo, &pList, &num); + taosRUnLockLatch(&pTaskInfo->lock); + ASSERT(pInfo->base.dataReader == NULL); int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,