fix(stream): opt perf.

This commit is contained in:
Haojun Liao 2023-09-28 09:25:23 +08:00
parent f525778aef
commit 22e485d132
4 changed files with 17 additions and 14 deletions

View File

@ -691,7 +691,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusWait(SStreamTask* pTask);
bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask);

View File

@ -1075,12 +1075,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
}
// we have to continue retrying to successfully execute the scan history task.
int8_t schedStatus = streamTaskSetSchedStatusWait(pTask);
if (schedStatus != TASK_SCHED_STATUS__INACTIVE) {
if (!streamTaskSetSchedStatusWait(pTask)) {
tqError(
"s-task:%s failed to start scan-history in first stream time window since already started, unexpected "
"sched-status:%d",
id, schedStatus);
id, pTask->status.schedStatus);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}

View File

@ -108,8 +108,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
}
int32_t streamSchedExec(SStreamTask* pTask) {
int8_t schedStatus = streamTaskSetSchedStatusWait(pTask);
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
if (streamTaskSetSchedStatusWait(pTask)) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;

View File

@ -624,15 +624,20 @@ void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
}
int8_t streamTaskSetSchedStatusWait(SStreamTask* pTask) {
taosThreadMutexLock(&pTask->lock);
int8_t status = pTask->status.schedStatus;
if (status == TASK_SCHED_STATUS__INACTIVE) {
pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
}
taosThreadMutexUnlock(&pTask->lock);
bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
bool ret = false;
return status;
// double check
if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
taosThreadMutexLock(&pTask->lock);
if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
ret = true;
}
taosThreadMutexUnlock(&pTask->lock);
}
return ret;
}
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) {