Merge branch '3.0' into feature/TD-14481-3.0
This commit is contained in:
commit
6ea490a8fb
|
@ -236,6 +236,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_SYNC_COMMON_RESPONSE, "sync-common-response", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_APPLY_MSG, "sync-apply-msg", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_CONFIG_CHANGE, "sync-config-change", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_CONFIG_CHANGE_FINISH, "sync-config-change-finish", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_SEND, "sync-snapshot-send", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_RSP, "sync-snapshot-rsp", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_LEADER_TRANSFER, "sync-leader-transfer", NULL, NULL)
|
||||
|
|
|
@ -152,7 +152,6 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput);
|
|||
typedef struct {
|
||||
char* qmsg;
|
||||
// followings are not applicable to encoder and decoder
|
||||
// void* inputHandle;
|
||||
void* executor;
|
||||
} STaskExec;
|
||||
|
||||
|
@ -400,15 +399,13 @@ typedef struct {
|
|||
|
||||
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
|
||||
|
||||
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb);
|
||||
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId);
|
||||
int32_t streamSetupTrigger(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskRun(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb);
|
||||
int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
|
||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp);
|
||||
int32_t streamProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
|
||||
int32_t streamProcessRunReq(SStreamTask* pTask);
|
||||
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
|
||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
|
||||
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
|
||||
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -24,6 +24,8 @@ extern "C" {
|
|||
#include "tdef.h"
|
||||
#include "tmsgcb.h"
|
||||
|
||||
extern bool gRaftDetailLog;
|
||||
|
||||
#define SYNC_INDEX_BEGIN 0
|
||||
#define SYNC_INDEX_INVALID -1
|
||||
|
||||
|
@ -61,14 +63,14 @@ typedef struct SSyncCfg {
|
|||
} SSyncCfg;
|
||||
|
||||
typedef struct SFsmCbMeta {
|
||||
SyncIndex index;
|
||||
SyncIndex lastConfigIndex;
|
||||
bool isWeak;
|
||||
int32_t code;
|
||||
ESyncState state;
|
||||
uint64_t seqNum;
|
||||
SyncIndex index;
|
||||
SyncTerm term;
|
||||
uint64_t seqNum;
|
||||
SyncIndex lastConfigIndex;
|
||||
ESyncState state;
|
||||
SyncTerm currentTerm;
|
||||
bool isWeak;
|
||||
uint64_t flag;
|
||||
} SFsmCbMeta;
|
||||
|
||||
|
@ -76,13 +78,20 @@ typedef struct SReConfigCbMeta {
|
|||
int32_t code;
|
||||
SyncIndex index;
|
||||
SyncTerm term;
|
||||
uint64_t seqNum;
|
||||
SyncIndex lastConfigIndex;
|
||||
ESyncState state;
|
||||
SyncTerm currentTerm;
|
||||
bool isWeak;
|
||||
uint64_t flag;
|
||||
|
||||
// config info
|
||||
SSyncCfg oldCfg;
|
||||
SSyncCfg newCfg;
|
||||
bool isDrop;
|
||||
uint64_t flag;
|
||||
uint64_t seqNum;
|
||||
SyncIndex newCfgIndex;
|
||||
SyncTerm newCfgTerm;
|
||||
uint64_t newCfgSeqNum;
|
||||
|
||||
} SReConfigCbMeta;
|
||||
|
||||
typedef struct SSnapshot {
|
||||
|
@ -107,8 +116,7 @@ typedef struct SSyncFSM {
|
|||
void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta);
|
||||
void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
||||
|
||||
|
||||
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void *pReaderParam, void** ppReader);
|
||||
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader);
|
||||
int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
|
||||
|
||||
int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void** ppReader);
|
||||
|
@ -189,14 +197,13 @@ ESyncState syncGetMyRole(int64_t rid);
|
|||
bool syncIsReady(int64_t rid);
|
||||
const char* syncGetMyRoleStr(int64_t rid);
|
||||
SyncTerm syncGetMyTerm(int64_t rid);
|
||||
SyncGroupId syncGetVgId(int64_t rid);
|
||||
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
|
||||
int32_t syncGetVgId(int64_t rid);
|
||||
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
|
||||
bool syncEnvIsStart();
|
||||
const char* syncStr(ESyncState state);
|
||||
bool syncIsRestoreFinish(int64_t rid);
|
||||
|
||||
|
||||
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);
|
||||
|
||||
// build SRpcMsg, need to call syncPropose with SRpcMsg
|
||||
|
|
|
@ -489,6 +489,40 @@ void syncLeaderTransferPrint2(char* s, const SyncLeaderTransfer* pMsg);
|
|||
void syncLeaderTransferLog(const SyncLeaderTransfer* pMsg);
|
||||
void syncLeaderTransferLog2(char* s, const SyncLeaderTransfer* pMsg);
|
||||
|
||||
|
||||
// ---------------------------------------------
|
||||
typedef struct SyncReconfigFinish {
|
||||
uint32_t bytes;
|
||||
int32_t vgId;
|
||||
uint32_t msgType;
|
||||
SSyncCfg oldCfg;
|
||||
SSyncCfg newCfg;
|
||||
SyncIndex newCfgIndex;
|
||||
SyncTerm newCfgTerm;
|
||||
uint64_t newCfgSeqNum;
|
||||
|
||||
} SyncReconfigFinish;
|
||||
|
||||
SyncReconfigFinish* syncReconfigFinishBuild(int32_t vgId);
|
||||
void syncReconfigFinishDestroy(SyncReconfigFinish* pMsg);
|
||||
void syncReconfigFinishSerialize(const SyncReconfigFinish* pMsg, char* buf, uint32_t bufLen);
|
||||
void syncReconfigFinishDeserialize(const char* buf, uint32_t len, SyncReconfigFinish* pMsg);
|
||||
char* syncReconfigFinishSerialize2(const SyncReconfigFinish* pMsg, uint32_t* len);
|
||||
SyncReconfigFinish* syncReconfigFinishDeserialize2(const char* buf, uint32_t len);
|
||||
void syncReconfigFinish2RpcMsg(const SyncReconfigFinish* pMsg, SRpcMsg* pRpcMsg);
|
||||
void syncReconfigFinishFromRpcMsg(const SRpcMsg* pRpcMsg, SyncReconfigFinish* pMsg);
|
||||
SyncReconfigFinish* syncReconfigFinishFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||
cJSON* syncReconfigFinish2Json(const SyncReconfigFinish* pMsg);
|
||||
char* syncReconfigFinish2Str(const SyncReconfigFinish* pMsg);
|
||||
|
||||
// for debug ----------------------
|
||||
void syncReconfigFinishPrint(const SyncReconfigFinish* pMsg);
|
||||
void syncReconfigFinishPrint2(char* s, const SyncReconfigFinish* pMsg);
|
||||
void syncReconfigFinishLog(const SyncReconfigFinish* pMsg);
|
||||
void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg);
|
||||
|
||||
|
||||
|
||||
// on message ----------------------
|
||||
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
|
||||
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
|
||||
|
|
|
@ -416,6 +416,9 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x090C)
|
||||
#define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x090D)
|
||||
#define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x090E)
|
||||
#define TSDB_CODE_SYN_NEW_CONFIG_ERROR TAOS_DEF_ERROR_CODE(0, 0x090F)
|
||||
#define TSDB_CODE_SYN_RECONFIG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0910)
|
||||
#define TSDB_CODE_SYN_PROPOSE_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0911)
|
||||
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
|
||||
|
||||
// tq
|
||||
|
|
|
@ -161,7 +161,7 @@ int32_t tsDiskCfgNum = 0;
|
|||
SDiskCfg tsDiskCfg[TFS_MAX_DISKS] = {0};
|
||||
|
||||
// stream scheduler
|
||||
bool tsStreamSchedV = true;
|
||||
bool tsSchedStreamToSnode = true;
|
||||
|
||||
/*
|
||||
* minimum scale for whole system, millisecond by default
|
||||
|
|
|
@ -96,11 +96,11 @@ SArray *smGetMsgHandles() {
|
|||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
_OVER:
|
||||
|
|
|
@ -58,6 +58,7 @@ static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t num
|
|||
if (sndProcessUMsg(pMgmt->pSnode, pMsg) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
smSendRsp(pMsg, 0);
|
||||
|
||||
dTrace("msg:%p, is freed", pMsg);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
|
@ -70,6 +71,7 @@ static void smProcessSharedQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
|
||||
dTrace("msg:%p, get from snode-shared queue", pMsg);
|
||||
if (sndProcessSMsg(pMgmt->pSnode, pMsg) < 0) {
|
||||
smSendRsp(pMsg, terrno);
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
|
|
|
@ -35,7 +35,6 @@ typedef struct SVnodeMgmt {
|
|||
SWWorkerPool syncPool;
|
||||
SWWorkerPool writePool;
|
||||
SWWorkerPool applyPool;
|
||||
SWWorkerPool mergePool;
|
||||
SSingleWorker mgmtWorker;
|
||||
SSingleWorker monitorWorker;
|
||||
SHashObj *hash;
|
||||
|
@ -63,7 +62,6 @@ typedef struct {
|
|||
STaosQueue *pApplyQ;
|
||||
STaosQueue *pQueryQ;
|
||||
STaosQueue *pFetchQ;
|
||||
STaosQueue *pMergeQ;
|
||||
} SVnodeObj;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -86,7 +86,6 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
|
||||
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
|
||||
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
|
||||
while (!taosQueueEmpty(pVnode->pMergeQ)) taosMsleep(10);
|
||||
|
||||
vmFreeQueue(pMgmt, pVnode);
|
||||
vnodeClose(pVnode->pImpl);
|
||||
|
|
|
@ -98,7 +98,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
|
||||
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
SRpcMsg * pMsg = NULL;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
|
||||
|
@ -119,7 +119,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
|
|||
|
||||
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
SRpcMsg * pMsg = NULL;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
|
||||
|
@ -251,10 +251,9 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyMsg);
|
||||
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
||||
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
|
||||
pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue);
|
||||
|
||||
if (pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pApplyQ == NULL || pVnode->pQueryQ == NULL ||
|
||||
pVnode->pFetchQ == NULL || pVnode->pMergeQ == NULL) {
|
||||
pVnode->pFetchQ == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
@ -269,13 +268,11 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
|
||||
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
||||
tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
||||
tWWorkerFreeQueue(&pMgmt->mergePool, pVnode->pMergeQ);
|
||||
pVnode->pWriteQ = NULL;
|
||||
pVnode->pSyncQ = NULL;
|
||||
pVnode->pApplyQ = NULL;
|
||||
pVnode->pQueryQ = NULL;
|
||||
pVnode->pFetchQ = NULL;
|
||||
pVnode->pMergeQ = NULL;
|
||||
dDebug("vgId:%d, queue is freed", pVnode->vgId);
|
||||
}
|
||||
|
||||
|
@ -307,11 +304,6 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
|||
pSPool->max = tsNumOfVnodeSyncThreads;
|
||||
if (tWWorkerInit(pSPool) != 0) return -1;
|
||||
|
||||
SWWorkerPool *pMPool = &pMgmt->mergePool;
|
||||
pMPool->name = "vnode-merge";
|
||||
pMPool->max = tsNumOfVnodeMergeThreads;
|
||||
if (tWWorkerInit(pMPool) != 0) return -1;
|
||||
|
||||
SSingleWorkerCfg mgmtCfg = {
|
||||
.min = 1,
|
||||
.max = 1,
|
||||
|
@ -342,6 +334,5 @@ void vmStopWorker(SVnodeMgmt *pMgmt) {
|
|||
tWWorkerCleanup(&pMgmt->syncPool);
|
||||
tQWorkerCleanup(&pMgmt->queryPool);
|
||||
tQWorkerCleanup(&pMgmt->fetchPool);
|
||||
tWWorkerCleanup(&pMgmt->mergePool);
|
||||
dDebug("vnode workers are closed");
|
||||
}
|
||||
|
|
|
@ -58,21 +58,21 @@ static void *mndBuildTimerMsg(int32_t *pContLen) {
|
|||
|
||||
static void mndPullupTrans(SMnode *pMnode) {
|
||||
int32_t contLen = 0;
|
||||
void * pReq = mndBuildTimerMsg(&contLen);
|
||||
void *pReq = mndBuildTimerMsg(&contLen);
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
}
|
||||
|
||||
static void mndCalMqRebalance(SMnode *pMnode) {
|
||||
int32_t contLen = 0;
|
||||
void * pReq = mndBuildTimerMsg(&contLen);
|
||||
void *pReq = mndBuildTimerMsg(&contLen);
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
|
||||
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
||||
}
|
||||
|
||||
static void mndPullupTelem(SMnode *pMnode) {
|
||||
int32_t contLen = 0;
|
||||
void * pReq = mndBuildTimerMsg(&contLen);
|
||||
void *pReq = mndBuildTimerMsg(&contLen);
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
|
||||
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
||||
}
|
||||
|
@ -97,13 +97,13 @@ static void mndPushTtlTime(SMnode *pMnode) {
|
|||
pHead->vgId = htonl(pVgroup->vgId);
|
||||
|
||||
int32_t t = taosGetTimestampSec();
|
||||
*(int32_t*)(POINTER_SHIFT(pHead, sizeof(SMsgHead))) = htonl(t);
|
||||
*(int32_t *)(POINTER_SHIFT(pHead, sizeof(SMsgHead))) = htonl(t);
|
||||
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen};
|
||||
|
||||
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
|
||||
if(code != 0){
|
||||
if (code != 0) {
|
||||
mError("ttl time seed err. code:%d", code);
|
||||
}
|
||||
mError("ttl time seed succ. time:%d", t);
|
||||
|
@ -416,7 +416,7 @@ void mndStop(SMnode *pMnode) {
|
|||
}
|
||||
|
||||
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||
SMnode * pMnode = pMsg->info.node;
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -433,15 +433,19 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
char logBuf[512] = {0};
|
||||
do {
|
||||
char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
|
||||
snprintf(logBuf, sizeof(logBuf), "==mndProcessSyncMsg== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
|
||||
static int64_t mndTick = 0;
|
||||
if (++mndTick % 10 == 1) {
|
||||
mTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
|
||||
mTrace("vgId:%d, sync heartbeat msg:%s, %s", syncGetVgId(pMgmt->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
|
||||
}
|
||||
if (gRaftDetailLog) {
|
||||
char logBuf[512] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf), "==mndProcessSyncMsg== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
|
||||
syncRpcMsgLog2(logBuf, pMsg);
|
||||
}
|
||||
taosMemoryFree(syncNodeStr);
|
||||
} while (0);
|
||||
|
||||
// ToDo: ugly! use function pointer
|
||||
if (syncNodeSnapshotEnable(pSyncNode)) {
|
||||
|
@ -580,7 +584,7 @@ static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
|
||||
SMnode * pMnode = pMsg->info.node;
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
|
||||
if (fp == NULL) {
|
||||
mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
||||
|
@ -633,7 +637,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
|
|||
SMonGrantInfo *pGrantInfo) {
|
||||
if (mndAcquireRpcRef(pMnode) != 0) return -1;
|
||||
|
||||
SSdb * pSdb = pMnode->pSdb;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int64_t ms = taosGetTimestampMs();
|
||||
|
||||
pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
|
||||
|
@ -709,7 +713,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
|
|||
pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
|
||||
tstrncpy(desc.status, "unsynced", sizeof(desc.status));
|
||||
for (int32_t i = 0; i < pVgroup->replica; ++i) {
|
||||
SVnodeGid * pVgid = &pVgroup->vnodeGid[i];
|
||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
|
||||
SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
|
||||
pVnDesc->dnode_id = pVgid->dnodeId;
|
||||
tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role));
|
||||
|
|
|
@ -33,7 +33,7 @@
|
|||
#include "tname.h"
|
||||
#include "tuuid.h"
|
||||
|
||||
extern bool tsStreamSchedV;
|
||||
extern bool tsSchedStreamToSnode;
|
||||
|
||||
static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) {
|
||||
int32_t childId = taosArrayGetSize(pArray);
|
||||
|
@ -204,9 +204,11 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS
|
|||
return 0;
|
||||
}
|
||||
|
||||
SSnodeObj* mndSchedFetchSnode(SMnode* pMnode) {
|
||||
SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
|
||||
SSnodeObj* pObj = NULL;
|
||||
pObj = sdbFetch(pMnode->pSdb, SDB_SNODE, NULL, (void**)&pObj);
|
||||
void* pIter = NULL;
|
||||
// TODO random fetch
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj);
|
||||
return pObj;
|
||||
}
|
||||
|
||||
|
@ -214,7 +216,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask,
|
|||
const SSnodeObj* pSnode) {
|
||||
int32_t msgLen;
|
||||
|
||||
pTask->nodeId = 0;
|
||||
pTask->nodeId = SNODE_HANDLE;
|
||||
pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode);
|
||||
|
||||
plan->execNode.nodeId = 0;
|
||||
|
@ -224,7 +226,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask,
|
|||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
}
|
||||
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, 0);
|
||||
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, SNODE_HANDLE);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -370,8 +372,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
}
|
||||
|
||||
if (totLevel > 1) {
|
||||
SStreamTask* pFinalTask;
|
||||
// inner plan
|
||||
SStreamTask* pInnerTask;
|
||||
// inner level
|
||||
{
|
||||
SArray* taskInnerLevel = taosArrayInit(0, sizeof(void*));
|
||||
taosArrayPush(pStream->tasks, &taskInnerLevel);
|
||||
|
@ -380,31 +382,51 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
|
||||
ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE);
|
||||
|
||||
pFinalTask = tNewSStreamTask(pStream->uid);
|
||||
mndAddTaskToTaskSet(taskInnerLevel, pFinalTask);
|
||||
pInnerTask = tNewSStreamTask(pStream->uid);
|
||||
mndAddTaskToTaskSet(taskInnerLevel, pInnerTask);
|
||||
// input
|
||||
pFinalTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
|
||||
pInnerTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
|
||||
|
||||
// trigger
|
||||
pFinalTask->triggerParam = pStream->triggerParam;
|
||||
pInnerTask->triggerParam = pStream->triggerParam;
|
||||
|
||||
// dispatch
|
||||
if (mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask) < 0) {
|
||||
if (mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pInnerTask) < 0) {
|
||||
qDestroyQueryPlan(pPlan);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// exec
|
||||
pFinalTask->execType = TASK_EXEC__PIPE;
|
||||
pInnerTask->execType = TASK_EXEC__PIPE;
|
||||
|
||||
if (tsSchedStreamToSnode) {
|
||||
SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode);
|
||||
if (pSnode == NULL) {
|
||||
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
|
||||
if (mndAssignTaskToVg(pMnode, pTrans, pFinalTask, plan, pVgroup) < 0) {
|
||||
if (mndAssignTaskToVg(pMnode, pTrans, pInnerTask, plan, pVgroup) < 0) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
qDestroyQueryPlan(pPlan);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
if (mndAssignTaskToSnode(pMnode, pTrans, pInnerTask, plan, pSnode) < 0) {
|
||||
ASSERT(0);
|
||||
sdbRelease(pSdb, pSnode);
|
||||
qDestroyQueryPlan(pPlan);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
|
||||
if (mndAssignTaskToVg(pMnode, pTrans, pInnerTask, plan, pVgroup) < 0) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
qDestroyQueryPlan(pPlan);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// source plan
|
||||
// source level
|
||||
SArray* taskSourceLevel = taosArrayInit(0, sizeof(void*));
|
||||
taosArrayPush(pStream->tasks, &taskSourceLevel);
|
||||
|
||||
|
@ -434,9 +456,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
||||
pTask->dispatchType = TASK_DISPATCH__FIXED;
|
||||
|
||||
pTask->fixedEpDispatcher.taskId = pFinalTask->taskId;
|
||||
pTask->fixedEpDispatcher.nodeId = pFinalTask->nodeId;
|
||||
pTask->fixedEpDispatcher.epSet = pFinalTask->epSet;
|
||||
pTask->fixedEpDispatcher.taskId = pInnerTask->taskId;
|
||||
pTask->fixedEpDispatcher.nodeId = pInnerTask->nodeId;
|
||||
pTask->fixedEpDispatcher.epSet = pInnerTask->epSet;
|
||||
|
||||
// exec
|
||||
pTask->execType = TASK_EXEC__PIPE;
|
||||
|
|
|
@ -896,7 +896,7 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
|
|||
pAction->rawWritten = 0;
|
||||
pAction->msgSent = 0;
|
||||
pAction->msgReceived = 0;
|
||||
if (pAction->errCode == TSDB_CODE_RPC_REDIRECT || pAction->errCode == TSDB_CODE_SYN_NOT_IN_NEW_CONFIG ||
|
||||
if (pAction->errCode == TSDB_CODE_RPC_REDIRECT || pAction->errCode == TSDB_CODE_SYN_NEW_CONFIG_ERROR ||
|
||||
pAction->errCode == TSDB_CODE_SYN_INTERNAL_ERROR || pAction->errCode == TSDB_CODE_SYN_NOT_LEADER) {
|
||||
pAction->epSet.inUse = (pAction->epSet.inUse + 1) % pAction->epSet.numOfEps;
|
||||
mDebug("trans:%d, %s:%d execute status is reset and set epset inuse:%d", pTrans->id, mndTransStr(pAction->stage),
|
||||
|
|
|
@ -78,8 +78,8 @@ int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) {
|
|||
|
||||
static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) {
|
||||
SStreamMeta *pMeta = pNode->pMeta;
|
||||
char *msg = pMsg->pCont;
|
||||
int32_t msgLen = pMsg->contLen;
|
||||
char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
||||
SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||
if (pTask == NULL) {
|
||||
|
@ -105,23 +105,22 @@ static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) {
|
|||
|
||||
ASSERT(pTask->execType != TASK_EXEC__NONE);
|
||||
|
||||
SReadHandle handle = {
|
||||
.pMsgCb = &pNode->msgCb,
|
||||
};
|
||||
|
||||
/*pTask->exec.inputHandle = NULL;*/
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||
ASSERT(pTask->dataScan == 0);
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
|
||||
ASSERT(pTask->exec.executor);
|
||||
|
||||
streamSetupTrigger(pTask);
|
||||
|
||||
qInfo("deploy stream: stream id %ld task id %d child id %d on snode", pTask->streamId, pTask->taskId, pTask->childId);
|
||||
|
||||
taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void *));
|
||||
|
||||
return 0;
|
||||
|
||||
FAIL:
|
||||
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
|
||||
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
|
||||
if (pTask) taosMemoryFree(pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -130,7 +129,7 @@ static int32_t sndProcessTaskRunReq(SSnode *pNode, SRpcMsg *pMsg) {
|
|||
SStreamTaskRunReq *pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
streamTaskProcessRunReq(pTask, &pNode->msgCb);
|
||||
streamProcessRunReq(pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -151,7 +150,7 @@ static int32_t sndProcessTaskDispatchReq(SSnode *pNode, SRpcMsg *pMsg) {
|
|||
.info = pMsg->info,
|
||||
.code = 0,
|
||||
};
|
||||
streamProcessDispatchReq(pTask, &pNode->msgCb, &req, &rsp);
|
||||
streamProcessDispatchReq(pTask, &req, &rsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -161,7 +160,7 @@ static int32_t sndProcessTaskRecoverReq(SSnode *pNode, SRpcMsg *pMsg) {
|
|||
SStreamTaskRecoverReq *pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
streamProcessRecoverReq(pTask, &pNode->msgCb, pReq, pMsg);
|
||||
streamProcessRecoverReq(pTask, pReq, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -171,7 +170,7 @@ static int32_t sndProcessTaskDispatchRsp(SSnode *pNode, SRpcMsg *pMsg) {
|
|||
SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t taskId = pRsp->taskId;
|
||||
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
streamProcessDispatchRsp(pTask, &pNode->msgCb, pRsp);
|
||||
streamProcessDispatchRsp(pTask, pRsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -87,7 +87,7 @@ int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* p
|
|||
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
|
||||
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
|
||||
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids);
|
||||
int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids);
|
||||
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
|
||||
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
|
||||
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
|
||||
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
|
||||
|
@ -106,7 +106,7 @@ int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader);
|
|||
int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nData);
|
||||
void* metaGetIdx(SMeta* pMeta);
|
||||
void* metaGetIvtIdx(SMeta* pMeta);
|
||||
int metaTtlSmaller(SMeta *pMeta, uint64_t time, SArray *uidList);
|
||||
int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList);
|
||||
|
||||
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
|
||||
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
|
||||
|
@ -141,7 +141,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
|
|||
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
|
||||
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data);
|
||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
|
@ -261,7 +261,7 @@ struct SSma {
|
|||
|
||||
#define SMA_CFG(s) (&(s)->pVnode->config)
|
||||
#define SMA_TSDB_CFG(s) (&(s)->pVnode->config.tsdbCfg)
|
||||
#define SMA_RETENTION(s) ((SRetention *)&(s)->pVnode->config.tsdbCfg.retentions)
|
||||
#define SMA_RETENTION(s) ((SRetention*)&(s)->pVnode->config.tsdbCfg.retentions)
|
||||
#define SMA_LOCKED(s) ((s)->locked)
|
||||
#define SMA_META(s) ((s)->pVnode->pMeta)
|
||||
#define SMA_VID(s) TD_VID((s)->pVnode)
|
||||
|
|
|
@ -333,7 +333,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
SReadHandle handle = {
|
||||
.reader = pHandle->execHandle.pExecReader[i],
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.pMsgCb = &pTq->pVnode->msgCb,
|
||||
};
|
||||
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
|
||||
ASSERT(pHandle->execHandle.execCol.task[i]);
|
||||
|
@ -373,7 +372,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
|
@ -404,7 +403,6 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
SReadHandle handle = {
|
||||
.reader = pStreamReader,
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.pMsgCb = &pTq->pVnode->msgCb,
|
||||
.vnode = pTq->pVnode,
|
||||
};
|
||||
/*pTask->exec.inputHandle = pStreamReader;*/
|
||||
|
@ -468,7 +466,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (streamLaunchByWrite(pTask, TD_VID(pTq->pVnode), &pTq->pVnode->msgCb) < 0) {
|
||||
if (streamLaunchByWrite(pTask, TD_VID(pTq->pVnode)) < 0) {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
|
@ -489,7 +487,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SStreamTaskRunReq* pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
streamTaskProcessRunReq(pTask, &pTq->pVnode->msgCb);
|
||||
streamProcessRunReq(pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -507,7 +505,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
.info = pMsg->info,
|
||||
.code = 0,
|
||||
};
|
||||
streamProcessDispatchReq(pTask, &pTq->pVnode->msgCb, &req, &rsp);
|
||||
streamProcessDispatchReq(pTask, &req, &rsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -515,7 +513,7 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SStreamTaskRecoverReq* pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
streamProcessRecoverReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg);
|
||||
streamProcessRecoverReq(pTask, pReq, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -523,7 +521,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t taskId = pRsp->taskId;
|
||||
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
streamProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp);
|
||||
streamProcessDispatchRsp(pTask, pRsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -167,7 +167,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
}
|
||||
break;
|
||||
case TDMT_STREAM_TASK_DEPLOY: {
|
||||
if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||
if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
@ -304,18 +304,17 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
|
|||
pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp){
|
||||
|
||||
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
|
||||
if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
int32_t t = ntohl(*(int32_t*)pReq);
|
||||
int32_t t = ntohl(*(int32_t *)pReq);
|
||||
vError("rec ttl time:%d", t);
|
||||
int32_t ret = metaTtlDropTable(pVnode->pMeta, t, tbUids);
|
||||
if(ret != 0){
|
||||
if (ret != 0) {
|
||||
goto end;
|
||||
}
|
||||
if(taosArrayGetSize(tbUids) > 0){
|
||||
if (taosArrayGetSize(tbUids) > 0) {
|
||||
tqUpdateTbUidList(pVnode->pTq, tbUids, false);
|
||||
}
|
||||
|
||||
|
|
|
@ -119,7 +119,7 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnode * pVnode = pInfo->ahandle;
|
||||
SVnode *pVnode = pInfo->ahandle;
|
||||
int32_t vgId = pVnode->config.vgId;
|
||||
int32_t code = 0;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
|
@ -174,7 +174,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
}
|
||||
|
||||
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnode * pVnode = pInfo->ahandle;
|
||||
SVnode *pVnode = pInfo->ahandle;
|
||||
int32_t vgId = pVnode->config.vgId;
|
||||
int32_t code = 0;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
|
@ -211,21 +211,23 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
|
||||
assert(pSyncNode != NULL);
|
||||
|
||||
ESyncState state = syncGetMyRole(pVnode->sync);
|
||||
SyncTerm currentTerm = syncGetMyTerm(pVnode->sync);
|
||||
|
||||
SMsgHead *pHead = pMsg->pCont;
|
||||
STraceId *trace = &pMsg->info.traceId;
|
||||
|
||||
char logBuf[512] = {0};
|
||||
do {
|
||||
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
|
||||
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
|
||||
static int64_t vndTick = 0;
|
||||
STraceId * trace = &pMsg->info.traceId;
|
||||
if (++vndTick % 10 == 1) {
|
||||
vGTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
|
||||
vGTrace("vgId:%d, sync heartbeat msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
|
||||
}
|
||||
if (gRaftDetailLog) {
|
||||
char logBuf[512] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType,
|
||||
syncNodeStr);
|
||||
syncRpcMsgLog2(logBuf, pMsg);
|
||||
}
|
||||
taosMemoryFree(syncNodeStr);
|
||||
} while (0);
|
||||
|
||||
SRpcMsg *pRpcMsg = pMsg;
|
||||
|
||||
|
@ -348,7 +350,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
|
|||
}
|
||||
|
||||
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||
SVnode * pVnode = pFsm->data;
|
||||
SVnode *pVnode = pFsm->data;
|
||||
SSnapshot snapshot = {0};
|
||||
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
||||
char logBuf[256] = {0};
|
||||
|
|
|
@ -506,7 +506,8 @@ typedef struct SPartitionOperatorInfo {
|
|||
SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file
|
||||
int32_t rowCapacity; // maximum number of rows for each buffer page
|
||||
int32_t* columnOffset; // start position for each column data
|
||||
void* pGroupIter; // group iterator
|
||||
SArray* sortedGroupArray; // SDataGroupInfo sorted by group id
|
||||
int32_t groupIndex; // group index
|
||||
int32_t pageIndex; // page index of current group
|
||||
SSDataBlock* pUpdateRes;
|
||||
SExprSupp scalarSup;
|
||||
|
|
|
@ -4052,14 +4052,19 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||
STimeWindowAggSupp twSup = {
|
||||
.waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN};
|
||||
.waterMark = pTableScanNode->watermark,
|
||||
.calTrigger = pTableScanNode->triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
};
|
||||
tsdbReaderT pDataReader = NULL;
|
||||
|
||||
if (pHandle) {
|
||||
if (pHandle->vnode) {
|
||||
// for stram
|
||||
pDataReader =
|
||||
doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
|
||||
} else {
|
||||
// for tq
|
||||
getTableList(pHandle->meta, pScanPhyNode, pTableListInfo, pTagCond);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -586,24 +586,30 @@ static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
|
|||
while( (ite = taosHashIterate(pInfo->pGroupSet, ite)) != NULL ) {
|
||||
taosArrayDestroy( ((SDataGroupInfo *)ite)->pPageList);
|
||||
}
|
||||
taosHashClear(pInfo->pGroupSet);
|
||||
taosArrayClear(pInfo->sortedGroupArray);
|
||||
clearDiskbasedBuf(pInfo->pBuf);
|
||||
}
|
||||
|
||||
static int compareDataGroupInfo(const void* group1, const void* group2) {
|
||||
const SDataGroupInfo* pGroupInfo1 = group1;
|
||||
const SDataGroupInfo* pGroupInfo2 = group2;
|
||||
return pGroupInfo1->groupId - pGroupInfo2->groupId;
|
||||
}
|
||||
|
||||
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
|
||||
SPartitionOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
SDataGroupInfo* pGroupInfo = pInfo->pGroupIter;
|
||||
if (pInfo->pGroupIter == NULL || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
|
||||
SDataGroupInfo* pGroupInfo = (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
|
||||
if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
|
||||
// try next group data
|
||||
pInfo->pGroupIter = taosHashIterate(pInfo->pGroupSet, pInfo->pGroupIter);
|
||||
if (pInfo->pGroupIter == NULL) {
|
||||
++pInfo->groupIndex;
|
||||
if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
clearPartitionOperator(pInfo);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pGroupInfo = pInfo->pGroupIter;
|
||||
pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex);
|
||||
pInfo->pageIndex = 0;
|
||||
}
|
||||
|
||||
|
@ -657,6 +663,20 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
|
|||
doHashPartition(pOperator, pBlock);
|
||||
}
|
||||
|
||||
SArray* groupArray = taosArrayInit(taosHashGetSize(pInfo->pGroupSet), sizeof(SDataGroupInfo));
|
||||
void* pGroupIter = NULL;
|
||||
pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
|
||||
while (pGroupIter != NULL) {
|
||||
SDataGroupInfo* pGroupInfo = pGroupIter;
|
||||
taosArrayPush(groupArray, pGroupInfo);
|
||||
pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
|
||||
}
|
||||
|
||||
taosArraySort(groupArray, compareDataGroupInfo);
|
||||
pInfo->sortedGroupArray = groupArray;
|
||||
pInfo->groupIndex = -1;
|
||||
taosHashClear(pInfo->pGroupSet);
|
||||
|
||||
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
|
@ -676,6 +696,7 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
|
||||
taosArrayDestroy(pInfo->pGroupColVals);
|
||||
taosMemoryFree(pInfo->keyBuf);
|
||||
taosArrayDestroy(pInfo->sortedGroupArray);
|
||||
taosHashCleanup(pInfo->pGroupSet);
|
||||
taosMemoryFree(pInfo->columnOffset);
|
||||
|
||||
|
|
|
@ -64,7 +64,7 @@ void streamTriggerByTimer(void* param, void* tmrId) {
|
|||
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE);
|
||||
|
||||
streamTaskInput(pTask, (SStreamQueueItem*)trigger);
|
||||
streamLaunchByWrite(pTask, pTask->nodeId, pTask->pMsgCb);
|
||||
streamLaunchByWrite(pTask, pTask->nodeId);
|
||||
}
|
||||
|
||||
taosTmrReset(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
|
||||
|
@ -81,7 +81,7 @@ int32_t streamSetupTrigger(SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) {
|
||||
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) {
|
||||
int8_t execStatus = atomic_load_8(&pTask->status);
|
||||
if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) {
|
||||
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
||||
|
@ -96,7 +96,7 @@ int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) {
|
|||
.pCont = pRunReq,
|
||||
.contLen = sizeof(SStreamTaskRunReq),
|
||||
};
|
||||
tmsgPutToQueue(pMsgCb, FETCH_QUEUE, &msg);
|
||||
tmsgPutToQueue(pTask->pMsgCb, FETCH_QUEUE, &msg);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -136,7 +136,9 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
|
|||
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
||||
}
|
||||
|
||||
int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
|
||||
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
|
||||
qInfo("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId, pReq->sourceTaskId);
|
||||
|
||||
// 1. handle input
|
||||
streamTaskEnqueue(pTask, pReq, pRsp);
|
||||
|
||||
|
@ -145,7 +147,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp
|
|||
// 2.2. executing: return
|
||||
// 2.3. closing: keep trying
|
||||
if (pTask->execType != TASK_EXEC__NONE) {
|
||||
streamExec(pTask, pMsgCb);
|
||||
streamExec(pTask, pTask->pMsgCb);
|
||||
} else {
|
||||
ASSERT(pTask->sinkType != TASK_SINK__NONE);
|
||||
while (1) {
|
||||
|
@ -161,34 +163,38 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp
|
|||
// 3.1 check and set status
|
||||
// 3.2 dispatch / sink
|
||||
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
|
||||
streamDispatch(pTask, pMsgCb);
|
||||
streamDispatch(pTask, pTask->pMsgCb);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) {
|
||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
|
||||
ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
|
||||
|
||||
qInfo("task %d receive dispatch rsp", pTask->taskId);
|
||||
|
||||
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
|
||||
ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
|
||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||
// TODO: init recover timer
|
||||
ASSERT(0);
|
||||
return 0;
|
||||
}
|
||||
// continue dispatch
|
||||
streamDispatch(pTask, pMsgCb);
|
||||
streamDispatch(pTask, pTask->pMsgCb);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||
streamExec(pTask, pMsgCb);
|
||||
int32_t streamProcessRunReq(SStreamTask* pTask) {
|
||||
streamExec(pTask, pTask->pMsgCb);
|
||||
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
|
||||
streamDispatch(pTask, pMsgCb);
|
||||
streamDispatch(pTask, pTask->pMsgCb);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg) {
|
||||
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg) {
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -144,7 +144,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
|
|||
}
|
||||
}
|
||||
|
||||
ASSERT(vgId != 0);
|
||||
ASSERT(vgId > 0 || vgId == SNODE_HANDLE);
|
||||
req.taskId = downstreamTaskId;
|
||||
|
||||
qInfo("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->childId,
|
||||
|
@ -199,6 +199,8 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
|||
}
|
||||
ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK);
|
||||
|
||||
qInfo("stream continue dispatching: task %d", pTask->taskId);
|
||||
|
||||
SRpcMsg dispatchMsg = {0};
|
||||
SEpSet* pEpSet = NULL;
|
||||
if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) {
|
||||
|
|
|
@ -107,18 +107,19 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
|||
pRes = streamExecForQall(pTask, pRes);
|
||||
if (pRes == NULL) goto FAIL;
|
||||
|
||||
break;
|
||||
taosArrayDestroy(pRes);
|
||||
atomic_store_8(&pTask->status, TASK_STATUS__IDLE);
|
||||
return 0;
|
||||
} else if (execStatus == TASK_STATUS__CLOSING) {
|
||||
continue;
|
||||
} else if (execStatus == TASK_STATUS__EXECUTING) {
|
||||
break;
|
||||
ASSERT(taosArrayGetSize(pRes) == 0);
|
||||
taosArrayDestroy(pRes);
|
||||
return 0;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
if (pRes) taosArrayDestroy(pRes);
|
||||
atomic_store_8(&pTask->status, TASK_STATUS__IDLE);
|
||||
return 0;
|
||||
FAIL:
|
||||
if (pRes) taosArrayDestroy(pRes);
|
||||
atomic_store_8(&pTask->status, TASK_STATUS__IDLE);
|
||||
|
|
|
@ -159,7 +159,8 @@ typedef struct SSyncNode {
|
|||
SSyncSnapshotSender* senders[TSDB_MAX_REPLICA];
|
||||
SSyncSnapshotReceiver* pNewNodeReceiver;
|
||||
|
||||
// SSnapshotMeta sMeta;
|
||||
// is config changing
|
||||
bool changing;
|
||||
|
||||
} SSyncNode;
|
||||
|
||||
|
@ -198,7 +199,7 @@ char* syncNode2Str(const SSyncNode* pSyncNode);
|
|||
void syncNodeEventLog(const SSyncNode* pSyncNode, char* str);
|
||||
char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
|
||||
bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
|
||||
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop);
|
||||
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
|
||||
|
||||
SSyncNode* syncNodeAcquire(int64_t rid);
|
||||
void syncNodeRelease(SSyncNode* pNode);
|
||||
|
@ -244,6 +245,9 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, str
|
|||
void syncStartNormal(int64_t rid);
|
||||
void syncStartStandBy(int64_t rid);
|
||||
|
||||
bool syncNodeCanChange(SSyncNode* pSyncNode);
|
||||
bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg);
|
||||
|
||||
// for debug --------------
|
||||
void syncNodePrint(SSyncNode* pObj);
|
||||
void syncNodePrint2(char* s, SSyncNode* pObj);
|
||||
|
|
|
@ -49,14 +49,14 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg);
|
|||
int32_t raftCfgPersist(SRaftCfg *pRaftCfg);
|
||||
int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex);
|
||||
|
||||
cJSON *syncCfg2Json(SSyncCfg *pSyncCfg);
|
||||
char *syncCfg2Str(SSyncCfg *pSyncCfg);
|
||||
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg);
|
||||
cJSON * syncCfg2Json(SSyncCfg *pSyncCfg);
|
||||
char * syncCfg2Str(SSyncCfg *pSyncCfg);
|
||||
char * syncCfg2SimpleStr(SSyncCfg *pSyncCfg);
|
||||
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg);
|
||||
int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg);
|
||||
|
||||
cJSON *raftCfg2Json(SRaftCfg *pRaftCfg);
|
||||
char *raftCfg2Str(SRaftCfg *pRaftCfg);
|
||||
cJSON * raftCfg2Json(SRaftCfg *pRaftCfg);
|
||||
char * raftCfg2Str(SRaftCfg *pRaftCfg);
|
||||
int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg);
|
||||
int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg);
|
||||
|
||||
|
|
|
@ -70,6 +70,7 @@ typedef struct SSyncSnapshotReceiver {
|
|||
void *pWriter;
|
||||
SyncTerm term;
|
||||
SyncTerm privateTerm;
|
||||
SSnapshot snapshot;
|
||||
|
||||
SSyncNode *pSyncNode;
|
||||
SRaftId fromId;
|
||||
|
@ -78,7 +79,7 @@ typedef struct SSyncSnapshotReceiver {
|
|||
|
||||
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId);
|
||||
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
|
||||
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId);
|
||||
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg);
|
||||
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
|
||||
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply);
|
||||
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
|
||||
|
|
|
@ -178,6 +178,23 @@ int32_t syncSetStandby(int64_t rid) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg) {
|
||||
bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg);
|
||||
if (!IamInNew) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pNewCfg->replicaNum > pSyncNode->replicaNum + 1) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pNewCfg->replicaNum < pSyncNode->replicaNum - 1) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
|
@ -185,13 +202,12 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
|
|||
return -1;
|
||||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
|
||||
int32_t ret = 0;
|
||||
bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg);
|
||||
|
||||
if (!IamInNew) {
|
||||
if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
terrno = TSDB_CODE_SYN_NOT_IN_NEW_CONFIG;
|
||||
terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
|
||||
sError("syncNodeCheckNewConfig error");
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -215,12 +231,10 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
|
|||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
|
||||
bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg);
|
||||
|
||||
if (!IamInNew) {
|
||||
sError("sync reconfig error, not in new config");
|
||||
if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
terrno = TSDB_CODE_SYN_NOT_IN_NEW_CONFIG;
|
||||
terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
|
||||
sError("syncNodeCheckNewConfig error");
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -425,18 +439,6 @@ const char* syncGetMyRoleStr(int64_t rid) {
|
|||
return s;
|
||||
}
|
||||
|
||||
int32_t syncGetVgId(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
return TAOS_SYNC_STATE_ERROR;
|
||||
}
|
||||
assert(rid == pSyncNode->rid);
|
||||
int32_t vgId = pSyncNode->vgId;
|
||||
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
return vgId;
|
||||
}
|
||||
|
||||
SyncTerm syncGetMyTerm(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
|
@ -449,6 +451,18 @@ SyncTerm syncGetMyTerm(int64_t rid) {
|
|||
return term;
|
||||
}
|
||||
|
||||
SyncGroupId syncGetVgId(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
return TAOS_SYNC_STATE_ERROR;
|
||||
}
|
||||
assert(rid == pSyncNode->rid);
|
||||
SyncGroupId vgId = pSyncNode->vgId;
|
||||
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
return vgId;
|
||||
}
|
||||
|
||||
void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
|
@ -589,6 +603,26 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak)
|
|||
syncNodeEventLog(pSyncNode, eventLog);
|
||||
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||
if (pSyncNode->changing && pMsg->msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH) {
|
||||
ret = -1;
|
||||
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
|
||||
sError("sync propose not ready, type:%s,%d", TMSG_INFO(pMsg->msgType), pMsg->msgType);
|
||||
goto _END;
|
||||
}
|
||||
|
||||
// config change
|
||||
if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
|
||||
if (!syncNodeCanChange(pSyncNode)) {
|
||||
ret = -1;
|
||||
terrno = TSDB_CODE_SYN_RECONFIG_NOT_READY;
|
||||
sError("sync reconfig not ready, type:%s,%d", TMSG_INFO(pMsg->msgType), pMsg->msgType);
|
||||
goto _END;
|
||||
}
|
||||
|
||||
ASSERT(!pSyncNode->changing);
|
||||
pSyncNode->changing = true;
|
||||
}
|
||||
|
||||
SRespStub stub;
|
||||
stub.createTime = taosGetTimestampMs();
|
||||
stub.rpcMsg = *pMsg;
|
||||
|
@ -606,12 +640,16 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak)
|
|||
sError("syncPropose pSyncNode->FpEqMsg is NULL");
|
||||
}
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
goto _END;
|
||||
|
||||
} else {
|
||||
ret = -1;
|
||||
terrno = TSDB_CODE_SYN_NOT_LEADER;
|
||||
sError("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state));
|
||||
goto _END;
|
||||
}
|
||||
|
||||
_END:
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -825,6 +863,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
|
|||
// snapshot receivers
|
||||
pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
|
||||
|
||||
// is config changing
|
||||
pSyncNode->changing = false;
|
||||
|
||||
// start in syncNodeStart
|
||||
// start raft
|
||||
// syncNodeBecomeFollower(pSyncNode);
|
||||
|
@ -1253,20 +1294,32 @@ char* syncNode2Str(const SSyncNode* pSyncNode) {
|
|||
|
||||
void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
|
||||
int32_t userStrLen = strlen(str);
|
||||
|
||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
||||
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||
}
|
||||
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
||||
|
||||
if (userStrLen < 256) {
|
||||
char logBuf[128 + 256];
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"vgId:%d %s term:%lu commit:%ld standby:%d replica-num:%d lconfig:%ld sync event %s", pSyncNode->vgId,
|
||||
syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex,
|
||||
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, str);
|
||||
"vgId:%d, sync %s %s, term:%lu, commit:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, replica-num:%d, "
|
||||
"lconfig:%ld, changing:%d",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
|
||||
pSyncNode->commitIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->pRaftCfg->isStandBy,
|
||||
pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing);
|
||||
sDebug("%s", logBuf);
|
||||
|
||||
} else {
|
||||
int len = 128 + userStrLen;
|
||||
char* s = (char*)taosMemoryMalloc(len);
|
||||
snprintf(s, len, "vgId:%d %s term:%lu commit:%ld standby:%d replica-num:%d lconfig:%ld sync event %s",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
|
||||
pSyncNode->commitIndex, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
|
||||
pSyncNode->pRaftCfg->lastConfigIndex, str);
|
||||
snprintf(s, len,
|
||||
"vgId:%d, sync %s %s, term:%lu, commit:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, replica-num:%d, "
|
||||
"lconfig:%ld, changing:%d",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
|
||||
pSyncNode->commitIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->pRaftCfg->isStandBy,
|
||||
pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing);
|
||||
sDebug("%s", s);
|
||||
taosMemoryFree(s);
|
||||
}
|
||||
|
@ -1313,11 +1366,41 @@ bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
|
|||
return b1;
|
||||
}
|
||||
|
||||
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex, bool* isDrop) {
|
||||
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
|
||||
SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
|
||||
pSyncNode->pRaftCfg->cfg = *pNewConfig;
|
||||
pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;
|
||||
|
||||
bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
|
||||
bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
|
||||
|
||||
bool isDrop = false;
|
||||
bool isAdd = false;
|
||||
|
||||
if (IamInOld && !IamInNew) {
|
||||
isDrop = true;
|
||||
} else {
|
||||
isDrop = false;
|
||||
}
|
||||
|
||||
if (!IamInOld && IamInNew) {
|
||||
isAdd = true;
|
||||
} else {
|
||||
isAdd = false;
|
||||
}
|
||||
|
||||
if (IamInNew) {
|
||||
pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal
|
||||
}
|
||||
if (isDrop) {
|
||||
pSyncNode->pRaftCfg->isStandBy = 1; // set standby
|
||||
}
|
||||
|
||||
// persist last config index
|
||||
raftCfgAddConfigIndex(pSyncNode->pRaftCfg, lastConfigChangeIndex);
|
||||
|
||||
if (IamInNew) {
|
||||
//-----------------------------------------
|
||||
int32_t ret = 0;
|
||||
|
||||
// save snapshot senders
|
||||
|
@ -1397,8 +1480,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
|
|||
|
||||
do {
|
||||
char eventLog[256];
|
||||
snprintf(eventLog, sizeof(eventLog), "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d",
|
||||
oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset);
|
||||
snprintf(eventLog, sizeof(eventLog),
|
||||
"snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex, i, host,
|
||||
port, (pSyncNode->senders)[i], reset);
|
||||
syncNodeEventLog(pSyncNode, eventLog);
|
||||
} while (0);
|
||||
}
|
||||
|
@ -1431,28 +1515,39 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
|
|||
}
|
||||
}
|
||||
|
||||
bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
|
||||
bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
|
||||
|
||||
*isDrop = true;
|
||||
if (IamInOld && !IamInNew) {
|
||||
*isDrop = true;
|
||||
} else {
|
||||
*isDrop = false;
|
||||
}
|
||||
|
||||
// may be add me to a new raft group
|
||||
if (IamInOld && IamInNew && oldConfig.replicaNum == 1) {
|
||||
}
|
||||
|
||||
if (IamInNew) {
|
||||
pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal
|
||||
}
|
||||
// persist cfg
|
||||
raftCfgPersist(pSyncNode->pRaftCfg);
|
||||
|
||||
if (gRaftDetailLog) {
|
||||
syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode);
|
||||
char tmpbuf[512];
|
||||
char* oldStr = syncCfg2SimpleStr(&oldConfig);
|
||||
char* newStr = syncCfg2SimpleStr(pNewConfig);
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%ld, %s --> %s", oldConfig.replicaNum,
|
||||
pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr);
|
||||
taosMemoryFree(oldStr);
|
||||
taosMemoryFree(newStr);
|
||||
|
||||
// change isStandBy to normal (election timeout)
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||
syncNodeBecomeLeader(pSyncNode, tmpbuf);
|
||||
} else {
|
||||
syncNodeBecomeFollower(pSyncNode, tmpbuf);
|
||||
}
|
||||
} else {
|
||||
// persist cfg
|
||||
raftCfgPersist(pSyncNode->pRaftCfg);
|
||||
|
||||
char tmpbuf[512];
|
||||
char* oldStr = syncCfg2SimpleStr(&oldConfig);
|
||||
char* newStr = syncCfg2SimpleStr(pNewConfig);
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "do not config change from %d to %d, index:%ld, %s --> %s", oldConfig.replicaNum,
|
||||
pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr);
|
||||
taosMemoryFree(oldStr);
|
||||
taosMemoryFree(newStr);
|
||||
syncNodeEventLog(pSyncNode, tmpbuf);
|
||||
}
|
||||
|
||||
_END:
|
||||
return;
|
||||
}
|
||||
|
||||
SSyncNode* syncNodeAcquire(int64_t rid) {
|
||||
|
@ -2170,9 +2265,54 @@ int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
|
||||
static int32_t syncNodeConfigChangeFinish(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
|
||||
SyncReconfigFinish* pFinish = syncReconfigFinishFromRpcMsg2(pRpcMsg);
|
||||
ASSERT(pFinish);
|
||||
|
||||
if (ths->pFsm->FpReConfigCb != NULL) {
|
||||
SReConfigCbMeta cbMeta = {0};
|
||||
cbMeta.code = 0;
|
||||
cbMeta.index = pEntry->index;
|
||||
cbMeta.term = pEntry->term;
|
||||
cbMeta.seqNum = pEntry->seqNum;
|
||||
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index);
|
||||
cbMeta.state = ths->state;
|
||||
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
|
||||
cbMeta.isWeak = pEntry->isWeak;
|
||||
cbMeta.flag = 0;
|
||||
|
||||
cbMeta.oldCfg = pFinish->oldCfg;
|
||||
cbMeta.newCfg = pFinish->newCfg;
|
||||
cbMeta.newCfgIndex = pFinish->newCfgIndex;
|
||||
cbMeta.newCfgTerm = pFinish->newCfgTerm;
|
||||
cbMeta.newCfgSeqNum = pFinish->newCfgSeqNum;
|
||||
|
||||
ths->pFsm->FpReConfigCb(ths->pFsm, pRpcMsg, cbMeta);
|
||||
}
|
||||
|
||||
// update changing
|
||||
ths->changing = false;
|
||||
|
||||
char tmpbuf[512];
|
||||
char* oldStr = syncCfg2SimpleStr(&(pFinish->oldCfg));
|
||||
char* newStr = syncCfg2SimpleStr(&(pFinish->newCfg));
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "config change finish from %d to %d, index:%ld, %s --> %s",
|
||||
pFinish->oldCfg.replicaNum, pFinish->newCfg.replicaNum, pFinish->newCfgIndex, oldStr, newStr);
|
||||
taosMemoryFree(oldStr);
|
||||
taosMemoryFree(newStr);
|
||||
syncNodeEventLog(ths, tmpbuf);
|
||||
|
||||
syncReconfigFinishDestroy(pFinish);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry,
|
||||
SyncReconfigFinish* pFinish) {
|
||||
// old config
|
||||
SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;
|
||||
|
||||
// new config
|
||||
SSyncCfg newSyncCfg;
|
||||
int32_t ret = syncCfgFromStr(pRpcMsg->pCont, &newSyncCfg);
|
||||
ASSERT(ret == 0);
|
||||
|
@ -2180,77 +2320,28 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
|
|||
// update new config myIndex
|
||||
syncNodeUpdateNewConfigIndex(ths, &newSyncCfg);
|
||||
|
||||
bool IamInNew = syncNodeInConfig(ths, &newSyncCfg);
|
||||
// do config change
|
||||
syncNodeDoConfigChange(ths, &newSyncCfg, pEntry->index);
|
||||
|
||||
/*
|
||||
for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
|
||||
if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
|
||||
ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
|
||||
newSyncCfg.myIndex = i;
|
||||
IamInNew = true;
|
||||
break;
|
||||
// set pFinish
|
||||
pFinish->oldCfg = oldSyncCfg;
|
||||
pFinish->newCfg = newSyncCfg;
|
||||
pFinish->newCfgIndex = pEntry->index;
|
||||
pFinish->newCfgTerm = pEntry->term;
|
||||
pFinish->newCfgSeqNum = pEntry->seqNum;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t syncNodeProposeConfigChangeFinish(SSyncNode* ths, SyncReconfigFinish* pFinish) {
|
||||
SRpcMsg rpcMsg;
|
||||
syncReconfigFinish2RpcMsg(pFinish, &rpcMsg);
|
||||
|
||||
int32_t code = syncNodePropose(ths, &rpcMsg, false);
|
||||
if (code != 0) {
|
||||
sError("syncNodeProposeConfigChangeFinish error");
|
||||
ths->changing = false;
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
bool isDrop;
|
||||
|
||||
if (IamInNew) {
|
||||
syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop);
|
||||
|
||||
// change isStandBy to normal
|
||||
if (!isDrop) {
|
||||
char tmpbuf[512];
|
||||
char* oldStr = syncCfg2SimpleStr(&oldSyncCfg);
|
||||
char* newStr = syncCfg2SimpleStr(&newSyncCfg);
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum,
|
||||
newSyncCfg.replicaNum, pEntry->index, oldStr, newStr);
|
||||
taosMemoryFree(oldStr);
|
||||
taosMemoryFree(newStr);
|
||||
|
||||
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||
syncNodeBecomeLeader(ths, tmpbuf);
|
||||
} else {
|
||||
syncNodeBecomeFollower(ths, tmpbuf);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
char tmpbuf[512];
|
||||
char* oldStr = syncCfg2SimpleStr(&oldSyncCfg);
|
||||
char* newStr = syncCfg2SimpleStr(&newSyncCfg);
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum,
|
||||
newSyncCfg.replicaNum, pEntry->index, oldStr, newStr);
|
||||
taosMemoryFree(oldStr);
|
||||
taosMemoryFree(newStr);
|
||||
|
||||
syncNodeBecomeFollower(ths, tmpbuf);
|
||||
}
|
||||
|
||||
if (gRaftDetailLog) {
|
||||
char* sOld = syncCfg2Str(&oldSyncCfg);
|
||||
char* sNew = syncCfg2Str(&newSyncCfg);
|
||||
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d index:%ld IamInNew:%d \n", sOld, sNew, isDrop, pEntry->index,
|
||||
IamInNew);
|
||||
taosMemoryFree(sOld);
|
||||
taosMemoryFree(sNew);
|
||||
}
|
||||
|
||||
// always call FpReConfigCb
|
||||
if (ths->pFsm->FpReConfigCb != NULL) {
|
||||
SReConfigCbMeta cbMeta = {0};
|
||||
cbMeta.code = 0;
|
||||
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
|
||||
cbMeta.index = pEntry->index;
|
||||
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index);
|
||||
cbMeta.term = pEntry->term;
|
||||
cbMeta.newCfg = newSyncCfg;
|
||||
cbMeta.oldCfg = oldSyncCfg;
|
||||
cbMeta.seqNum = pEntry->seqNum;
|
||||
cbMeta.flag = 0x11;
|
||||
cbMeta.isDrop = isDrop;
|
||||
ths->pFsm->FpReConfigCb(ths->pFsm, pRpcMsg, cbMeta);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -2292,9 +2383,21 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
|
|||
|
||||
// config change
|
||||
if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
|
||||
raftCfgAddConfigIndex(ths->pRaftCfg, pEntry->index);
|
||||
raftCfgPersist(ths->pRaftCfg);
|
||||
code = syncNodeConfigChange(ths, &rpcMsg, pEntry);
|
||||
SyncReconfigFinish* pFinish = syncReconfigFinishBuild(ths->vgId);
|
||||
ASSERT(pFinish != NULL);
|
||||
|
||||
code = syncNodeConfigChange(ths, &rpcMsg, pEntry, pFinish);
|
||||
ASSERT(code == 0);
|
||||
|
||||
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||
syncNodeProposeConfigChangeFinish(ths, pFinish);
|
||||
}
|
||||
syncReconfigFinishDestroy(pFinish);
|
||||
}
|
||||
|
||||
// config change finish
|
||||
if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE_FINISH) {
|
||||
code = syncNodeConfigChangeFinish(ths, &rpcMsg, pEntry);
|
||||
ASSERT(code == 0);
|
||||
}
|
||||
|
||||
|
@ -2345,3 +2448,28 @@ SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId)
|
|||
}
|
||||
return pSender;
|
||||
}
|
||||
|
||||
bool syncNodeCanChange(SSyncNode* pSyncNode) {
|
||||
if (pSyncNode->changing) {
|
||||
sError("sync cannot change");
|
||||
return false;
|
||||
}
|
||||
|
||||
if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
|
||||
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
|
||||
if (pSyncNode->commitIndex != lastIndex) {
|
||||
sError("sync cannot change2");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
|
||||
if (pSender->start) {
|
||||
sError("sync cannot change3");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
|
@ -2228,3 +2228,132 @@ void syncLeaderTransferLog2(char* s, const SyncLeaderTransfer* pMsg) {
|
|||
taosMemoryFree(serialized);
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------
|
||||
SyncReconfigFinish* syncReconfigFinishBuild(int32_t vgId) {
|
||||
uint32_t bytes = sizeof(SyncReconfigFinish);
|
||||
SyncReconfigFinish* pMsg = taosMemoryMalloc(bytes);
|
||||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->vgId = vgId;
|
||||
pMsg->msgType = TDMT_SYNC_CONFIG_CHANGE_FINISH;
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncReconfigFinishDestroy(SyncReconfigFinish* pMsg) {
|
||||
if (pMsg != NULL) {
|
||||
taosMemoryFree(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
void syncReconfigFinishSerialize(const SyncReconfigFinish* pMsg, char* buf, uint32_t bufLen) {
|
||||
assert(pMsg->bytes <= bufLen);
|
||||
memcpy(buf, pMsg, pMsg->bytes);
|
||||
}
|
||||
|
||||
void syncReconfigFinishDeserialize(const char* buf, uint32_t len, SyncReconfigFinish* pMsg) {
|
||||
memcpy(pMsg, buf, len);
|
||||
assert(len == pMsg->bytes);
|
||||
}
|
||||
|
||||
char* syncReconfigFinishSerialize2(const SyncReconfigFinish* pMsg, uint32_t* len) {
|
||||
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||
assert(buf != NULL);
|
||||
syncReconfigFinishSerialize(pMsg, buf, pMsg->bytes);
|
||||
if (len != NULL) {
|
||||
*len = pMsg->bytes;
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
SyncReconfigFinish* syncReconfigFinishDeserialize2(const char* buf, uint32_t len) {
|
||||
uint32_t bytes = *((uint32_t*)buf);
|
||||
SyncReconfigFinish* pMsg = taosMemoryMalloc(bytes);
|
||||
assert(pMsg != NULL);
|
||||
syncReconfigFinishDeserialize(buf, len, pMsg);
|
||||
assert(len == pMsg->bytes);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncReconfigFinish2RpcMsg(const SyncReconfigFinish* pMsg, SRpcMsg* pRpcMsg) {
|
||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||
pRpcMsg->msgType = pMsg->msgType;
|
||||
pRpcMsg->contLen = pMsg->bytes;
|
||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||
syncReconfigFinishSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
}
|
||||
|
||||
void syncReconfigFinishFromRpcMsg(const SRpcMsg* pRpcMsg, SyncReconfigFinish* pMsg) {
|
||||
syncReconfigFinishDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||
}
|
||||
|
||||
SyncReconfigFinish* syncReconfigFinishFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||
SyncReconfigFinish* pMsg = syncReconfigFinishDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
assert(pMsg != NULL);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
cJSON* syncReconfigFinish2Json(const SyncReconfigFinish* pMsg) {
|
||||
char u64buf[128];
|
||||
cJSON* pRoot = cJSON_CreateObject();
|
||||
|
||||
if (pMsg != NULL) {
|
||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||
|
||||
cJSON* pOldCfg = syncCfg2Json((SSyncCfg*)(&(pMsg->oldCfg)));
|
||||
cJSON* pNewCfg = syncCfg2Json((SSyncCfg*)(&(pMsg->newCfg)));
|
||||
cJSON_AddItemToObject(pRoot, "oldCfg", pOldCfg);
|
||||
cJSON_AddItemToObject(pRoot, "newCfg", pNewCfg);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->newCfgIndex);
|
||||
cJSON_AddStringToObject(pRoot, "newCfgIndex", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->newCfgTerm);
|
||||
cJSON_AddStringToObject(pRoot, "newCfgTerm", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->newCfgSeqNum);
|
||||
cJSON_AddStringToObject(pRoot, "newCfgSeqNum", u64buf);
|
||||
}
|
||||
|
||||
cJSON* pJson = cJSON_CreateObject();
|
||||
cJSON_AddItemToObject(pJson, "SyncReconfigFinish", pRoot);
|
||||
return pJson;
|
||||
}
|
||||
|
||||
char* syncReconfigFinish2Str(const SyncReconfigFinish* pMsg) {
|
||||
cJSON* pJson = syncReconfigFinish2Json(pMsg);
|
||||
char* serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
// for debug ----------------------
|
||||
void syncReconfigFinishPrint(const SyncReconfigFinish* pMsg) {
|
||||
char* serialized = syncReconfigFinish2Str(pMsg);
|
||||
printf("syncReconfigFinishPrint | len:%lu | %s \n", strlen(serialized), serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncReconfigFinishPrint2(char* s, const SyncReconfigFinish* pMsg) {
|
||||
char* serialized = syncReconfigFinish2Str(pMsg);
|
||||
printf("syncReconfigFinishPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncReconfigFinishLog(const SyncReconfigFinish* pMsg) {
|
||||
char* serialized = syncReconfigFinish2Str(pMsg);
|
||||
sTrace("syncReconfigFinishLog | len:%lu | %s", strlen(serialized), serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) {
|
||||
if (gRaftDetailLog) {
|
||||
char* serialized = syncReconfigFinish2Str(pMsg);
|
||||
sTrace("syncReconfigFinishLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
}
|
|
@ -96,14 +96,14 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
|
|||
|
||||
char *syncCfg2Str(SSyncCfg *pSyncCfg) {
|
||||
cJSON *pJson = syncCfg2Json(pSyncCfg);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
|
||||
int32_t len = 512;
|
||||
char *s = taosMemoryMalloc(len);
|
||||
char * s = taosMemoryMalloc(len);
|
||||
memset(s, 0, len);
|
||||
|
||||
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
|
||||
|
@ -196,7 +196,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
|
|||
|
||||
char *raftCfg2Str(SRaftCfg *pRaftCfg) {
|
||||
cJSON *pJson = raftCfg2Json(pRaftCfg);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
@ -262,7 +262,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
|
|||
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
|
||||
}
|
||||
|
||||
cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
|
||||
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
|
||||
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
|
||||
ASSERT(code == 0);
|
||||
|
||||
|
|
|
@ -21,7 +21,8 @@
|
|||
#include "syncUtil.h"
|
||||
#include "wal.h"
|
||||
|
||||
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId);
|
||||
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm,
|
||||
SyncSnapshotSend *pBeginMsg);
|
||||
|
||||
//----------------------------------
|
||||
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
|
||||
|
@ -341,6 +342,10 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
|
|||
pReceiver->fromId = fromId;
|
||||
pReceiver->term = pSyncNode->pRaftStore->currentTerm;
|
||||
pReceiver->privateTerm = 0;
|
||||
pReceiver->snapshot.data = NULL;
|
||||
pReceiver->snapshot.lastApplyIndex = -1;
|
||||
pReceiver->snapshot.lastApplyTerm = 0;
|
||||
pReceiver->snapshot.lastConfigIndex = -1;
|
||||
|
||||
} else {
|
||||
sInfo("snapshotReceiverCreate cannot create receiver");
|
||||
|
@ -358,11 +363,16 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
|
|||
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }
|
||||
|
||||
// begin receive snapshot msg (current term, seq begin)
|
||||
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) {
|
||||
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm,
|
||||
SyncSnapshotSend *pBeginMsg) {
|
||||
pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
|
||||
pReceiver->privateTerm = privateTerm;
|
||||
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
|
||||
pReceiver->fromId = fromId;
|
||||
pReceiver->fromId = pBeginMsg->srcId;
|
||||
|
||||
pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
|
||||
pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
|
||||
pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
|
||||
|
||||
ASSERT(pReceiver->pWriter == NULL);
|
||||
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter));
|
||||
|
@ -371,10 +381,10 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p
|
|||
|
||||
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
|
||||
// if already start, force close, start again
|
||||
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) {
|
||||
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) {
|
||||
if (!snapshotReceiverIsStart(pReceiver)) {
|
||||
// start
|
||||
snapshotReceiverDoStart(pReceiver, privateTerm, fromId);
|
||||
snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
|
||||
pReceiver->start = true;
|
||||
|
||||
} else {
|
||||
|
@ -388,7 +398,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTer
|
|||
pReceiver->pWriter = NULL;
|
||||
|
||||
// start again
|
||||
snapshotReceiverDoStart(pReceiver, privateTerm, fromId);
|
||||
snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
|
||||
pReceiver->start = true;
|
||||
}
|
||||
|
||||
|
@ -449,6 +459,15 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
|
|||
cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "fromId", pFromId);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->snapshot.lastApplyIndex);
|
||||
cJSON_AddStringToObject(pRoot, "snapshot.lastApplyIndex", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->snapshot.lastApplyTerm);
|
||||
cJSON_AddStringToObject(pRoot, "snapshot.lastApplyTerm", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->snapshot.lastConfigIndex);
|
||||
cJSON_AddStringToObject(pRoot, "snapshot.lastConfigIndex", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term);
|
||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||
|
||||
|
@ -477,8 +496,9 @@ char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event)
|
|||
uint16_t port;
|
||||
syncUtilU642Addr(fromId.addr, host, sizeof(host), &port);
|
||||
|
||||
snprintf(s, len, "%s %p start:%d ack:%d term:%lu pterm:%lu %s:%d ", event, pReceiver, pReceiver->start,
|
||||
pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port);
|
||||
snprintf(s, len, "%s %p start:%d ack:%d term:%lu pterm:%lu from:%s:%d laindex:%ld laterm:%lu lcindex:%ld", event,
|
||||
pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port,
|
||||
pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex);
|
||||
|
||||
return s;
|
||||
}
|
||||
|
@ -495,7 +515,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
|||
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
|
||||
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
|
||||
// begin
|
||||
snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg->srcId);
|
||||
snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg);
|
||||
pReceiver->ack = pMsg->seq;
|
||||
needRsp = true;
|
||||
|
||||
|
@ -519,42 +539,9 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
|||
// update new config myIndex
|
||||
SSyncCfg newSyncCfg = pMsg->lastConfig;
|
||||
syncNodeUpdateNewConfigIndex(pSyncNode, &newSyncCfg);
|
||||
bool IamInNew = syncNodeInConfig(pSyncNode, &newSyncCfg);
|
||||
|
||||
bool isDrop = false;
|
||||
if (IamInNew) {
|
||||
char eventLog[128];
|
||||
snprintf(eventLog, sizeof(eventLog),
|
||||
"update config by snapshot, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld", pMsg->lastIndex,
|
||||
pMsg->lastTerm, pMsg->lastConfigIndex);
|
||||
syncNodeEventLog(pSyncNode, eventLog);
|
||||
|
||||
syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop);
|
||||
|
||||
} else {
|
||||
char eventLog[128];
|
||||
snprintf(eventLog, sizeof(eventLog),
|
||||
"do not update config by snapshot, not in new, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld",
|
||||
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
|
||||
syncNodeEventLog(pSyncNode, eventLog);
|
||||
}
|
||||
|
||||
// change isStandBy to normal
|
||||
if (!isDrop) {
|
||||
char tmpbuf[512];
|
||||
char *oldStr = syncCfg2SimpleStr(&oldSyncCfg);
|
||||
char *newStr = syncCfg2SimpleStr(&newSyncCfg);
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, index:%ld, %s --> %s",
|
||||
oldSyncCfg.replicaNum, newSyncCfg.replicaNum, pMsg->lastConfigIndex, oldStr, newStr);
|
||||
taosMemoryFree(oldStr);
|
||||
taosMemoryFree(newStr);
|
||||
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||
syncNodeBecomeLeader(pSyncNode, tmpbuf);
|
||||
} else {
|
||||
syncNodeBecomeFollower(pSyncNode, tmpbuf);
|
||||
}
|
||||
}
|
||||
// do config change
|
||||
syncNodeDoConfigChange(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex);
|
||||
}
|
||||
|
||||
SSnapshot snapshot;
|
||||
|
|
|
@ -261,23 +261,29 @@ bool syncUtilIsData(tmsg_t msgType) {
|
|||
#endif
|
||||
|
||||
bool syncUtilUserPreCommit(tmsg_t msgType) {
|
||||
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_LEADER_TRANSFER) {
|
||||
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH &&
|
||||
msgType != TDMT_SYNC_LEADER_TRANSFER) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool syncUtilUserCommit(tmsg_t msgType) {
|
||||
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_LEADER_TRANSFER) {
|
||||
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH &&
|
||||
msgType != TDMT_SYNC_LEADER_TRANSFER) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool syncUtilUserRollback(tmsg_t msgType) {
|
||||
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_LEADER_TRANSFER) {
|
||||
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH &&
|
||||
msgType != TDMT_SYNC_LEADER_TRANSFER) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -49,6 +49,7 @@ add_executable(syncRaftLogTest "")
|
|||
add_executable(syncRaftLogTest2 "")
|
||||
add_executable(syncRaftLogTest3 "")
|
||||
add_executable(syncLeaderTransferTest "")
|
||||
add_executable(syncReconfigFinishTest "")
|
||||
|
||||
|
||||
target_sources(syncTest
|
||||
|
@ -255,6 +256,10 @@ target_sources(syncLeaderTransferTest
|
|||
PRIVATE
|
||||
"syncLeaderTransferTest.cpp"
|
||||
)
|
||||
target_sources(syncReconfigFinishTest
|
||||
PRIVATE
|
||||
"syncReconfigFinishTest.cpp"
|
||||
)
|
||||
|
||||
|
||||
target_include_directories(syncTest
|
||||
|
@ -512,6 +517,11 @@ target_include_directories(syncLeaderTransferTest
|
|||
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
target_include_directories(syncReconfigFinishTest
|
||||
PUBLIC
|
||||
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
|
||||
|
||||
target_link_libraries(syncTest
|
||||
|
@ -718,6 +728,10 @@ target_link_libraries(syncLeaderTransferTest
|
|||
sync
|
||||
gtest_main
|
||||
)
|
||||
target_link_libraries(syncReconfigFinishTest
|
||||
sync
|
||||
gtest_main
|
||||
)
|
||||
|
||||
|
||||
enable_testing()
|
||||
|
|
|
@ -147,8 +147,8 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_
|
|||
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
|
||||
|
||||
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) {
|
||||
sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu",
|
||||
cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term);
|
||||
sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%ld, code:%d, currentTerm:%lu, term:%lu", cbMeta.flag,
|
||||
cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term);
|
||||
}
|
||||
|
||||
SSyncFSM* createFsm() {
|
||||
|
|
|
@ -78,8 +78,8 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
|
|||
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
|
||||
|
||||
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) {
|
||||
sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu",
|
||||
cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term);
|
||||
sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%ld, code:%d, currentTerm:%lu, term:%lu", cbMeta.flag,
|
||||
cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term);
|
||||
}
|
||||
|
||||
SSyncFSM* createFsm() {
|
||||
|
|
|
@ -0,0 +1,135 @@
|
|||
#include <gtest/gtest.h>
|
||||
#include <stdio.h>
|
||||
#include "syncIO.h"
|
||||
#include "syncInt.h"
|
||||
#include "syncMessage.h"
|
||||
#include "syncUtil.h"
|
||||
|
||||
void logTest() {
|
||||
sTrace("--- sync log test: trace");
|
||||
sDebug("--- sync log test: debug");
|
||||
sInfo("--- sync log test: info");
|
||||
sWarn("--- sync log test: warn");
|
||||
sError("--- sync log test: error");
|
||||
sFatal("--- sync log test: fatal");
|
||||
}
|
||||
|
||||
SSyncCfg* createSyncOldCfg() {
|
||||
SSyncCfg* pCfg = (SSyncCfg*)taosMemoryMalloc(sizeof(SSyncCfg));
|
||||
memset(pCfg, 0, sizeof(SSyncCfg));
|
||||
|
||||
pCfg->replicaNum = 3;
|
||||
pCfg->myIndex = 1;
|
||||
for (int i = 0; i < pCfg->replicaNum; ++i) {
|
||||
((pCfg->nodeInfo)[i]).nodePort = i * 100;
|
||||
snprintf(((pCfg->nodeInfo)[i]).nodeFqdn, sizeof(((pCfg->nodeInfo)[i]).nodeFqdn), "100.200.300.%d", i);
|
||||
}
|
||||
|
||||
return pCfg;
|
||||
}
|
||||
|
||||
SSyncCfg* createSyncNewCfg() {
|
||||
SSyncCfg* pCfg = (SSyncCfg*)taosMemoryMalloc(sizeof(SSyncCfg));
|
||||
memset(pCfg, 0, sizeof(SSyncCfg));
|
||||
|
||||
pCfg->replicaNum = 3;
|
||||
pCfg->myIndex = 1;
|
||||
for (int i = 0; i < pCfg->replicaNum; ++i) {
|
||||
((pCfg->nodeInfo)[i]).nodePort = i * 100;
|
||||
snprintf(((pCfg->nodeInfo)[i]).nodeFqdn, sizeof(((pCfg->nodeInfo)[i]).nodeFqdn), "500.600.700.%d", i);
|
||||
}
|
||||
|
||||
return pCfg;
|
||||
}
|
||||
|
||||
SyncReconfigFinish *createMsg() {
|
||||
SyncReconfigFinish *pMsg = syncReconfigFinishBuild(1234);
|
||||
|
||||
SSyncCfg* pOld = createSyncOldCfg();
|
||||
SSyncCfg* pNew = createSyncNewCfg();
|
||||
pMsg->oldCfg = *pOld;
|
||||
pMsg->newCfg = *pNew;
|
||||
|
||||
pMsg->newCfgIndex = 11;
|
||||
pMsg->newCfgTerm = 22;
|
||||
pMsg->newCfgSeqNum = 33;
|
||||
|
||||
taosMemoryFree(pOld);
|
||||
taosMemoryFree(pNew);
|
||||
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
|
||||
void test1() {
|
||||
SyncReconfigFinish *pMsg = createMsg();
|
||||
syncReconfigFinishLog2((char *)"test1:", pMsg);
|
||||
syncReconfigFinishDestroy(pMsg);
|
||||
}
|
||||
|
||||
|
||||
void test2() {
|
||||
SyncReconfigFinish *pMsg = createMsg();
|
||||
uint32_t len = pMsg->bytes;
|
||||
char * serialized = (char *)taosMemoryMalloc(len);
|
||||
syncReconfigFinishSerialize(pMsg, serialized, len);
|
||||
SyncReconfigFinish *pMsg2 = syncReconfigFinishBuild(1000);
|
||||
syncReconfigFinishDeserialize(serialized, len, pMsg2);
|
||||
syncReconfigFinishLog2((char *)"test2: syncReconfigFinishSerialize -> syncReconfigFinishDeserialize ", pMsg2);
|
||||
|
||||
taosMemoryFree(serialized);
|
||||
syncReconfigFinishDestroy(pMsg);
|
||||
syncReconfigFinishDestroy(pMsg2);
|
||||
}
|
||||
|
||||
void test3() {
|
||||
SyncReconfigFinish *pMsg = createMsg();
|
||||
uint32_t len;
|
||||
char * serialized = syncReconfigFinishSerialize2(pMsg, &len);
|
||||
SyncReconfigFinish *pMsg2 = syncReconfigFinishDeserialize2(serialized, len);
|
||||
syncReconfigFinishLog2((char *)"test3: SyncReconfigFinishSerialize2 -> syncReconfigFinishDeserialize2 ", pMsg2);
|
||||
|
||||
taosMemoryFree(serialized);
|
||||
syncReconfigFinishDestroy(pMsg);
|
||||
syncReconfigFinishDestroy(pMsg2);
|
||||
}
|
||||
|
||||
void test4() {
|
||||
SyncReconfigFinish *pMsg = createMsg();
|
||||
SRpcMsg rpcMsg;
|
||||
syncReconfigFinish2RpcMsg(pMsg, &rpcMsg);
|
||||
SyncReconfigFinish *pMsg2 = (SyncReconfigFinish *)taosMemoryMalloc(rpcMsg.contLen);
|
||||
syncReconfigFinishFromRpcMsg(&rpcMsg, pMsg2);
|
||||
syncReconfigFinishLog2((char *)"test4: syncReconfigFinish2RpcMsg -> syncReconfigFinishFromRpcMsg ", pMsg2);
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncReconfigFinishDestroy(pMsg);
|
||||
syncReconfigFinishDestroy(pMsg2);
|
||||
}
|
||||
|
||||
void test5() {
|
||||
SyncReconfigFinish *pMsg = createMsg();
|
||||
SRpcMsg rpcMsg;
|
||||
syncReconfigFinish2RpcMsg(pMsg, &rpcMsg);
|
||||
SyncReconfigFinish *pMsg2 = syncReconfigFinishFromRpcMsg2(&rpcMsg);
|
||||
syncReconfigFinishLog2((char *)"test5: syncReconfigFinish2RpcMsg -> syncReconfigFinishFromRpcMsg2 ", pMsg2);
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncReconfigFinishDestroy(pMsg);
|
||||
syncReconfigFinishDestroy(pMsg2);
|
||||
}
|
||||
|
||||
int main() {
|
||||
gRaftDetailLog = true;
|
||||
tsAsyncLog = 0;
|
||||
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
|
||||
logTest();
|
||||
|
||||
test1();
|
||||
test2();
|
||||
test3();
|
||||
test4();
|
||||
test5();
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -148,8 +148,8 @@ void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFini
|
|||
|
||||
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) {
|
||||
char* s = syncCfg2Str(&(cbMeta.newCfg));
|
||||
sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu, newCfg:%s",
|
||||
cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term, s);
|
||||
sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%ld, code:%d, currentTerm:%lu, term:%lu, newCfg:%s",
|
||||
cbMeta.flag, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term, s);
|
||||
taosMemoryFree(s);
|
||||
}
|
||||
|
||||
|
|
|
@ -423,6 +423,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_IS_LEADER, "Sync is leader")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync not leader")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_ONE_REPLICA, "Sync one replica")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_IN_NEW_CONFIG, "Sync not in new config")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEW_CONFIG_ERROR, "Sync new config error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RECONFIG_NOT_READY, "Sync not ready for reconfig")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_PROPOSE_NOT_READY, "Sync not ready for propose")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
|
||||
|
||||
// wal
|
||||
|
|
|
@ -80,6 +80,7 @@
|
|||
./test.sh -f tsim/stream/triggerInterval0.sim
|
||||
# ./test.sh -f tsim/stream/triggerSession0.sim
|
||||
./test.sh -f tsim/stream/partitionby.sim
|
||||
./test.sh -f tsim/stream/schedSnode.sim
|
||||
|
||||
|
||||
# ---- transaction
|
||||
|
|
|
@ -0,0 +1,173 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/deploy.sh -n dnode2 -i 2
|
||||
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
sql create snode on dnode 1
|
||||
|
||||
sql create database test vgroups 1;
|
||||
sql create database target vgroups 1;
|
||||
sql use test;
|
||||
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
|
||||
sql create table ts1 using st tags(1,1,1);
|
||||
sql create table ts2 using st tags(2,2,2);
|
||||
sql create table ts3 using st tags(3,2,2);
|
||||
sql create table ts4 using st tags(4,2,2);
|
||||
sql create stream stream_t1 trigger at_once into target.streamtST1 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s);
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql insert into ts1 values(1648791213001,1,12,3,1.0);
|
||||
sql insert into ts2 values(1648791213001,1,12,3,1.0);
|
||||
|
||||
sql insert into ts3 values(1648791213001,1,12,3,1.0);
|
||||
sql insert into ts4 values(1648791213001,1,12,3,1.0);
|
||||
|
||||
sql insert into ts1 values(1648791213002,NULL,NULL,NULL,NULL);
|
||||
sql insert into ts2 values(1648791213002,NULL,NULL,NULL,NULL);
|
||||
|
||||
sql insert into ts3 values(1648791213002,NULL,NULL,NULL,NULL);
|
||||
sql insert into ts4 values(1648791213002,NULL,NULL,NULL,NULL);
|
||||
|
||||
sql insert into ts1 values(1648791223002,2,2,3,1.1);
|
||||
sql insert into ts1 values(1648791233003,3,2,3,2.1);
|
||||
sql insert into ts2 values(1648791243004,4,2,43,73.1);
|
||||
sql insert into ts1 values(1648791213002,24,22,23,4.1);
|
||||
sql insert into ts1 values(1648791243005,4,20,3,3.1);
|
||||
sql insert into ts2 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ;
|
||||
sql insert into ts1 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ;
|
||||
sql insert into ts2 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1);
|
||||
sql insert into ts1 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
|
||||
sql insert into ts2 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ;
|
||||
sql insert into ts1 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
|
||||
|
||||
sql insert into ts3 values(1648791223002,2,2,3,1.1);
|
||||
sql insert into ts4 values(1648791233003,3,2,3,2.1);
|
||||
sql insert into ts3 values(1648791243004,4,2,43,73.1);
|
||||
sql insert into ts4 values(1648791213002,24,22,23,4.1);
|
||||
sql insert into ts3 values(1648791243005,4,20,3,3.1);
|
||||
sql insert into ts4 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ;
|
||||
sql insert into ts3 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ;
|
||||
sql insert into ts4 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1);
|
||||
sql insert into ts3 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
|
||||
sql insert into ts4 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ;
|
||||
sql insert into ts3 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
|
||||
|
||||
$loop_count = 0
|
||||
loop1:
|
||||
sql select * from target.streamtST1;
|
||||
|
||||
sleep 300
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 0
|
||||
if $data01 != 8 then
|
||||
print =====data01=$data01
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data02 != 4 then
|
||||
print =====data02=$data02
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data03 != 4 then
|
||||
print ======$data03
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data04 != 52 then
|
||||
print ======$data04
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data05 != 13 then
|
||||
print ======$data05
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 1
|
||||
if $data11 != 6 then
|
||||
print =====data11=$data11
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data12 != 6 then
|
||||
print =====data12=$data12
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data13 != 92 then
|
||||
print ======$data13
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data14 != 22 then
|
||||
print ======$data14
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data15 != 3 then
|
||||
print ======$data15
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 2
|
||||
if $data21 != 4 then
|
||||
print =====data21=$data21
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data22 != 4 then
|
||||
print =====data22=$data22
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data23 != 32 then
|
||||
print ======$data23
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data24 != 12 then
|
||||
print ======$data24
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data25 != 3 then
|
||||
print ======$data25
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 3
|
||||
if $data31 != 30 then
|
||||
print =====data31=$data31
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data32 != 30 then
|
||||
print =====data32=$data32
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data33 != 180 then
|
||||
print ======$data33
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data34 != 42 then
|
||||
print ======$data34
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data35 != 3 then
|
||||
print ======$data35
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s);
|
Loading…
Reference in New Issue