enh(stream): prepare update trans.

This commit is contained in:
Haojun Liao 2023-08-01 16:12:35 +08:00
parent 75b1520be0
commit 164bfd5408
4 changed files with 122 additions and 34 deletions

View File

@ -303,6 +303,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_UPDATE, "vnode-stream-update", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG)

View File

@ -554,7 +554,7 @@ typedef struct {
SEpSet epset; SEpSet epset;
} SStreamTaskUpdateInfo; } SStreamTaskUpdateInfo;
int32_t tEncodeTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateInfo* pMsg); int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateInfo* pMsg);
int32_t tDecodeTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskUpdateInfo* pMsg); int32_t tDecodeTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskUpdateInfo* pMsg);
typedef struct { typedef struct {

View File

@ -1736,13 +1736,121 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
static int32_t doBuildStreamTaskUpdateMsg(void** pBuf, int32_t* pLen, int32_t nodeId, const SEpSet* pEpset) {
SStreamTaskUpdateInfo req = {0};
req.nodeId = nodeId;
req.epset = *pEpset;
int32_t code = 0;
int32_t blen;
tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int32_t tlen = sizeof(SMsgHead) + blen;
void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
tEncodeStreamTaskUpdateMsg(&encoder, &req);
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(nodeId);
tEncoderClear(&encoder);
*pBuf = buf;
*pLen = tlen;
return TSDB_CODE_SUCCESS;
}
// build trans to update the epset
static int32_t createStreamUpdateTrans(SMnode* pMnode, SStreamObj* pStream, int32_t nodeId, SEpSet* pEpset) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-task-update");
if (pTrans == NULL) {
mError("failed to build stream task DAG update, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return -1;
}
mDebug("start to build stream:0x%"PRIx64" task DAG update", pStream->uid);
ASSERT(0);
// mndTransSetDbName(pTrans, "stream-task-update", "checkpoint");
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mError("failed to build stream:0x%"PRIx64" task DAG update, code:%s", pStream->uid, tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
mndTransDrop(pTrans);
return -1;
}
taosWLockLatch(&pStream->lock);
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
for (int32_t j = 0; j < numOfLevels; ++j) {
SArray *pLevel = taosArrayGetP(pStream->tasks, j);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
void* pBuf = NULL;
int32_t len = 0;
doBuildStreamTaskUpdateMsg(&pBuf, &len, nodeId, pEpset);
STransAction action = {0};
action.epSet = pTask->info.epSet;
action.pCont = pBuf;
action.contLen = len;
action.msgType = TDMT_VND_STREAM_TASK_UPDATE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pBuf);
taosWUnLockLatch(&pStream->lock);
return -1;
}
}
}
taosWUnLockLatch(&pStream->lock);
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (pCommitRaw == NULL) {
mError("failed to prepare trans rebalance since %s", terrstr());
return -1;
}
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
sdbFreeRaw(pCommitRaw);
mError("failed to prepare trans rebalance since %s", terrstr());
return -1;
}
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) {
sdbFreeRaw(pCommitRaw);
mError("failed to prepare trans rebalance since %s", terrstr());
return -1;
}
return TSDB_CODE_SUCCESS;
}
// todo: handle the database drop/stream drop case // todo: handle the database drop/stream drop case
int32_t mndProcessStreamHb(SRpcMsg *pReq) { int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SStreamHbMsg req = {0}; SStreamHbMsg req = {0};
int32_t code = TSDB_CODE_SUCCESS;
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, (uint8_t *)pReq->pCont, pReq->contLen); tDecoderInit(&decoder, (uint8_t *)pReq->pCont, pReq->contLen);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
@ -1813,44 +1921,23 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
} }
} }
{ // build trans to update the epset code = createStreamUpdateTrans(pMnode, pStream, nodeId, &newEpSet);
if (code != TSDB_CODE_SUCCESS) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint");
if (pTrans == NULL) {
mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return -1;
}
// mDebug("start to trigger checkpoint, checkpointId: %" PRId64 "", checkpointId);
mndTransSetDbName(pTrans, "checkpoint", "checkpoint");
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId,
tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
mndTransDrop(pTrans);
return -1;
}
void* pBuf = NULL;
int32_t len = 0;
// doBuildStreamTaskUpdateMsg(&pBuf, &len, nodeId, newEpSet);
STransAction action = {0};
action.epSet = /*mndGetVgroupEpset(pMnode, pVgObj)*/;
action.pCont = pBuf;
action.contLen = len;
action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pBuf);
taosWUnLockLatch(&pStream->lock);
return -1;
}
} }
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
} }
} }
// if (code == 0) {
// if (mndTransPrepare(pMnode, pTrans) != 0) {
// mError("failed to prepre trans rebalance since %s", terrstr());
// }
// }
// mndTransDrop(pTrans);
return code;
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -959,7 +959,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
return 0; return 0;
} }
int32_t tEncodeTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateInfo* pMsg) { int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateInfo* pMsg) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI32(pEncoder, pMsg->nodeId) < 0) return -1; if (tEncodeI32(pEncoder, pMsg->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pMsg->epset) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pMsg->epset) < 0) return -1;