refactor: remove void
This commit is contained in:
parent
f9c8b6cd7b
commit
28df9fe285
|
@ -27,7 +27,7 @@ static void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInf
|
|||
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
|
||||
static void streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id);
|
||||
static void streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id);
|
||||
static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p);
|
||||
static int32_t doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p);
|
||||
static void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList);
|
||||
static void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList);
|
||||
static int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs,
|
||||
|
@ -83,6 +83,7 @@ void streamTaskSendCheckMsg(SStreamTask* pTask) {
|
|||
SDataRange* pRange = &pTask->dataRange;
|
||||
STimeWindow* pWindow = &pRange->window;
|
||||
const char* idstr = pTask->id.idStr;
|
||||
int32_t code = 0;
|
||||
|
||||
SStreamTaskCheckReq req = {
|
||||
.streamId = pTask->id.streamId,
|
||||
|
@ -102,11 +103,11 @@ void streamTaskSendCheckMsg(SStreamTask* pTask) {
|
|||
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pDispatch->taskId, pDispatch->nodeId, idstr);
|
||||
|
||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
|
||||
" window:%" PRId64 "-%" PRId64 "QID:0x%" PRIx64,
|
||||
" window:%" PRId64 "-%" PRId64 " QID:0x%" PRIx64,
|
||||
idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer,
|
||||
pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
|
||||
|
||||
(void)streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId,
|
||||
code = streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId,
|
||||
&pTask->outputInfo.fixedDispatcher.epSet);
|
||||
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
|
@ -128,15 +129,19 @@ void streamTaskSendCheckMsg(SStreamTask* pTask) {
|
|||
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pVgInfo->taskId, pVgInfo->vgId, idstr);
|
||||
|
||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
|
||||
" check downstream task:0x%x (vgId:%d) (shuffle), idx:%d,QID:0x%" PRIx64,
|
||||
" check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, QID:0x%" PRIx64,
|
||||
idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
|
||||
(void)streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||
code = streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||
}
|
||||
} else { // for sink task, set it ready directly.
|
||||
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId);
|
||||
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr);
|
||||
processDownstreamReadyRsp(pTask);
|
||||
}
|
||||
|
||||
if (code) {
|
||||
stError("s-task:%s failed to send check msg to downstream, code:%s", idstr, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp) {
|
||||
|
@ -243,13 +248,13 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
|
|||
int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp,
|
||||
SRpcHandleInfo* pRpcInfo, int32_t taskId) {
|
||||
SEncoder encoder;
|
||||
int32_t code;
|
||||
int32_t code = 0;
|
||||
int32_t len;
|
||||
|
||||
tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
|
||||
if (code < 0) {
|
||||
stError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
|
||||
return -1;
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
|
||||
|
@ -257,13 +262,13 @@ int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTa
|
|||
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
tEncoderInit(&encoder, (uint8_t*)abuf, len);
|
||||
(void)tEncodeStreamTaskCheckRsp(&encoder, pRsp);
|
||||
code = tEncodeStreamTaskCheckRsp(&encoder, pRsp);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
|
||||
|
||||
tmsgSendRsp(&rspMsg);
|
||||
return 0;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
|
||||
|
@ -316,8 +321,11 @@ void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) {
|
|||
pInfo->pList = NULL;
|
||||
|
||||
if (pInfo->checkRspTmr != NULL) {
|
||||
(void)taosTmrStop(pInfo->checkRspTmr);
|
||||
bool succ = taosTmrStop(pInfo->checkRspTmr);
|
||||
pInfo->checkRspTmr = NULL;
|
||||
if (!succ) {
|
||||
stError("failed to stop checkrsp tmr"); // todo: add id
|
||||
}
|
||||
}
|
||||
|
||||
streamMutexDestroy(&pInfo->checkInfoLock);
|
||||
|
@ -326,11 +334,17 @@ void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) {
|
|||
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
void processDownstreamReadyRsp(SStreamTask* pTask) {
|
||||
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
|
||||
(void)streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
|
||||
int32_t code = streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to set event succ, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
|
||||
int64_t checkTs = pTask->execInfo.checkTs;
|
||||
int64_t readyTs = pTask->execInfo.readyTs;
|
||||
(void)streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
|
||||
code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to record the downstream task status, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
|
||||
if (pTask->status.taskStatus == TASK_STATUS__HALT) {
|
||||
if (!HAS_RELATED_FILLHISTORY_TASK(pTask) || (pTask->info.fillHistory != 0)) {
|
||||
|
@ -341,9 +355,9 @@ void processDownstreamReadyRsp(SStreamTask* pTask) {
|
|||
// halt it self for count window stream task until the related fill history task completed.
|
||||
stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
|
||||
pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
|
||||
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
|
||||
if (code != 0) {
|
||||
// todo: handle error
|
||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
|
||||
if (code != 0) { // todo: handle error
|
||||
stError("s-task:%s failed to handle halt event, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -352,7 +366,10 @@ void processDownstreamReadyRsp(SStreamTask* pTask) {
|
|||
// todo: let's retry
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
stDebug("s-task:%s try to launch related fill-history task", pTask->id.idStr);
|
||||
(void)streamLaunchFillHistoryTask(pTask);
|
||||
code = streamLaunchFillHistoryTask(pTask);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to launch history task, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -517,8 +534,9 @@ void streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId,
|
|||
streamMutexUnlock(&pInfo->checkInfoLock);
|
||||
}
|
||||
|
||||
void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
||||
int32_t doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t code = 0;
|
||||
|
||||
SStreamTaskCheckReq req = {
|
||||
.streamId = pTask->id.streamId,
|
||||
|
@ -536,10 +554,10 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
|||
STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher;
|
||||
setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->nodeId);
|
||||
|
||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d)QID:0x%" PRIx64, id,
|
||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) QID:0x%" PRIx64, id,
|
||||
pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
|
||||
|
||||
(void)streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
|
||||
code = streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
|
||||
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
SArray* vgInfo = pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||
|
@ -554,13 +572,19 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
|||
setCheckDownstreamReqInfo(&req, p->reqId, pVgInfo->taskId, pVgInfo->vgId);
|
||||
|
||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
|
||||
" re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%dQID:0x%" PRIx64,
|
||||
" re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d QID:0x%" PRIx64,
|
||||
id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, p->reqId);
|
||||
(void)streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||
code = streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (code) {
|
||||
stError("s-task:%s failed to send check msg to downstream, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
} else {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
|
||||
|
@ -626,7 +650,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
|
|||
continue;
|
||||
}
|
||||
|
||||
doSendCheckMsg(pTask, p);
|
||||
int32_t code = doSendCheckMsg(pTask, p);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -676,7 +700,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
|
|||
if (p != NULL) {
|
||||
p->rspTs = 0;
|
||||
p->status = -1;
|
||||
doSendCheckMsg(pTask, p);
|
||||
int32_t code = doSendCheckMsg(pTask, p);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -723,7 +747,10 @@ void rspMonitorFn(void* param, void* tmrId) {
|
|||
// not record the failed of the current task if try to close current vnode
|
||||
// otherwise, the put of message operation may incur invalid read of message queue.
|
||||
if (!pMeta->closeFlag) {
|
||||
(void)addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
|
||||
int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to create async record start failed task, code:%s", id, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
@ -805,7 +832,11 @@ void rspMonitorFn(void* param, void* tmrId) {
|
|||
streamTaskCompleteCheckRsp(pInfo, false, id);
|
||||
streamMutexUnlock(&pInfo->checkInfoLock);
|
||||
|
||||
(void)addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
|
||||
int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to create async record start failed task, code:%s", id, tstrerror(code));
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
taosArrayDestroy(pNotReadyList);
|
||||
|
|
|
@ -193,13 +193,13 @@ int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock)
|
|||
// no need to do anything if failed
|
||||
int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) {
|
||||
void* buf = NULL;
|
||||
int32_t code = -1;
|
||||
int32_t code = 0;
|
||||
SRpcMsg msg = {0};
|
||||
|
||||
int32_t tlen;
|
||||
tEncodeSize(tEncodeStreamTaskCheckReq, pReq, tlen, code);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
|
||||
|
@ -217,8 +217,8 @@ int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq,
|
|||
tEncoderClear(&encoder);
|
||||
return code;
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
tEncoderClear(&encoder);
|
||||
initRpcMsg(&msg, TDMT_VND_STREAM_TASK_CHECK, buf, tlen + sizeof(SMsgHead));
|
||||
stDebug("s-task:%s (level:%d) send check msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
|
||||
pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId);
|
||||
|
|
Loading…
Reference in New Issue