Merge branch '3.0' into feat/TS-4243-3.0

This commit is contained in:
Haojun Liao 2024-04-12 09:17:06 +08:00 committed by GitHub
commit 27dd0349bc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 15 additions and 83 deletions

View File

@ -708,9 +708,7 @@ static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, i
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size); tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size);
pInfo->tableStartIndex = TARRAY_ELEM_IDX(pInfo->base.pTableListInfo->pTableList, *pKeyInfo); pInfo->tableStartIndex = TARRAY_ELEM_IDX(pInfo->base.pTableListInfo->pTableList, *pKeyInfo);
pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1); pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1);
pInfo->pResBlock->info.blankFill = false; pInfo->pResBlock->info.blankFill = false;
if (!pInfo->needCountEmptyTable) { if (!pInfo->needCountEmptyTable) {
@ -1011,8 +1009,8 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
STableScanInfo* pInfo = pOperator->info; STableScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int32_t num = 0; int32_t num = 0;
STableKeyInfo* pList = NULL; STableKeyInfo* pList = NULL;
if (pInfo->currentGroupId == -1) { if (pInfo->currentGroupId == -1) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
@ -1020,7 +1018,10 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
taosRLockLatch(&pTaskInfo->lock);
initNextGroupScan(pInfo, &pList, &num); initNextGroupScan(pInfo, &pList, &num);
taosRUnLockLatch(&pTaskInfo->lock);
ASSERT(pInfo->base.dataReader == NULL); ASSERT(pInfo->base.dataReader == NULL);
int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,

View File

@ -1339,11 +1339,13 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
// The gap is less than the threshold, so it belongs to current session window that has been opened already. // The gap is less than the threshold, so it belongs to current session window that has been opened already.
doKeepTuple(pRowSup, tsList[j], gid); doKeepTuple(pRowSup, tsList[j], gid);
} else { // start a new session window } else { // start a new session window
// start a new session window
if (pRowSup->numOfRows > 0) { // handled data that belongs to the previous session window if (pRowSup->numOfRows > 0) { // handled data that belongs to the previous session window
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
// keep the time window for the closed time window. // keep the time window for the closed time window.
STimeWindow window = pRowSup->win; STimeWindow window = pRowSup->win;
int32_t ret = int32_t ret =
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);

View File

@ -78,21 +78,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely); SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely);
if (ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { if (ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
if (commitIndex >= ths->assignedCommitIndex) { if (commitIndex >= ths->assignedCommitIndex) {
terrno = TSDB_CODE_SUCCESS; syncNodeStepDown(ths, pMsg->term);
raftStoreNextTerm(ths);
if (terrno != TSDB_CODE_SUCCESS) {
sError("vgId:%d, failed to update term, reason:%s", ths->vgId, tstrerror(terrno));
return -1;
}
if (syncNodeAssignedLeader2Leader(ths) != 0) {
sError("vgId:%d, failed to change state from assigned leader to leader", ths->vgId);
return -1;
}
taosThreadMutexLock(&ths->arbTokenMutex);
syncUtilGenerateArbToken(ths->myNodeInfo.nodeId, ths->vgId, ths->arbToken);
sInfo("vgId:%d, assigned leader to leader, arbToken:%s", ths->vgId, ths->arbToken);
taosThreadMutexUnlock(&ths->arbTokenMutex);
} }
} else { } else {
(void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex); (void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex);

View File

@ -503,20 +503,6 @@ int32_t syncEndSnapshot(int64_t rid) {
return code; return code;
} }
#ifdef BUILD_NO_CALL
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
sError("sync step down error");
return -1;
}
syncNodeStepDown(pSyncNode, newTerm);
syncNodeRelease(pSyncNode);
return 0;
}
#endif
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) { bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
@ -1277,7 +1263,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
// start in syncNodeStart // start in syncNodeStart
// start raft // start raft
// syncNodeBecomeFollower(pSyncNode);
int64_t timeNow = taosGetTimestampMs(); int64_t timeNow = taosGetTimestampMs();
pSyncNode->startTime = timeNow; pSyncNode->startTime = timeNow;
@ -1848,20 +1833,6 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// persist cfg // persist cfg
syncWriteCfgFile(pSyncNode); syncWriteCfgFile(pSyncNode);
#if 0
// change isStandBy to normal (election timeout)
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(pSyncNode, "");
// Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop(pSyncNode);
// syncMaybeAdvanceCommitIndex(pSyncNode);
} else {
syncNodeBecomeFollower(pSyncNode, "");
}
#endif
} else { } else {
// persist cfg // persist cfg
syncWriteCfgFile(pSyncNode); syncWriteCfgFile(pSyncNode);
@ -1874,18 +1845,6 @@ _END:
} }
// raft state change -------------- // raft state change --------------
#ifdef BUILD_NO_CALL
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
if (term > raftStoreGetTerm(pSyncNode)) {
raftStoreSetTerm(pSyncNode, term);
char tmpBuf[64];
snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
syncNodeBecomeFollower(pSyncNode, tmpBuf);
raftStoreClearVote(pSyncNode);
}
}
#endif
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) { void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
if (term > raftStoreGetTerm(pSyncNode)) { if (term > raftStoreGetTerm(pSyncNode)) {
raftStoreSetTerm(pSyncNode, term); raftStoreSetTerm(pSyncNode, term);
@ -1903,13 +1862,19 @@ void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm); sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
} while (0); } while (0);
if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
taosThreadMutexLock(&pSyncNode->arbTokenMutex);
syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken);
taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
}
if (currentTerm < newTerm) { if (currentTerm < newTerm) {
raftStoreSetTerm(pSyncNode, newTerm); raftStoreSetTerm(pSyncNode, newTerm);
char tmpBuf[64]; char tmpBuf[64];
snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm); snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
syncNodeBecomeFollower(pSyncNode, tmpBuf); syncNodeBecomeFollower(pSyncNode, tmpBuf);
raftStoreClearVote(pSyncNode); raftStoreClearVote(pSyncNode);
} else { } else {
if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) { if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
syncNodeBecomeFollower(pSyncNode, "step down"); syncNodeBecomeFollower(pSyncNode, "step down");
@ -2170,28 +2135,6 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
sNTrace(pSyncNode, "follower to candidate"); sNTrace(pSyncNode, "follower to candidate");
} }
#ifdef BUILD_NO_CALL
void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
syncNodeBecomeFollower(pSyncNode, "leader to follower");
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
sNTrace(pSyncNode, "leader to follower");
}
void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
syncNodeBecomeFollower(pSyncNode, "candidate to follower");
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
sNTrace(pSyncNode, "candidate to follower");
}
#endif
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) { int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER); ASSERT(pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER);
syncNodeBecomeLeader(pSyncNode, "assigned leader to leader"); syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");