Merge pull request #27110 from taosdata/fix/3_liaohj

refactor: do some internal refactor.
This commit is contained in:
Haojun Liao 2024-08-09 16:13:54 +08:00 committed by GitHub
commit f48d6f11ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 27 additions and 15 deletions

View File

@ -443,7 +443,7 @@ static int32_t mndInitTimer(SMnode *pMnode) {
(void)taosThreadAttrInit(&thAttr); (void)taosThreadAttrInit(&thAttr);
(void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if ((code = taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode)) != 0) { if ((code = taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode)) != 0) {
mError("failed to create timer thread since %s", strerror(errno)); mError("failed to create timer thread since %s", tstrerror(code));
TAOS_RETURN(code); TAOS_RETURN(code);
} }

View File

@ -2420,7 +2420,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
if (pStream != NULL) { // TODO:handle error if (pStream != NULL) { // TODO:handle error
code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
if (code) { if (code) {
mError("failed to create checkpoint trans, code:%s", strerror(code)); mError("failed to create checkpoint trans, code:%s", tstrerror(code));
} }
} else { } else {
// todo: wait for the create stream trans completed, and launch the checkpoint trans // todo: wait for the create stream trans completed, and launch the checkpoint trans

View File

@ -1069,6 +1069,9 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) { while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
SChkptReportInfo* px = (SChkptReportInfo *)pIter; SChkptReportInfo* px = (SChkptReportInfo *)pIter;
if (taosArrayGetSize(px->pTaskList) == 0) {
continue;
}
STaskChkptInfo *pInfo = taosArrayGet(px->pTaskList, 0); STaskChkptInfo *pInfo = taosArrayGet(px->pTaskList, 0);
if (pInfo == NULL) { if (pInfo == NULL) {

View File

@ -417,7 +417,6 @@ int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
return code; return code;
} else { } else {
tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId); tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId);
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
return TSDB_CODE_STREAM_TASK_NOT_EXIST; return TSDB_CODE_STREAM_TASK_NOT_EXIST;
} }
} }

View File

@ -558,10 +558,17 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
pInfo->checkpointTime, pReq->checkpointTs); pInfo->checkpointTime, pReq->checkpointTs);
} else { // not in restore status, must be in checkpoint status } else { // not in restore status, must be in checkpoint status
if (pStatus.state == TASK_STATUS__CK) {
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
pInfo->checkpointTime, pReq->checkpointTs); pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs);
} else {
stDebug("s-task:%s vgId:%d status:%s NOT update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
" checkpointVer:%" PRId64 "->%" PRId64,
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
pReq->checkpointVer);
}
} }
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer && ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
@ -573,12 +580,11 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
pInfo->checkpointVer = pReq->checkpointVer; pInfo->checkpointVer = pReq->checkpointVer;
pInfo->checkpointTime = pReq->checkpointTs; pInfo->checkpointTime = pReq->checkpointTs;
streamTaskClearCheckInfo(pTask, true);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
} else {
stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus.name);
} }
streamTaskClearCheckInfo(pTask, true);
if (pReq->dropRelHTask) { if (pReq->dropRelHTask) {
stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint", stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",
pReq->taskId, vgId, pReq->hTaskId); pReq->taskId, vgId, pReq->hTaskId);

View File

@ -1083,7 +1083,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
void* buf = NULL; void* buf = NULL;
int32_t code = -1; int32_t code = 0;
SRpcMsg msg = {0}; SRpcMsg msg = {0};
// serialize // serialize
@ -1093,9 +1093,9 @@ int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, in
goto FAIL; goto FAIL;
} }
code = -1;
buf = rpcMallocCont(sizeof(SMsgHead) + tlen); buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) { if (buf == NULL) {
code = terrno;
goto FAIL; goto FAIL;
} }
@ -1119,6 +1119,10 @@ FAIL:
rpcFreeCont(buf); rpcFreeCont(buf);
} }
if (code == -1) {
code = TSDB_CODE_INVALID_MSG;
}
return code; return code;
} }

View File

@ -295,12 +295,12 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
if (code) { if (code) {
stError("vgId:%d failed to send hmMsg to mnode, try again in 5s, code:%s", pMeta->vgId, tstrerror(code)); stError("vgId:%d failed to send hmMsg to mnode, try again in 5s, code:%s", pMeta->vgId, tstrerror(code));
} }
streamMetaRUnLock(pMeta); streamMetaRUnLock(pMeta);
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
"meta-hb-tmr"); "meta-hb-tmr");
code = taosReleaseRef(streamMetaId, rid); code = taosReleaseRef(streamMetaId, rid);
if (code) { if (code) {
stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid); stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid);
} }

View File

@ -107,7 +107,7 @@ void streamTaskResumeHelper(void* param, void* tmrId) {
int32_t code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK); int32_t code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
if (code) { if (code) {
stError("s-task:%s sched task failed, code:%s, ref:%d", pId->idStr, strerror(code), ref); stError("s-task:%s sched task failed, code:%s, ref:%d", pId->idStr, tstrerror(code), ref);
} else { } else {
stDebug("trigger to resume s-task:%s after being idled for %dms, ref:%d", pId->idStr, pTask->status.schedIdleTime, stDebug("trigger to resume s-task:%s after being idled for %dms, ref:%d", pId->idStr, pTask->status.schedIdleTime,
ref); ref);