enh(stream): add timer to check the node changing.

This commit is contained in:
Haojun Liao 2023-08-09 10:25:16 +08:00
parent a0dcec5890
commit f3c5f20ee2
9 changed files with 203 additions and 43 deletions

View File

@ -183,6 +183,7 @@ extern int64_t tsWalFsyncDataSizeLimit;
extern int32_t tsTransPullupInterval;
extern int32_t tsMqRebalanceInterval;
extern int32_t tsStreamCheckpointTickInterval;
extern int32_t tsStreamNodeCheckInterval;
extern int32_t tsTtlUnit;
extern int32_t tsTtlPushInterval;
extern int32_t tsGrantHBInterval;

View File

@ -156,6 +156,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TTL_TIMER, "ttl-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT_HB_TIMER, "grant-hb-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_NODECHECK_TIMER, "node-check-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "kill-trans", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "kill-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "kill-conn", NULL, NULL)

View File

@ -537,7 +537,6 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea
typedef struct {
int32_t vgId;
SEpSet epset;
int32_t numOfTasks;
} SStreamHbMsg;
@ -555,15 +554,20 @@ typedef struct {
int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq);
int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq);
typedef struct {
int32_t nodeId;
SEpSet prevEp;
SEpSet newEp;
} SNodeUpdateInfo;
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t nodeId;
SEpSet epset;
} SStreamTaskUpdateMsg;
SArray* pNodeList; // SArray<SNodeUpdateInfo>
} SStreamTaskNodeUpdateMsg;
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateMsg* pMsg);
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskUpdateMsg* pMsg);
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg);
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg);
typedef struct {
int64_t streamId;

View File

@ -223,6 +223,7 @@ int64_t tsWalFsyncDataSizeLimit = (100 * 1024 * 1024L);
int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointTickInterval = 20;
int32_t tsStreamNodeCheckInterval = 10;
int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 3600;
int32_t tsGrantHBInterval = 60;

View File

@ -120,7 +120,6 @@ static void mndPullupTtl(SMnode *pMnode) {
}
static void mndCalMqRebalance(SMnode *pMnode) {
mTrace("calc mq rebalance");
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) {
@ -133,11 +132,16 @@ static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) {
int32_t contLen = 0;
void *pReq = mndBuildCheckpointTickMsg(&contLen, sec);
if (pReq != NULL) {
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_STREAM_CHECKPOINT_TIMER,
.pCont = pReq,
.contLen = contLen,
};
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
}
static void mndStreamCheckNode(SMnode* pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_NODECHECK_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
}
@ -268,6 +272,10 @@ static void *mndThreadFp(void *param) {
mndStreamCheckpointTick(pMnode, sec);
}
if (sec % tsStreamNodeCheckInterval == 0) {
mndStreamCheckNode(pMnode);
}
if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
mndPullupTelem(pMnode);
}

View File

@ -34,6 +34,7 @@
#define MND_STREAM_MAX_NUM 60
#define MND_STREAM_HB_INTERVAL 100 // 100 sec
typedef struct SNodeEntry {
int32_t nodeId;
SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes.
@ -41,10 +42,11 @@ typedef struct SNodeEntry {
} SNodeEntry;
typedef struct SStreamVnodeRevertIndex {
// SHashObj* pVnodeMap;
SArray* pDBList;
SArray* pNodeEntryList;
} SStreamVnodeRevertIndex;
static int32_t mndNodeCheckSentinel = 0;
static SStreamVnodeRevertIndex execNodeList;
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
@ -66,6 +68,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static int32_t mndPersistTransLog(SStreamObj* pStream, STrans* pTrans);
static void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_t msgType, const SEpSet* pEpset);
@ -83,6 +86,8 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheckReq);
/*mndSetMsgHandle(pMnode, TDMT_MND_RECOVER_STREAM, mndProcessRecoverStreamReq);*/
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
@ -1729,10 +1734,18 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
static int32_t doBuildStreamTaskUpdateMsg(void** pBuf, int32_t* pLen, int32_t nodeId, const SEpSet* pEpset) {
SStreamTaskUpdateMsg req = {0};
req.nodeId = nodeId;
req.epset = *pEpset;
typedef struct SVgroupChangeInfo {
SHashObj* pDBMap;
SArray* pUpdateNodeList; //SArray<SNodeUpdateInfo>
} SVgroupChangeInfo;
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg, const SVgroupChangeInfo* pInfo) {
taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
}
static int32_t doBuildStreamTaskUpdateMsg(void** pBuf, int32_t* pLen, int32_t nodeId, SVgroupChangeInfo* pInfo) {
SStreamTaskNodeUpdateMsg req = {0};
initNodeUpdateMsg(&req, pInfo);
int32_t code = 0;
int32_t blen;
@ -1800,7 +1813,7 @@ void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_
}
// build trans to update the epset
static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, int32_t nodeId, SEpSet *pEpset) {
static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo* pInfo) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-task-update");
if (pTrans == NULL) {
mError("failed to build stream task DAG update, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
@ -1830,11 +1843,10 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, int3
void *pBuf = NULL;
int32_t len = 0;
doBuildStreamTaskUpdateMsg(&pBuf, &len, nodeId, pEpset);
doBuildStreamTaskUpdateMsg(&pBuf, &len, pTask->info.nodeId, pInfo);
STransAction action = {0};
initTransAction(&action, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pBuf);
taosWUnLockLatch(&pStream->lock);
@ -1847,6 +1859,122 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, int3
return mndPersistTransLog(pStream, pTrans);
}
// todo. 1. multiple change, 2. replica change problem
static SVgroupChangeInfo mndFindChangedVgroupInfo(SMnode *pMnode, const SArray *pPrevVgroupList,
const SArray *pVgroupList) {
SVgroupChangeInfo info = {
.pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)),
.pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
};
int32_t numOfVgroups = taosArrayGetSize(pPrevVgroupList);
for (int32_t i = 0; i < numOfVgroups; ++i) {
SNodeEntry *pPrevEntry = taosArrayGet(pPrevVgroupList, i);
int32_t num = taosArrayGetSize(pVgroupList);
for (int32_t j = 0; j < num; ++j) {
SNodeEntry *pCurrent = taosArrayGet(pVgroupList, j);
if (pCurrent->nodeId == pPrevEntry->nodeId) {
// todo handle the replica change problem.
if (!isEpsetEqual(&pCurrent->epset, &pPrevEntry->epset)) {
SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId};
epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
epsetAssign(&updateInfo.newEp, &pCurrent->epset);
taosArrayPush(info.pUpdateNodeList, &updateInfo);
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId);
taosHashPut(info.pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
mndReleaseVgroup(pMnode, pVgroup);
}
break;
}
}
}
return info;
}
static SArray* mndTakeVgroupSnapshot(SMnode* pMnode) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SVgObj *pVgroup = NULL;
SArray* pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry));
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) {
break;
}
SNodeEntry entry = {0};
entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
entry.nodeId = pVgroup->vgId;
entry.hbTimestamp = -1;
taosArrayPush(pVgroupListSnapshot, &entry);
sdbRelease(pSdb, pVgroup);
}
return pVgroupListSnapshot;
}
int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) {
SSdb *pSdb = pMnode->pSdb;
// check all streams that involved this vnode should update the epset info
SStreamObj *pStream = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
}
// update the related upstream and downstream tasks, todo remove this, no need this function
// taosWLockLatch(&pStream->lock);
// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset);
// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset);
// taosWUnLockLatch(&pStream->lock);
void* p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
void* p1 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
if (p == NULL && p1 == NULL) {
mndReleaseStream(pMnode, pStream);
continue;
}
int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo);
if (code != TSDB_CODE_SUCCESS) {
// todo
}
}
return 0;
}
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
if (old != 0) {
mDebug("still in checking node change");
return 0;
}
mDebug("start to do node change checking");
SMnode *pMnode = pMsg->info.node;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
SVgroupChangeInfo changeInfo = mndFindChangedVgroupInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
mndProcessVgroupChange(pMnode, &changeInfo);
}
mDebug("end to do node change checking");
return 0;
}
// todo: this process should be executed by the write queue worker of the mnode
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
@ -1889,10 +2017,10 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// check epset to identify whether the node has been transferred to other dnodes.
// node the epset is changed, which means the node transfer has occurred for this node.
if (!isEpsetEqual(&pEntry->epset, &req.epset)) {
nodeChanged = true;
break;
}
// if (!isEpsetEqual(&pEntry->epset, &req.epset)) {
// nodeChanged = true;
// break;
// }
}
// todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode,
@ -1904,7 +2032,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
int32_t nodeId = req.vgId;
SEpSet newEpSet = req.epset;
{// check all streams that involved this vnode should update the epset info
SStreamObj *pStream = NULL;
@ -1917,14 +2044,14 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// update the related upstream and downstream tasks, todo remove this, no need this function
taosWLockLatch(&pStream->lock);
streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset);
streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset);
// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset);
// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset);
taosWUnLockLatch(&pStream->lock);
code = createStreamUpdateTrans(pMnode, pStream, nodeId, &newEpSet);
if (code != TSDB_CODE_SUCCESS) {
// todo
}
// code = createStreamUpdateTrans(pMnode, pStream, nodeId, );
// if (code != TSDB_CODE_SUCCESS) {
// todo
// }
}
}

View File

@ -1822,7 +1822,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0;
SStreamTaskUpdateMsg req = {0};
SStreamTaskNodeUpdateMsg req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);

View File

@ -973,22 +973,40 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
return 0;
}
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateMsg* pMsg) {
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pMsg->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pMsg->epset) < 0) return -1;
int32_t size = taosArrayGetSize(pMsg->pNodeList);
if (tEncodeI32(pEncoder, size) < 0) return -1;
for (int32_t i = 0; i < size; ++i) {
SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i);
if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pInfo->prevEp) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pInfo->newEp) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskUpdateMsg* pMsg) {
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pMsg->nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pMsg->epset) < 0) return -1;
int32_t size = 0;
if (tDecodeI32(pDecoder, &size) < 0) return -1;
pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo));
for (int32_t i = 0; i < size; ++i) {
SNodeUpdateInfo info = {0};
if (tDecodeI32(pDecoder, &info.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &info.prevEp) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &info.newEp) < 0) return -1;
taosArrayPush(pMsg->pNodeList, &info);
}
tEndDecode(pDecoder);
return 0;
}

View File

@ -93,7 +93,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->expandFunc = expandFunc;
// send heartbeat every 20sec.
// pMeta->hbTmr = taosTmrStart(metaHbToMnode, 20000, ahandle, streamEnv.timer);
pMeta->hbTmr = taosTmrStart(metaHbToMnode, 20000, pMeta, streamEnv.timer);
pMeta->pTaskBackendUnique =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
@ -534,7 +534,6 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pReq->epset) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
@ -543,13 +542,12 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->numOfTasks) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pReq->epset) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
void metaHbToMnode(void* param, void* tmrId) {
STQ* pMeta = param;
SStreamMeta* pMeta = param;
SStreamHbMsg hbMsg = {0};
taosRLockLatch(&pMeta->lock);
@ -558,7 +556,6 @@ void metaHbToMnode(void* param, void* tmrId) {
hbMsg.numOfTasks = numOfTasks;
hbMsg.vgId = pMeta->vgId;
hbMsg.epset = ;
int32_t code = 0;
int32_t tlen = 0;
@ -592,4 +589,7 @@ void metaHbToMnode(void* param, void* tmrId) {
qDebug("vgId:%d, build and send hb to mnode", pMeta->mgmtInfo.mnodeId);
tmsgSendReq(&pMeta->mgmtInfo.epset, &msg);
// next hb will be issued in 20sec.
taosTmrReset(metaHbToMnode, 20000, pMeta, streamEnv.timer, pMeta->hbTmr);
}