diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 6fb9c75e6b..3b38362647 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -221,7 +221,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MON_QM_LOAD, "monitor-qload", NULL, NULL) TD_NEW_MSG_SEG(TDMT_SYNC_MSG) - TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timeout", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timer", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_PING, "sync-ping", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST, "sync-client-request", NULL, NULL) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 10ece0b219..9d1385bff2 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -83,8 +83,10 @@ typedef struct SReConfigCbMeta { SyncTerm term; SyncTerm currentTerm; SSyncCfg oldCfg; + SSyncCfg newCfg; bool isDrop; uint64_t flag; + uint64_t seqNum; } SReConfigCbMeta; typedef struct SSnapshot { @@ -106,7 +108,7 @@ typedef struct SSyncFSM { void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm); - void (*FpReConfigCb)(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta); + void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); @@ -184,7 +186,6 @@ int64_t syncOpen(const SSyncInfo* pSyncInfo); void syncStart(int64_t rid); void syncStop(int64_t rid); int32_t syncSetStandby(int64_t rid); -int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); ESyncState syncGetMyRole(int64_t rid); const char* syncGetMyRoleStr(int64_t rid); SyncTerm syncGetMyTerm(int64_t rid); @@ -194,8 +195,10 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); bool syncEnvIsStart(); const char* syncStr(ESyncState state); bool syncIsRestoreFinish(int64_t rid); +int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); -int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); +int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg); +int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg); // to be moved to static void syncStartNormal(int64_t rid); diff --git a/source/dnode/mnode/impl/inc/mndScheduler.h b/source/dnode/mnode/impl/inc/mndScheduler.h index 05aea3f68c..fa5e20ab6f 100644 --- a/source/dnode/mnode/impl/inc/mndScheduler.h +++ b/source/dnode/mnode/impl/inc/mndScheduler.h @@ -27,7 +27,7 @@ void mndCleanupScheduler(SMnode* pMnode); int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub); -int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); +int32_t mndScheduleStream1(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); int32_t mndConvertRSmaTask(const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, char** pStr, int32_t* pLen, double filesFactor); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 27d13d66b6..d8a61461dd 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -403,6 +403,10 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { char logBuf[512] = {0}; char *syncNodeStr = sync2SimpleStr(pMgmt->sync); snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); + static int64_t mndTick = 0; + if (++mndTick % 10 == 1) { + mTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr); + } syncRpcMsgLog2(logBuf, pMsg); taosMemoryFree(syncNodeStr); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index d07355dfa9..deb7382183 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -127,6 +127,61 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet return 0; } +int32_t mndAddSinkToTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SStreamTask* pTask) { + pTask->dispatchType = TASK_DISPATCH__NONE; + // sink + if (pStream->createdBy == STREAM_CREATED_BY__SMA) { + pTask->sinkType = TASK_SINK__SMA; + pTask->smaSink.smaId = pStream->smaId; + } else { + pTask->sinkType = TASK_SINK__TABLE; + pTask->tbSink.stbUid = pStream->targetStbUid; + memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); + pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); + } + return 0; +} + +int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SStreamTask* pTask) { + pTask->sinkType = TASK_SINK__NONE; + if (pStream->fixedSinkVgId == 0) { + pTask->dispatchType = TASK_DISPATCH__SHUFFLE; + pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; + SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb); + ASSERT(pDb); + + if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { + sdbRelease(pMnode->pSdb, pDb); + + SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t sz = taosArrayGetSize(pVgs); + SArray* sinkLv = taosArrayGetP(pStream->tasks, 0); + int32_t sinkLvSize = taosArrayGetSize(sinkLv); + for (int32_t i = 0; i < sz; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); + for (int32_t j = 0; j < sinkLvSize; j++) { + SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); + if (pLastLevelTask->nodeId == pVgInfo->vgId) { + pVgInfo->taskId = pLastLevelTask->taskId; + break; + } + } + } + } else { + pTask->dispatchType = TASK_DISPATCH__FIXED; + pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; + SArray* pArray = taosArrayGetP(pStream->tasks, 0); + // one sink only + ASSERT(taosArrayGetSize(pArray) == 1); + SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); + pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; + pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; + pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; + } + } + return 0; +} + int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) { int32_t msgLen; pTask->nodeId = pVgroup->vgId; @@ -139,6 +194,7 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } + ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE || pTask->sinkType != TASK_SINK__NONE); mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, pVgroup->vgId); return 0; } @@ -182,7 +238,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { return pVgroup; } -int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { +int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SSdb* pSdb = pMnode->pSdb; void* pIter = NULL; SArray* tasks = taosArrayGetP(pStream->tasks, 0); @@ -234,7 +290,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p return 0; } -int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { +int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ASSERT(pStream->fixedSinkVgId != 0); SArray* tasks = taosArrayGetP(pStream->tasks, 0); SStreamTask* pTask = tNewSStreamTask(pStream->uid); @@ -279,6 +335,146 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr return 0; } +int32_t mndScheduleStream1(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { + SSdb* pSdb = pMnode->pSdb; + SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); + if (pPlan == NULL) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } + ASSERT(pStream->vgNum == 0); + + int32_t totLevel = LIST_LENGTH(pPlan->pSubplans); + ASSERT(totLevel <= 2); + pStream->tasks = taosArrayInit(totLevel, sizeof(void*)); + + bool hasExtraSink = false; + bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; + if (totLevel == 2 || externalTargetDB) { + SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); + taosArrayPush(pStream->tasks, &taskOneLevel); + // add extra sink + hasExtraSink = true; + if (pStream->fixedSinkVgId == 0) { + mndAddShuffleSinkTasksToStream(pMnode, pTrans, pStream); + } else { + mndAddFixedSinkTaskToStream(pMnode, pTrans, pStream); + } + } + if (totLevel > 1) { + SStreamTask* pFinalTask; + // inner plan + { + SArray* taskInnerLevel = taosArrayInit(0, sizeof(void*)); + taosArrayPush(pStream->tasks, &taskInnerLevel); + + SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0); + SSubplan* plan = nodesListGetNode(inner->pNodeList, 0); + ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE); + + pFinalTask = tNewSStreamTask(pStream->uid); + mndAddTaskToTaskSet(taskInnerLevel, pFinalTask); + // input + pFinalTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; + + // dispatch + mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask); + + // exec + pFinalTask->execType = TASK_EXEC__PIPE; + SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->dbUid); + if (mndAssignTaskToVg(pMnode, pTrans, pFinalTask, plan, pVgroup) < 0) { + sdbRelease(pSdb, pVgroup); + qDestroyQueryPlan(pPlan); + return -1; + } + } + + // source plan + SArray* taskSourceLevel = taosArrayInit(0, sizeof(void*)); + taosArrayPush(pStream->tasks, &taskSourceLevel); + + SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 1); + SSubplan* plan = nodesListGetNode(inner->pNodeList, 0); + ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN); + + void* pIter = NULL; + while (1) { + SVgObj* pVgroup; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); + if (pIter == NULL) break; + if (pVgroup->dbUid != pStream->dbUid) { + sdbRelease(pSdb, pVgroup); + continue; + } + SStreamTask* pTask = tNewSStreamTask(pStream->uid); + mndAddTaskToTaskSet(taskSourceLevel, pTask); + + // input + pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK; + + // add fixed vg dispatch + pTask->sinkType = TASK_SINK__NONE; + pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; + pTask->dispatchType = TASK_DISPATCH__FIXED; + + pTask->fixedEpDispatcher.taskId = pFinalTask->taskId; + pTask->fixedEpDispatcher.nodeId = pFinalTask->nodeId; + pTask->fixedEpDispatcher.epSet = pFinalTask->epSet; + + // exec + pTask->execType = TASK_EXEC__PIPE; + if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { + sdbRelease(pSdb, pVgroup); + qDestroyQueryPlan(pPlan); + return -1; + } + } + } + + if (totLevel == 1) { + SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); + taosArrayPush(pStream->tasks, &taskOneLevel); + + SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0); + ASSERT(LIST_LENGTH(inner->pNodeList) == 1); + SSubplan* plan = nodesListGetNode(inner->pNodeList, 0); + ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN); + + void* pIter = NULL; + while (1) { + SVgObj* pVgroup; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); + if (pIter == NULL) break; + if (pVgroup->dbUid != pStream->dbUid) { + sdbRelease(pSdb, pVgroup); + continue; + } + SStreamTask* pTask = tNewSStreamTask(pStream->uid); + mndAddTaskToTaskSet(taskOneLevel, pTask); + + // input + pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK; + + // sink or dispatch + if (hasExtraSink) { + mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pTask); + } else { + mndAddSinkToTask(pMnode, pTrans, pStream, pTask); + } + + // exec + pTask->execType = TASK_EXEC__PIPE; + if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { + sdbRelease(pSdb, pVgroup); + qDestroyQueryPlan(pPlan); + return -1; + } + } + } + return 0; +} + int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SSdb* pSdb = pMnode->pSdb; SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); @@ -300,14 +496,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // add extra sink hasExtraSink = true; if (pStream->fixedSinkVgId == 0) { - mndAddShuffledSinkToStream(pMnode, pTrans, pStream); + mndAddShuffleSinkTasksToStream(pMnode, pTrans, pStream); } else { - mndAddFixedSinkToStream(pMnode, pTrans, pStream); + mndAddFixedSinkTaskToStream(pMnode, pTrans, pStream); } } for (int32_t level = 0; level < totLevel; level++) { - SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); + SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); + taosArrayPush(pStream->tasks, &taskOneLevel); SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level); ASSERT(LIST_LENGTH(inner->pNodeList) == 1); @@ -357,18 +554,17 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { } // dispatch part - if (level == 0) { + if (level == 0 && !hasExtraSink) { pTask->dispatchType = TASK_DISPATCH__NONE; } else { // add fixed ep dispatcher int32_t lastLevel = level - 1; - ASSERT(lastLevel == 0); if (hasExtraSink) lastLevel++; + ASSERT(lastLevel == 0); SArray* pArray = taosArrayGetP(pStream->tasks, lastLevel); // one merge only ASSERT(taosArrayGetSize(pArray) == 1); SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); - /*pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;*/ pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; pTask->dispatchType = TASK_DISPATCH__FIXED; @@ -465,8 +661,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { } sdbRelease(pSdb, pVgroup); } - - taosArrayPush(pStream->tasks, &taskOneLevel); } #if 0 diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7abe9e3c0d..4a2c0a59a1 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -269,7 +269,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast return -1; } - if (mndScheduleStream(pMnode, pTrans, pStream) < 0) { + if (mndScheduleStream1(pMnode, pTrans, pStream) < 0) { mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr()); return -1; } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index a0daa72d9a..a0f722b3d2 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -96,10 +96,18 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) { } } -void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { +void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) { SMnode *pMnode = pFsm->data; SSyncMgmt *pMgmt = &pMnode->syncMgmt; +#if 0 +// send response + SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index}; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); + syncGetAndDelRespRpc(pMnode->syncMgmt.sync, cbMeta.seqNum, &rpcMsg.info); +#endif + pMgmt->errCode = cbMeta.code; mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index e7a744748b..5a8564bfd1 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -58,7 +58,7 @@ struct STqReadHandle { SArray* pColIdList; // SArray int32_t cachedSchemaVer; - int64_t cachedSchemaUid; + int64_t cachedSchemaSuid; SSchemaWrapper* pSchemaWrapper; STSchema* pSchema; }; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 2ecaeff747..0c38d6442b 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -67,7 +67,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { pReadHandle->ver = -1; pReadHandle->pColIdList = NULL; pReadHandle->cachedSchemaVer = -1; - pReadHandle->cachedSchemaUid = -1; + pReadHandle->cachedSchemaSuid = -1; pReadHandle->pSchema = NULL; pReadHandle->pSchemaWrapper = NULL; pReadHandle->tbIdHash = NULL; @@ -130,7 +130,8 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p // TODO set to real sversion /*int32_t sversion = 1;*/ int32_t sversion = htonl(pHandle->pBlock->sversion); - if (pHandle->cachedSchemaVer != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) { + if (pHandle->cachedSchemaSuid == 0 || pHandle->cachedSchemaVer != sversion || + pHandle->cachedSchemaSuid != pHandle->msgIter.suid) { pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion); if (pHandle->pSchema == NULL) { tqWarn("cannot found tsschema for table: uid: %ld (suid: %ld), version %d, possibly dropped table", @@ -150,7 +151,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p return -1; } pHandle->cachedSchemaVer = sversion; - pHandle->cachedSchemaUid = pHandle->msgIter.suid; + pHandle->cachedSchemaSuid = pHandle->msgIter.suid; } STSchema* pTschema = pHandle->pSchema; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index da3c8e7c93..51ed739693 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -305,6 +305,10 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { char logBuf[512] = {0}; char *syncNodeStr = sync2SimpleStr(pVnode->sync); snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); + static int64_t vndTick = 0; + if (++vndTick % 10 == 1) { + vTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr); + } syncRpcMsgLog2(logBuf, pMsg); taosMemoryFree(syncNodeStr); @@ -902,4 +906,4 @@ static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, vo // 2. adjust hash range / compact / remove wals / rename vgroups // 3. reload sync return 0; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 087dcac7b7..b4021ec73d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -180,10 +180,18 @@ static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; } -static void vnodeSyncReconfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { +static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) { SVnode *pVnode = pFsm->data; vInfo("vgId:%d, sync reconfig is confirmed", TD_VID(pVnode)); +#if 0 +// send response + SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index}; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); + syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); +#endif + // todo rpc response here // build rpc msg // put into apply queue @@ -213,6 +221,7 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); + } else { char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index a8a95b513a..8110df3090 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -809,6 +809,11 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo, bool isStream); + +SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, + SExecTaskInfo* pTaskInfo); + SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, @@ -907,6 +912,9 @@ int64_t getSmaWaterMark(int64_t interval, double filesFactor); bool isSmaStream(int8_t triggerType); int32_t compareTimeWindow(const void* p1, const void* p2, const void* param); +int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, + SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, + SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f50c7cdae5..726be6d0a2 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1956,6 +1956,57 @@ static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_ } } +int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, + SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, + const int32_t* rowCellOffset, SSDataBlock* pBlock, + SExecTaskInfo* pTaskInfo) { + SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId); + SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset); + + doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset); + if (pRow->numOfRows == 0) { + releaseBufPage(pBuf, page); + return 0; + } + + while (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { + int32_t code = blockDataEnsureCapacity(pBlock, pBlock->info.capacity * 1.25); + if (TAOS_FAILED(code)) { + releaseBufPage(pBuf, page); + qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); + longjmp(pTaskInfo->env, code); + } + } + + for (int32_t j = 0; j < numOfExprs; ++j) { + int32_t slotId = pExprInfo[j].base.resSchema.slotId; + + pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset); + if (pCtx[j].fpSet.finalize) { + int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); + if (TAOS_FAILED(code)) { + qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); + longjmp(pTaskInfo->env, code); + } + } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { + // do nothing, todo refactor + } else { + // expand the result into multiple rows. E.g., _wstartts, top(k, 20) + // the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); + char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); + for (int32_t k = 0; k < pRow->numOfRows; ++k) { + colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); + } + } + } + + releaseBufPage(pBuf, page); + pBlock->info.rows += pRow->numOfRows; + + return 0; +} + int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) { @@ -4689,6 +4740,21 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream); + } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) { + SMergeIntervalPhysiNode * pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode; + + SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); + SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + + SInterval interval = {.interval = pIntervalPhyNode->interval, + .sliding = pIntervalPhyNode->sliding, + .intervalUnit = pIntervalPhyNode->intervalUnit, + .slidingUnit = pIntervalPhyNode->slidingUnit, + .offset = pIntervalPhyNode->offset, + .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; + + int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; + pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) { qDebug("[******]create Semi"); int32_t children = 0; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index d981b5034c..c1c504a400 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1881,8 +1881,8 @@ _error: return NULL; } -static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, - int32_t tableGroupId, SArray* pUpdated) { +static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId, + SArray* pUpdated) { SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info; SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; @@ -1897,7 +1897,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); tsCols = (int64_t*)pColDataInfo->pData; } else { - return ; + return; } int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1); @@ -1914,13 +1914,14 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc pos->groupId = tableGroupId; pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; *(int64_t*)pos->key = pResult->win.skey; - forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, - nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); + forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, + TSDB_ORDER_ASC); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { - saveResult(pResult, tableGroupId, pUpdated); + saveResult(pResult, tableGroupId, pUpdated); } // window start(end) key interpolation - // doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardRows); + // doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, + // forwardRows); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); @@ -2049,10 +2050,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { int32_t childIndex = getChildIndex(pBlock); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SIntervalAggOperatorInfo* pChildInfo = pChildOp->info; - doClearWindows(&pChildInfo->aggSup, &pChildInfo->binfo, &pChildInfo->interval, - pChildInfo->primaryTsIndex, pChildOp->numOfExprs, pBlock, NULL); - rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId, - pOperator->numOfExprs, pOperator->pTaskInfo); + doClearWindows(&pChildInfo->aggSup, &pChildInfo->binfo, &pChildInfo->interval, pChildInfo->primaryTsIndex, + pChildOp->numOfExprs, pBlock, NULL); + rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId, pOperator->numOfExprs, + pOperator->pTaskInfo); taosArrayDestroy(pUpWins); continue; } @@ -2062,7 +2063,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { break; } if (isFinalInterval(pInfo)) { - int32_t chIndex = getChildIndex(pBlock); + int32_t chIndex = getChildIndex(pBlock); int32_t size = taosArrayGetSize(pInfo->pChildren); // if chIndex + 1 - size > 0, add new child for (int32_t i = 0; i < chIndex + 1 - size; i++) { @@ -2072,7 +2073,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } taosArrayPush(pInfo->pChildren, &pChildOp); } - SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex); + SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex); SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info; setInputDataBlock(pChildOp, pChInfo->binfo.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true); doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL); @@ -2080,12 +2081,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); } - + if (isFinalInterval(pInfo)) { - closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, - &pInfo->interval, pClosed); - finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, - pInfo->binfo.rowCellInfoOffset); + closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pClosed); + finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, pInfo->binfo.rowCellInfoOffset); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { taosArrayAddAll(pUpdated, pClosed); } @@ -2109,31 +2108,32 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return pInfo->binfo.pRes; } -SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, - SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild) { - SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; +SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, + SExecTaskInfo* pTaskInfo, int32_t numOfChild) { + SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } pInfo->order = TSDB_ORDER_ASC; - pInfo->interval = (SInterval) {.interval = pIntervalPhyNode->interval, - .sliding = pIntervalPhyNode->sliding, - .intervalUnit = pIntervalPhyNode->intervalUnit, - .slidingUnit = pIntervalPhyNode->slidingUnit, - .offset = pIntervalPhyNode->offset, - .precision = - ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; - pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark, + pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval, + .sliding = pIntervalPhyNode->sliding, + .intervalUnit = pIntervalPhyNode->intervalUnit, + .slidingUnit = pIntervalPhyNode->slidingUnit, + .offset = pIntervalPhyNode->offset, + .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; + pInfo->twAggSup = (STimeWindowAggSupp){ + .waterMark = pIntervalPhyNode->window.watermark, .calTrigger = pIntervalPhyNode->window.triggerType, .maxTs = INT64_MIN, - .winMap = NULL, }; + .winMap = NULL, + }; pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(pOperator, 4096); - int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); + int32_t numOfCols = 0; + SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); @@ -2160,7 +2160,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, if (!isFinalInterval(pInfo)) { pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; } - pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);\ + pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); pInfo->pUpdateRes->info.type = STREAM_REPROCESS; blockDataEnsureCapacity(pInfo->pUpdateRes, 128); pInfo->pPhyNode = nodesCloneNode(pPhyNode); @@ -2174,9 +2174,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; - pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, - destroyStreamFinalIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, - NULL); + pOperator->fpSet = + createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo, + aggEncodeResultRow, aggDecodeResultRow, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -2216,8 +2216,7 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) { } } -int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo, - int32_t numOfCols, SSDataBlock* pResultBlock) { +int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock) { pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset); pBasicInfo->pRes = pResultBlock; for (int32_t i = 0; i < numOfCols; ++i) { @@ -3195,3 +3194,281 @@ _error: pTaskInfo->code = code; return NULL; } + +typedef struct SMergeIntervalAggOperatorInfo { + SIntervalAggOperatorInfo intervalAggOperatorInfo; + + SHashObj* groupIntervalHash; + bool hasGroupId; + uint64_t groupId; + SSDataBlock* prefetchedBlock; + bool inputBlocksFinished; +} SMergeIntervalAggOperatorInfo; + +void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) { + SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param; + taosHashCleanup(miaInfo->groupIntervalHash); + destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput); +} + +static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock, + STimeWindow* newWin) { + SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; + SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; + SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; + bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); + + STimeWindow* prevWin = taosHashGet(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId)); + if (prevWin == NULL) { + taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow)); + return 0; + } + + if (newWin == NULL || (ascScan && newWin->skey > prevWin->ekey || (!ascScan) && newWin->skey < prevWin->ekey) ) { + SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &prevWin->skey, TSDB_KEYSIZE, tableGroupId); + SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, + GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); + ASSERT(p1 != NULL); + + finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, iaInfo->binfo.pCtx, pOperatorInfo->pExpr, + pOperatorInfo->numOfExprs, iaInfo->binfo.rowCellInfoOffset, pResultBlock, + pTaskInfo); + taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); + if (newWin == NULL) { + taosHashRemove(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId)); + } else { + taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow)); + } + } + + return 0; +} + +static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, + int32_t scanFlag, SSDataBlock* pResultBlock) { + SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; + SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; + + SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; + + int32_t startPos = 0; + int32_t numOfOutput = pOperatorInfo->numOfExprs; + int64_t* tsCols = extractTsCol(pBlock, iaInfo); + uint64_t tableGroupId = pBlock->info.groupId; + bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); + TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols); + SResultRow* pResult = NULL; + + STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, + iaInfo->interval.precision, &iaInfo->win); + + int32_t ret = + setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx, + numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + TSKEY ekey = ascScan ? win.ekey : win.skey; + int32_t forwardRows = + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order); + ASSERT(forwardRows > 0); + + // prev time window not interpolation yet. + if (iaInfo->timeWindowInterpo) { + SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult); + doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos); + + // restore current time window + ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, + iaInfo->binfo.pCtx, numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, + pTaskInfo); + if (ret != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + // window start key interpolation + doWindowBorderInterpolation(iaInfo, pBlock, numOfOutput, iaInfo->binfo.pCtx, pResult, &win, startPos, forwardRows); + } + + updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true); + doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &win, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, + pBlock->info.rows, numOfOutput, iaInfo->order); + doCloseWindow(pResultRowInfo, iaInfo, pResult); + + // output previous interval results after this interval (&win) is closed + outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win); + + STimeWindow nextWin = win; + while (1) { + int32_t prevEndPos = forwardRows - 1 + startPos; + startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->order); + if (startPos < 0) { + break; + } + + // null data, failed to allocate more memory buffer + int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, + iaInfo->binfo.pCtx, numOfOutput, iaInfo->binfo.rowCellInfoOffset, + &iaInfo->aggSup, pTaskInfo); + if (code != TSDB_CODE_SUCCESS || pResult == NULL) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + ekey = ascScan ? nextWin.ekey : nextWin.skey; + forwardRows = + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order); + + // window start(end) key interpolation + doWindowBorderInterpolation(iaInfo, pBlock, numOfOutput, iaInfo->binfo.pCtx, pResult, &nextWin, startPos, + forwardRows); + + updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true); + doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &nextWin, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, + tsCols, pBlock->info.rows, numOfOutput, iaInfo->order); + doCloseWindow(pResultRowInfo, iaInfo, pResult); + + // output previous interval results after this interval (&nextWin) is closed + outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin); + } + + if (iaInfo->timeWindowInterpo) { + saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols); + } +} + +static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info; + SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SSDataBlock* pRes = iaInfo->binfo.pRes; + blockDataCleanup(pRes); + blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity); + + if (!miaInfo->inputBlocksFinished) { + SOperatorInfo* downstream = pOperator->pDownstream[0]; + int32_t scanFlag = MAIN_SCAN; + while (1) { + SSDataBlock* pBlock = NULL; + if (miaInfo->prefetchedBlock == NULL) { + pBlock = downstream->fpSet.getNextFn(downstream); + } else { + pBlock = miaInfo->prefetchedBlock; + miaInfo->groupId = pBlock->info.groupId; + } + + if (pBlock == NULL) { + miaInfo->inputBlocksFinished = true; + break; + } + + if (!miaInfo->hasGroupId) { + miaInfo->hasGroupId = true; + miaInfo->groupId = pBlock->info.groupId; + } else if (miaInfo->groupId != pBlock->info.groupId) { + miaInfo->prefetchedBlock = pBlock; + break; + } + + getTableScanInfo(pOperator, &iaInfo->order, &scanFlag); + setInputDataBlock(pOperator, iaInfo->binfo.pCtx, pBlock, iaInfo->order, scanFlag, true); + STableQueryInfo* pTableQueryInfo = iaInfo->pCurrent; + + setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); + doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); + + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + break; + } + } + + pRes->info.groupId = miaInfo->groupId; + } else { + void* p = taosHashIterate(miaInfo->groupIntervalHash, NULL); + if (p != NULL) { + size_t len = 0; + uint64_t* pKey = taosHashGetKey(p, &len); + outputPrevIntervalResult(pOperator, *pKey, pRes, NULL); + } + } + + if (pRes->info.rows == 0) { + doSetOperatorCompleted(pOperator); + } + + size_t rows = pRes->info.rows; + pOperator->resultInfo.totalRows += rows; + return (rows == 0) ? NULL : pRes; +} + +SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, + SExecTaskInfo* pTaskInfo) { + SMergeIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (miaInfo == NULL || pOperator == NULL) { + goto _error; + } + + SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; + + iaInfo->win = pTaskInfo->window; + iaInfo->order = TSDB_ORDER_ASC; + iaInfo->interval = *pInterval; + + iaInfo->execModel = pTaskInfo->execModel; + + iaInfo->primaryTsIndex = primaryTsSlotId; + miaInfo->groupIntervalHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK); + + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + initResultSizeInfo(pOperator, 4096); + + int32_t code = + initAggInfo(&iaInfo->binfo, &iaInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); + + initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win); + + iaInfo->timeWindowInterpo = timeWindowinterpNeeded(iaInfo->binfo.pCtx, numOfCols, iaInfo); + if (iaInfo->timeWindowInterpo) { + iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); + } + + // iaInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); + if (code != TSDB_CODE_SUCCESS /* || iaInfo->pTableQueryInfo == NULL*/) { + goto _error; + } + + initResultRowInfo(&iaInfo->binfo.resultRowInfo, (int32_t)1); + + pOperator->name = "TimeMergeIntervalAggOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->numOfExprs = numOfCols; + pOperator->info = miaInfo; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL, + destroyMergeIntervalOperatorInfo, NULL, NULL, NULL); + + code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return pOperator; + +_error: + destroyMergeIntervalOperatorInfo(miaInfo, numOfCols); + taosMemoryFreeClear(miaInfo); + taosMemoryFreeClear(pOperator); + pTaskInfo->code = code; + return NULL; +} diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 1eafd3c649..ff838eb9c9 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2103,8 +2103,49 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } +static void apercentileTransferInfo(SAPercentileInfo* pInput, SAPercentileInfo* pOutput) { + pOutput->percent = pInput->percent; + pOutput->algo = pInput->algo; + if (pOutput->algo == APERCT_ALGO_TDIGEST) { + buildTDigestInfo(pInput); + tdigestAutoFill(pInput->pTDigest, COMPRESSION); + + if(pInput->pTDigest->num_centroids == 0 && pInput->pTDigest->num_buffered_pts == 0) { + return; + } + + buildTDigestInfo(pOutput); + TDigest *pTDigest = pOutput->pTDigest; + + if(pTDigest->num_centroids <= 0) { + memcpy(pTDigest, pInput->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION)); + tdigestAutoFill(pTDigest, COMPRESSION); + } else { + tdigestMerge(pTDigest, pInput->pTDigest); + } + } else { + buildHistogramInfo(pInput); + if (pInput->pHisto->numOfElems <= 0) { + return; + } + + buildHistogramInfo(pOutput); + SHistogramInfo *pHisto = pOutput->pHisto; + + if (pHisto->numOfElems <= 0) { + memcpy(pHisto, pInput->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1)); + pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo)); + } else { + pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo)); + SHistogramInfo *pRes = tHistogramMerge(pHisto, pInput->pHisto, MAX_HISTOGRAM_BIN); + memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN); + pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo)); + tHistogramDestroy(&pRes); + } + } +} + int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) { - int32_t numOfElems = 0; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SInputColumnInfoData* pInput = &pCtx->input; @@ -2113,60 +2154,14 @@ int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) { ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); - SAPercentileInfo* pInputInfo; int32_t start = pInput->startRowIndex; - for (int32_t i = start; i < pInput->numOfRows + start; ++i) { - //if (colDataIsNull_s(pCol, i)) { - // continue; - //} - numOfElems += 1; - char* data = colDataGetData(pCol, i); + char* data = colDataGetData(pCol, start); + SAPercentileInfo* pInputInfo = (SAPercentileInfo *)varDataVal(data); - pInputInfo = (SAPercentileInfo *)varDataVal(data); - } + apercentileTransferInfo(pInputInfo, pInfo); - pInfo->percent = pInputInfo->percent; - pInfo->algo = pInputInfo->algo; - if (pInfo->algo == APERCT_ALGO_TDIGEST) { - buildTDigestInfo(pInputInfo); - tdigestAutoFill(pInputInfo->pTDigest, COMPRESSION); - - if(pInputInfo->pTDigest->num_centroids == 0 && pInputInfo->pTDigest->num_buffered_pts == 0) { - return TSDB_CODE_SUCCESS; - } - - buildTDigestInfo(pInfo); - TDigest *pTDigest = pInfo->pTDigest; - - if(pTDigest->num_centroids <= 0) { - memcpy(pTDigest, pInputInfo->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION)); - tdigestAutoFill(pTDigest, COMPRESSION); - } else { - tdigestMerge(pTDigest, pInputInfo->pTDigest); - } - } else { - buildHistogramInfo(pInputInfo); - if (pInputInfo->pHisto->numOfElems <= 0) { - return TSDB_CODE_SUCCESS; - } - - buildHistogramInfo(pInfo); - SHistogramInfo *pHisto = pInfo->pHisto; - - if (pHisto->numOfElems <= 0) { - memcpy(pHisto, pInputInfo->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1)); - pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo)); - } else { - pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo)); - SHistogramInfo *pRes = tHistogramMerge(pHisto, pInputInfo->pHisto, MAX_HISTOGRAM_BIN); - memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN); - pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo)); - tHistogramDestroy(&pRes); - } - } - - SET_VAL(pResInfo, numOfElems, 1); + SET_VAL(pResInfo, 1, 1); return TSDB_CODE_SUCCESS; } @@ -3049,6 +3044,17 @@ _spread_over: return TSDB_CODE_SUCCESS; } +static void spreadTransferInfo(SSpreadInfo* pInput, SSpreadInfo* pOutput) { + pOutput->hasResult = pInput->hasResult; + if (pInput->max > pOutput->max) { + pOutput->max = pInput->max; + } + + if (pInput->min < pOutput->min) { + pOutput->min = pInput->min; + } +} + int32_t spreadFunctionMerge(SqlFunctionCtx *pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pCol = pInput->pData[0]; @@ -3061,14 +3067,7 @@ int32_t spreadFunctionMerge(SqlFunctionCtx *pCtx) { char* data = colDataGetData(pCol, start); pInputInfo = (SSpreadInfo *)varDataVal(data); - pInfo->hasResult = pInputInfo->hasResult; - if (pInputInfo->max > pInfo->max) { - pInfo->max = pInputInfo->max; - } - - if (pInputInfo->min < pInfo->min) { - pInfo->min = pInputInfo->min; - } + spreadTransferInfo(pInputInfo, pInfo); SET_VAL(GET_RES_INFO(pCtx), 1, 1); @@ -3206,6 +3205,17 @@ _elapsed_over: return TSDB_CODE_SUCCESS; } +static void elapsedTransferInfo(SElapsedInfo* pInput, SElapsedInfo* pOutput) { + pOutput->timeUnit = pInput->timeUnit; + if (pOutput->min > pInput->min) { + pOutput->min = pInput->min; + } + + if (pOutput->max < pInput->max) { + pOutput->max = pInput->max; + } +} + int32_t elapsedFunctionMerge(SqlFunctionCtx *pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pCol = pInput->pData[0]; @@ -3217,14 +3227,7 @@ int32_t elapsedFunctionMerge(SqlFunctionCtx *pCtx) { char* data = colDataGetData(pCol, start); SElapsedInfo* pInputInfo = (SElapsedInfo *)varDataVal(data); - pInfo->timeUnit = pInputInfo->timeUnit; - if (pInfo->min > pInputInfo->min) { - pInfo->min = pInputInfo->min; - } - - if (pInfo->max < pInputInfo->max) { - pInfo->max = pInputInfo->max; - } + elapsedTransferInfo(pInputInfo, pInfo); SET_VAL(GET_RES_INFO(pCtx), 1, 1); return TSDB_CODE_SUCCESS; @@ -3470,6 +3473,17 @@ int32_t histogramFunction(SqlFunctionCtx *pCtx) { return TSDB_CODE_SUCCESS; } +static void histogramTransferInfo(SHistoFuncInfo* pInput, SHistoFuncInfo* pOutput) { + pOutput->normalized = pInput->normalized; + pOutput->numOfBins = pInput->numOfBins; + pOutput->totalCount += pInput->totalCount; + for (int32_t k = 0; k < pOutput->numOfBins; ++k) { + pOutput->bins[k].lower = pInput->bins[k].lower; + pOutput->bins[k].upper = pInput->bins[k].upper; + pOutput->bins[k].count += pInput->bins[k].count; + } +} + int32_t histogramFunctionMerge(SqlFunctionCtx *pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pCol = pInput->pData[0]; @@ -3481,14 +3495,7 @@ int32_t histogramFunctionMerge(SqlFunctionCtx *pCtx) { char* data = colDataGetData(pCol, start); SHistoFuncInfo* pInputInfo = (SHistoFuncInfo *)varDataVal(data); - pInfo->normalized = pInputInfo->normalized; - pInfo->numOfBins = pInputInfo->numOfBins; - pInfo->totalCount += pInputInfo->totalCount; - for (int32_t k = 0; k < pInfo->numOfBins; ++k) { - pInfo->bins[k].lower = pInputInfo->bins[k].lower; - pInfo->bins[k].upper = pInputInfo->bins[k].upper; - pInfo->bins[k].count += pInputInfo->bins[k].count; - } + histogramTransferInfo(pInputInfo, pInfo); SET_VAL(GET_RES_INFO(pCtx), pInfo->numOfBins, pInfo->numOfBins); return TSDB_CODE_SUCCESS; @@ -3676,6 +3683,14 @@ int32_t hllFunction(SqlFunctionCtx *pCtx) { return TSDB_CODE_SUCCESS; } +static void hllTransferInfo(SHLLInfo* pInput, SHLLInfo* pOutput) { + for (int32_t k = 0; k < HLL_BUCKETS; ++k) { + if (pOutput->buckets[k] < pInput->buckets[k]) { + pOutput->buckets[k] = pInput->buckets[k]; + } + } +} + int32_t hllFunctionMerge(SqlFunctionCtx *pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pCol = pInput->pData[0]; @@ -3687,11 +3702,7 @@ int32_t hllFunctionMerge(SqlFunctionCtx *pCtx) { char* data = colDataGetData(pCol, start); SHLLInfo* pInputInfo = (SHLLInfo *)varDataVal(data); - for (int32_t k = 0; k < HLL_BUCKETS; ++k) { - if (pInfo->buckets[k] < pInputInfo->buckets[k]) { - pInfo->buckets[k] = pInputInfo->buckets[k]; - } - } + hllTransferInfo(pInputInfo, pInfo); SET_VAL(GET_RES_INFO(pCtx), 1, 1); return TSDB_CODE_SUCCESS; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 10218f69e6..83f0bd7dd8 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -201,8 +201,8 @@ void syncNodeRelease(SSyncNode* pNode); // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); -void syncNodeBecomeFollower(SSyncNode* pSyncNode); -void syncNodeBecomeLeader(SSyncNode* pSyncNode); +void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr); +void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr); void syncNodeCandidate2Leader(SSyncNode* pSyncNode); void syncNodeFollower2Candidate(SSyncNode* pSyncNode); diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index 9f03ac3e55..e0cbcf0744 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -49,8 +49,8 @@ void raftStoreClearVote(SRaftStore *pRaftStore); void raftStoreNextTerm(SRaftStore *pRaftStore); void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term); int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson); -cJSON *raftStore2Json(SRaftStore *pRaftStore); -char *raftStore2Str(SRaftStore *pRaftStore); +cJSON * raftStore2Json(SRaftStore *pRaftStore); +char * raftStore2Str(SRaftStore *pRaftStore); // for debug ------------------- void raftStorePrint(SRaftStore *pObj); diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index b16e47b51e..9fbcdf138b 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -39,8 +39,8 @@ typedef struct SSyncSnapshotSender { bool start; int32_t seq; int32_t ack; - void *pReader; - void *pCurrentBlock; + void * pReader; + void * pCurrentBlock; int32_t blockLen; SSnapshot snapshot; int64_t sendingMS; @@ -58,28 +58,29 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender); void snapshotSenderStop(SSyncSnapshotSender *pSender); int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); -cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); -char *snapshotSender2Str(SSyncSnapshotSender *pSender); +cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender); +char * snapshotSender2Str(SSyncSnapshotSender *pSender); typedef struct SSyncSnapshotReceiver { bool start; int32_t ack; - void *pWriter; + void * pWriter; SyncTerm term; SyncTerm privateTerm; SSyncNode *pSyncNode; - int32_t replicaIndex; + SRaftId fromId; + } SSyncSnapshotReceiver; -SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t replicaIndex); +SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId); void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); -void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm); +void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply); -cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); +char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg); int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 3c558b60c8..01c95d8241 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -150,7 +150,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { "ths->state:%d, logOK:%d", pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK); - syncNodeBecomeFollower(ths); + syncNodeBecomeFollower(ths, "from candidate by append entries"); // ret or reply? return ret; @@ -380,17 +380,19 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // change isStandBy to normal if (!isDrop) { if (ths->state == TAOS_SYNC_STATE_LEADER) { - syncNodeBecomeLeader(ths); + syncNodeBecomeLeader(ths, "config change"); } else { - syncNodeBecomeFollower(ths); + syncNodeBecomeFollower(ths, "config change"); } } - char* sOld = syncCfg2Str(&oldSyncCfg); - char* sNew = syncCfg2Str(&newSyncCfg); - sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop); - taosMemoryFree(sOld); - taosMemoryFree(sNew); + if (gRaftDetailLog) { + char* sOld = syncCfg2Str(&oldSyncCfg); + char* sNew = syncCfg2Str(&newSyncCfg); + sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop); + taosMemoryFree(sOld); + taosMemoryFree(sNew); + } } // always call FpReConfigCb @@ -399,10 +401,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { cbMeta.currentTerm = ths->pRaftStore->currentTerm; cbMeta.index = pEntry->index; cbMeta.term = pEntry->term; + cbMeta.newCfg = newSyncCfg; cbMeta.oldCfg = oldSyncCfg; + cbMeta.seqNum = pEntry->seqNum; cbMeta.flag = 0x11; cbMeta.isDrop = isDrop; - ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta); + ths->pFsm->FpReConfigCb(ths->pFsm, &rpcMsg, cbMeta); } } @@ -469,7 +473,7 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { // delete confict entries code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); ASSERT(code == 0); - sInfo("sync event log truncate, from %ld to %ld", delBegin, delEnd); + sInfo("sync event vgId:%d log truncate, from %ld to %ld", ths->vgId, delBegin, delEnd); logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); return code; @@ -571,7 +575,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs if (condition) { sTrace("recv SyncAppendEntries, candidate to follower"); - syncNodeBecomeFollower(ths); + syncNodeBecomeFollower(ths, "from candidate by append entries"); // do not reply? return ret; } @@ -742,6 +746,18 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs if (pMsg->commitIndex > ths->commitIndex) { // has commit entry in local if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) { + // advance commit index to sanpshot first + SSnapshot snapshot; + ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) { + SyncIndex commitBegin = ths->commitIndex; + SyncIndex commitEnd = snapshot.lastApplyIndex; + ths->commitIndex = snapshot.lastApplyIndex; + + sInfo("sync event vgId:%d commit by snapshot from index:%ld to index:%ld, %s", ths->vgId, commitBegin, + commitEnd, syncUtilState2String(ths->state)); + } + SyncIndex beginIndex = ths->commitIndex + 1; SyncIndex endIndex = pMsg->commitIndex; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 5a543e1605..7fc35afbb1 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -121,7 +121,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries syncIndexMgrLog2("recv SyncAppendEntriesReply, before pNextIndex:", ths->pNextIndex); syncIndexMgrLog2("recv SyncAppendEntriesReply, before pMatchIndex:", ths->pMatchIndex); - { + if (gRaftDetailLog) { SSnapshot snapshot; ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); sTrace("recv SyncAppendEntriesReply, before snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", @@ -147,7 +147,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries if (pMsg->success) { // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); - sTrace("update next match, index:%ld, success:%d", pMsg->matchIndex + 1, pMsg->success); + + if (gRaftDetailLog) { + sTrace("update next match, index:%ld, success:%d", pMsg->matchIndex + 1, pMsg->success); + } // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); @@ -159,7 +162,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries } else { SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); - sTrace("update next not match, begin, index:%ld, success:%d", nextIndex, pMsg->success); + if (gRaftDetailLog) { + sTrace("update next index not match, begin, index:%ld, success:%d", nextIndex, pMsg->success); + } // notice! int64, uint64 if (nextIndex > SYNC_INDEX_BEGIN) { @@ -178,9 +183,23 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries pMsg->privateTerm < pSender->privateTerm) { snapshotSenderStart(pSender); - char* s = snapshotSender2Str(pSender); - sInfo("sync event snapshot send start sender first time, sender:%s", s); - taosMemoryFree(s); + char host[128]; + uint16_t port; + syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port); + + if (gRaftDetailLog) { + char* s = snapshotSender2Str(pSender); + sInfo( + "sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu " + "sender:%s", + ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, s); + taosMemoryFree(s); + } else { + sInfo( + "sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld " + "lastApplyTerm:%lu", + ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm); + } } SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1; @@ -195,12 +214,14 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries } syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); - sTrace("update next not match, end, index:%ld, success:%d", nextIndex, pMsg->success); + if (gRaftDetailLog) { + sTrace("update next index not match, end, index:%ld, success:%d", nextIndex, pMsg->success); + } } syncIndexMgrLog2("recv SyncAppendEntriesReply, after pNextIndex:", ths->pNextIndex); syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex); - { + if (gRaftDetailLog) { SSnapshot snapshot; ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); sTrace("recv SyncAppendEntriesReply, after snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index c092b31adf..8236301f8e 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -48,13 +48,28 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex); syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex); + // advance commit index to sanpshot first + SSnapshot snapshot; + pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + if (snapshot.lastApplyIndex > 0 && snapshot.lastApplyIndex > pSyncNode->commitIndex) { + SyncIndex commitBegin = pSyncNode->commitIndex; + SyncIndex commitEnd = snapshot.lastApplyIndex; + pSyncNode->commitIndex = snapshot.lastApplyIndex; + + sInfo("sync event vgId:%d commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId, + pSyncNode->commitIndex, snapshot.lastApplyIndex, syncUtilState2String(pSyncNode->state)); + } + // update commit index SyncIndex newCommitIndex = pSyncNode->commitIndex; - for (SyncIndex index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore); index > pSyncNode->commitIndex; - --index) { + for (SyncIndex index = syncNodeGetLastIndex(pSyncNode); index > pSyncNode->commitIndex; --index) { bool agree = syncAgree(pSyncNode, index); - sTrace("syncMaybeAdvanceCommitIndex syncAgree:%d, index:%ld, pSyncNode->commitIndex:%ld", agree, index, - pSyncNode->commitIndex); + + if (gRaftDetailLog) { + sTrace("syncMaybeAdvanceCommitIndex syncAgree:%d, index:%ld, pSyncNode->commitIndex:%ld", agree, index, + pSyncNode->commitIndex); + } + if (agree) { // term SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index); @@ -64,16 +79,21 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { if (pEntry->term == pSyncNode->pRaftStore->currentTerm) { // update commit index newCommitIndex = index; - sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%ld commit, pSyncNode->commitIndex:%ld", - newCommitIndex, pSyncNode->commitIndex); + + if (gRaftDetailLog) { + sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%ld commit, pSyncNode->commitIndex:%ld", + newCommitIndex, pSyncNode->commitIndex); + } syncEntryDestory(pEntry); break; } else { - sTrace( - "syncMaybeAdvanceCommitIndex can not commit due to term not equal, pEntry->term:%lu, " - "pSyncNode->pRaftStore->currentTerm:%lu", - pEntry->term, pSyncNode->pRaftStore->currentTerm); + if (gRaftDetailLog) { + sTrace( + "syncMaybeAdvanceCommitIndex can not commit due to term not equal, pEntry->term:%lu, " + "pSyncNode->pRaftStore->currentTerm:%lu", + pEntry->term, pSyncNode->pRaftStore->currentTerm); + } } syncEntryDestory(pEntry); @@ -84,7 +104,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SyncIndex beginIndex = pSyncNode->commitIndex + 1; SyncIndex endIndex = newCommitIndex; - sTrace("syncMaybeAdvanceCommitIndex sync commit %ld", newCommitIndex); + if (gRaftDetailLog) { + sTrace("syncMaybeAdvanceCommitIndex sync commit %ld", newCommitIndex); + } // update commit index pSyncNode->commitIndex = newCommitIndex; diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index 945d59646b..e67439f8fe 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -40,7 +40,7 @@ int32_t syncEnvStart() { // gSyncEnv = doSyncEnvStart(gSyncEnv); gSyncEnv = doSyncEnvStart(); assert(gSyncEnv != NULL); - sTrace("syncEnvStart ok!"); + sTrace("sync env start ok"); return ret; } diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index ecc1c8f1e2..18cb55b417 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -119,7 +119,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) { char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) { cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 9516df64da..0a19e16d5c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -87,7 +87,9 @@ int64_t syncOpen(const SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); assert(pSyncNode != NULL); - syncNodeLog2("syncNodeOpen open success", pSyncNode); + if (gRaftDetailLog) { + syncNodeLog2("syncNodeOpen open success", pSyncNode); + } pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode); if (pSyncNode->rid < 0) { @@ -173,20 +175,37 @@ int32_t syncSetStandby(int64_t rid) { int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { int32_t ret = 0; - char* configChange = syncCfg2Str((SSyncCfg*)pSyncCfg); - sInfo("==syncReconfig== newconfig:%s", configChange); + char* newconfig = syncCfg2Str((SSyncCfg*)pSyncCfg); + + if (gRaftDetailLog) { + sInfo("==syncReconfig== newconfig:%s", newconfig); + } SRpcMsg rpcMsg = {0}; rpcMsg.msgType = TDMT_SYNC_CONFIG_CHANGE; rpcMsg.info.noResp = 1; - rpcMsg.contLen = strlen(configChange) + 1; + rpcMsg.contLen = strlen(newconfig) + 1; rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", configChange); - taosMemoryFree(configChange); + snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", newconfig); + taosMemoryFree(newconfig); ret = syncPropose(rid, &rpcMsg, false); return ret; } +int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) { + int32_t ret = 0; + char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg); + + pRpcMsg->msgType = TDMT_SYNC_CONFIG_CHANGE; + pRpcMsg->info.noResp = 1; + pRpcMsg->contLen = strlen(newconfig) + 1; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + snprintf(pRpcMsg->pCont, pRpcMsg->contLen, "%s", newconfig); + taosMemoryFree(newconfig); + + return ret; +} + int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t ret = syncPropose(rid, pMsg, isWeak); return ret; @@ -374,13 +393,14 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) { } int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { - sTrace("syncPropose msgType:%d ", pMsg->msgType); + int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS; - int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS; SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) return TAOS_SYNC_PROPOSE_OTHER_ERROR; - + if (pSyncNode == NULL) { + return TAOS_SYNC_PROPOSE_OTHER_ERROR; + } assert(rid == pSyncNode->rid); + sTrace("sync event vgId:%d propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType)); if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { SRespStub stub; @@ -411,6 +431,8 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo; + sInfo("sync event vgId:%d sync open", pSyncInfo->vgId); + SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); assert(pSyncNode != NULL); memset(pSyncNode, 0, sizeof(SSyncNode)); @@ -439,9 +461,11 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { assert(pSyncNode->pRaftCfg != NULL); pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg; - char* seralized = raftCfg2Str(pSyncNode->pRaftCfg); - sInfo("syncNodeOpen update config :%s", seralized); - taosMemoryFree(seralized); + if (gRaftDetailLog) { + char* seralized = raftCfg2Str(pSyncNode->pRaftCfg); + sInfo("syncNodeOpen update config :%s", seralized); + taosMemoryFree(seralized); + } raftCfgClose(pSyncNode->pRaftCfg); } @@ -612,7 +636,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { } // snapshot receivers - pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, 100); + pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID); // start in syncNodeStart // start raft @@ -628,9 +652,7 @@ void syncNodeStart(SSyncNode* pSyncNode) { // start raft if (pSyncNode->replicaNum == 1) { raftStoreNextTerm(pSyncNode->pRaftStore); - syncNodeBecomeLeader(pSyncNode); - - syncNodeLog2("==state change become leader immediately==", pSyncNode); + syncNodeBecomeLeader(pSyncNode, "one replica start"); // Raft 3.6.2 Committing entries from previous terms @@ -638,41 +660,22 @@ void syncNodeStart(SSyncNode* pSyncNode) { syncNodeAppendNoop(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica - /* - sInfo("==syncNodeStart== RestoreFinish begin 1 replica tsem_wait %p", pSyncNode); - tsem_wait(&pSyncNode->restoreSem); - sInfo("==syncNodeStart== RestoreFinish end 1 replica tsem_wait %p", pSyncNode); - */ - - /* - while (pSyncNode->restoreFinish != true) { - taosMsleep(10); + if (gRaftDetailLog) { + syncNodeLog2("==state change become leader immediately==", pSyncNode); } - */ - sInfo("==syncNodeStart== restoreFinish ok 1 replica %p vgId:%d", pSyncNode, pSyncNode->vgId); return; } - syncNodeBecomeFollower(pSyncNode); + syncNodeBecomeFollower(pSyncNode, "first start"); - // for test - int32_t ret = 0; + // int32_t ret = 0; // ret = syncNodeStartPingTimer(pSyncNode); - assert(ret == 0); + // assert(ret == 0); - /* - sInfo("==syncNodeStart== RestoreFinish begin multi replica tsem_wait %p", pSyncNode); - tsem_wait(&pSyncNode->restoreSem); - sInfo("==syncNodeStart== RestoreFinish end multi replica tsem_wait %p", pSyncNode); - */ - - /* - while (pSyncNode->restoreFinish != true) { - taosMsleep(10); + if (gRaftDetailLog) { + syncNodeLog2("==state change become leader immediately==", pSyncNode); } - */ - sInfo("==syncNodeStart== restoreFinish ok multi replica %p vgId:%d", pSyncNode, pSyncNode->vgId); } void syncNodeStartStandBy(SSyncNode* pSyncNode) { @@ -687,6 +690,8 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { } void syncNodeClose(SSyncNode* pSyncNode) { + sInfo("sync event vgId:%d sync close", pSyncNode->vgId); + int32_t ret; assert(pSyncNode != NULL); @@ -1061,7 +1066,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { int len = 256; char* s = (char*)taosMemoryMalloc(len); snprintf(s, len, - "syncNode2SimpleStr vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, " + "syncNode: vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, " "electTimerLogicClock:%lu, " "electTimerLogicClockUser:%lu, " "electTimerMS:%d, replicaNum:%d", @@ -1131,7 +1136,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDro } raftCfgPersist(pSyncNode->pRaftCfg); - syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode); + + if (gRaftDetailLog) { + syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode); + } } SSyncNode* syncNodeAcquire(int64_t rid) { @@ -1149,12 +1157,14 @@ void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid) void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { if (term > pSyncNode->pRaftStore->currentTerm) { raftStoreSetTerm(pSyncNode->pRaftStore, term); - syncNodeBecomeFollower(pSyncNode); + syncNodeBecomeFollower(pSyncNode, "update term"); raftStoreClearVote(pSyncNode->pRaftStore); } } -void syncNodeBecomeFollower(SSyncNode* pSyncNode) { +void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { + sInfo("sync event vgId:%d become follower, %s", pSyncNode->vgId, debugStr); + // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { pSyncNode->leaderCache = EMPTY_RAFT_ID; @@ -1186,7 +1196,9 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode) { // evoterLog |-> voterLog[i]]} // /\ UNCHANGED <> // -void syncNodeBecomeLeader(SSyncNode* pSyncNode) { +void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { + sInfo("sync event vgId:%d become leader, %s", pSyncNode->vgId, debugStr); + // state change pSyncNode->state = TAOS_SYNC_STATE_LEADER; @@ -1237,7 +1249,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode) { void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); assert(voteGrantedMajority(pSyncNode->pVotesGranted)); - syncNodeBecomeLeader(pSyncNode); + syncNodeBecomeLeader(pSyncNode, "candidate to leader"); syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode); @@ -1260,14 +1272,14 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { void syncNodeLeader2Follower(SSyncNode* pSyncNode) { assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER); - syncNodeBecomeFollower(pSyncNode); + syncNodeBecomeFollower(pSyncNode, "leader to follower"); syncNodeLog2("==state change syncNodeLeader2Follower==", pSyncNode); } void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); - syncNodeBecomeFollower(pSyncNode); + syncNodeBecomeFollower(pSyncNode, "candidate to follower"); syncNodeLog2("==state change syncNodeCandidate2Follower==", pSyncNode); } @@ -1467,9 +1479,11 @@ void syncNodeLog(SSyncNode* pObj) { } void syncNodeLog2(char* s, SSyncNode* pObj) { - char* serialized = syncNode2Str(pObj); - sTraceLong("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); - taosMemoryFree(serialized); + if (gRaftDetailLog) { + char* serialized = syncNode2Str(pObj); + sTraceLong("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } } // ------ local funciton --------- @@ -1724,17 +1738,19 @@ const char* syncStr(ESyncState state) { int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { int32_t code = 0; ESyncState state = flag; - sInfo("sync event commit from index:%" PRId64 " to index:%" PRId64 ", %s", beginIndex, endIndex, - syncUtilState2String(state)); + sInfo("sync event vgId:%d commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, beginIndex, + endIndex, syncUtilState2String(state)); - // maybe execute by leader, skip snapshot - SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; - if (ths->pFsm->FpGetSnapshot != NULL) { - ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); - } - if (beginIndex <= snapshot.lastApplyIndex) { - beginIndex = snapshot.lastApplyIndex + 1; - } + /* + // maybe execute by leader, skip snapshot + SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; + if (ths->pFsm->FpGetSnapshot != NULL) { + ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + } + if (beginIndex <= snapshot.lastApplyIndex) { + beginIndex = snapshot.lastApplyIndex + 1; + } + */ // execute fsm if (ths->pFsm != NULL) { @@ -1791,17 +1807,19 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, // change isStandBy to normal if (!isDrop) { if (ths->state == TAOS_SYNC_STATE_LEADER) { - syncNodeBecomeLeader(ths); + syncNodeBecomeLeader(ths, "config change"); } else { - syncNodeBecomeFollower(ths); + syncNodeBecomeFollower(ths, "config change"); } } - char* sOld = syncCfg2Str(&oldSyncCfg); - char* sNew = syncCfg2Str(&newSyncCfg); - sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop); - taosMemoryFree(sOld); - taosMemoryFree(sNew); + if (gRaftDetailLog) { + char* sOld = syncCfg2Str(&oldSyncCfg); + char* sNew = syncCfg2Str(&newSyncCfg); + sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop); + taosMemoryFree(sOld); + taosMemoryFree(sNew); + } } // always call FpReConfigCb @@ -1810,10 +1828,12 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, cbMeta.currentTerm = ths->pRaftStore->currentTerm; cbMeta.index = pEntry->index; cbMeta.term = pEntry->term; + cbMeta.newCfg = newSyncCfg; cbMeta.oldCfg = oldSyncCfg; + cbMeta.seqNum = pEntry->seqNum; cbMeta.flag = 0x11; cbMeta.isDrop = isDrop; - ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta); + ths->pFsm->FpReConfigCb(ths->pFsm, &rpcMsg, cbMeta); } } @@ -1824,7 +1844,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ths->pFsm->FpRestoreFinishCb(ths->pFsm); } ths->restoreFinish = true; - sInfo("restore finish %p vgId:%d", ths, ths->vgId); + sInfo("sync event vgId:%d restore finish", ths->vgId); } } @@ -1853,4 +1873,4 @@ SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) } } return pSender; -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 49509ae979..c53e5916ae 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -14,6 +14,7 @@ */ #include "syncRaftLog.h" +#include "syncRaftCfg.h" #include "wal.h" // refactor, log[0 .. n] ==> log[m .. n] @@ -161,7 +162,9 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr walFsync(pWal, true); - sTrace("sync event write index:%" PRId64, pEntry->index); + sTrace("sync event vgId:%d write index:%ld, %s, isStandBy:%d, msgType:%s, originalRpcType:%s", pData->pSyncNode->vgId, + pEntry->index, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy, + TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType)); return code; } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index ff39b0b13d..08564f8293 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -122,7 +122,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { syncIndexMgrLog2("begin append entries peers pNextIndex:", pSyncNode->pNextIndex); syncIndexMgrLog2("begin append entries peers pMatchIndex:", pSyncNode->pMatchIndex); logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore); - { + if (gRaftDetailLog) { SSnapshot snapshot; pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); sTrace("begin append entries peers, snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", @@ -201,7 +201,6 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) { } int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) { - sTrace("syncNodeAppendEntries pSyncNode:%p ", pSyncNode); int32_t ret = 0; SRpcMsg rpcMsg; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index a68312d07f..a23fe2c38a 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -20,7 +20,7 @@ #include "syncUtil.h" #include "wal.h" -static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm); +static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId); //---------------------------------- SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { @@ -105,13 +105,23 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg); - char *msgStr = syncSnapshotSend2Str(pMsg); char host[128]; uint16_t port; syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port); - sTrace("sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send msg:%s", host, - port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, msgStr); - taosMemoryFree(msgStr); + + if (gRaftDetailLog) { + char *msgStr = syncSnapshotSend2Str(pMsg); + sTrace( + "sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send " + "msg:%s", + pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, + pSender->snapshot.lastApplyTerm, msgStr); + taosMemoryFree(msgStr); + } else { + sTrace("sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu", + pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, + pSender->snapshot.lastApplyTerm); + } syncSnapshotSendDestroy(pMsg); } @@ -183,9 +193,11 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender) { pSender->start = false; - char *s = snapshotSender2Str(pSender); - sInfo("snapshotSenderStop %s", s); - taosMemoryFree(s); + if (gRaftDetailLog) { + char *s = snapshotSender2Str(pSender); + sInfo("snapshotSenderStop %s", s); + taosMemoryFree(s); + } } // when sender receiver ack, call this function to send msg from seq @@ -225,20 +237,29 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg); - char *msgStr = syncSnapshotSend2Str(pMsg); char host[128]; uint16_t port; syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port); + if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) { - sTrace("sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send msg:%s", - host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, - msgStr); + if (gRaftDetailLog) { + char *msgStr = syncSnapshotSend2Str(pMsg); + sTrace( + "sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send " + "msg:%s", + pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, + pSender->snapshot.lastApplyTerm, msgStr); + taosMemoryFree(msgStr); + } else { + sTrace("sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu", + pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, + pSender->snapshot.lastApplyTerm); + } } else { - sTrace("sync event snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send msg:%s", - host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, - msgStr); + sTrace("sync event vgId:%d snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu", + pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, + pSender->snapshot.lastApplyTerm); } - taosMemoryFree(msgStr); syncSnapshotSendDestroy(pMsg); return 0; @@ -260,13 +281,19 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg); - char *msgStr = syncSnapshotSend2Str(pMsg); char host[128]; uint16_t port; syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port); - sTrace("sync event snapshot send to %s:%d resend seq:%d ack:%d send msg:%s", host, port, pSender->seq, pSender->ack, - msgStr); - taosMemoryFree(msgStr); + + if (gRaftDetailLog) { + char *msgStr = syncSnapshotSend2Str(pMsg); + sTrace("sync event vgId:%d snapshot send to %s:%d resend seq:%d ack:%d send msg:%s", pSender->pSyncNode->vgId, + host, port, pSender->seq, pSender->ack, msgStr); + taosMemoryFree(msgStr); + } else { + sTrace("sync event vgId:%d snapshot send to %s:%d resend seq:%d ack:%d", pSender->pSyncNode->vgId, host, port, + pSender->seq, pSender->ack); + } syncSnapshotSendDestroy(pMsg); } @@ -331,7 +358,7 @@ char *snapshotSender2Str(SSyncSnapshotSender *pSender) { } // ------------------------------------- -SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { +SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) { bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) && (pSyncNode->pFsm->FpSnapshotDoWrite != NULL); @@ -345,7 +372,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; pReceiver->pWriter = NULL; pReceiver->pSyncNode = pSyncNode; - pReceiver->replicaIndex = replicaIndex; + pReceiver->fromId = fromId; pReceiver->term = pSyncNode->pRaftStore->currentTerm; pReceiver->privateTerm = 0; @@ -365,10 +392,11 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; } // begin receive snapshot msg (current term, seq begin) -static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm) { +static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) { pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm; pReceiver->privateTerm = privateTerm; pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; + pReceiver->fromId = fromId; ASSERT(pReceiver->pWriter == NULL); int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter)); @@ -377,14 +405,15 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p // if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver // if already start, force close, start again -void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm) { +void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) { if (!snapshotReceiverIsStart(pReceiver)) { // start - snapshotReceiverDoStart(pReceiver, privateTerm); + snapshotReceiverDoStart(pReceiver, privateTerm, fromId); pReceiver->start = true; } else { // already start + sInfo("snapshot recv, receiver already start"); // force close, abandon incomplete data int32_t ret = @@ -393,15 +422,15 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTer pReceiver->pWriter = NULL; // start again - snapshotReceiverDoStart(pReceiver, privateTerm); + snapshotReceiverDoStart(pReceiver, privateTerm, fromId); pReceiver->start = true; - - ASSERT(0); } - char *s = snapshotReceiver2Str(pReceiver); - sInfo("snapshotReceiverStart %s", s); - taosMemoryFree(s); + if (gRaftDetailLog) { + char *s = snapshotReceiver2Str(pReceiver); + sInfo("snapshotReceiverStart %s", s); + taosMemoryFree(s); + } } void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) { @@ -418,9 +447,11 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) { ++(pReceiver->privateTerm); } - char *s = snapshotReceiver2Str(pReceiver); - sInfo("snapshotReceiverStop %s", s); - taosMemoryFree(s); + if (gRaftDetailLog) { + char *s = snapshotReceiver2Str(pReceiver); + sInfo("snapshotReceiverStop %s", s); + taosMemoryFree(s); + } } cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { @@ -436,7 +467,22 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode); cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); - cJSON_AddNumberToObject(pRoot, "replicaIndex", pReceiver->replicaIndex); + + cJSON *pFromId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->fromId.addr); + cJSON_AddStringToObject(pFromId, "addr", u64buf); + { + uint64_t u64 = pReceiver->fromId.addr; + cJSON *pTmp = pFromId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId); + cJSON_AddItemToObject(pRoot, "fromId", pFromId); + snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term); cJSON_AddStringToObject(pRoot, "term", u64buf); @@ -468,17 +514,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { // begin - snapshotReceiverStart(pReceiver, pMsg->privateTerm); + snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg->srcId); pReceiver->ack = pMsg->seq; needRsp = true; - char *msgStr = syncSnapshotSend2Str(pMsg); char host[128]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sTrace("sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host, port, - pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); - taosMemoryFree(msgStr); + + if (gRaftDetailLog) { + char *msgStr = syncSnapshotSend2Str(pMsg); + sTrace("sync event vgId:%d snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", + pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); + taosMemoryFree(msgStr); + } else { + sTrace("sync event vgId:%d snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu", + pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm); + } } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { // end, finish FSM @@ -486,29 +538,46 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ASSERT(writeCode == 0); pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true); - pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1); - char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore); + SSnapshot snapshot; pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + char host[128]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sInfo( - "sync event snapshot recv from %s:%d finish, update log begin index:%ld, snapshot.lastApplyIndex:%ld, " - "snapshot.lastApplyTerm:%lu, raft log:%s", - host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logSimpleStr); - taosMemoryFree(logSimpleStr); + + if (gRaftDetailLog) { + char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore); + sInfo( + "sync event vgId:%d snapshot recv from %s:%d finish, update log begin index:%ld, " + "snapshot.lastApplyIndex:%ld, " + "snapshot.lastApplyTerm:%lu, raft log:%s", + pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, + logSimpleStr); + taosMemoryFree(logSimpleStr); + } else { + sInfo( + "sync event vgId:%d snapshot recv from %s:%d finish, update log begin index:%ld, " + "snapshot.lastApplyIndex:%ld, " + "snapshot.lastApplyTerm:%lu", + pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm); + } pReceiver->pWriter = NULL; snapshotReceiverStop(pReceiver, true); pReceiver->ack = pMsg->seq; needRsp = true; - char *msgStr = syncSnapshotSend2Str(pMsg); - sTrace("sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host, port, - pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); - taosMemoryFree(msgStr); + if (gRaftDetailLog) { + char *msgStr = syncSnapshotSend2Str(pMsg); + sTrace("sync event vgId:%d snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", + pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); + taosMemoryFree(msgStr); + } else { + sTrace("sync event vgId:%d snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu", + pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm); + } } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false); @@ -519,11 +588,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - char *msgStr = syncSnapshotSend2Str(pMsg); - sTrace("sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host, - port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); - - taosMemoryFree(msgStr); + if (gRaftDetailLog) { + char *msgStr = syncSnapshotSend2Str(pMsg); + sTrace( + "sync event vgId:%d snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv " + "msg:%s", + pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); + taosMemoryFree(msgStr); + } else { + sTrace("sync event vgId:%d snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu", + pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm); + } } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { // transfering @@ -535,13 +610,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { } needRsp = true; - char *msgStr = syncSnapshotSend2Str(pMsg); char host[128]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sTrace("sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host, - port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); - taosMemoryFree(msgStr); + + if (gRaftDetailLog) { + char *msgStr = syncSnapshotSend2Str(pMsg); + sTrace( + "sync event vgId:%d snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", + pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); + taosMemoryFree(msgStr); + } else { + sTrace("sync event vgId:%d snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu", + pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm); + } } else { ASSERT(0); diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 781c168da9..10b54d0aa4 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -146,7 +146,7 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_ void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } -void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { +void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) { sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu", cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); } diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index c9d9ca48aa..1e64a8a6f7 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -77,7 +77,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } -void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { +void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) { sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu", cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); } diff --git a/source/libs/sync/test/syncSnapshotReceiverTest.cpp b/source/libs/sync/test/syncSnapshotReceiverTest.cpp index 69670f09a6..208a96daa4 100644 --- a/source/libs/sync/test/syncSnapshotReceiverTest.cpp +++ b/source/libs/sync/test/syncSnapshotReceiverTest.cpp @@ -41,7 +41,11 @@ SSyncSnapshotReceiver* createReceiver() { pSyncNode->pFsm->FpSnapshotStopWrite = SnapshotStopWrite; pSyncNode->pFsm->FpSnapshotDoWrite = SnapshotDoWrite; - SSyncSnapshotReceiver* pReceiver = snapshotReceiverCreate(pSyncNode, 2); + SRaftId id; + id.addr = syncUtilAddr2U64("1.2.3.4", 99); + id.vgId = 100; + + SSyncSnapshotReceiver* pReceiver = snapshotReceiverCreate(pSyncNode, id); pReceiver->start = true; pReceiver->ack = 20; pReceiver->pWriter = (void*)0x11; diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index 782baf3c97..0c8b26e9d9 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -146,8 +146,8 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_ void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb== pFsm:%p", pFsm); } -void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { - char* s = syncCfg2Str(&newCfg); +void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) { + char* s = syncCfg2Str(&(cbMeta.newCfg)); sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu, newCfg:%s", cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term, s); taosMemoryFree(s); @@ -235,7 +235,6 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* } } - int64_t rid = syncOpen(&syncInfo); assert(rid > 0); diff --git a/tools/taos-tools b/tools/taos-tools index 717f5aaa5f..932da0f4ca 160000 --- a/tools/taos-tools +++ b/tools/taos-tools @@ -1 +1 @@ -Subproject commit 717f5aaa5f0a1b4d92bb2ae68858fec554fb5eda +Subproject commit 932da0f4cac013c2eded824d1d4d01cfa6168fa3