refactor: do some internal refactor about the sched status set

This commit is contained in:
Haojun Liao 2023-09-13 14:00:34 +08:00
parent 01d104706e
commit aaa12ed4f8
6 changed files with 56 additions and 19 deletions

View File

@ -401,6 +401,7 @@ typedef struct SStreamMeta {
int32_t vgId;
int64_t stage;
SRWLatch lock;
// TdThreadRwlock lock;
int32_t walScanCounter;
void* streamBackend;
int64_t streamBackendRid;
@ -660,6 +661,9 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusWait(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask);
int32_t streamTaskStop(SStreamTask* pTask);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,

View File

@ -1046,8 +1046,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
}
// we have to continue retrying to successfully execute the scan history task.
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
TASK_SCHED_STATUS__WAITING);
int8_t schedStatus = streamTaskSetSchedStatusWait(pTask);
if (schedStatus != TASK_SCHED_STATUS__INACTIVE) {
tqError(
"s-task:%s failed to start scan-history in first stream time window since already started, unexpected "
@ -1064,9 +1063,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamScanHistoryData(pTask);
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el,
TASK_SCHED_STATUS__INACTIVE);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
int8_t status = streamTaskSetSchedStatusInActive(pTask);
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
@ -1106,8 +1104,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
}
// now we can stop the stream task execution
int64_t latestVer = 0;
taosThreadMutexLock(&pStreamTask->lock);
streamTaskHalt(pStreamTask);
tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
@ -1141,7 +1139,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
/*int8_t status = */streamTaskSetSchedStatusInActive(pTask);
// set the fill-history task to be normal
if (pTask->info.fillHistory == 1 && !streamTaskShouldStop(&pTask->status)) {
@ -1308,9 +1306,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
pTask->chkInfo.nextProcessVer);
streamProcessRunReq(pTask);
} else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
int8_t status = streamTaskSetSchedStatusInActive(pTask);
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
pTask->id.idStr, streamGetTaskStatusStr(st), pTask->status.schedStatus);
pTask->id.idStr, streamGetTaskStatusStr(st), status);
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);

View File

@ -108,14 +108,13 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
}
int32_t streamSchedExec(SStreamTask* pTask) {
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
TASK_SCHED_STATUS__WAITING);
int8_t schedStatus = streamTaskSetSchedStatusWait(pTask);
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
/*int8_t status = */streamTaskSetSchedStatusInActive(pTask);
qError("failed to create msg to aunch s-task:%s, reason out of memory", pTask->id.idStr);
return -1;
}
@ -256,7 +255,11 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
}
tDeleteStreamDispatchReq(pReq);
streamTryExec(pTask);
int8_t schedStatus = streamTaskSetSchedStatusWait(pTask);
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
streamTryExec(pTask);
}
return 0;
}

View File

@ -509,7 +509,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
/*int8_t status = */streamTaskSetSchedStatusInActive(pTask);
}
} else {
qDebug("s-task:%s sink task does not transfer state", id);
@ -615,11 +615,9 @@ bool streamTaskIsIdle(const SStreamTask* pTask) {
int32_t streamTryExec(SStreamTask* pTask) {
// this function may be executed by multi-threads, so status check is required.
int8_t schedStatus =
atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
const char* id = pTask->id.idStr;
int8_t schedStatus = streamTaskSetSchedStatusActive(pTask);
if (schedStatus == TASK_SCHED_STATUS__WAITING) {
while (1) {
int32_t code = streamExecForAll(pTask);
@ -628,9 +626,12 @@ int32_t streamTryExec(SStreamTask* pTask) {
return -1;
}
taosThreadMutexLock(&pTask->lock);
if (taosQueueEmpty(pTask->inputInfo.queue->pQueue) || streamTaskShouldStop(&pTask->status) ||
streamTaskShouldPause(&pTask->status)) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
taosThreadMutexUnlock(&pTask->lock);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
return 0;

View File

@ -480,7 +480,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
// execute in the scan history complete call back msg, ready to process data from inputQ
streamSetStatusNormal(pTask);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
streamTaskSetSchedStatusInActive(pTask);
taosWLockLatch(&pMeta->lock);
streamMetaSaveTask(pMeta, pTask);

View File

@ -332,7 +332,6 @@ void tFreeStreamTask(SStreamTask* pTask) {
}
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
taosThreadMutexDestroy(&pTask->lock);
if (pTask->msgInfo.pData != NULL) {
destroyStreamDataBlock(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL;
@ -553,3 +552,35 @@ void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
qDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
}
int8_t streamTaskSetSchedStatusWait(SStreamTask* pTask) {
taosThreadMutexLock(&pTask->lock);
int8_t status = pTask->status.schedStatus;
if (status == TASK_SCHED_STATUS__INACTIVE) {
pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
}
taosThreadMutexUnlock(&pTask->lock);
return status;
}
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) {
taosThreadMutexLock(&pTask->lock);
int8_t status = pTask->status.schedStatus;
if (status == TASK_SCHED_STATUS__WAITING) {
pTask->status.schedStatus = TASK_SCHED_STATUS__ACTIVE;
}
taosThreadMutexUnlock(&pTask->lock);
return status;
}
int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask) {
taosThreadMutexLock(&pTask->lock);
int8_t status = pTask->status.schedStatus;
ASSERT(status == TASK_SCHED_STATUS__WAITING || status == TASK_SCHED_STATUS__ACTIVE);
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
taosThreadMutexUnlock(&pTask->lock);
return status;
}