refactor: do some internal refactor.
This commit is contained in:
parent
10acd19e71
commit
71b8f67ea6
|
@ -1069,6 +1069,9 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
|
||||||
|
|
||||||
while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
|
while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
|
||||||
SChkptReportInfo* px = (SChkptReportInfo *)pIter;
|
SChkptReportInfo* px = (SChkptReportInfo *)pIter;
|
||||||
|
if (taosArrayGetSize(px->pTaskList) == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
STaskChkptInfo *pInfo = taosArrayGet(px->pTaskList, 0);
|
STaskChkptInfo *pInfo = taosArrayGet(px->pTaskList, 0);
|
||||||
if (pInfo == NULL) {
|
if (pInfo == NULL) {
|
||||||
|
|
|
@ -417,7 +417,6 @@ int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
return code;
|
return code;
|
||||||
} else {
|
} else {
|
||||||
tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId);
|
tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId);
|
||||||
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
|
||||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1083,7 +1083,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
|
||||||
|
|
||||||
int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
|
int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
int32_t code = -1;
|
int32_t code = 0;
|
||||||
SRpcMsg msg = {0};
|
SRpcMsg msg = {0};
|
||||||
|
|
||||||
// serialize
|
// serialize
|
||||||
|
@ -1093,9 +1093,9 @@ int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, in
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = -1;
|
|
||||||
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
|
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
|
code = terrno;
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1119,6 +1119,10 @@ FAIL:
|
||||||
rpcFreeCont(buf);
|
rpcFreeCont(buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (code == -1) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -295,10 +295,14 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
|
||||||
if (code) {
|
if (code) {
|
||||||
stError("vgId:%d failed to send hmMsg to mnode, try again in 5s, code:%s", pMeta->vgId, tstrerror(code));
|
stError("vgId:%d failed to send hmMsg to mnode, try again in 5s, code:%s", pMeta->vgId, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaRUnLock(pMeta);
|
streamMetaRUnLock(pMeta);
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_APP_IS_STOPPING) {
|
||||||
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
|
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
|
||||||
"meta-hb-tmr");
|
"meta-hb-tmr");
|
||||||
|
} else {
|
||||||
|
stDebug("vgId:%d is stopping, not start hb again", pMeta->vgId);
|
||||||
|
}
|
||||||
|
|
||||||
code = taosReleaseRef(streamMetaId, rid);
|
code = taosReleaseRef(streamMetaId, rid);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
|
Loading…
Reference in New Issue