From 8729bdac1e0deb11d36147e9fedc14f415fc7271 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 19 Oct 2023 17:59:04 +0800 Subject: [PATCH] fix(stream): fix bugs caused by refactor. --- source/dnode/mnode/impl/src/mndScheduler.c | 8 ++- source/dnode/vnode/src/tq/tq.c | 7 +-- source/libs/stream/inc/streamsm.h | 12 +++-- source/libs/stream/src/streamTaskSm.c | 50 +++++++++++-------- source/util/src/tarray.c | 2 +- tests/system-test/8-stream/scalar_function.py | 2 +- 6 files changed, 50 insertions(+), 31 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 2931f6be6b..404198a523 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -569,6 +569,10 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr } static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpstreamTask) { + if (taosArrayGetSize(pTasksList) < SINK_NODE_LEVEL || pUpstreamTask == NULL) { + return; + } + SArray* pSinkTaskList = taosArrayGetP(pTasksList, SINK_NODE_LEVEL); for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); @@ -628,7 +632,9 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* } setSinkTaskUpstreamInfo(pStream->tasks, pAggTask); - setSinkTaskUpstreamInfo(pStream->pHTasksList, pHAggTask); + if (pHAggTask != NULL) { + setSinkTaskUpstreamInfo(pStream->pHTasksList, pHAggTask); + } // source level return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, pEpset, nextWindowSkey); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8f7657f98c..c4e74f84e5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1225,7 +1225,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (done) { qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); streamTaskPutTranstateIntoInputQ(pTask); - streamTaskRestoreStatus(pTask); +// streamTaskRestoreStatus(pTask); // if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { // pTask->status.keepTaskStatus = TASK_STATUS__READY; @@ -1259,6 +1259,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } #endif + // now the fill-history task starts to scan data from wal files. streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); tqScanWalAsync(pTq, false); } @@ -1271,8 +1272,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // Not update the fill-history time window until the state transfer is completed if the related fill-history task // exists. tqDebug( - "s-task:%s scan-history in stream time window completed, now start to handle data from WAL, start " - "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64, + "s-task:%s scan-history in stream time window completed, now start to handle data from WAL, startVer:%" PRId64 + ", window:%" PRId64 " - %" PRId64, id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey); code = streamTaskScanHistoryDataComplete(pTask); diff --git a/source/libs/stream/inc/streamsm.h b/source/libs/stream/inc/streamsm.h index 19eb3c0029..83c4f51a1d 100644 --- a/source/libs/stream/inc/streamsm.h +++ b/source/libs/stream/inc/streamsm.h @@ -32,8 +32,8 @@ typedef int32_t (*__state_trans_fn)(SStreamTask*); typedef int32_t (*__state_trans_succ_fn)(SStreamTask*); typedef struct SAttachedEventInfo { - ETaskStatus status; - EStreamTaskEvent event; + ETaskStatus status; // required status that this event can be handled + EStreamTaskEvent event; // the delayed handled event } SAttachedEventInfo; typedef struct STaskStateTrans { @@ -48,13 +48,15 @@ typedef struct STaskStateTrans { struct SStreamTaskSM { SStreamTask* pTask; -// SArray* pTransList; // SArray STaskStateTrans* pActiveTrans; int64_t startTs; SStreamTaskState current; - SStreamTaskState prev; + struct { + SStreamTaskState state; + EStreamTaskEvent evt; + } prev; // register the next handled event, if current state is not allowed to handle this event - SArray* eventList; + SArray* pWaitingEventList; }; typedef struct SStreamEventInfo { diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index addd563388..463b7ae771 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -32,10 +32,11 @@ SStreamTaskState StreamTaskStatusList[9] = { {.state = TASK_STATUS__STREAM_SCAN_HISTORY, .name = "stream-scan-history"}, }; -SStreamEventInfo StreamTaskEventList[11] = { +SStreamEventInfo StreamTaskEventList[12] = { {}, // dummy event, place holder {.event = TASK_EVENT_INIT, .name = "initialize"}, - {.event = TASK_EVENT_INIT_SCANHIST, .name = "scan-history-initialize"}, + {.event = TASK_EVENT_INIT_SCANHIST, .name = "scan-history-init"}, + {.event = TASK_EVENT_INIT_STREAM_SCANHIST, .name = "stream-scan-history-init"}, {.event = TASK_EVENT_SCANHIST_DONE, .name = "scan-history-completed"}, {.event = TASK_EVENT_STOP, .name = "stopping"}, {.event = TASK_EVENT_GEN_CHECKPOINT, .name = "checkpoint"}, @@ -66,7 +67,7 @@ static int32_t attachEvent(SStreamTask* pTask, SAttachedEventInfo* pEvtInfo) { stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p, StreamTaskEventList[pEvtInfo->event].name, StreamTaskStatusList[pEvtInfo->status].name); - taosArrayPush(pTask->status.pSM->eventList, pEvtInfo); + taosArrayPush(pTask->status.pSM->pWaitingEventList, pEvtInfo); return 0; } @@ -126,12 +127,15 @@ void streamTaskRestoreStatus(SStreamTask* pTask) { ASSERT(pSM->current.state == TASK_STATUS__PAUSE || pSM->current.state == TASK_STATUS__HALT); SStreamTaskState state = pSM->current; - pSM->current = pSM->prev; - pSM->prev = state; + pSM->current = pSM->prev.state; + + pSM->prev.state = state; + pSM->prev.evt = 0; + pSM->startTs = taosGetTimestampMs(); taosThreadMutexUnlock(&pTask->lock); - stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.name, pSM->current.name); + stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name); } SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) { @@ -147,8 +151,8 @@ SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) { } pSM->pTask = pTask; - pSM->eventList = taosArrayInit(4, sizeof(SAttachedEventInfo)); - if (pSM->eventList == NULL) { + pSM->pWaitingEventList = taosArrayInit(4, sizeof(SAttachedEventInfo)); + if (pSM->pWaitingEventList == NULL) { taosMemoryFree(pSM); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -168,7 +172,7 @@ void* streamDestroyStateMachine(SStreamTaskSM* pSM) { return NULL; } - taosArrayDestroy(pSM->eventList); + taosArrayDestroy(pSM->pWaitingEventList); taosMemoryFree(pSM); return NULL; } @@ -192,7 +196,8 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { ETaskStatus s = streamTaskGetStatus(pTask, NULL); taosThreadMutexUnlock(&pTask->lock); - if (s == pTrans->attachEvent.status) { + if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) { + stDebug("s-task:%s attached event:%s handled", pTask->id.idStr, StreamTaskEventList[pTrans->event].name); return TSDB_CODE_SUCCESS; } else {// this event has been handled already stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", pTask->id.idStr, @@ -223,27 +228,32 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { return TSDB_CODE_SUCCESS; } +static void keepPrevInfo(SStreamTaskSM* pSM) { + STaskStateTrans* pTrans = pSM->pActiveTrans; + + pSM->prev.state = pSM->current; + pSM->prev.evt = pTrans->event; +} int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { STaskStateTrans* pTrans = pSM->pActiveTrans; SStreamTask* pTask = pSM->pTask; // do update the task status taosThreadMutexLock(&pTask->lock); - SStreamTaskState current = pSM->current; + keepPrevInfo(pSM); - pSM->prev = pSM->current; pSM->current = pTrans->next; pSM->pActiveTrans = NULL; // on success callback, add into lock if necessary, or maybe we should add an option for this? pTrans->pSuccAction(pTask); - if (taosArrayGetSize(pSM->eventList) > 0) { + if (taosArrayGetSize(pSM->pWaitingEventList) > 0) { int64_t el = (taosGetTimestampMs() - pSM->startTs); stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr, - StreamTaskEventList[pTrans->event].name, el, current.name, pSM->current.name); + StreamTaskEventList[pTrans->event].name, el, pSM->prev.state.name, pSM->current.name); - SAttachedEventInfo* pEvtInfo = taosArrayPop(pSM->eventList); + SAttachedEventInfo* pEvtInfo = taosArrayPop(pSM->pWaitingEventList); // OK, let's handle the attached event, since the task has reached the required status now if (pSM->current.state == pEvtInfo->status) { @@ -258,7 +268,6 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { taosThreadMutexUnlock(&pTask->lock); int32_t code = pNextTrans->pAction(pSM->pTask); - if (pTrans->autoInvokeEndFn) { return streamTaskOnHandleEventSuccess(pSM); } else { @@ -270,7 +279,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { int64_t el = (taosGetTimestampMs() - pSM->startTs); stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr, - StreamTaskEventList[pTrans->event].name, el, current.name, pSM->current.name); + StreamTaskEventList[pTrans->event].name, el, pSM->prev.state.name, pSM->current.name); } return TSDB_CODE_SUCCESS; @@ -292,7 +301,7 @@ void streamTaskResetStatus(SStreamTask* pTask) { SStreamTaskSM* pSM = pTask->status.pSM; pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT]; pSM->pActiveTrans = NULL; - taosArrayClear(pSM->eventList); + taosArrayClear(pSM->pWaitingEventList); } void streamTaskSetStatusReady(SStreamTask* pTask) { @@ -302,12 +311,13 @@ void streamTaskSetStatusReady(SStreamTask* pTask) { return; } - pSM->prev = pSM->current; + pSM->prev.state = pSM->current; + pSM->prev.evt = 0; pSM->current = StreamTaskStatusList[TASK_STATUS__READY]; pSM->startTs = taosGetTimestampMs(); pSM->pActiveTrans = NULL; - taosArrayClear(pSM->eventList); + taosArrayClear(pSM->pWaitingEventList); } STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn, diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index a7c28df22b..26d149b5b5 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -191,7 +191,7 @@ void* taosArrayGet(const SArray* pArray, size_t index) { } if (index >= pArray->size) { - uError("index is out of range, current:%" PRIzu " max:%d", index, pArray->capacity); + uError("index is out of range, current:%" PRIzu " max:%"PRIzu, index, pArray->size); return NULL; } diff --git a/tests/system-test/8-stream/scalar_function.py b/tests/system-test/8-stream/scalar_function.py index 3bc44a7dc7..eda643f661 100644 --- a/tests/system-test/8-stream/scalar_function.py +++ b/tests/system-test/8-stream/scalar_function.py @@ -7,7 +7,7 @@ from util.common import * class TDTestCase: updatecfgDict = {'vdebugFlag': 143, 'qdebugflag':135, 'tqdebugflag':135, 'udebugflag':135, 'rpcdebugflag':135, - 'asynclog': 0} + 'asynclog': 0, 'stdebugflag':135} def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug("start to execute %s" % __file__)