refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-01-04 18:47:49 +08:00
parent c494252002
commit 6d37e596ec
13 changed files with 103 additions and 133 deletions

View File

@ -91,9 +91,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}
char *p = NULL;
streamTaskGetStatus(pTask, &p);
char* p = streamTaskGetStatus(pTask)->name;
if (pTask->info.fillHistory) {
sndInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64

View File

@ -842,8 +842,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}
char* p = NULL;
streamTaskGetStatus(pTask, &p);
char* p = streamTaskGetStatus(pTask)->name;
if (pTask->info.fillHistory) {
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
@ -932,8 +931,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// do recovery step1
const char* id = pTask->id.idStr;
char* pStatus = NULL;
streamTaskGetStatus(pTask, &pStatus);
char* pStatus = streamTaskGetStatus(pTask)->name;
// avoid multi-thread exec
while (1) {
@ -990,15 +988,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (retInfo.ret == TASK_SCANHISTORY_REXEC) {
streamReExecScanHistoryFuture(pTask, retInfo.idleTime);
} else {
char* p = NULL;
ETaskStatus s = streamTaskGetStatus(pTask, &p);
SStreamTaskState* p = streamTaskGetStatus(pTask);
ETaskStatus s = p->state;
if (s == TASK_STATUS__PAUSE) {
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr,
el, pTask->execInfo.step1El, status);
} else if (s == TASK_STATUS__STOP || s == TASK_STATUS__DROPPING) {
tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", pTask->id.idStr, p,
pTask->execInfo.step1El);
tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", pTask->id.idStr,
p->name, pTask->execInfo.step1El);
}
}
@ -1038,15 +1036,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamMetaReleaseTask(pMeta, pStreamTask);
} else {
ASSERT(0);
// STimeWindow* pWindow = &pTask->dataRange.window;
// ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) || streamTaskShouldStop(pTask));
//
// // Not update the fill-history time window until the state transfer is completed.
// tqDebug("s-task:%s scan-history in stream time window completed, start to handle data from WAL, startVer:%" PRId64
// ", window:%" PRId64 " - %" PRId64,
// id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey);
//
// code = streamTaskScanHistoryDataComplete(pTask);
}
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
@ -1138,7 +1127,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
}
streamTaskResume(pTask);
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
ETaskStatus status = streamTaskGetStatus(pTask)->state;
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SINK) {
@ -1275,7 +1264,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
// todo save the checkpoint failed info
taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
ETaskStatus status = streamTaskGetStatus(pTask)->state;
if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
tqError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure",
@ -1362,7 +1351,7 @@ int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) {
}
taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
ETaskStatus status = streamTaskGetStatus(pTask)->state;
// if (status == TASK_STATUS__STREAM_SCAN_HISTORY) {
// streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
// }

View File

@ -220,16 +220,15 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) {
}
// not in ready state, do not handle the data from wal
char* p = NULL;
int32_t status = streamTaskGetStatus(pTask, &p);
if (streamTaskGetStatus(pTask, &p) != TASK_STATUS__READY) {
tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, p);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state != TASK_STATUS__READY) {
tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, pState->name);
return false;
}
// fill-history task has entered into the last phase, no need to anything
if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
ASSERT(status == TASK_STATUS__READY);
ASSERT(pState->state == TASK_STATUS__READY);
// the maximum version of data in the WAL has reached already, the step2 is done
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
pTask->dataRange.range.maxVer);
@ -342,10 +341,9 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
taosThreadMutexLock(&pTask->lock);
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
if (status != TASK_STATUS__READY) {
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, p);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state != TASK_STATUS__READY) {
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pState->name);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;

View File

@ -400,11 +400,11 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage);
streamMetaReleaseTask(pMeta, pTask);
char* p = NULL;
streamTaskGetStatus(pTask, &p);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
tqDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64
") task:0x%x (vgId:%d), check_status:%d",
pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
pTask->id.idStr, pState->name, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId,
rsp.status);
} else {
rsp.status = TASK_DOWNSTREAM_NOT_READY;
tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
@ -706,9 +706,11 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
streamMetaStopAllTasks(pMeta);
return 0;
} else if (type == STREAM_EXEC_T_RESUME_TASK) {
// task resume to run after idle for a while
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask != NULL) {
ASSERT(streamTaskReadyToRun(pTask, NULL));
tqDebug("s-task:%s task resume to run after idle for a while", pTask->id.idStr);
streamResumeTask(pTask);
}
@ -772,7 +774,7 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__CK) {
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) {
streamTaskClearCheckInfo(pTask, true);
streamTaskSetStatusReady(pTask);
}

View File

@ -23,11 +23,6 @@ extern "C" {
#endif
// moore finite state machine for stream task
typedef struct SStreamTaskState {
ETaskStatus state;
char* name;
} SStreamTaskState;
typedef int32_t (*__state_trans_fn)(SStreamTask*);
typedef int32_t (*__state_trans_succ_fn)(SStreamTask*);

View File

@ -49,7 +49,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
return;
}
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__CK) {
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) {
stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger);
} else {
if (status == TASK_TRIGGER_STATUS__ACTIVE) {
@ -247,7 +247,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
}
// disable the data from upstream tasks
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__HALT) {
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__HALT) {
status = TASK_INPUT_STATUS__BLOCKED;
}

View File

@ -187,7 +187,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
int32_t code = TSDB_CODE_SUCCESS;
// set task status
if (streamTaskGetStatus(pTask, NULL) != TASK_STATUS__CK) {
if (streamTaskGetStatus(pTask)->state != TASK_STATUS__CK) {
pTask->chkInfo.checkpointingId = checkpointId;
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
if (code != TSDB_CODE_SUCCESS) {
@ -309,8 +309,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
p->chkInfo.checkpointVer = p->chkInfo.processedVer;
streamTaskClearCheckInfo(p, false);
char* str = NULL;
streamTaskGetStatus(p, &str);
SStreamTaskState* pState = streamTaskGetStatus(p);
code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
taosThreadMutexUnlock(&p->lock);
@ -322,7 +321,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
stDebug("vgId:%d s-task:%s level:%d open upstream inputQ, save status after checkpoint, checkpointId:%" PRId64
", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status: normal, prev:%s",
vgId, id, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, str);
vgId, id, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, pState->name);
// save the task if not sink task
if (p->info.taskLevel != TASK_LEVEL__SINK) {

View File

@ -696,10 +696,9 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
int32_t numOfVgs = taosArrayGetSize(vgInfo);
pTask->notReadyTasks = numOfVgs;
char* p = NULL;
streamTaskGetStatus(pTask, &p);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
stDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s",
pTask->id.idStr, numOfVgs, p);
pTask->id.idStr, numOfVgs, pState->name);
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.downstreamTaskId = pVgInfo->taskId;
@ -819,9 +818,9 @@ int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHist
initRpcMsg(&msg, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead));
tmsgSendReq(pEpSet, &msg);
char* p = NULL;
streamTaskGetStatus(pTask, &p);
stDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, p,
SStreamTaskState* pState = streamTaskGetStatus(pTask);
stDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pState->name,
pReq->downstreamTaskId, vgId);
return 0;
}

View File

@ -24,12 +24,12 @@
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
bool streamTaskShouldStop(const SStreamTask* pTask) {
ETaskStatus s = streamTaskGetStatus(pTask, NULL);
return (s == TASK_STATUS__STOP) || (s == TASK_STATUS__DROPPING);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
return (pState->state == TASK_STATUS__STOP) || (pState->state == TASK_STATUS__DROPPING);
}
bool streamTaskShouldPause(const SStreamTask* pTask) {
return (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__PAUSE);
return (streamTaskGetStatus(pTask)->state == TASK_STATUS__PAUSE);
}
static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBlock) {
@ -344,11 +344,14 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
streamMetaWUnLock(pMeta);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
} else {
stDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", id,
pStreamTask->id.idStr);
double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.;
stDebug(
"s-task:%s fill-history task end, scal wal elapsed time:%.2fSec,update related stream task:%s info, transfer "
"exec state",
id, el, pStreamTask->id.idStr);
}
ETaskStatus status = streamTaskGetStatus(pStreamTask, NULL);
ETaskStatus status = streamTaskGetStatus(pStreamTask)->state;
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
// It must be halted for a source stream task, since when the related scan-history-data task start scan the history
@ -374,8 +377,10 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// In case of sink tasks, no need to halt them.
// In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
// start the task state transfer procedure.
char* p = NULL;
status = streamTaskGetStatus(pStreamTask, &p);
// char* p = NULL;
SStreamTaskState* pState = streamTaskGetStatus(pStreamTask);
status = pState->state;
char* p = pState->name;
if (status == TASK_STATUS__STOP || status == TASK_STATUS__DROPPING) {
stError("s-task:%s failed to transfer state from fill-history task:%s, status:%s", id, pStreamTask->id.idStr, p);
streamMetaReleaseTask(pMeta, pStreamTask);
@ -409,7 +414,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
// 5. save to disk
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL);
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state;
// 6. add empty delete block
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) {
@ -570,7 +575,7 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
int32_t blockSize = 0;
int32_t numOfBlocks = 0;
SStreamQueueItem* pInput = NULL;
if (streamTaskShouldStop(pTask) || (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__UNINIT)) {
if (streamTaskShouldStop(pTask) || (streamTaskGetStatus(pTask)->state == TASK_STATUS__UNINIT)) {
stDebug("s-task:%s stream task is stopped", id);
break;
}
@ -647,10 +652,9 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
if (type == STREAM_INPUT__CHECKPOINT) {
// todo add lock
char* p = NULL;
ETaskStatus s = streamTaskGetStatus(pTask, &p);
if (s == TASK_STATUS__CK) {
stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, p);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) {
stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, pState->name);
streamTaskBuildCheckpoint(pTask);
} else {
// todo refactor
@ -678,26 +682,27 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
// the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not
// be updated by tryExec function, therefore, the schedStatus will always be the TASK_SCHED_STATUS__WAITING.
bool streamTaskIsIdle(const SStreamTask* pTask) {
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
ETaskStatus status = streamTaskGetStatus(pTask)->state;
return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE || status == TASK_STATUS__STOP ||
status == TASK_STATUS__DROPPING);
}
bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
ETaskStatus st = streamTaskGetStatus(pTask, pStatus);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
ETaskStatus st = pState->state;
*pStatus = pState->name;
return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK);
}
static void doStreamExecTaskHelper(void* param, void* tmrId) {
SStreamTask* pTask = (SStreamTask*)param;
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
SStreamTaskState* p = streamTaskGetStatus(pTask);
if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) {
streamTaskSetSchedStatusInactive(pTask);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s not resume task, ref:%d", pTask->id.idStr, p, ref);
stDebug("s-task:%s status:%s not resume task, ref:%d", pTask->id.idStr, p->name, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
@ -770,8 +775,7 @@ int32_t streamResumeTask(SStreamTask* pTask) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
taosThreadMutexUnlock(&pTask->lock);
char* p = NULL;
streamTaskGetStatus(pTask, &p);
char* p = streamTaskGetStatus(pTask)->name;
stDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, p, pTask->status.schedStatus);
return 0;
}
@ -789,8 +793,7 @@ int32_t streamExecTask(SStreamTask* pTask) {
if (schedStatus == TASK_SCHED_STATUS__WAITING) {
streamResumeTask(pTask);
} else {
char* p = NULL;
streamTaskGetStatus(pTask, &p);
char* p = streamTaskGetStatus(pTask)->name;
stDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, p,
pTask->status.schedStatus);
}

View File

@ -1093,7 +1093,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
STaskStatusEntry entry = {
.id = *pId,
.status = streamTaskGetStatus(*pTask, NULL),
.status = streamTaskGetStatus(*pTask)->state,
.nodeId = hbMsg.vgId,
.stage = pMeta->stage,
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
@ -1362,13 +1362,13 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) {
}
taosThreadMutexLock(&pTask->lock);
char* p = NULL;
ETaskStatus s = streamTaskGetStatus(pTask, &p);
if (s == TASK_STATUS__CK) {
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask);
stDebug("s-task:%s mark the checkpoint:%"PRId64" failed", pTask->id.idStr, pTask->chkInfo.checkpointingId);
} else {
stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, p);
stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState->state);
}
taosThreadMutexUnlock(&pTask->lock);

View File

@ -48,15 +48,13 @@ static void tryLaunchHistoryTask(void* param, void* tmrId);
static void doProcessDownstreamReadyRsp(SStreamTask* pTask);
int32_t streamTaskSetReady(SStreamTask* pTask) {
char* p = NULL;
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
ETaskStatus status = streamTaskGetStatus(pTask, &p);
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
SStreamTaskState* p = streamTaskGetStatus(pTask);
if ((status == TASK_STATUS__SCAN_HISTORY/* || status == TASK_STATUS__STREAM_SCAN_HISTORY*/) &&
pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
if ((p->state == TASK_STATUS__SCAN_HISTORY) && pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, p);
pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, p->name);
}
ASSERT(pTask->status.downstreamReady == 0);
@ -93,12 +91,11 @@ static void doReExecScanhistory(void* param, void* tmrId) {
SStreamTask* pTask = param;
pTask->schedHistoryInfo.numOfTicks -= 1;
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
SStreamTaskState* p = streamTaskGetStatus(pTask);
if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) {
streamMetaReleaseTask(pTask->pMeta, pTask);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p, ref);
stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p->name, ref);
return;
}
@ -155,7 +152,7 @@ static int32_t doStartScanHistoryTask(SStreamTask* pTask) {
int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
int32_t level = pTask->info.taskLevel;
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
ETaskStatus status = streamTaskGetStatus(pTask)->state;
ASSERT(pTask->status.downstreamReady == 1 &&
((status == TASK_STATUS__SCAN_HISTORY)/* || (status == TASK_STATUS__STREAM_SCAN_HISTORY)*/));
@ -315,7 +312,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
id, upstreamTaskId, vgId, stage, pInfo->stage);
// record the checkpoint failure id and sent to mnode
taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
ETaskStatus status = streamTaskGetStatus(pTask)->state;
if (status == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask);
}
@ -325,7 +322,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
if (pInfo->stage != stage) {
taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
ETaskStatus status = streamTaskGetStatus(pTask)->state;
if (status == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask);
}
@ -346,9 +343,8 @@ int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) {
streamTaskSetReady(pTask);
streamTaskSetRangeStreamCalc(pTask);
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
ASSERT(status == TASK_STATUS__READY);
SStreamTaskState* p = streamTaskGetStatus(pTask);
ASSERT(p->state == TASK_STATUS__READY);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int64_t startVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
@ -372,15 +368,14 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
streamTaskSetReady(pTask);
streamTaskSetRangeStreamCalc(pTask);
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
ASSERT(status == TASK_STATUS__SCAN_HISTORY/* || status == TASK_STATUS__STREAM_SCAN_HISTORY*/);
SStreamTaskState* p = streamTaskGetStatus(pTask);
ASSERT(p->state == TASK_STATUS__SCAN_HISTORY);
if (pTask->info.fillHistory == 1) {
stDebug("s-task:%s fill-history task enters into scan-history data stage, status:%s", id, p);
stDebug("s-task:%s fill-history task enters into scan-history data stage, status:%s", id, p->name);
streamTaskStartScanHistory(pTask);
} else {
stDebug("s-task:%s scan wal data, status:%s", id, p);
stDebug("s-task:%s scan wal data, status:%s", id, p->name);
}
// NOTE: there will be an deadlock if launch fill history here.
@ -624,12 +619,11 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK);
const char* id = pTask->id.idStr;
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
SStreamTaskState* p = streamTaskGetStatus(pTask);
if (status != TASK_STATUS__SCAN_HISTORY /*&& status != TASK_STATUS__STREAM_SCAN_HISTORY*/) {
stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", id, p,
pReq->upstreamTaskId);
if (p->state != TASK_STATUS__SCAN_HISTORY) {
stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", id,
p->name, pReq->upstreamTaskId);
void* pBuf = NULL;
int32_t len = 0;
@ -682,7 +676,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
}
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
ETaskStatus status = streamTaskGetStatus(pTask)->state;
// task restart now, not handle the scan-history finish rsp
if (status == TASK_STATUS__UNINIT) {
@ -741,8 +735,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
ASSERT((*ppTask)->status.timerActive >= 1);
if (streamTaskShouldStop(*ppTask)) {
char* p = NULL;
streamTaskGetStatus((*ppTask), &p);
char* p = streamTaskGetStatus(*ppTask)->name;
int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1);
stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d",
(*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref);
@ -781,10 +774,9 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
// abort the timer if intend to stop task
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId);
if (pHTask == NULL && (!streamTaskShouldStop(pTask))) {
char* p = NULL;
char* p = streamTaskGetStatus(pTask)->name;
int32_t hTaskId = pHTaskInfo->id.taskId;
streamTaskGetStatus(pTask, &p);
stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
@ -878,7 +870,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
}
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__DROPPING) {
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__DROPPING) {
return 0;
}
@ -1058,23 +1050,21 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
}
void streamTaskResume(SStreamTask* pTask) {
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
SStreamTaskState* p = streamTaskGetStatus(pTask);
SStreamMeta* pMeta = pTask->pMeta;
if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__HALT) {
if (p->state == TASK_STATUS__PAUSE || p->state == TASK_STATUS__HALT) {
streamTaskRestoreStatus(pTask);
char* pNew = NULL;
streamTaskGetStatus(pTask, &pNew);
if (status == TASK_STATUS__PAUSE) {
char* pNew = streamTaskGetStatus(pTask)->name;
if (p->state == TASK_STATUS__PAUSE) {
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, p, num);
stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, p->name, num);
} else {
stInfo("s-task:%s status:%s resume from %s", pTask->id.idStr, pNew, p);
stInfo("s-task:%s status:%s resume from %s", pTask->id.idStr, pNew, p->name);
}
} else {
stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, p);
stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, p->name);
}
}

View File

@ -309,7 +309,9 @@ void tFreeStreamTask(SStreamTask* pTask) {
ETaskStatus status1 = TASK_STATUS__UNINIT;
taosThreadMutexLock(&pTask->lock);
if (pTask->status.pSM != NULL) {
status1 = streamTaskGetStatus(pTask, &p);
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
p = pStatus->name;
status1 = pStatus->state;
}
taosThreadMutexUnlock(&pTask->lock);

View File

@ -65,8 +65,7 @@ static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus nex
static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; }
static int32_t attachEvent(SStreamTask* pTask, SAttachedEventInfo* pEvtInfo) {
char* p = NULL;
streamTaskGetStatus(pTask, &p);
char* p = streamTaskGetStatus(pTask)->name;
stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p,
GET_EVT_NAME(pEvtInfo->event), StreamTaskStatusList[pEvtInfo->status].name);
@ -275,7 +274,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
while (1) {
// wait for the task to be here
taosThreadMutexLock(&pTask->lock);
ETaskStatus s = streamTaskGetStatus(pTask, NULL);
ETaskStatus s = streamTaskGetStatus(pTask)->state;
taosThreadMutexUnlock(&pTask->lock);
if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {// this event has been handled already
@ -400,12 +399,8 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
return TSDB_CODE_SUCCESS;
}
ETaskStatus streamTaskGetStatus(const SStreamTask* pTask, char** pStr) {
SStreamTaskState s = pTask->status.pSM->current; // copy one obj in case of multi-thread environment
if (pStr != NULL) {
*pStr = s.name;
}
return s.state;
SStreamTaskState* streamTaskGetStatus(const SStreamTask* pTask) {
return &pTask->status.pSM->current; // copy one obj in case of multi-thread environment
}
const char* streamTaskGetStatusStr(ETaskStatus status) {