fix:snode error
This commit is contained in:
parent
e7bf159174
commit
8a05bad899
|
@ -828,7 +828,6 @@ int32_t streamMetaCommit(SStreamMeta* pMeta);
|
|||
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
|
||||
void streamMetaNotifyClose(SStreamMeta* pMeta);
|
||||
void streamMetaStartHb(SStreamMeta* pMeta);
|
||||
void streamMetaInitForSnode(SStreamMeta* pMeta);
|
||||
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
||||
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool succ);
|
||||
void streamMetaRLock(SStreamMeta* pMeta);
|
||||
|
|
|
@ -88,6 +88,8 @@ SArray *smGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
|
||||
code = 0;
|
||||
_OVER:
|
||||
|
|
|
@ -117,7 +117,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
|
|||
}
|
||||
|
||||
pSnode->msgCb = pOption->msgCb;
|
||||
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, -1);
|
||||
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs());
|
||||
if (pSnode->pMeta == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto FAIL;
|
||||
|
@ -126,8 +126,6 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
|
|||
stopRsync();
|
||||
startRsync();
|
||||
|
||||
// todo fix it: send msg to mnode to rollback to an existed checkpoint
|
||||
streamMetaInitForSnode(pSnode->pMeta);
|
||||
return pSnode;
|
||||
|
||||
FAIL:
|
||||
|
@ -270,6 +268,9 @@ int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
|
||||
pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId);
|
||||
pRsp->streamId = htobe64(pRsp->streamId);
|
||||
pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId);
|
||||
pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId);
|
||||
pRsp->stage = htobe64(pRsp->stage);
|
||||
pRsp->msgId = htonl(pRsp->msgId);
|
||||
|
||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pRsp->streamId, pRsp->upstreamTaskId);
|
||||
|
@ -335,6 +336,38 @@ int32_t sndProcessTaskRecoverFinishRsp(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task
|
||||
int32_t sndProcessTaskCheckpointReadyMsg(SSnode *pSnode, SRpcMsg* pMsg) {
|
||||
SStreamMeta* pMeta = pSnode->pMeta;
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
int32_t code = 0;
|
||||
|
||||
SStreamCheckpointReadyMsg req = {0};
|
||||
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||
if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) {
|
||||
code = TSDB_CODE_MSG_DECODE_ERROR;
|
||||
tDecoderClear(&decoder);
|
||||
return code;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId);
|
||||
if (pTask == NULL) {
|
||||
qError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", pMeta->vgId, req.downstreamTaskId);
|
||||
return code;
|
||||
}
|
||||
|
||||
qDebug("snode vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", pMeta->vgId,
|
||||
pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
|
||||
|
||||
streamProcessCheckpointReadyMsg(pTask);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
char *msgStr = pMsg->pCont;
|
||||
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
||||
|
@ -450,6 +483,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
return sndProcessStreamTaskCheckReq(pSnode, pMsg);
|
||||
case TDMT_VND_STREAM_TASK_CHECK_RSP:
|
||||
return sndProcessStreamTaskCheckRsp(pSnode, pMsg);
|
||||
case TDMT_STREAM_TASK_CHECKPOINT_READY:
|
||||
return sndProcessTaskCheckpointReadyMsg(pSnode, pMsg);
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
|
|
|
@ -371,7 +371,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
|||
pTask->msgInfo.pData = pReqs;
|
||||
}
|
||||
|
||||
stDebug("s-task:%s build dispatch msg success, msgId:%d", pTask->id.idStr, pTask->execInfo.dispatch);
|
||||
stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64, pTask->id.idStr, pTask->execInfo.dispatch, pTask->pMeta->stage);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -926,8 +926,8 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
|
|||
initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead));
|
||||
info.msg.info.noResp = 1; // refactor later.
|
||||
|
||||
stDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64 ":0x%x (vgId:%d) idx:%d",
|
||||
pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.upstreamNodeId, index);
|
||||
stDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64 ":0x%x (vgId:%d) idx:%d, vgId:%d",
|
||||
pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.upstreamNodeId, index, req.upstreamNodeId);
|
||||
|
||||
if (pTask->pReadyMsgList == NULL) {
|
||||
pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo));
|
||||
|
|
|
@ -175,6 +175,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
pMeta->ahandle = ahandle;
|
||||
pMeta->expandFunc = expandFunc;
|
||||
pMeta->stage = stage;
|
||||
pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
|
||||
|
||||
// send heartbeat every 5sec.
|
||||
pMeta->rid = taosAddRef(streamMetaId, pMeta);
|
||||
|
@ -205,7 +206,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
}
|
||||
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
|
||||
|
||||
pMeta->role = NODE_ROLE_UNINIT;
|
||||
code = streamBackendLoadCheckpointInfo(pMeta);
|
||||
|
||||
taosInitRWLatch(&pMeta->lock);
|
||||
|
@ -1093,11 +1093,6 @@ void streamMetaStartHb(SStreamMeta* pMeta) {
|
|||
metaHbToMnode(pRid, NULL);
|
||||
}
|
||||
|
||||
void streamMetaInitForSnode(SStreamMeta* pMeta) {
|
||||
pMeta->stage = 0;
|
||||
pMeta->role = NODE_ROLE_LEADER;
|
||||
}
|
||||
|
||||
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
|
||||
taosHashClear(pStartInfo->pReadyTaskSet);
|
||||
taosHashClear(pStartInfo->pFailedTaskSet);
|
||||
|
|
Loading…
Reference in New Issue