refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-06-28 13:13:13 +08:00
parent 3710ea4aca
commit 57fcd553f8
8 changed files with 47 additions and 27 deletions

View File

@ -333,7 +333,8 @@ struct SStreamTask {
// trigger // trigger
int8_t triggerStatus; int8_t triggerStatus;
int64_t triggerParam; int64_t triggerParam;
void* timer; void* schedTimer;
void* launchTaskTimer;
SMsgCb* pMsgCb; // msg handle SMsgCb* pMsgCb; // msg handle
SStreamState* pState; // state backend SStreamState* pState; // state backend
@ -550,7 +551,7 @@ int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTas
int64_t dstTaskId); int64_t dstTaskId);
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq); void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
int32_t streamSetupTrigger(SStreamTask* pTask); int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
int32_t streamProcessRunReq(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);

View File

@ -90,7 +90,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0);
ASSERT(pTask->exec.pExecutor); ASSERT(pTask->exec.pExecutor);
streamSetupTrigger(pTask); streamSetupScheduleTrigger(pTask);
qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE, qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE,
pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel); pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel);

View File

@ -889,11 +889,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond); pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
} }
streamSetupTrigger(pTask); streamSetupScheduleTrigger(pTask);
tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d, scan-history:%d", tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64
" child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel, vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel,
pTask->info.fillHistory); pTask->info.fillHistory, pTask->triggerParam);
// next valid version will add one // next valid version will add one
pTask->chkInfo.version += 1; pTask->chkInfo.version += 1;
@ -1189,9 +1190,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
streamMetaReleaseTask(pMeta, pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask);
taosWLockLatch(&pMeta->lock);
if (streamMetaCommit(pTask->pMeta) < 0) { if (streamMetaCommit(pTask->pMeta) < 0) {
// persist to disk // persist to disk
} }
taosWUnLockLatch(&pMeta->lock);
} else { } else {
// todo update the chkInfo version for current task. // todo update the chkInfo version for current task.
// this task has an associated history stream task, so we need to scan wal from the end version of // this task has an associated history stream task, so we need to scan wal from the end version of

View File

@ -64,14 +64,21 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
void streamSchedByTimer(void* param, void* tmrId) { void streamSchedByTimer(void* param, void* tmrId) {
SStreamTask* pTask = (void*)param; SStreamTask* pTask = (void*)param;
int8_t status = atomic_load_8(&pTask->triggerStatus);
qDebug("s-task:%s in scheduler timer, trigger status:%d", pTask->id.idStr, status);
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
streamMetaReleaseTask(NULL, pTask); streamMetaReleaseTask(NULL, pTask);
qDebug("s-task:%s jump out of schedTimer", pTask->id.idStr);
return; return;
} }
if (atomic_load_8(&pTask->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) { if (status == TASK_TRIGGER_STATUS__ACTIVE) {
SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0); SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0);
if (trigger == NULL) return; if (trigger == NULL) {
return;
}
trigger->type = STREAM_INPUT__GET_RES; trigger->type = STREAM_INPUT__GET_RES;
trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (trigger->pBlock == NULL) { if (trigger->pBlock == NULL) {
@ -84,23 +91,27 @@ void streamSchedByTimer(void* param, void* tmrId) {
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)trigger) < 0) { if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)trigger) < 0) {
taosFreeQitem(trigger); taosFreeQitem(trigger);
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer);
return; return;
} }
streamSchedExec(pTask); streamSchedExec(pTask);
} }
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer);
} }
int32_t streamSetupTrigger(SStreamTask* pTask) { int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
if (pTask->triggerParam != 0) { if (pTask->triggerParam != 0) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
ASSERT(ref == 2); ASSERT(ref == 2 && pTask->schedTimer == NULL);
pTask->timer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
qDebug("s-task:%s setup scheduler trigger, delay:%d ms", pTask->id.idStr, pTask->triggerParam);
pTask->schedTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
pTask->triggerStatus = TASK_TRIGGER_STATUS__INACTIVE; pTask->triggerStatus = TASK_TRIGGER_STATUS__INACTIVE;
} }
return 0; return 0;
} }

View File

@ -557,7 +557,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) { void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) {
qError("s-task:%s dispatch data in %"PRId64"ms", pTask->id.idStr, waitDuration); qError("s-task:%s dispatch data in %"PRId64"ms", pTask->id.idStr, waitDuration);
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->timer); taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer);
} }
int32_t streamDispatchStreamBlock(SStreamTask* pTask) { int32_t streamDispatchStreamBlock(SStreamTask* pTask) {

View File

@ -139,9 +139,14 @@ void streamMetaClose(SStreamMeta* pMeta) {
} }
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->timer) { if (pTask->schedTimer) {
taosTmrStop(pTask->timer); taosTmrStop(pTask->schedTimer);
pTask->timer = NULL; pTask->schedTimer = NULL;
}
if (pTask->launchTaskTimer) {
taosTmrStop(pTask->launchTaskTimer);
pTask->launchTaskTimer = NULL;
} }
tFreeStreamTask(pTask); tFreeStreamTask(pTask);

View File

@ -441,7 +441,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
SStreamTaskRetryInfo* pInfo = param; SStreamTaskRetryInfo* pInfo = param;
SStreamMeta* pMeta = pInfo->pMeta; SStreamMeta* pMeta = pInfo->pMeta;
qDebug("s-task:0x%x in timer to launch history task", pInfo->taskId); qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId);
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t));
@ -472,7 +472,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
"destroyed, or should stop exec", "destroyed, or should stop exec",
pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId); pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId);
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->timer); taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return; return;
} }
@ -486,7 +486,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
pTask->status.timerActive = 0; pTask->status.timerActive = 0;
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} else { } else {
qError("s-task:0x%x failed to load task, it may have been destoryed", pInfo->taskId); qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId);
} }
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
@ -508,18 +508,18 @@ int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask) {
pInfo->taskId = pTask->id.taskId; pInfo->taskId = pTask->id.taskId;
pInfo->pMeta = pTask->pMeta; pInfo->pMeta = pTask->pMeta;
if (pTask->timer == NULL) { if (pTask->launchTaskTimer == NULL) {
pTask->timer = taosTmrStart(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer); pTask->launchTaskTimer = taosTmrStart(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer);
if (pTask->timer == NULL) { if (pTask->launchTaskTimer == NULL) {
// todo failed to create timer // todo failed to create timer
} else { } else {
pTask->status.timerActive = 1; // timer is active pTask->status.timerActive = 1; // timer is active
qDebug("s-task:%s set time active flag", pTask->id.idStr); qDebug("s-task:%s set timer active flag", pTask->id.idStr);
} }
} else { // timer exists } else { // timer exists
pTask->status.timerActive = 1; pTask->status.timerActive = 1;
qDebug("s-task:%s set time active flag", pTask->id.idStr); qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->timer); taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
} }
// try again in 500ms // try again in 500ms

View File

@ -118,7 +118,7 @@ echo "statusInterval 1" >> $TAOS_CFG
echo "dataDir $DATA_DIR" >> $TAOS_CFG echo "dataDir $DATA_DIR" >> $TAOS_CFG
echo "logDir $LOG_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "debugFlag 0" >> $TAOS_CFG echo "debugFlag 0" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG echo "tmrDebugFlag 143" >> $TAOS_CFG
echo "uDebugFlag 143" >> $TAOS_CFG echo "uDebugFlag 143" >> $TAOS_CFG
echo "rpcDebugFlag 143" >> $TAOS_CFG echo "rpcDebugFlag 143" >> $TAOS_CFG
echo "jniDebugFlag 143" >> $TAOS_CFG echo "jniDebugFlag 143" >> $TAOS_CFG