enh(stream): check stream tasks dag.

This commit is contained in:
Haojun Liao 2023-08-02 09:44:39 +08:00
parent c016a58623
commit e1e5e9bb0f
4 changed files with 141 additions and 72 deletions

View File

@ -2965,6 +2965,20 @@ typedef struct {
int8_t reserved; int8_t reserved;
} SVPauseStreamTaskRsp; } SVPauseStreamTaskRsp;
typedef struct {
SMsgHead head;
int32_t taskId;
int32_t nodeId;
SEpSet epset;
} SVStreamTaskUpdateReq;
typedef struct {
int8_t reserved;
} SVStreamTaskUpdateRsp;
int32_t tSerializeVTaskUpdateReq(void* buf, int32_t bufLen, const SVStreamTaskUpdateReq* pReq);
int32_t tDeserializeVTaskUpdateReq(void* buf, int32_t bufLen, SVStreamTaskUpdateReq* pReq);
typedef struct { typedef struct {
char name[TSDB_STREAM_FNAME_LEN]; char name[TSDB_STREAM_FNAME_LEN];
int8_t igNotExists; int8_t igNotExists;

View File

@ -7919,6 +7919,24 @@ int32_t tDeserializeSMResumeStreamReq(void *buf, int32_t bufLen, SMResumeStreamR
return 0; return 0;
} }
int32_t tSerializeVTaskUpdateReq(void *buf, int32_t bufLen, const SVStreamTaskUpdateReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
// if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
// if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
// if (tEncodeI8(&encoder, pReq->igUntreated) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeVTaskUpdateReq(void* buf, int32_t bufLen, SVStreamTaskUpdateReq* pReq) {
return 0;
}
int32_t tEncodeMqSubTopicEp(void **buf, const SMqSubTopicEp *pTopicEp) { int32_t tEncodeMqSubTopicEp(void **buf, const SMqSubTopicEp *pTopicEp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeString(buf, pTopicEp->topic); tlen += taosEncodeString(buf, pTopicEp->topic);

View File

@ -66,6 +66,9 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId); int64_t streamId, int32_t taskId);
static int32_t mndPersistTransLog(SStreamObj* pStream, STrans* pTrans);
static void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_t msgType, const SEpSet* pEpset);
int32_t mndInitStream(SMnode *pMnode) { int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = { SSdbTable table = {
.sdbType = SDB_STREAM, .sdbType = SDB_STREAM,
@ -471,11 +474,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) {
STransAction action = {0}; STransAction action = {0};
action.mTraceId = pTrans->mTraceId; action.mTraceId = pTrans->mTraceId;
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); initTransAction(&action, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet);
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_STREAM_TASK_DEPLOY;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf); taosMemoryFree(buf);
return -1; return -1;
@ -653,8 +652,6 @@ _OVER:
} }
static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
// vnode
/*if (pTask->info.nodeId > 0) {*/
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) { if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -663,16 +660,13 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
pReq->head.vgId = htonl(pTask->info.nodeId); pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId; pReq->taskId = pTask->id.taskId;
STransAction action = {0}; STransAction action = {0};
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &pTask->info.epSet);
action.pCont = pReq;
action.contLen = sizeof(SVDropStreamTaskReq);
action.msgType = TDMT_STREAM_TASK_DROP;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return -1;
} }
/*}*/
return 0; return 0;
} }
@ -1043,11 +1037,8 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
} }
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj); SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = buf; initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset);
action.contLen = tlen;
action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE;
mndReleaseVgroup(pMnode, pVgObj); mndReleaseVgroup(pMnode, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
@ -1483,16 +1474,17 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq)); SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq));
if (pReq == NULL) { if (pReq == NULL) {
mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
pReq->head.vgId = htonl(pTask->info.nodeId); pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId; pReq->taskId = pTask->id.taskId;
STransAction action = {0}; STransAction action = {0};
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &pTask->info.epSet);
action.pCont = pReq;
action.contLen = sizeof(SVPauseStreamTaskReq);
action.msgType = TDMT_STREAM_TASK_PAUSE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return -1;
@ -1630,11 +1622,9 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig
pReq->head.vgId = htonl(pTask->info.nodeId); pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId; pReq->taskId = pTask->id.taskId;
pReq->igUntreated = igUntreated; pReq->igUntreated = igUntreated;
STransAction action = {0}; STransAction action = {0};
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &pTask->info.epSet);
action.pCont = pReq;
action.contLen = sizeof(SVResumeStreamTaskReq);
action.msgType = TDMT_STREAM_TASK_RESUME;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return -1;
@ -1776,8 +1766,39 @@ static int32_t doBuildStreamTaskUpdateMsg(void** pBuf, int32_t* pLen, int32_t no
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t mndPersistTransLog(SStreamObj* pStream, STrans* pTrans) {
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (pCommitRaw == NULL) {
mError("failed to encode stream since %s", terrstr());
return -1;
}
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
sdbFreeRaw(pCommitRaw);
mndTransDrop(pTrans);
return -1;
}
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) {
mError("stream trans:%d failed to set raw status since %s", pTrans->id, terrstr());
sdbFreeRaw(pCommitRaw);
mndTransDrop(pTrans);
return -1;
}
return 0;
}
void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_t msgType, const SEpSet* pEpset) {
pAction->epSet = *pEpset;
pAction->contLen = contLen;
pAction->pCont = pCont;
pAction->msgType = msgType;
}
// build trans to update the epset // build trans to update the epset
static int32_t createStreamUpdateTrans(SMnode* pMnode, SStreamObj* pStream, int32_t nodeId, SEpSet* pEpset) { static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, int32_t nodeId, SEpSet *pEpset) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-task-update"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-task-update");
if (pTrans == NULL) { if (pTrans == NULL) {
mError("failed to build stream task DAG update, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); mError("failed to build stream task DAG update, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
@ -1805,15 +1826,12 @@ static int32_t createStreamUpdateTrans(SMnode* pMnode, SStreamObj* pStream, int3
for (int32_t k = 0; k < numOfTasks; ++k) { for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k); SStreamTask *pTask = taosArrayGetP(pLevel, k);
void* pBuf = NULL; void *pBuf = NULL;
int32_t len = 0; int32_t len = 0;
doBuildStreamTaskUpdateMsg(&pBuf, &len, nodeId, pEpset); doBuildStreamTaskUpdateMsg(&pBuf, &len, nodeId, pEpset);
STransAction action = {0}; STransAction action = {0};
action.epSet = pTask->info.epSet; initTransAction(&action, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet);
action.pCont = pBuf;
action.contLen = len;
action.msgType = TDMT_VND_STREAM_TASK_UPDATE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
@ -1824,28 +1842,12 @@ static int32_t createStreamUpdateTrans(SMnode* pMnode, SStreamObj* pStream, int3
} }
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
return mndPersistTransLog(pStream, pTrans);
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (pCommitRaw == NULL) {
mError("failed to prepare trans rebalance since %s", terrstr());
return -1;
}
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
sdbFreeRaw(pCommitRaw);
mError("failed to prepare trans rebalance since %s", terrstr());
return -1;
}
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) {
sdbFreeRaw(pCommitRaw);
mError("failed to prepare trans rebalance since %s", terrstr());
return -1;
}
return TSDB_CODE_SUCCESS;
} }
static int32_t updateTaskEpInfo(SStreamObj* pStream, int32_t nodeId, SEpSet* pEpSet) { static int32_t updateTaskEpInfo(SStreamObj* pStream, int32_t nodeId, SEpSet* pEpSet) {
int32_t numOfLevels = 0; int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
for (int32_t j = 0; j < numOfLevels; ++j) { for (int32_t j = 0; j < numOfLevels; ++j) {
SArray *pLevel = taosArrayGetP(pStream->tasks, j); SArray *pLevel = taosArrayGetP(pStream->tasks, j);
@ -1870,10 +1872,9 @@ static int32_t updateTaskEpInfo(SStreamObj* pStream, int32_t nodeId, SEpSet* pEp
} }
} }
return 0; return 0;
} }
// todo: handle the database drop/stream drop case // todo: this process should be executed by the write queue worker of the mnode
int32_t mndProcessStreamHb(SRpcMsg *pReq) { int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
@ -1891,6 +1892,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
} }
int64_t now = taosGetTimestampSec(); int64_t now = taosGetTimestampSec();
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
// timeout list // timeout list
bool nodeChanged = false; bool nodeChanged = false;
@ -1899,8 +1901,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// record the timeout node // record the timeout node
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i); SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i);
if (now - pEntry->hbTimestamp > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next int64_t duration = now - pEntry->hbTimestamp;
// taosArrayPush(pList, &pEntry); if (duration > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next
taosArrayPush(pList, &pEntry);
mWarn("nodeId:%d stream node timeout, since last hb:%"PRId64"s", pEntry->nodeId, duration);
continue;
} }
if (pEntry->nodeId != req.vgId) { if (pEntry->nodeId != req.vgId) {
@ -1910,14 +1915,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
pEntry->hbTimestamp = now; pEntry->hbTimestamp = now;
// check epset to identify whether the node has been transferred to other dnodes. // check epset to identify whether the node has been transferred to other dnodes.
// 1. node the epset is changed, which means the node transfer has occurred for this node. // node the epset is changed, which means the node transfer has occurred for this node.
if (!isEpsetEqual(&pEntry->epset, &req.epset)) { if (!isEpsetEqual(&pEntry->epset, &req.epset)) {
nodeChanged = true; nodeChanged = true;
break; break;
} }
} }
// todo handle the node timeout case. // todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode,
// to identify whether the dnode is truely offline or not.
// handle the node changed case // handle the node changed case
if (!nodeChanged) { if (!nodeChanged) {
@ -1927,7 +1933,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
int32_t nodeId = req.vgId; int32_t nodeId = req.vgId;
SEpSet newEpSet = req.epset; SEpSet newEpSet = req.epset;
{ // check all streams that involved this vnode should update the epset info {// check all streams that involved this vnode should update the epset info
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
@ -1936,27 +1942,17 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
break; break;
} }
// update the related upstream and downstream tasks // update the related upstream and downstream tasks, todo remove this, no need this function
taosWLockLatch(&pStream->lock); taosWLockLatch(&pStream->lock);
updateTaskEpInfo(pStream, req.vgId, &req.epset); updateTaskEpInfo(pStream, req.vgId, &req.epset);
// write down taosWUnLockLatch(&pStream->lock);
code = createStreamUpdateTrans(pMnode, pStream, nodeId, &newEpSet); code = createStreamUpdateTrans(pMnode, pStream, nodeId, &newEpSet);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
// todo
}
} }
taosWUnLockLatch(&pStream->lock);
} }
}
// if (code == 0) {
// if (mndTransPrepare(pMnode, pTrans) != 0) {
// mError("failed to prepre trans rebalance since %s", terrstr());
// }
// }
// mndTransDrop(pTrans);
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -811,7 +811,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
SMqRebVgReq req = {0}; SMqRebVgReq req = {0};
SDecoder dc = {0}; SDecoder dc = {0};
tDecoderInit(&dc, msg, msgLen); tDecoderInit(&dc, (uint8_t*)msg, msgLen);
// decode req // decode req
if (tDecodeSMqRebVgReq(&dc, &req) < 0) { if (tDecodeSMqRebVgReq(&dc, &req) < 0) {
@ -1810,3 +1810,44 @@ int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return code; return code;
} }
int32_t tqProcessTaskUpdateReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
// SStreamTaskUpdateInfo* pReq = (SVPauseStreamTaskReq*)msg;
//
// SStreamMeta* pMeta = pTq->pStreamMeta;
// SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
// if (pTask == NULL) {
// tqError("vgId:%d failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
// pReq->taskId);
//
// // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
// return TSDB_CODE_SUCCESS;
// }
//
// tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
// streamTaskPause(pTask);
//
// SStreamTask* pHistoryTask = NULL;
// if (pTask->historyTaskId.taskId != 0) {
// pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
// if (pHistoryTask == NULL) {
// tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already. Pause success",
// pMeta->vgId, pTask->historyTaskId.taskId);
//
// streamMetaReleaseTask(pMeta, pTask);
//
// // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
// return TSDB_CODE_SUCCESS;
// }
//
// tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
// streamTaskPause(pHistoryTask);
// }
//
// streamMetaReleaseTask(pMeta, pTask);
// if (pHistoryTask != NULL) {
// streamMetaReleaseTask(pMeta, pHistoryTask);
// }
//
return TSDB_CODE_SUCCESS;
}