fix(stream): opt perf.
This commit is contained in:
parent
0a2b70fd7f
commit
db897fb03a
|
@ -691,7 +691,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
|
||||||
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
|
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
|
||||||
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
||||||
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
|
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
|
||||||
int8_t streamTaskSetSchedStatusWait(SStreamTask* pTask);
|
bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
|
||||||
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
|
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
|
||||||
int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask);
|
int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask);
|
||||||
|
|
||||||
|
|
|
@ -1066,12 +1066,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// we have to continue retrying to successfully execute the scan history task.
|
// we have to continue retrying to successfully execute the scan history task.
|
||||||
int8_t schedStatus = streamTaskSetSchedStatusWait(pTask);
|
if (!streamTaskSetSchedStatusWait(pTask)) {
|
||||||
if (schedStatus != TASK_SCHED_STATUS__INACTIVE) {
|
|
||||||
tqError(
|
tqError(
|
||||||
"s-task:%s failed to start scan-history in first stream time window since already started, unexpected "
|
"s-task:%s failed to start scan-history in first stream time window since already started, unexpected "
|
||||||
"sched-status:%d",
|
"sched-status:%d",
|
||||||
id, schedStatus);
|
id, pTask->status.schedStatus);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -108,8 +108,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamSchedExec(SStreamTask* pTask) {
|
int32_t streamSchedExec(SStreamTask* pTask) {
|
||||||
int8_t schedStatus = streamTaskSetSchedStatusWait(pTask);
|
if (streamTaskSetSchedStatusWait(pTask)) {
|
||||||
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
|
|
||||||
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
||||||
if (pRunReq == NULL) {
|
if (pRunReq == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
|
@ -624,15 +624,20 @@ void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
|
||||||
stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
|
stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
int8_t streamTaskSetSchedStatusWait(SStreamTask* pTask) {
|
bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
|
||||||
taosThreadMutexLock(&pTask->lock);
|
bool ret = false;
|
||||||
int8_t status = pTask->status.schedStatus;
|
|
||||||
if (status == TASK_SCHED_STATUS__INACTIVE) {
|
|
||||||
pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
|
|
||||||
}
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
|
||||||
|
|
||||||
return status;
|
// double check
|
||||||
|
if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
|
||||||
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
|
||||||
|
pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
|
||||||
|
ret = true;
|
||||||
|
}
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) {
|
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) {
|
||||||
|
|
Loading…
Reference in New Issue