diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index ad525a2aa7..a19b249bc7 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -46,6 +46,7 @@ extern "C" { #define SYNC_HEARTBEAT_SLOW_MS 1500 #define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500 #define SYNC_SNAP_RESEND_MS 1000 * 60 +#define SYNC_SNAP_TIMEOUT_MS 1000 * 600 #define SYNC_VND_COMMIT_MIN_MS 3000 diff --git a/include/util/tdef.h b/include/util/tdef.h index bb5d4cfa96..ffe69dd118 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -303,6 +303,8 @@ typedef enum ELogicConditionType { #define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512 #define TSDB_SYNC_NEGOTIATION_WIN 512 +#define TSDB_SYNC_SNAP_BUFFER_SIZE 2048 + #define TSDB_TBNAME_COLUMN_INDEX (-1) #define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 87b407efcb..91244e321f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -584,6 +584,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * // commit json if (!rollback) { + ASSERT(pVnode->config.vgId == pWriter->info.config.vgId); pWriter->info.state.committed = pWriter->ever; pVnode->config = pWriter->info.config; pVnode->state = (SVState){.committed = pWriter->info.state.committed, diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 6c03ed68e9..c9e805d80b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -516,7 +516,10 @@ static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex); int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot); - vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code); + if (code != 0) { + vError("vgId:%d, failed to finish applying vnode snapshot since %s, code:0x%x", pVnode->config.vgId, terrstr(), + code); + } return code; } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index cec1a12024..637c18e97d 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -249,8 +249,8 @@ int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg); -int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg); -int32_t syncNodeOnSnapshotRsp(SSyncNode* ths, const SRpcMsg* pMsg); +int32_t syncNodeOnSnapshot(SSyncNode* ths, SRpcMsg* pMsg); +int32_t syncNodeOnSnapshotRsp(SSyncNode* ths, SRpcMsg* pMsg); int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg); diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 95382132b5..f8ee99e8a0 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -22,21 +22,41 @@ extern "C" { #include "syncInt.h" -#define SYNC_SNAPSHOT_SEQ_INVALID -2 #define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -3 -#define SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT -1 +#define SYNC_SNAPSHOT_SEQ_INVALID -2 +#define SYNC_SNAPSHOT_SEQ_PREP -1 #define SYNC_SNAPSHOT_SEQ_BEGIN 0 #define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF #define SYNC_SNAPSHOT_RETRY_MS 5000 +typedef struct SSyncSnapBuffer { + void *entries[TSDB_SYNC_SNAP_BUFFER_SIZE]; + int64_t start; + int64_t cursor; + int64_t end; + int64_t size; + TdThreadMutex mutex; + void (*entryDeleteCb)(void *ptr); +} SSyncSnapBuffer; + +typedef struct SyncSnapBlock { + int32_t seq; + int8_t acked; + int64_t sendTimeMs; + + int16_t blockType; + void *pBlock; + int32_t blockLen; +} SyncSnapBlock; + +void syncSnapBlockDestroy(void *ptr); + typedef struct SSyncSnapshotSender { int8_t start; int32_t seq; int32_t ack; void *pReader; - void *pCurrentBlock; - int32_t blockLen; SSnapshotParam snapshotParam; SSnapshot snapshot; SSyncCfg lastConfig; @@ -47,6 +67,9 @@ typedef struct SSyncSnapshotSender { int64_t lastSendTime; bool finish; + // ring buffer for ack + SSyncSnapBuffer *pSndBuf; + // init when create SSyncNode *pSyncNode; int32_t replicaIndex; @@ -72,6 +95,9 @@ typedef struct SSyncSnapshotReceiver { SSnapshotParam snapshotParam; SSnapshot snapshot; + // buffer + SSyncSnapBuffer *pRcvBuf; + // init when create SSyncNode *pSyncNode; } SSyncSnapshotReceiver; @@ -83,8 +109,8 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); // on message -int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg); -int32_t syncNodeOnSnapshotRsp(SSyncNode *ths, const SRpcMsg *pMsg); +// int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg); +// int32_t syncNodeOnSnapshotRsp(SSyncNode *ths, const SRpcMsg *pMsg); SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode *pSyncNode, SyncIndex snapshotLastApplyIndex); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index f9dc10da02..199c7a1445 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -818,7 +818,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { if (!taosCheckExistFile(pSyncNode->configPath)) { // create a new raft config file - sInfo("vgId:%d, create a new raft config file", pSyncNode->vgId); + sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId); + pSyncNode->vgId = pSyncInfo->vgId; pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy; pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy; pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex; diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index a7ee37cc3b..f2386797c1 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -797,7 +797,7 @@ _out: pMgr->retryBackoff = syncLogReplGetNextRetryBackoff(pMgr); SSyncLogBuffer* pBuf = pNode->pLogBuf; sInfo("vgId:%d, resend %d sync log entries. dest:%" PRIx64 ", indexes:%" PRId64 " ..., terms: ... %" PRId64 - ", retryWaitMs:%" PRId64 ", mgr: [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 + ", retryWaitMs:%" PRId64 ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); @@ -815,9 +815,9 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn ASSERT(pMgr->matchIndex == 0); if (pMsg->matchIndex < 0) { pMgr->restored = true; - sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64 - ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, DID(&destId), destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, + sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 + "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); return 0; } @@ -832,9 +832,9 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) { pMgr->matchIndex = pMsg->matchIndex; pMgr->restored = true; - sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64 - ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, DID(&destId), destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, + sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 + "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); return 0; } @@ -958,10 +958,10 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde pMgr->endIndex = index + 1; SSyncLogBuffer* pBuf = pNode->pLogBuf; - sTrace("vgId:%d, probe peer:%" PRIx64 " with msg of index:%" PRId64 " term:%" PRId64 ". mgr (rs:%d): [%" PRId64 - " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, pDestId->addr, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, - pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + sTrace("vgId:%d, probe peer:%" PRIx64 " with msg of index:%" PRId64 " term:%" PRId64 ". repl-mgr:[%" PRId64 + " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, pDestId->addr, index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, + pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); return 0; } @@ -1002,9 +1002,9 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { pMgr->endIndex = index + 1; if (barrier) { - sInfo("vgId:%d, replicated sync barrier to dnode:%d. index:%" PRId64 ", term:%" PRId64 - ", repl mgr: rs(%d) [%" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, DID(pDestId), index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex); + sInfo("vgId:%d, replicated sync barrier to dnode:%d. index:%" PRId64 ", term:%" PRId64 ", repl-mgr:[%" PRId64 + " %" PRId64 ", %" PRId64 ")", + pNode->vgId, DID(pDestId), index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex); break; } } @@ -1013,10 +1013,10 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { SSyncLogBuffer* pBuf = pNode->pLogBuf; sTrace("vgId:%d, replicated %d msgs to peer:%" PRIx64 ". indexes:%" PRId64 "..., terms: ...%" PRId64 - ", mgr: (rs:%d) [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 + ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, count, pDestId->addr, firstIndex, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, - pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + pNode->vgId, count, pDestId->addr, firstIndex, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, + pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); return 0; } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 924813eb98..0b94d377f1 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -23,6 +23,44 @@ #include "syncReplication.h" #include "syncUtil.h" +static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) { + taosThreadMutexLock(&pBuf->mutex); + for (int64_t i = pBuf->start; i < pBuf->end; ++i) { + if (pBuf->entryDeleteCb) { + pBuf->entryDeleteCb(pBuf->entries[i % pBuf->size]); + } + pBuf->entries[i % pBuf->size] = NULL; + } + pBuf->start = SYNC_SNAPSHOT_SEQ_BEGIN + 1; + pBuf->end = pBuf->start; + pBuf->cursor = pBuf->start - 1; + taosThreadMutexUnlock(&pBuf->mutex); +} + +static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) { + if (ppBuf == NULL || ppBuf[0] == NULL) return; + SSyncSnapBuffer *pBuf = ppBuf[0]; + + syncSnapBufferReset(pBuf); + + taosThreadMutexDestroy(&pBuf->mutex); + taosMemoryFree(ppBuf[0]); + ppBuf[0] = NULL; + return; +} + +static SSyncSnapBuffer *syncSnapBufferCreate() { + SSyncSnapBuffer *pBuf = taosMemoryCalloc(1, sizeof(SSyncSnapBuffer)); + if (pBuf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + pBuf->size = sizeof(pBuf->entries) / sizeof(void *); + ASSERT(pBuf->size == TSDB_SYNC_SNAP_BUFFER_SIZE); + taosThreadMutexInit(&pBuf->mutex, NULL); + return pBuf; +} + SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) && (pSyncNode->pFsm->FpSnapshotDoRead != NULL); @@ -38,8 +76,6 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; pSender->pReader = NULL; - pSender->pCurrentBlock = NULL; - pSender->blockLen = 0; pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS; pSender->pSyncNode = pSyncNode; pSender->replicaIndex = replicaIndex; @@ -49,24 +85,42 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot); pSender->finish = false; + SSyncSnapBuffer *pSndBuf = syncSnapBufferCreate(); + if (pSndBuf == NULL) { + taosMemoryFree(pSender); + pSender = NULL; + return NULL; + } + pSndBuf->entryDeleteCb = syncSnapBlockDestroy; + pSender->pSndBuf = pSndBuf; + + syncSnapBufferReset(pSender->pSndBuf); return pSender; } +void syncSnapBlockDestroy(void *ptr) { + SyncSnapBlock *pBlk = ptr; + if (pBlk->pBlock != NULL) { + taosMemoryFree(pBlk->pBlock); + pBlk->pBlock = NULL; + pBlk->blockLen = 0; + } + taosMemoryFree(pBlk); +} + void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { if (pSender == NULL) return; - // free current block - if (pSender->pCurrentBlock != NULL) { - taosMemoryFree(pSender->pCurrentBlock); - pSender->pCurrentBlock = NULL; - } - // close reader if (pSender->pReader != NULL) { pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); pSender->pReader = NULL; } + // free snap buffer + if (pSender->pSndBuf) { + syncSnapBufferDestroy(&pSender->pSndBuf); + } // free sender taosMemoryFree(pSender); } @@ -79,11 +133,9 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { int8_t started = atomic_val_compare_exchange_8(&pSender->start, false, true); if (started) return 0; - pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; + pSender->seq = SYNC_SNAPSHOT_SEQ_PREP; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; pSender->pReader = NULL; - pSender->pCurrentBlock = NULL; - pSender->blockLen = 0; pSender->snapshotParam.start = SYNC_INDEX_INVALID; pSender->snapshotParam.end = SYNC_INDEX_INVALID; pSender->snapshot.data = NULL; @@ -127,29 +179,28 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { SyncSnapshotSend *pMsg = rpcMsg.pCont; pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - pMsg->term = raftStoreGetTerm(pSender->pSyncNode); + pMsg->term = pSender->term; pMsg->beginIndex = pSender->snapshotParam.start; pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastTerm = pSender->snapshot.lastApplyTerm; pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; pMsg->lastConfig = pSender->lastConfig; pMsg->startTime = pSender->startTime; - pMsg->seq = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT; + pMsg->seq = pSender->seq; if (dataLen > 0) { pMsg->payloadType = snapInfo.type; memcpy(pMsg->data, snapInfo.data, dataLen); } - // event log - syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start"); - // send msg if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { sSError(pSender, "snapshot sender send msg failed since %s", terrstr()); goto _out; } + sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&pMsg->destId)); + code = 0; _out: if (snapInfo.data) { @@ -175,48 +226,59 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { pSender->pReader = NULL; } - // free current block - if (pSender->pCurrentBlock != NULL) { - taosMemoryFree(pSender->pCurrentBlock); - pSender->pCurrentBlock = NULL; - pSender->blockLen = 0; - } + syncSnapBufferReset(pSender->pSndBuf); + + SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; + sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish); } // when sender receive ack, call this function to send msg from seq // seq = ack + 1, already updated static int32_t snapshotSend(SSyncSnapshotSender *pSender) { - // free memory last time (current seq - 1) - if (pSender->pCurrentBlock != NULL) { - taosMemoryFree(pSender->pCurrentBlock); - pSender->pCurrentBlock = NULL; - pSender->blockLen = 0; + int32_t code = -1; + SyncSnapBlock *pBlk = NULL; + + if (pSender->seq < SYNC_SNAPSHOT_SEQ_END) { + pSender->seq++; + + if (pSender->seq > SYNC_SNAPSHOT_SEQ_BEGIN) { + pBlk = taosMemoryCalloc(1, sizeof(SyncSnapBlock)); + if (pBlk == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OUT; + } + + pBlk->seq = pSender->seq; + + // read data + int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, + &pBlk->pBlock, &pBlk->blockLen); + if (ret != 0) { + sSError(pSender, "snapshot sender read failed since %s", terrstr()); + goto _OUT; + } + + if (pBlk->blockLen > 0) { + // has read data + sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pBlk->blockLen, pBlk->seq); + } else { + // read finish, update seq to end + pSender->seq = SYNC_SNAPSHOT_SEQ_END; + sSInfo(pSender, "snapshot sender read to the end"); + code = 0; + goto _OUT; + } + } } - // read data - int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, - &pSender->pCurrentBlock, &pSender->blockLen); - if (ret != 0) { - sSError(pSender, "snapshot sender read failed since %s", terrstr()); - return -1; - } - - if (pSender->blockLen > 0) { - // has read data - sSDebug(pSender, "vgId:%d, snapshot sender continue to read, blockLen:%d seq:%d", pSender->pSyncNode->vgId, - pSender->blockLen, pSender->seq); - } else { - // read finish, update seq to end - pSender->seq = SYNC_SNAPSHOT_SEQ_END; - sSInfo(pSender, "vgId:%d, snapshot sender read to the end, blockLen:%d seq:%d", pSender->pSyncNode->vgId, - pSender->blockLen, pSender->seq); - } + ASSERT(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END); + int32_t blockLen = (pBlk != NULL) ? pBlk->blockLen : 0; // build msg SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) { + if (syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId) != 0) { sSError(pSender, "vgId:%d, snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr()); - return -1; + goto _OUT; } SyncSnapshotSend *pMsg = rpcMsg.pCont; @@ -231,77 +293,83 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { pMsg->startTime = pSender->startTime; pMsg->seq = pSender->seq; - if (pSender->pCurrentBlock != NULL) { - memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); - } - - // event log - if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) { - syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender finish"); - } else { - syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender sending"); + if (pBlk != NULL && pBlk->pBlock != NULL && pBlk->blockLen > 0) { + memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen); } // send msg if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { sSError(pSender, "snapshot sender send msg failed since %s", terrstr()); - return -1; + goto _OUT; } - pSender->lastSendTime = taosGetTimestampMs(); - return 0; + // put in buffer + int64_t nowMs = taosGetTimestampMs(); + if (pBlk) { + ASSERT(pBlk->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pBlk->seq < SYNC_SNAPSHOT_SEQ_END); + pBlk->sendTimeMs = nowMs; + pSender->pSndBuf->entries[pSender->seq % pSender->pSndBuf->size] = pBlk; + pBlk = NULL; + pSender->pSndBuf->end = TMAX(pSender->seq + 1, pSender->pSndBuf->end); + } + pSender->lastSendTime = nowMs; + code = 0; + +_OUT:; + if (pBlk != NULL) { + syncSnapBlockDestroy(pBlk); + pBlk = NULL; + } + return code; } // send snapshot data from cache int32_t snapshotReSend(SSyncSnapshotSender *pSender) { - // build msg - SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) { - sSError(pSender, "snapshot sender build msg failed since %s", terrstr()); - return -1; + SSyncSnapBuffer *pSndBuf = pSender->pSndBuf; + int32_t code = -1; + taosThreadMutexLock(&pSndBuf->mutex); + + for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) { + SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size]; + ASSERT(pBlk && !pBlk->acked); + int64_t nowMs = taosGetTimestampMs(); + if (nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) { + continue; + } + // build msg + SRpcMsg rpcMsg = {0}; + if (syncBuildSnapshotSend(&rpcMsg, pBlk->blockLen, pSender->pSyncNode->vgId) != 0) { + sSError(pSender, "snapshot sender build msg failed since %s", terrstr()); + goto _out; + } + + SyncSnapshotSend *pMsg = rpcMsg.pCont; + pMsg->srcId = pSender->pSyncNode->myRaftId; + pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; + pMsg->term = pSender->term; + pMsg->beginIndex = pSender->snapshotParam.start; + pMsg->lastIndex = pSender->snapshot.lastApplyIndex; + pMsg->lastTerm = pSender->snapshot.lastApplyTerm; + pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; + pMsg->lastConfig = pSender->lastConfig; + pMsg->startTime = pSender->startTime; + pMsg->seq = pBlk->seq; + + if (pBlk->pBlock != NULL && pBlk->blockLen > 0) { + memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen); + } + + // send msg + if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { + sSError(pSender, "snapshot sender resend msg failed since %s", terrstr()); + goto _out; + } + pBlk->sendTimeMs = nowMs; } - - SyncSnapshotSend *pMsg = rpcMsg.pCont; - pMsg->srcId = pSender->pSyncNode->myRaftId; - pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - pMsg->term = raftStoreGetTerm(pSender->pSyncNode); - pMsg->beginIndex = pSender->snapshotParam.start; - pMsg->lastIndex = pSender->snapshot.lastApplyIndex; - pMsg->lastTerm = pSender->snapshot.lastApplyTerm; - pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; - pMsg->lastConfig = pSender->lastConfig; - pMsg->startTime = pSender->startTime; - pMsg->seq = pSender->seq; - - if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) { - memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); - } - - // event log - syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender resend"); - - // send msg - if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { - sSError(pSender, "snapshot sender resend msg failed since %s", terrstr()); - return -1; - } - - pSender->lastSendTime = taosGetTimestampMs(); - return 0; -} - -static int32_t snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { - if (pMsg->ack != pSender->seq) { - sSError(pSender, "snapshot sender update seq failed, ack:%d seq:%d", pMsg->ack, pSender->seq); - terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; - return -1; - } - - pSender->ack = pMsg->ack; - pSender->seq++; - - sSDebug(pSender, "snapshot sender update seq:%d", pSender->seq); - return 0; + code = 0; +_out:; + taosThreadMutexUnlock(&pSndBuf->mutex); + return code; } // return 0, start ok @@ -328,8 +396,6 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { return 0; } - sSInfo(pSender, "snapshot sender start"); - int32_t code = snapshotSenderStart(pSender); if (code != 0) { sSError(pSender, "snapshot sender start error since %s", terrstr()); @@ -362,6 +428,16 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from pReceiver->snapshot.lastApplyTerm = 0; pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID; + SSyncSnapBuffer *pRcvBuf = syncSnapBufferCreate(); + if (pRcvBuf == NULL) { + taosMemoryFree(pReceiver); + pReceiver = NULL; + return NULL; + } + pRcvBuf->entryDeleteCb = rpcFreeCont; + pReceiver->pRcvBuf = pRcvBuf; + + syncSnapBufferReset(pReceiver->pRcvBuf); return pReceiver; } @@ -389,6 +465,11 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { pReceiver->snapshot.data = NULL; } + // free snap buf + if (pReceiver->pRcvBuf) { + syncSnapBufferDestroy(&pReceiver->pRcvBuf); + } + // free receiver taosMemoryFree(pReceiver); } @@ -444,20 +525,19 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p int8_t started = atomic_val_compare_exchange_8(&pReceiver->start, false, true); if (started) return; - pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT; + pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP; pReceiver->term = pPreMsg->term; pReceiver->fromId = pPreMsg->srcId; pReceiver->startTime = pPreMsg->startTime; ASSERT(pReceiver->startTime); - // event log - sRInfo(pReceiver, "snapshot receiver is start"); + sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId)); } // just set start = false // FpSnapshotStopWrite should not be called void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { - sRInfo(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter); + sRDebug(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter); int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false); if (stopped) return; @@ -472,6 +552,8 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { } else { sRInfo(pReceiver, "snapshot receiver stop, writer is null"); } + + syncSnapBufferReset(pReceiver->pRcvBuf); } // when recv last snapshot block, apply data into snapshot @@ -479,7 +561,7 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap int32_t code = 0; if (pReceiver->pWriter != NULL) { // write data - sRInfo(pReceiver, "snapshot receiver write finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq); + sRInfo(pReceiver, "snapshot receiver write about to finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq); if (pMsg->dataLen > 0) { code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); @@ -489,15 +571,6 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap } } - // reset wal - sRInfo(pReceiver, "snapshot receiver log restore"); - code = - pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex); - if (code != 0) { - sRError(pReceiver, "failed to snapshot receiver log restore since %s", terrstr()); - return -1; - } - // update commit index if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) { pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex; @@ -509,7 +582,6 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap } // stop writer, apply data - sRInfo(pReceiver, "snapshot receiver apply write"); code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true, &pReceiver->snapshot); if (code != 0) { @@ -517,21 +589,29 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap return -1; } pReceiver->pWriter = NULL; + sRInfo(pReceiver, "snapshot receiver write stopped"); // update progress pReceiver->ack = SYNC_SNAPSHOT_SEQ_END; + // get fsmState SSnapshot snapshot = {0}; pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot); pReceiver->pSyncNode->fsmState = snapshot.state; + // reset wal + code = + pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex); + if (code != 0) { + sRError(pReceiver, "failed to snapshot receiver log restore since %s", terrstr()); + return -1; + } + sRInfo(pReceiver, "wal log restored from snapshot"); } else { sRError(pReceiver, "snapshot receiver finish error since writer is null"); return -1; } - // event log - sRInfo(pReceiver, "snapshot receiver got last data and apply snapshot finished"); return 0; } @@ -599,22 +679,22 @@ static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pM int32_t order = 0; if ((order = snapshotReceiverSignatureCmp(pReceiver, pMsg)) < 0) { sRInfo(pReceiver, - "received a new snapshot preparation. restart receiver" - "receiver signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")", - pReceiver->term, pReceiver->startTime, pMsg->term, pMsg->startTime); + "received a new snapshot preparation. restart receiver." + " msg signature:(%" PRId64 ", %" PRId64 ")", + pMsg->term, pMsg->startTime); goto _START_RECEIVER; } else if (order == 0) { sRInfo(pReceiver, - "received a duplicate snapshot preparation. send reply" - "receiver signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")", - pReceiver->term, pReceiver->startTime, pMsg->term, pMsg->startTime); + "received a duplicate snapshot preparation. send reply." + " msg signature:(%" PRId64 ", %" PRId64 ")", + pMsg->term, pMsg->startTime); goto _SEND_REPLY; } else { // ignore sRError(pReceiver, - "received a stale snapshot preparation. ignore" - "receiver signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")", - pReceiver->term, pReceiver->startTime, pMsg->term, pMsg->startTime); + "received a stale snapshot preparation. ignore." + " msg signature:(%" PRId64 ", %" PRId64 ")", + pMsg->term, pMsg->startTime); terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; code = terrno; goto _SEND_REPLY; @@ -765,29 +845,8 @@ _SEND_REPLY: return code; } -static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { - // condition 4 - // transfering - SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; - int64_t timeNow = taosGetTimestampMs(); - int32_t code = 0; - - if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) { - terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; - sRError(pReceiver, "failed to receive snapshot data since %s.", terrstr()); - code = terrno; - goto _SEND_REPLY; - } - - if (snapshotReceiverGotData(pReceiver, pMsg) != 0) { - code = terrno; - if (code >= SYNC_SNAPSHOT_SEQ_INVALID) { - code = TSDB_CODE_SYN_INTERNAL_ERROR; - } - } - -_SEND_REPLY:; - +static int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, int32_t code) { + SSyncNode *pSyncNode = pReceiver->pSyncNode; // build msg SRpcMsg rpcMsg = {0}; if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) { @@ -811,10 +870,79 @@ _SEND_REPLY:; sRError(pReceiver, "failed to send snapshot receiver resp since %s", terrstr()); return -1; } + return 0; +} +static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend **ppMsg) { + int32_t code = 0; + SSyncSnapBuffer *pRcvBuf = pReceiver->pRcvBuf; + SyncSnapshotSend *pMsg = ppMsg[0]; + terrno = TSDB_CODE_SUCCESS; + + taosThreadMutexLock(&pRcvBuf->mutex); + + if (pMsg->seq - pRcvBuf->start >= pRcvBuf->size) { + terrno = TSDB_CODE_SYN_BUFFER_FULL; + code = terrno; + goto _out; + } + + ASSERT(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end); + + if (pMsg->seq > pRcvBuf->cursor) { + pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg; + ppMsg[0] = NULL; + pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end); + } else { + syncSnapSendRsp(pReceiver, pMsg, code); + goto _out; + } + + for (int64_t seq = pRcvBuf->cursor + 1; seq < pRcvBuf->end; ++seq) { + if (pRcvBuf->entries[seq]) { + pRcvBuf->cursor = seq; + } else { + break; + } + } + + for (int64_t seq = pRcvBuf->start; seq <= pRcvBuf->cursor; ++seq) { + if (snapshotReceiverGotData(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size]) != 0) { + code = terrno; + if (code >= SYNC_SNAPSHOT_SEQ_INVALID) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + } + } + pRcvBuf->start = seq + 1; + syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], code); + pRcvBuf->entryDeleteCb(pRcvBuf->entries[seq % pRcvBuf->size]); + pRcvBuf->entries[seq % pRcvBuf->size] = NULL; + if (code) goto _out; + } + +_out: + taosThreadMutexUnlock(&pRcvBuf->mutex); return code; } +static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend **ppMsg) { + // condition 4 + // transfering + SyncSnapshotSend *pMsg = ppMsg[0]; + ASSERT(pMsg); + SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; + int64_t timeNow = taosGetTimestampMs(); + int32_t code = 0; + + if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) { + terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; + sRError(pReceiver, "failed to receive snapshot data since %s.", terrstr()); + return syncSnapSendRsp(pReceiver, pMsg, terrno); + } + + return syncSnapBufferRecv(pReceiver, ppMsg); +} + static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // condition 2 // end, finish FSM @@ -867,7 +995,7 @@ _SEND_REPLY:; // receiver on message // -// condition 1, recv SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT +// condition 1, recv SYNC_SNAPSHOT_SEQ_PREP // if receiver already start // if sender.start-time > receiver.start-time, restart receiver(reply snapshot start) // if sender.start-time = receiver.start-time, maybe duplicate msg @@ -885,9 +1013,11 @@ _SEND_REPLY:; // // condition 5, got data, update ack // -int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { - SyncSnapshotSend *pMsg = pRpcMsg->pCont; +int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { + SyncSnapshotSend **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont; + SyncSnapshotSend *pMsg = ppMsg[0]; SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; + int32_t code = 0; // if already drop replica, do not process if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) { @@ -911,49 +1041,56 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { syncNodeUpdateTermWithoutStepDown(pSyncNode, pMsg->term); } - // state, term, seq/ack - int32_t code = 0; - if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER || pSyncNode->state == TAOS_SYNC_STATE_LEARNER) { - if (pMsg->term == raftStoreGetTerm(pSyncNode)) { - if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) { - sInfo("vgId:%d, receive pre-snapshot msg of snapshot replication. signature:(%" PRId64 ", %" PRId64 ")", - pSyncNode->vgId, pMsg->term, pMsg->startTime); - code = syncNodeOnSnapshotPrep(pSyncNode, pMsg); - } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { - sInfo("vgId:%d, receive begin msg of snapshot replication. signature:(%" PRId64 ", %" PRId64 ")", - pSyncNode->vgId, pMsg->term, pMsg->startTime); - code = syncNodeOnSnapshotBegin(pSyncNode, pMsg); - } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { - sInfo("vgId:%d, receive end msg of snapshot replication. signature: (%" PRId64 ", %" PRId64 ")", - pSyncNode->vgId, pMsg->term, pMsg->startTime); - code = syncNodeOnSnapshotEnd(pSyncNode, pMsg); - if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) { - sRError(pReceiver, "failed to reinit log buffer since %s", terrstr()); - code = -1; - } - } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { - // force close, no response - syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop"); - snapshotReceiverStop(pReceiver); - } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { - syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data"); - code = syncNodeOnSnapshotReceive(pSyncNode, pMsg); - } else { - // error log - sRError(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack); - code = -1; - } - } else { - // error log - sRError(pReceiver, "snapshot receiver term not equal"); - code = -1; - } - } else { - // error log - sRError(pReceiver, "snapshot receiver not follower"); - code = -1; + if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER && pSyncNode->state != TAOS_SYNC_STATE_LEARNER) { + sRError(pReceiver, "snapshot receiver not a follower or learner"); + return -1; } + if (pMsg->seq < SYNC_SNAPSHOT_SEQ_PREP || pMsg->seq > SYNC_SNAPSHOT_SEQ_END) { + sRError(pReceiver, "snap replication msg with invalid seq:%d", pMsg->seq); + return -1; + } + + // prepare + if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP) { + sInfo("vgId:%d, prepare snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term, + pMsg->startTime); + code = syncNodeOnSnapshotPrep(pSyncNode, pMsg); + goto _out; + } + + // begin + if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { + sInfo("vgId:%d, begin snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term, + pMsg->startTime); + code = syncNodeOnSnapshotBegin(pSyncNode, pMsg); + goto _out; + } + + // data + if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { + code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg); + goto _out; + } + + // end + if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { + sInfo("vgId:%d, end snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term, + pMsg->startTime); + code = syncNodeOnSnapshotEnd(pSyncNode, pMsg); + if (code != 0) { + sRError(pReceiver, "failed to end snapshot."); + goto _out; + } + + code = syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode); + if (code != 0) { + sRError(pReceiver, "failed to reinit log buffer since %s", terrstr()); + } + goto _out; + } + +_out:; syncNodeResetElectTimer(pSyncNode); return code; } @@ -993,41 +1130,7 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend // update next index syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1); - // update seq - pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; - - // build begin msg - SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) { - sSError(pSender, "prepare snapshot failed since build msg error"); - return -1; - } - - SyncSnapshotSend *pSendMsg = rpcMsg.pCont; - pSendMsg->srcId = pSender->pSyncNode->myRaftId; - pSendMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - pSendMsg->term = raftStoreGetTerm(pSender->pSyncNode); - pSendMsg->beginIndex = pSender->snapshotParam.start; - pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex; - pSendMsg->lastTerm = pSender->snapshot.lastApplyTerm; - pSendMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; - pSendMsg->lastConfig = pSender->lastConfig; - pSendMsg->startTime = pSender->startTime; - pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN; - - ASSERT(pSendMsg->startTime); - - sSInfo(pSender, "begin snapshot replication to dnode %d. startTime:%" PRId64, DID(&pSendMsg->destId), - pSendMsg->startTime); - - // send msg - syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "snapshot sender reply pre"); - if (syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { - sSError(pSender, "prepare snapshot failed since send msg error"); - return -1; - } - - return 0; + return snapshotSend(pSender); } static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { @@ -1038,14 +1141,77 @@ static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnap return 0; } +static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp **ppMsg) { + int32_t code = 0; + SSyncSnapBuffer *pSndBuf = pSender->pSndBuf; + SyncSnapshotRsp *pMsg = ppMsg[0]; + + taosThreadMutexLock(&pSndBuf->mutex); + if (snapshotSenderSignatureCmp(pSender, pMsg) != 0) { + code = terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; + goto _out; + } + + if (pSender->pReader == NULL || pSender->finish) { + code = terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } + + if (pMsg->ack - pSndBuf->start >= pSndBuf->size) { + code = terrno = TSDB_CODE_SYN_BUFFER_FULL; + goto _out; + } + + ASSERT(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end); + + if (pMsg->ack > pSndBuf->cursor && pMsg->ack < pSndBuf->end) { + SyncSnapBlock *pBlk = pSndBuf->entries[pMsg->ack % pSndBuf->size]; + ASSERT(pBlk); + pBlk->acked = 1; + } + + for (int64_t ack = pSndBuf->cursor + 1; ack < pSndBuf->end; ++ack) { + SyncSnapBlock *pBlk = pSndBuf->entries[ack % pSndBuf->size]; + if (pBlk->acked) { + pSndBuf->cursor = ack; + } else { + break; + } + } + + for (int64_t ack = pSndBuf->start; ack <= pSndBuf->cursor; ++ack) { + pSndBuf->entryDeleteCb(pSndBuf->entries[ack % pSndBuf->size]); + pSndBuf->entries[ack % pSndBuf->size] = NULL; + pSndBuf->start = ack + 1; + } + + while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < (pSndBuf->size >> 2)) { + if (snapshotSend(pSender) != 0) { + code = terrno; + goto _out; + } + } + + if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) { + if (snapshotSend(pSender) != 0) { + code = terrno; + goto _out; + } + } +_out: + taosThreadMutexUnlock(&pSndBuf->mutex); + return code; +} + // sender on message // // condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender // condition 2 sender receives ack, set seq = ack + 1, send msg from seq // condition 3 sender receives error msg, just print error log // -int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { - SyncSnapshotRsp *pMsg = pRpcMsg->pCont; +int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { + SyncSnapshotRsp **ppMsg = (SyncSnapshotRsp **)&pRpcMsg->pCont; + SyncSnapshotRsp *pMsg = ppMsg[0]; // if already drop replica, do not process if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) { @@ -1071,10 +1237,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { // check signature int32_t order = 0; if ((order = snapshotSenderSignatureCmp(pSender, pMsg)) > 0) { - sSError(pSender, - "received a stale snapshot rsp. ignore it" - "sender signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")", - pSender->term, pSender->startTime, pMsg->term, pMsg->startTime); + sSWarn(pSender, "ignore a stale snap rsp, msg signature:(%" PRId64 ", %" PRId64 ").", pMsg->term, pMsg->startTime); terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; return -1; } else if (order < 0) { @@ -1083,9 +1246,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { goto _ERROR; } - // state, term, seq/ack if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender not leader"); sSError(pSender, "snapshot sender not leader"); terrno = TSDB_CODE_SYN_NOT_LEADER; goto _ERROR; @@ -1093,83 +1254,46 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { SyncTerm currentTerm = raftStoreGetTerm(pSyncNode); if (pMsg->term != currentTerm) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match"); - sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term, + sSError(pSender, "snapshot sender term mismatch, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term, currentTerm); terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; goto _ERROR; } if (pMsg->code != 0) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code"); sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code); terrno = pMsg->code; goto _ERROR; } - // prepare , send begin msg - if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot"); - return syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg); - } - - if (pSender->pReader == NULL || pSender->finish) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender invalid"); - sSError(pSender, "snapshot sender invalid error:%s 0x%x, pReader:%p finish:%d", tstrerror(pMsg->code), pMsg->code, - pSender->pReader, pSender->finish); - terrno = pMsg->code; - goto _ERROR; - } - - if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin"); - if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) { - return -1; + // send begin + if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP) { + sSInfo(pSender, "process prepare rsp"); + if (syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg) != 0) { + goto _ERROR; } - - if (snapshotSend(pSender) != 0) { - return -1; - } - return 0; } - // receive ack is finish, close sender + // send msg of data or end + if (pMsg->ack >= SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->ack < SYNC_SNAPSHOT_SEQ_END) { + if (syncSnapBufferSend(pSender, ppMsg) != 0) { + sSError(pSender, "failed to replicate snap since %s. seq:%d, pReader:%p, finish:%d", terrstr(), pSender->seq, + pSender->pReader, pSender->finish); + goto _ERROR; + } + } + + // end if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end"); + sSInfo(pSender, "process end rsp"); snapshotSenderStop(pSender, true); syncNodeReplicateReset(pSyncNode, &pMsg->srcId); - return 0; - } - - // send next msg - if (pMsg->ack == pSender->seq) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq data"); - // update sender ack - if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) { - return -1; - } - if (snapshotSend(pSender) != 0) { - return -1; - } - } else if (pMsg->ack == pSender->seq - 1) { - // maybe resend - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend"); - if (snapshotReSend(pSender) != 0) { - return -1; - } - } else { - // error log - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error ack"); - sSError(pSender, "snapshot sender receive error ack:%d, my seq:%d", pMsg->ack, pSender->seq); - snapshotSenderStop(pSender, true); - syncNodeReplicateReset(pSyncNode, &pMsg->srcId); - return -1; } return 0; _ERROR: - snapshotSenderStop(pSender, true); + snapshotSenderStop(pSender, false); syncNodeReplicateReset(pSyncNode, &pMsg->srcId); return -1; } diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 37166805ce..a57dfbee53 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -77,12 +77,19 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) { for (int i = 0; i < ths->peersNum; ++i) { SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(ths->peersId[i])); if (pSender != NULL) { - if (ths->isStart && ths->state == TAOS_SYNC_STATE_LEADER && pSender->start && - timeNow - pSender->lastSendTime > SYNC_SNAP_RESEND_MS) { - snapshotReSend(pSender); - } else { - sTrace("vgId:%d, do not resend: nstart%d, now:%" PRId64 ", lstsend:%" PRId64 ", diff:%" PRId64, ths->vgId, - ths->isStart, timeNow, pSender->lastSendTime, timeNow - pSender->lastSendTime); + if (ths->isStart && ths->state == TAOS_SYNC_STATE_LEADER && pSender->start) { + int64_t elapsedMs = timeNow - pSender->lastSendTime; + if (elapsedMs < SYNC_SNAP_RESEND_MS) { + continue; + } + + if (elapsedMs > SYNC_SNAP_TIMEOUT_MS) { + sSError(pSender, "snap replication timeout, terminate."); + snapshotSenderStop(pSender, false); + } else { + sSWarn(pSender, "snap replication resend."); + snapshotReSend(pSender); + } } } } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 9acc17e130..9e6ea94e78 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -267,21 +267,23 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla va_end(argpointer); taosPrintLog(flags, level, dflag, - "vgId:%d, %s, sync:%s, snap-sender:{%p start:%" PRId64 " end:%" PRId64 " last-index:%" PRId64 - " last-term:%" PRIu64 " last-cfg:%" PRId64 - ", seq:%d ack:%d finish:%d, as:%d dnode:%d}" + "vgId:%d, %s, sync:%s, snap-sender:%p signature:(%" PRId64 ", %" PRId64 "), {start:%" PRId64 + " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64 + ", seq:%d, ack:%d, " + " buf:[%" PRId64 " %" PRId64 ", %" PRId64 + "), finish:%d, as:%d, to-dnode:%d}" ", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", min-match:%" PRId64 ", snap:{last-index:%" PRId64 ", term:%" PRIu64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64 - ", chging:%d, restore:%d, quorum:%d, lc-timer:{elect:%" PRId64 ", hb:%" PRId64 "}, peer:%s, cfg:%s", - pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->snapshotParam.start, - pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, - pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex, - DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode), pNode->commitIndex, - logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, - pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, - pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, - pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); + ", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s", + pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->term, pSender->startTime, + pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, + pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, + pSender->pSndBuf->start, pSender->pSndBuf->cursor, pSender->pSndBuf->end, pSender->finish, + pSender->replicaIndex, DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode), + pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, + snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum, + pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->quorum, peerStr, cfgStr); } void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver, @@ -316,19 +318,21 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df taosPrintLog( flags, level, dflag, "vgId:%d, %s, sync:%s," - " snap-receiver:{%p started:%d acked:%d term:%" PRIu64 " start-time:%" PRId64 " from-dnode:%d, start:%" PRId64 - " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64 + " snap-receiver:%p signature:(%" PRId64 ", %" PRId64 "), {start:%d ack:%d buf:[%" PRId64 " %" PRId64 ", %" PRId64 + ")" + " from-dnode:%d, start:%" PRId64 " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64 "}" ", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", min-match:%" PRId64 ", snap:{last-index:%" PRId64 ", last-term:%" PRIu64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64 - ", chging:%d, restore:%d, quorum:%d, lc-timers:{elect:%" PRId64 ", hb:%" PRId64 "}, peer:%s, cfg:%s", - pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, - pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end, + ", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s", + pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->term, pReceiver->startTime, pReceiver->start, + pReceiver->ack, pReceiver->pRcvBuf->start, pReceiver->pRcvBuf->cursor, pReceiver->pRcvBuf->end, + DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex, raftStoreGetTerm(pNode), pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, - pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, - syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); + pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->quorum, peerStr, + cfgStr); } void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) { diff --git a/tests/script/tsim/stream/sliding.sim b/tests/script/tsim/stream/sliding.sim index 18893245fa..a92da7f472 100644 --- a/tests/script/tsim/stream/sliding.sim +++ b/tests/script/tsim/stream/sliding.sim @@ -21,6 +21,7 @@ sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 in sql create stream streams2 trigger at_once watermark 1d IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s) sliding (5s); sql create stream stream_t1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s) sliding (5s); sql create stream stream_t2 trigger at_once watermark 1d IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s) sliding (5s); +sleep 1000 sql insert into t1 values(1648791210000,1,2,3,1.0); sql insert into t1 values(1648791216000,2,2,3,1.1); @@ -311,6 +312,7 @@ sql create table t2 using st tags(2,2,2); sql create stream streams11 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s, 5s); sql create stream streams12 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s); +sleep 1000 sql insert into t1 values(1648791213000,1,2,3,1.0); sql insert into t1 values(1648791223001,2,2,3,1.1); @@ -444,6 +446,7 @@ sql create table t2 using st tags(2,2,2); sql create stream streams21 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt21 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s, 5s); sql create stream streams22 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt22 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s); +sleep 1000 sql insert into t1 values(1648791213000,1,1,1,1.0); sql insert into t1 values(1648791223001,2,2,2,1.1); @@ -582,6 +585,7 @@ sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); sql create stream streams23 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt23 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(20s) sliding(10s); +sleep 1000 sql insert into t1 values(1648791213000,1,1,1,1.0); sql insert into t1 values(1648791223001,2,2,2,1.1); @@ -706,6 +710,7 @@ sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _wstart as ts, count(*),min(a) c1 from st interval(10s) sliding(5s); +sleep 1000 sql insert into t1 values(1648791213000,1,1,1,1.0); sql insert into t1 values(1648791243000,2,1,1,1.0); @@ -818,4 +823,4 @@ print ============loop_all=$loop_all #=goto looptest -system sh/stop_dnodes.sh \ No newline at end of file +system sh/stop_dnodes.sh