refactor: check return value for each function.

This commit is contained in:
Haojun Liao 2024-07-17 16:47:56 +08:00
parent 552e59f42b
commit 30186f466b
14 changed files with 128 additions and 126 deletions

View File

@ -628,7 +628,9 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
int32_t streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId, SStreamUpstreamEpInfo** pEpInfo); int32_t streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId, SStreamUpstreamEpInfo** pEpInfo);
#if 0
SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId); SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId);
#endif
void streamTaskInputFail(SStreamTask* pTask); void streamTaskInputFail(SStreamTask* pTask);
@ -638,7 +640,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask);
bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus); bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
SStreamTaskState* streamTaskGetStatus(const SStreamTask* pTask); SStreamTaskState streamTaskGetStatus(const SStreamTask* pTask);
const char* streamTaskGetStatusStr(ETaskStatus status); const char* streamTaskGetStatusStr(ETaskStatus status);
void streamTaskResetStatus(SStreamTask* pTask); void streamTaskResetStatus(SStreamTask* pTask);
void streamTaskSetStatusReady(SStreamTask* pTask); void streamTaskSetStatusReady(SStreamTask* pTask);

View File

@ -41,7 +41,7 @@ int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProce
SCheckpointInfo *pChkInfo = &pTask->chkInfo; SCheckpointInfo *pChkInfo = &pTask->chkInfo;
tqSetRestoreVersionInfo(pTask); tqSetRestoreVersionInfo(pTask);
char *p = streamTaskGetStatus(pTask)->name; char *p = streamTaskGetStatus(pTask).name;
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
sndInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 sndInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64 " nextProcessVer:%" PRId64

View File

@ -768,7 +768,7 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
SCheckpointInfo* pChkInfo = &pTask->chkInfo; SCheckpointInfo* pChkInfo = &pTask->chkInfo;
tqSetRestoreVersionInfo(pTask); tqSetRestoreVersionInfo(pTask);
char* p = streamTaskGetStatus(pTask)->name; char* p = streamTaskGetStatus(pTask).name;
const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus); const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus);
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
@ -886,7 +886,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// do recovery step1 // do recovery step1
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
char* pStatus = streamTaskGetStatus(pTask)->name; char* pStatus = streamTaskGetStatus(pTask).name;
// avoid multi-thread exec // avoid multi-thread exec
while (1) { while (1) {
@ -942,15 +942,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (retInfo.ret == TASK_SCANHISTORY_REXEC) { if (retInfo.ret == TASK_SCANHISTORY_REXEC) {
streamExecScanHistoryInFuture(pTask, retInfo.idleTime); streamExecScanHistoryInFuture(pTask, retInfo.idleTime);
} else { } else {
SStreamTaskState* p = streamTaskGetStatus(pTask); SStreamTaskState p = streamTaskGetStatus(pTask);
ETaskStatus s = p->state; ETaskStatus s = p.state;
if (s == TASK_STATUS__PAUSE) { if (s == TASK_STATUS__PAUSE) {
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr, tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr,
el, pTask->execInfo.step1El, status); el, pTask->execInfo.step1El, status);
} else if (s == TASK_STATUS__STOP || s == TASK_STATUS__DROPPING) { } else if (s == TASK_STATUS__STOP || s == TASK_STATUS__DROPPING) {
tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", pTask->id.idStr, tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", pTask->id.idStr,
p->name, pTask->execInfo.step1El); p.name, pTask->execInfo.step1El);
} }
} }
@ -1152,7 +1152,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
// todo save the checkpoint failed info // todo save the checkpoint failed info
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask)->state; ETaskStatus status = streamTaskGetStatus(pTask).state;
if (req.mndTrigger == 1) { if (req.mndTrigger == 1) {
if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) { if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {

View File

@ -226,15 +226,15 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) {
} }
// not in ready state, do not handle the data from wal // not in ready state, do not handle the data from wal
SStreamTaskState* pState = streamTaskGetStatus(pTask); SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState->state != TASK_STATUS__READY) { if (pState.state != TASK_STATUS__READY) {
tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, pState->name); tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, pState.name);
return false; return false;
} }
// fill-history task has entered into the last phase, no need to anything // fill-history task has entered into the last phase, no need to anything
if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) { if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
ASSERT(pState->state == TASK_STATUS__READY); ASSERT(pState.state == TASK_STATUS__READY);
// the maximum version of data in the WAL has reached already, the step2 is done // the maximum version of data in the WAL has reached already, the step2 is done
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
pTask->dataRange.range.maxVer); pTask->dataRange.range.maxVer);
@ -349,9 +349,9 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask); SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState->state != TASK_STATUS__READY) { if (pState.state != TASK_STATUS__READY) {
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pState->name); tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pState.name);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;

View File

@ -885,8 +885,8 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
streamTaskClearCheckInfo(pTask, true); streamTaskClearCheckInfo(pTask, true);
// clear flag set during do checkpoint, and open inputQ for all upstream tasks // clear flag set during do checkpoint, and open inputQ for all upstream tasks
SStreamTaskState* pState = streamTaskGetStatus(pTask); SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) { if (pState.state == TASK_STATUS__CK) {
int32_t tranId = 0; int32_t tranId = 0;
int64_t activeChkId = 0; int64_t activeChkId = 0;
streamTaskGetActiveCheckpointInfo(pTask, &tranId, &activeChkId); streamTaskGetActiveCheckpointInfo(pTask, &tranId, &activeChkId);
@ -895,13 +895,13 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
pTask->id.idStr, activeChkId, tranId); pTask->id.idStr, activeChkId, tranId);
streamTaskSetStatusReady(pTask); streamTaskSetStatusReady(pTask);
} else if (pState->state == TASK_STATUS__UNINIT) { } else if (pState.state == TASK_STATUS__UNINIT) {
// tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr); // tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
// ASSERT(pTask->status.downstreamReady == 0); // ASSERT(pTask->status.downstreamReady == 0);
// tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId); // tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);
tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name); tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
} else { } else {
tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name); tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
} }
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
@ -936,8 +936,8 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SStreamTaskState* pState = streamTaskGetStatus(pTask); SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) { // recv the checkpoint-source/trigger already if (pState.state == TASK_STATUS__CK) { // recv the checkpoint-source/trigger already
int32_t transId = 0; int32_t transId = 0;
int64_t checkpointId = 0; int64_t checkpointId = 0;
@ -966,7 +966,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
TSDB_CODE_ACTION_IN_PROGRESS); TSDB_CODE_ACTION_IN_PROGRESS);
} }
} else { // upstream not recv the checkpoint-source/trigger till now } else { // upstream not recv the checkpoint-source/trigger till now
ASSERT(pState->state == TASK_STATUS__READY || pState->state == TASK_STATUS__HALT); ASSERT(pState.state == TASK_STATUS__READY || pState.state == TASK_STATUS__HALT);
tqWarn( tqWarn(
"s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all " "s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
"upstream sending checkpoint-source/trigger", "upstream sending checkpoint-source/trigger",
@ -1017,7 +1017,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
SStreamTask* pHistoryTask = NULL; SStreamTask* pHistoryTask = NULL;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
pHistoryTask = NULL; pHistoryTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId, &pHistoryTask); code = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId, &pHistoryTask);
if (pHistoryTask == NULL) { if (pHistoryTask == NULL) {
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64 tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
", it may have been dropped already", ", it may have been dropped already",
@ -1047,7 +1047,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
} }
streamTaskResume(pTask); streamTaskResume(pTask);
ETaskStatus status = streamTaskGetStatus(pTask)->state; ETaskStatus status = streamTaskGetStatus(pTask).state;
int32_t level = pTask->info.taskLevel; int32_t level = pTask->info.taskLevel;
if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) { if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
@ -1092,8 +1092,8 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
} }
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask); SStreamTaskState pState = streamTaskGetStatus(pTask);
tqDebug("s-task:%s start to resume from paused, current status:%s", pTask->id.idStr, pState->name); tqDebug("s-task:%s start to resume from paused, current status:%s", pTask->id.idStr, pState.name);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode); code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
@ -1106,8 +1106,8 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
code = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask); code = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
if (pHTask) { if (pHTask) {
taosThreadMutexLock(&pHTask->lock); taosThreadMutexLock(&pHTask->lock);
SStreamTaskState* p = streamTaskGetStatus(pHTask); SStreamTaskState p = streamTaskGetStatus(pHTask);
tqDebug("s-task:%s related history task start to resume from paused, current status:%s", pHTask->id.idStr, p->name); tqDebug("s-task:%s related history task start to resume from paused, current status:%s", pHTask->id.idStr, p.name);
taosThreadMutexUnlock(&pHTask->lock); taosThreadMutexUnlock(&pHTask->lock);
code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode); code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode);

View File

@ -66,7 +66,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
id, upstreamTaskId, vgId, stage, pInfo->stage); id, upstreamTaskId, vgId, stage, pInfo->stage);
// record the checkpoint failure id and sent to mnode // record the checkpoint failure id and sent to mnode
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask)->state; ETaskStatus status = streamTaskGetStatus(pTask).state;
if (status == TASK_STATUS__CK) { if (status == TASK_STATUS__CK) {
streamTaskSetFailedCheckpointId(pTask); streamTaskSetFailedCheckpointId(pTask);
} }
@ -75,7 +75,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
if (pInfo->stage != stage) { if (pInfo->stage != stage) {
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask)->state; ETaskStatus status = streamTaskGetStatus(pTask).state;
if (status == TASK_STATUS__CK) { if (status == TASK_STATUS__CK) {
streamTaskSetFailedCheckpointId(pTask); streamTaskSetFailedCheckpointId(pTask);
} }
@ -175,10 +175,10 @@ void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SS
pRsp->status = pRsp->status =
streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage); streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage);
SStreamTaskState* pState = streamTaskGetStatus(pTask); SStreamTaskState pState = streamTaskGetStatus(pTask);
stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64 stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64
") task:0x%x (vgId:%d), check_status:%d", ") task:0x%x (vgId:%d), check_status:%d",
pTask->id.idStr, pState->name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, pTask->id.idStr, pState.name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId,
pRsp->status); pRsp->status);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} else { } else {
@ -651,12 +651,12 @@ int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64
void rspMonitorFn(void* param, void* tmrId) { void rspMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param; SStreamTask* pTask = param;
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
SStreamTaskState* pStat = streamTaskGetStatus(pTask); SStreamTaskState pStat = streamTaskGetStatus(pTask);
STaskCheckInfo* pInfo = &pTask->taskCheckInfo; STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
int32_t vgId = pTask->pMeta->vgId; int32_t vgId = pTask->pMeta->vgId;
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
int64_t timeoutDuration = now - pInfo->timeoutStartTs; int64_t timeoutDuration = now - pInfo->timeoutStartTs;
ETaskStatus state = pStat->state; ETaskStatus state = pStat.state;
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t numOfReady = 0; int32_t numOfReady = 0;
int32_t numOfFault = 0; int32_t numOfFault = 0;
@ -669,7 +669,7 @@ void rspMonitorFn(void* param, void* tmrId) {
if (state == TASK_STATUS__STOP) { if (state == TASK_STATUS__STOP) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat.name, vgId, ref);
streamTaskCompleteCheckRsp(pInfo, true, id); streamTaskCompleteCheckRsp(pInfo, true, id);
@ -685,7 +685,7 @@ void rspMonitorFn(void* param, void* tmrId) {
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) { if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat.name, vgId, ref);
streamTaskCompleteCheckRsp(pInfo, true, id); streamTaskCompleteCheckRsp(pInfo, true, id);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
@ -695,7 +695,7 @@ void rspMonitorFn(void* param, void* tmrId) {
taosThreadMutexLock(&pInfo->checkInfoLock); taosThreadMutexLock(&pInfo->checkInfoLock);
if (pInfo->notReadyTasks == 0) { if (pInfo->notReadyTasks == 0) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat->name, stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat.name,
vgId, ref); vgId, ref);
streamTaskCompleteCheckRsp(pInfo, false, id); streamTaskCompleteCheckRsp(pInfo, false, id);
@ -707,7 +707,7 @@ void rspMonitorFn(void* param, void* tmrId) {
SArray* pNotReadyList = taosArrayInit(4, sizeof(int64_t)); SArray* pNotReadyList = taosArrayInit(4, sizeof(int64_t));
SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t)); SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t));
if (pStat->state == TASK_STATUS__UNINIT) { if (pStat.state == TASK_STATUS__UNINIT) {
getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id); getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList); numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
@ -720,7 +720,7 @@ void rspMonitorFn(void* param, void* tmrId) {
stDebug( stDebug(
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
"detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", "detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
id, pStat->name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); id, pStat.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
streamTaskCompleteCheckRsp(pInfo, false, id); streamTaskCompleteCheckRsp(pInfo, false, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);
@ -731,7 +731,7 @@ void rspMonitorFn(void* param, void* tmrId) {
return; return;
} }
} else { // unexpected status } else { // unexpected status
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name); stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat.name);
} }
// checking of downstream tasks has been stopped by other threads // checking of downstream tasks has been stopped by other threads
@ -740,7 +740,7 @@ void rspMonitorFn(void* param, void* tmrId) {
stDebug( stDebug(
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, total:%d, notRsp:%d, " "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, total:%d, notRsp:%d, "
"notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", "notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
id, pStat->name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); id, pStat.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
streamTaskCompleteCheckRsp(pInfo, false, id); streamTaskCompleteCheckRsp(pInfo, false, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);

View File

@ -219,7 +219,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
return code; return code;
} }
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) { if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
if (pActiveInfo->activeId != checkpointId) { if (pActiveInfo->activeId != checkpointId) {
stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64 stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64
" discard", " discard",
@ -265,7 +265,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
id, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, transId, checkpointId); id, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, transId, checkpointId);
// set task status // set task status
if (streamTaskGetStatus(pTask)->state != TASK_STATUS__CK) { if (streamTaskGetStatus(pTask).state != TASK_STATUS__CK) {
pActiveInfo->activeId = checkpointId; pActiveInfo->activeId = checkpointId;
pActiveInfo->transId = transId; pActiveInfo->transId = transId;
@ -354,9 +354,9 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
ASSERT(total > 0); ASSERT(total > 0);
// 1. not in checkpoint status now // 1. not in checkpoint status now
SStreamTaskState* pStat = streamTaskGetStatus(pTask); SStreamTaskState pStat = streamTaskGetStatus(pTask);
if (pStat->state != TASK_STATUS__CK) { if (pStat.state != TASK_STATUS__CK) {
stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat->name, downstreamTaskId); stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat.name, downstreamTaskId);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS; return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
} }
@ -364,7 +364,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) { if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) {
stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64 stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64
") from task:0x%x, expired and discard ", ") from task:0x%x, expired and discard ",
id, pStat->name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId); id, pStat.name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId);
return -1; return -1;
} }
@ -482,17 +482,17 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SStreamTaskState* pStatus = streamTaskGetStatus(pTask); SStreamTaskState pStatus = streamTaskGetStatus(pTask);
if (!restored) { // during restore procedure, do update checkpoint-info if (!restored) { // during restore procedure, do update checkpoint-info
stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64 stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
pInfo->checkpointTime, pReq->checkpointTs); pInfo->checkpointTime, pReq->checkpointTs);
} else { // not in restore status, must be in checkpoint status } else { // not in restore status, must be in checkpoint status
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
pInfo->checkpointTime, pReq->checkpointTs); pInfo->checkpointTime, pReq->checkpointTs);
} }
@ -505,11 +505,11 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
streamTaskClearCheckInfo(pTask, true); streamTaskClearCheckInfo(pTask, true);
if (pStatus->state == TASK_STATUS__CK) { if (pStatus.state == TASK_STATUS__CK) {
// todo handle error // todo handle error
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
} else { } else {
stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus->name); stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus.name);
} }
if (pReq->dropRelHTask) { if (pReq->dropRelHTask) {
@ -519,7 +519,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
} }
stDebug("s-task:0x%x set the persistent status attr to be ready, prev:%s, status in sm:%s", pReq->taskId, stDebug("s-task:0x%x set the persistent status attr to be ready, prev:%s, status in sm:%s", pReq->taskId,
streamTaskGetStatusStr(pTask->status.taskStatus), streamTaskGetStatus(pTask)->name); streamTaskGetStatusStr(pTask->status.taskStatus), streamTaskGetStatus(pTask).name);
pTask->status.taskStatus = TASK_STATUS__READY; pTask->status.taskStatus = TASK_STATUS__READY;
@ -770,8 +770,8 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now); stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
(void) taosThreadMutexLock(&pTask->lock); (void) taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask); SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState->state != TASK_STATUS__CK) { if (pState.state != TASK_STATUS__CK) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
@ -890,9 +890,9 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId)
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
SStreamTaskState* pStatus = streamTaskGetStatus(pTask); SStreamTaskState pStatus = streamTaskGetStatus(pTask);
if (pStatus->state != TASK_STATUS__CK) { if (pStatus.state != TASK_STATUS__CK) {
return false; return false;
} }
@ -1207,8 +1207,8 @@ int32_t streamTaskSendCheckpointsourceRsp(SStreamTask* pTask) {
} }
(void) taosThreadMutexLock(&pTask->lock); (void) taosThreadMutexLock(&pTask->lock);
SStreamTaskState* p = streamTaskGetStatus(pTask); SStreamTaskState p = streamTaskGetStatus(pTask);
if (p->state == TASK_STATUS__CK) { if (p.state == TASK_STATUS__CK) {
code = streamTaskSendCheckpointSourceRsp(pTask); code = streamTaskSendCheckpointSourceRsp(pTask);
} }
(void) taosThreadMutexUnlock(&pTask->lock); (void) taosThreadMutexUnlock(&pTask->lock);

View File

@ -799,11 +799,11 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
stDebug("s-task:%s in sending checkpoint-ready msg monitor timer", id); stDebug("s-task:%s in sending checkpoint-ready msg monitor timer", id);
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask); SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState->state != TASK_STATUS__CK) { if (pState.state != TASK_STATUS__CK) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId, stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId,
pState->name, ref); pState.name, ref);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pTask->pMeta, pTask); streamMetaReleaseTask(pTask->pMeta, pTask);
return; return;
@ -1303,7 +1303,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
if (delayDispatch) { if (delayDispatch) {
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
// we only set the dispatch msg info for current checkpoint trans // we only set the dispatch msg info for current checkpoint trans
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK && if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK &&
pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId) { pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId) {
ASSERT(pTask->chkInfo.pActiveInfo->transId == pMsgInfo->transId); ASSERT(pTask->chkInfo.pActiveInfo->transId == pMsgInfo->transId);
stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed", stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed",

View File

@ -27,12 +27,12 @@ static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks); static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks);
bool streamTaskShouldStop(const SStreamTask* pTask) { bool streamTaskShouldStop(const SStreamTask* pTask) {
SStreamTaskState* pState = streamTaskGetStatus(pTask); SStreamTaskState pState = streamTaskGetStatus(pTask);
return (pState->state == TASK_STATUS__STOP) || (pState->state == TASK_STATUS__DROPPING); return (pState.state == TASK_STATUS__STOP) || (pState.state == TASK_STATUS__DROPPING);
} }
bool streamTaskShouldPause(const SStreamTask* pTask) { bool streamTaskShouldPause(const SStreamTask* pTask) {
return (streamTaskGetStatus(pTask)->state == TASK_STATUS__PAUSE); return (streamTaskGetStatus(pTask).state == TASK_STATUS__PAUSE);
} }
static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBlock) { static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBlock) {
@ -337,10 +337,10 @@ int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
stDebug( stDebug(
"s-task:%s fill-history task end, status:%s, scan wal elapsed time:%.2fSec, update related stream task:%s " "s-task:%s fill-history task end, status:%s, scan wal elapsed time:%.2fSec, update related stream task:%s "
"info, prepare transfer exec state", "info, prepare transfer exec state",
id, streamTaskGetStatus(pTask)->name, el, pStreamTask->id.idStr); id, streamTaskGetStatus(pTask).name, el, pStreamTask->id.idStr);
} }
ETaskStatus status = streamTaskGetStatus(pStreamTask)->state; ETaskStatus status = streamTaskGetStatus(pStreamTask).state;
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
// It must be halted for a source stream task, since when the related scan-history-data task start scan the history // It must be halted for a source stream task, since when the related scan-history-data task start scan the history
@ -350,7 +350,7 @@ int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
} else { } else {
ASSERT(status == TASK_STATUS__READY || status == TASK_STATUS__PAUSE || status == TASK_STATUS__DROPPING || ASSERT(status == TASK_STATUS__READY || status == TASK_STATUS__PAUSE || status == TASK_STATUS__DROPPING ||
status == TASK_STATUS__STOP); status == TASK_STATUS__STOP);
int32_t code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", id, stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", id,
pStreamTask->id.idStr, tstrerror(code)); pStreamTask->id.idStr, tstrerror(code));
@ -364,9 +364,9 @@ int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
// In case of sink tasks, no need to halt them. // In case of sink tasks, no need to halt them.
// In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
// start the task state transfer procedure. // start the task state transfer procedure.
SStreamTaskState* pState = streamTaskGetStatus(pStreamTask); SStreamTaskState pState = streamTaskGetStatus(pStreamTask);
status = pState->state; status = pState.state;
char* p = pState->name; char* p = pState.name;
if (status == TASK_STATUS__STOP || status == TASK_STATUS__DROPPING) { if (status == TASK_STATUS__STOP || status == TASK_STATUS__DROPPING) {
stError("s-task:%s failed to transfer state from fill-history task:%s, status:%s", id, pStreamTask->id.idStr, p); stError("s-task:%s failed to transfer state from fill-history task:%s, status:%s", id, pStreamTask->id.idStr, p);
streamMetaReleaseTask(pMeta, pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask);
@ -391,7 +391,7 @@ int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
streamTaskSendCheckpointReq(pStreamTask); streamTaskSendCheckpointReq(pStreamTask);
// 3. assign the status to the value that will be kept in disk // 3. assign the status to the value that will be kept in disk
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state; pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask).state;
// 4. open the inputQ for all upstream tasks // 4. open the inputQ for all upstream tasks
streamTaskOpenAllUpstreamInput(pStreamTask); streamTaskOpenAllUpstreamInput(pStreamTask);
@ -598,7 +598,7 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB
streamTaskReleaseState(pHTask); streamTaskReleaseState(pHTask);
streamTaskReloadState(pTask); streamTaskReloadState(pTask);
stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr, stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr,
streamTaskGetStatus(pHTask)->name); streamTaskGetStatus(pHTask).name);
streamMetaReleaseTask(pTask->pMeta, pHTask); streamMetaReleaseTask(pTask->pMeta, pHTask);
} else { } else {
@ -628,7 +628,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
SStreamQueueItem* pInput = NULL; SStreamQueueItem* pInput = NULL;
if (streamTaskShouldStop(pTask) || (streamTaskGetStatus(pTask)->state == TASK_STATUS__UNINIT)) { if (streamTaskShouldStop(pTask) || (streamTaskGetStatus(pTask).state == TASK_STATUS__UNINIT)) {
stDebug("s-task:%s stream task is stopped", id); stDebug("s-task:%s stream task is stopped", id);
return 0; return 0;
} }
@ -706,9 +706,9 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
} else { // todo other thread may change the status } else { // todo other thread may change the status
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask); SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) { if (pState.state == TASK_STATUS__CK) {
stDebug("s-task:%s checkpoint block received, set status:%s", id, pState->name); stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
streamTaskBuildCheckpoint(pTask); streamTaskBuildCheckpoint(pTask);
} else { // todo refactor } else { // todo refactor
int32_t code = 0; int32_t code = 0;
@ -735,17 +735,17 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
// the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not // the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not
// be updated by tryExec function, therefore, the schedStatus will always be the TASK_SCHED_STATUS__WAITING. // be updated by tryExec function, therefore, the schedStatus will always be the TASK_SCHED_STATUS__WAITING.
bool streamTaskIsIdle(const SStreamTask* pTask) { bool streamTaskIsIdle(const SStreamTask* pTask) {
ETaskStatus status = streamTaskGetStatus(pTask)->state; ETaskStatus status = streamTaskGetStatus(pTask).state;
return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE || status == TASK_STATUS__STOP || return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE || status == TASK_STATUS__STOP ||
status == TASK_STATUS__DROPPING); status == TASK_STATUS__DROPPING);
} }
bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) { bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
SStreamTaskState* pState = streamTaskGetStatus(pTask); SStreamTaskState pState = streamTaskGetStatus(pTask);
ETaskStatus st = pState->state; ETaskStatus st = pState.state;
if (pStatus != NULL) { if (pStatus != NULL) {
*pStatus = pState->name; *pStatus = pState.name;
} }
// pause & halt will still run for sink tasks. // pause & halt will still run for sink tasks.
@ -776,7 +776,7 @@ int32_t streamResumeTask(SStreamTask* pTask) {
setLastExecTs(pTask, taosGetTimestampMs()); setLastExecTs(pTask, taosGetTimestampMs());
char* p = streamTaskGetStatus(pTask)->name; char* p = streamTaskGetStatus(pTask).name;
stDebug("s-task:%s exec completed, status:%s, sched-status:%d, lastExecTs:%" PRId64, id, p, stDebug("s-task:%s exec completed, status:%s, sched-status:%d, lastExecTs:%" PRId64, id, p,
pTask->status.schedStatus, pTask->status.lastExecTs); pTask->status.schedStatus, pTask->status.lastExecTs);
@ -804,7 +804,7 @@ int32_t streamExecTask(SStreamTask* pTask) {
if (schedStatus == TASK_SCHED_STATUS__WAITING) { if (schedStatus == TASK_SCHED_STATUS__WAITING) {
streamResumeTask(pTask); streamResumeTask(pTask);
} else { } else {
char* p = streamTaskGetStatus(pTask)->name; char* p = streamTaskGetStatus(pTask).name;
stDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, p, stDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, p,
pTask->status.schedStatus); pTask->status.schedStatus);
} }

View File

@ -1128,11 +1128,11 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) {
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask); SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) { if (pState.state == TASK_STATUS__CK) {
streamTaskSetFailedCheckpointId(pTask); streamTaskSetFailedCheckpointId(pTask);
} else { } else {
stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState->name); stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState.name);
} }
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);

View File

@ -94,13 +94,13 @@ int32_t streamTaskResumeInFuture(SStreamTask* pTask) {
void streamTaskResumeHelper(void* param, void* tmrId) { void streamTaskResumeHelper(void* param, void* tmrId) {
SStreamTask* pTask = (SStreamTask*)param; SStreamTask* pTask = (SStreamTask*)param;
SStreamTaskId* pId = &pTask->id; SStreamTaskId* pId = &pTask->id;
SStreamTaskState* p = streamTaskGetStatus(pTask); SStreamTaskState p = streamTaskGetStatus(pTask);
if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) { if (p.state == TASK_STATUS__DROPPING || p.state == TASK_STATUS__STOP) {
streamTaskSetSchedStatusInactive(pTask); streamTaskSetSchedStatusInactive(pTask);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s not resume task, ref:%d", pId->idStr, p->name, ref); stDebug("s-task:%s status:%s not resume task, ref:%d", pId->idStr, p.name, ref);
streamMetaReleaseTask(pTask->pMeta, pTask); streamMetaReleaseTask(pTask->pMeta, pTask);
return; return;
@ -130,7 +130,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
return; return;
} }
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) { if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger); stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger);
} else { } else {
if (status == TASK_TRIGGER_STATUS__ACTIVE) { if (status == TASK_TRIGGER_STATUS__ACTIVE) {

View File

@ -44,13 +44,13 @@ static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunc
static void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now); static void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now);
static int32_t streamTaskSetReady(SStreamTask* pTask) { static int32_t streamTaskSetReady(SStreamTask* pTask) {
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
SStreamTaskState* p = streamTaskGetStatus(pTask); SStreamTaskState p = streamTaskGetStatus(pTask);
if ((p->state == TASK_STATUS__SCAN_HISTORY) && pTask->info.taskLevel != TASK_LEVEL__SOURCE) { if ((p.state == TASK_STATUS__SCAN_HISTORY) && pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
int32_t numOfUps = taosArrayGetSize(pTask->upstreamInfo.pList); int32_t numOfUps = taosArrayGetSize(pTask->upstreamInfo.pList);
stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s", stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
pTask->id.idStr, pTask->info.taskLevel, numOfUps, p->name); pTask->id.idStr, pTask->info.taskLevel, numOfUps, p.name);
} }
ASSERT(pTask->status.downstreamReady == 0); ASSERT(pTask->status.downstreamReady == 0);
@ -59,7 +59,7 @@ static int32_t streamTaskSetReady(SStreamTask* pTask) {
pTask->execInfo.readyTs = taosGetTimestampMs(); pTask->execInfo.readyTs = taosGetTimestampMs();
int64_t el = (pTask->execInfo.readyTs - pTask->execInfo.checkTs); int64_t el = (pTask->execInfo.readyTs - pTask->execInfo.checkTs);
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
pTask->id.idStr, numOfDowns, el, p->name); pTask->id.idStr, numOfDowns, el, p.name);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -96,7 +96,7 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration)
int32_t code = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, &p); int32_t code = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, &p);
if (p == NULL) { if (p == NULL) {
stError("s-task:0x%x failed to acquire task, status:%s, not exec scan-history data", pTask->id.taskId, stError("s-task:0x%x failed to acquire task, status:%s, not exec scan-history data", pTask->id.taskId,
streamTaskGetStatus(pTask)->name); streamTaskGetStatus(pTask).name);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -118,7 +118,7 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration)
int32_t streamTaskStartScanHistory(SStreamTask* pTask) { int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
int32_t level = pTask->info.taskLevel; int32_t level = pTask->info.taskLevel;
ETaskStatus status = streamTaskGetStatus(pTask)->state; ETaskStatus status = streamTaskGetStatus(pTask).state;
ASSERT((pTask->status.downstreamReady == 1) && (status == TASK_STATUS__SCAN_HISTORY) && ASSERT((pTask->status.downstreamReady == 1) && (status == TASK_STATUS__SCAN_HISTORY) &&
(pTask->info.fillHistory == 1)); (pTask->info.fillHistory == 1));
@ -139,8 +139,8 @@ int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) {
streamTaskSetReady(pTask); streamTaskSetReady(pTask);
streamTaskSetRangeStreamCalc(pTask); streamTaskSetRangeStreamCalc(pTask);
SStreamTaskState* p = streamTaskGetStatus(pTask); SStreamTaskState p = streamTaskGetStatus(pTask);
ASSERT(p->state == TASK_STATUS__READY); ASSERT(p.state == TASK_STATUS__READY);
int8_t schedStatus = pTask->status.schedStatus; int8_t schedStatus = pTask->status.schedStatus;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
@ -149,10 +149,10 @@ int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) {
startVer = pTask->chkInfo.nextProcessVer; startVer = pTask->chkInfo.nextProcessVer;
} }
stDebug("s-task:%s status:%s, sched-status:%d, ready for data from wal ver:%" PRId64, id, p->name, schedStatus, stDebug("s-task:%s status:%s, sched-status:%d, ready for data from wal ver:%" PRId64, id, p.name, schedStatus,
startVer); startVer);
} else { } else {
stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p->name, schedStatus); stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p.name, schedStatus);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -163,10 +163,10 @@ int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask) {
streamTaskSetReady(pTask); streamTaskSetReady(pTask);
streamTaskSetRangeStreamCalc(pTask); streamTaskSetRangeStreamCalc(pTask);
SStreamTaskState* p = streamTaskGetStatus(pTask); SStreamTaskState p = streamTaskGetStatus(pTask);
ASSERT((p->state == TASK_STATUS__SCAN_HISTORY) && (pTask->info.fillHistory == 1)); ASSERT((p.state == TASK_STATUS__SCAN_HISTORY) && (pTask->info.fillHistory == 1));
stDebug("s-task:%s fill-history task enters into scan-history data stage, status:%s", pTask->id.idStr, p->name); stDebug("s-task:%s fill-history task enters into scan-history data stage, status:%s", pTask->id.idStr, p.name);
streamTaskStartScanHistory(pTask); streamTaskStartScanHistory(pTask);
// NOTE: there will be an deadlock if launch fill history here. // NOTE: there will be an deadlock if launch fill history here.
@ -206,11 +206,11 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
ASSERT(hTaskId != 0); ASSERT(hTaskId != 0);
// check stream task status in the first place. // check stream task status in the first place.
SStreamTaskState* pStatus = streamTaskGetStatus(pTask); SStreamTaskState pStatus = streamTaskGetStatus(pTask);
if (pStatus->state != TASK_STATUS__READY && pStatus->state != TASK_STATUS__HALT && if (pStatus.state != TASK_STATUS__READY && pStatus.state != TASK_STATUS__HALT &&
pStatus->state != TASK_STATUS__PAUSE) { pStatus.state != TASK_STATUS__PAUSE) {
stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId, stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId,
pStatus->name); pStatus.name);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
return -1; // todo set the correct error code return -1; // todo set the correct error code
@ -310,7 +310,7 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i
streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false); streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
} else { } else {
char* p = streamTaskGetStatus(pTask)->name; char* p = streamTaskGetStatus(pTask).name;
int32_t hTaskId = pHTaskInfo->id.taskId; int32_t hTaskId = pHTaskInfo->id.taskId;
stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d", stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
@ -342,7 +342,7 @@ void tryLaunchHistoryTask(void* param, void* tmrId) {
if (streamTaskShouldStop(*ppTask)) { if (streamTaskShouldStop(*ppTask)) {
ASSERT((*ppTask)->status.timerActive >= 1); ASSERT((*ppTask)->status.timerActive >= 1);
char* p = streamTaskGetStatus(*ppTask)->name; char* p = streamTaskGetStatus(*ppTask).name;
int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1);
stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d", stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d",
(*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref); (*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref);
@ -544,10 +544,10 @@ void doExecScanhistoryInFuture(void* param, void* tmrId) {
SStreamTask* pTask = param; SStreamTask* pTask = param;
pTask->schedHistoryInfo.numOfTicks -= 1; pTask->schedHistoryInfo.numOfTicks -= 1;
SStreamTaskState* p = streamTaskGetStatus(pTask); SStreamTaskState p = streamTaskGetStatus(pTask);
if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) { if (p.state == TASK_STATUS__DROPPING || p.state == TASK_STATUS__STOP) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p->name, ref); stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p.name, ref);
streamMetaReleaseTask(pTask->pMeta, pTask); streamMetaReleaseTask(pTask->pMeta, pTask);
return; return;

View File

@ -199,9 +199,9 @@ void tFreeStreamTask(SStreamTask* pTask) {
ETaskStatus status1 = TASK_STATUS__UNINIT; ETaskStatus status1 = TASK_STATUS__UNINIT;
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
if (pTask->status.pSM != NULL) { if (pTask->status.pSM != NULL) {
SStreamTaskState* pStatus = streamTaskGetStatus(pTask); SStreamTaskState pStatus = streamTaskGetStatus(pTask);
p = pStatus->name; p = pStatus.name;
status1 = pStatus->state; status1 = pStatus.state;
} }
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
@ -728,7 +728,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
if (resetRelHalt) { if (resetRelHalt) {
stDebug("s-task:0x%" PRIx64 " set the persistent status attr to be ready, prev:%s, status in sm:%s", stDebug("s-task:0x%" PRIx64 " set the persistent status attr to be ready, prev:%s, status in sm:%s",
sTaskId.taskId, streamTaskGetStatusStr((*ppStreamTask)->status.taskStatus), sTaskId.taskId, streamTaskGetStatusStr((*ppStreamTask)->status.taskStatus),
streamTaskGetStatus(*ppStreamTask)->name); streamTaskGetStatus(*ppStreamTask).name);
(*ppStreamTask)->status.taskStatus = TASK_STATUS__READY; (*ppStreamTask)->status.taskStatus = TASK_STATUS__READY;
} }
@ -861,7 +861,7 @@ STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) {
STaskStatusEntry entry = { STaskStatusEntry entry = {
.id = streamTaskGetTaskId(pTask), .id = streamTaskGetTaskId(pTask),
.status = streamTaskGetStatus(pTask)->state, .status = streamTaskGetStatus(pTask).state,
.nodeId = pMeta->vgId, .nodeId = pMeta->vgId,
.stage = pMeta->stage, .stage = pMeta->stage,
@ -906,12 +906,12 @@ void streamTaskPause(SStreamTask* pTask) {
} }
void streamTaskResume(SStreamTask* pTask) { void streamTaskResume(SStreamTask* pTask) {
SStreamTaskState prevState = *streamTaskGetStatus(pTask); SStreamTaskState prevState = streamTaskGetStatus(pTask);
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
int32_t code = streamTaskRestoreStatus(pTask); int32_t code = streamTaskRestoreStatus(pTask);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
char* pNew = streamTaskGetStatus(pTask)->name; char* pNew = streamTaskGetStatus(pTask).name;
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num); stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
} else { } else {

View File

@ -67,7 +67,7 @@ static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus nex
static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; } static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; }
static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEvtInfo) { static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEvtInfo) {
char* p = streamTaskGetStatus(pTask)->name; char* p = streamTaskGetStatus(pTask).name;
stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p, stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p,
GET_EVT_NAME(pEvtInfo->event), StreamTaskStatusList[pEvtInfo->status].name); GET_EVT_NAME(pEvtInfo->event), StreamTaskStatusList[pEvtInfo->status].name);
@ -305,7 +305,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
while (1) { while (1) {
// wait for the task to be here // wait for the task to be here
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
ETaskStatus s = streamTaskGetStatus(pTask)->state; ETaskStatus s = streamTaskGetStatus(pTask).state;
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {// this event has been handled already if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {// this event has been handled already
@ -510,8 +510,8 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SStreamTaskState* streamTaskGetStatus(const SStreamTask* pTask) { SStreamTaskState streamTaskGetStatus(const SStreamTask* pTask) {
return &pTask->status.pSM->current; // copy one obj in case of multi-thread environment return pTask->status.pSM->current; // copy one obj in case of multi-thread environment
} }
ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask) { ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask) {