From 472020a2b968e28ffe14630ae0979ec57f919608 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 18 Feb 2024 14:07:15 +0800 Subject: [PATCH 01/19] enh(stream): handle event async function. --- include/libs/stream/tstream.h | 5 +- source/dnode/vnode/src/tq/tq.c | 67 ++++++++----- source/libs/stream/inc/streamsm.h | 20 ++-- source/libs/stream/src/streamStart.c | 2 +- source/libs/stream/src/streamTaskSm.c | 139 ++++++++++++++++++++++---- 5 files changed, 176 insertions(+), 57 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2135bb706b..587e762448 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -799,7 +799,10 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask); int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock); int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event); -int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event); + +typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param); +int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param); +int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param); void streamTaskRestoreStatus(SStreamTask* pTask); int32_t streamTaskStop(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bde6889ecd..940a8e0c49 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -916,6 +916,22 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask } } +int32_t handleStep2Async(SStreamTask* pStreamTask, void* param) { + STQ* pTq = param; + + SStreamMeta* pMeta = pStreamTask->pMeta; + STaskId hId = pStreamTask->hTaskInfo.id; + SStreamTask* pTask = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId); + if (pTask == NULL) { + // todo handle error + } + + doStartFillhistoryStep2(pTask, pStreamTask, pTq); + + streamMetaReleaseTask(pMeta, pTask); + return 0; +} + // this function should be executed by only one thread, so we set an sentinel to protect this function int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont; @@ -988,7 +1004,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamReExecScanHistoryFuture(pTask, retInfo.idleTime); } else { SStreamTaskState* p = streamTaskGetStatus(pTask); - ETaskStatus s = p->state; + ETaskStatus s = p->state; if (s == TASK_STATUS__PAUSE) { tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr, @@ -1006,37 +1022,34 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // the following procedure should be executed, no matter status is stop/pause or not tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, pTask->execInfo.step1El); - if (pTask->info.fillHistory) { - SStreamTask* pStreamTask = NULL; + ASSERT(pTask->info.fillHistory == 1); - // 1. get the related stream task - pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); - if (pStreamTask == NULL) { - tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s", - pTask->streamTaskId.taskId, pTask->id.idStr); + // 1. get the related stream task + SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); + if (pStreamTask == NULL) { + tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s", + pTask->streamTaskId.taskId, pTask->id.idStr); - tqDebug("s-task:%s fill-history task set status to be dropping", id); - streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); + tqDebug("s-task:%s fill-history task set status to be dropping", id); + streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); - atomic_store_32(&pTask->status.inScanHistorySentinel, 0); - streamMetaReleaseTask(pMeta, pTask); - return -1; - } - - ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); - - code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); - if (code == TSDB_CODE_SUCCESS) { - doStartFillhistoryStep2(pTask, pStreamTask, pTq); - } else { - tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr); - } - - streamMetaReleaseTask(pMeta, pStreamTask); - } else { - ASSERT(0); + atomic_store_32(&pTask->status.inScanHistorySentinel, 0); + streamMetaReleaseTask(pMeta, pTask); + return -1; } + ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); + + // code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); + code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, handleStep2Async, pTq); +// if (code == TSDB_CODE_SUCCESS) { +// doStartFillhistoryStep2(pTask, pStreamTask, pTq); +// } else { +// tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr); +// } + + streamMetaReleaseTask(pMeta, pStreamTask); + atomic_store_32(&pTask->status.inScanHistorySentinel, 0); streamMetaReleaseTask(pMeta, pTask); return code; diff --git a/source/libs/stream/inc/streamsm.h b/source/libs/stream/inc/streamsm.h index 22e1c4497b..47e0ce1b55 100644 --- a/source/libs/stream/inc/streamsm.h +++ b/source/libs/stream/inc/streamsm.h @@ -26,21 +26,21 @@ extern "C" { typedef int32_t (*__state_trans_fn)(SStreamTask*); typedef int32_t (*__state_trans_succ_fn)(SStreamTask*); -typedef struct SAttachedEventInfo { +typedef struct SFutureHandleEventInfo { ETaskStatus status; // required status that this event can be handled EStreamTaskEvent event; // the delayed handled event void* pParam; - void* pFn; -} SAttachedEventInfo; + __state_trans_user_fn callBackFn; +} SFutureHandleEventInfo; typedef struct STaskStateTrans { - bool autoInvokeEndFn; - SStreamTaskState state; - EStreamTaskEvent event; - SStreamTaskState next; - __state_trans_fn pAction; - __state_trans_succ_fn pSuccAction; - SAttachedEventInfo attachEvent; + bool autoInvokeEndFn; + SStreamTaskState state; + EStreamTaskEvent event; + SStreamTaskState next; + __state_trans_fn pAction; + __state_trans_succ_fn pSuccAction; + SFutureHandleEventInfo attachEvent; } STaskStateTrans; struct SStreamTaskSM { diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index ee98bc801b..dd99f59f91 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -385,7 +385,7 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) { void doProcessDownstreamReadyRsp(SStreamTask* pTask) { EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST; - streamTaskOnHandleEventSuccess(pTask->status.pSM, event); + streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL); int64_t initTs = pTask->execInfo.init; int64_t startTs = pTask->execInfo.start; diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 83e71c42bc..ecd3fba725 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -63,16 +63,20 @@ static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask); static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn, __state_trans_succ_fn succFn, - SAttachedEventInfo* pEventInfo, bool autoInvoke); + SFutureHandleEventInfo* pEventInfo, bool autoInvoke); static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; } -static int32_t attachEvent(SStreamTask* pTask, SAttachedEventInfo* pEvtInfo) { +static int32_t attachNextHandledEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEvtInfo) { char* p = streamTaskGetStatus(pTask)->name; stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p, GET_EVT_NAME(pEvtInfo->event), StreamTaskStatusList[pEvtInfo->status].name); - taosArrayPush(pTask->status.pSM->pWaitingEventList, pEvtInfo); + + SArray* pList = pTask->status.pSM->pWaitingEventList; + taosArrayPush(pList, pEvtInfo); + + stDebug("s-task:%s add into waiting list, total waiting events:%d", pTask->id.idStr, (int32_t)taosArrayGetSize(pList)); return 0; } @@ -170,9 +174,11 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName, stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr, pEventName, el, pSM->prev.state.name, pSM->current.name); - SAttachedEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0); + ASSERT(taosArrayGetSize(pSM->pWaitingEventList) == 1); - // OK, let's handle the attached event, since the task has reached the required status now + SFutureHandleEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0); + + // OK, let's handle the waiting event, since the task has reached the required status now if (pSM->current.state == pEvtInfo->status) { stDebug("s-task:%s handle the event:%s in waiting list, state:%s", pTask->id.idStr, GET_EVT_NAME(pEvtInfo->event), pSM->current.name); @@ -189,7 +195,7 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName, code = pNextTrans->pAction(pSM->pTask); if (pNextTrans->autoInvokeEndFn) { - return streamTaskOnHandleEventSuccess(pSM, pNextTrans->event); + return streamTaskOnHandleEventSuccess(pSM, pNextTrans->event, pEvtInfo->callBackFn, pEvtInfo->pParam); } else { return code; } @@ -242,7 +248,7 @@ SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) { } pSM->pTask = pTask; - pSM->pWaitingEventList = taosArrayInit(4, sizeof(SAttachedEventInfo)); + pSM->pWaitingEventList = taosArrayInit(4, sizeof(SFutureHandleEventInfo)); if (pSM->pWaitingEventList == NULL) { taosMemoryFree(pSM); @@ -273,7 +279,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt const char* id = pTask->id.idStr; if (pTrans->attachEvent.event != 0) { - attachEvent(pTask, &pTrans->attachEvent); + attachNextHandledEvent(pTask, &pTrans->attachEvent); taosThreadMutexUnlock(&pTask->lock); while (1) { @@ -303,7 +309,53 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt // todo handle error code; if (pTrans->autoInvokeEndFn) { - streamTaskOnHandleEventSuccess(pSM, event); + streamTaskOnHandleEventSuccess(pSM, event, NULL, NULL); + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t doHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans, __state_trans_user_fn callbackFn, void* param) { + SStreamTask* pTask = pSM->pTask; + const char* id = pTask->id.idStr; + + if (pTrans->attachEvent.event != 0) { + SFutureHandleEventInfo info = pTrans->attachEvent; + info.pParam = param; + info.callBackFn = callbackFn; + + attachNextHandledEvent(pTask, &info); + taosThreadMutexUnlock(&pTask->lock); + +// while (1) { +// // wait for the task to be here +// taosThreadMutexLock(&pTask->lock); +// ETaskStatus s = streamTaskGetStatus(pTask)->state; +// taosThreadMutexUnlock(&pTask->lock); +// +// if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {// this event has been handled already +// stDebug("s-task:%s attached event:%s handled", id, GET_EVT_NAME(pTrans->event)); +// return TSDB_CODE_SUCCESS; +// } else if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT) { +// stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, GET_EVT_NAME(event)); +// taosMsleep(100); +// } else { +// stDebug("s-task:%s is dropped or stopped already, not wait.", id); +// return TSDB_CODE_STREAM_INVALID_STATETRANS; +// } +// } + + } else { // override current active trans + pSM->pActiveTrans = pTrans; + pSM->startTs = taosGetTimestampMs(); + taosThreadMutexUnlock(&pTask->lock); + + int32_t code = pTrans->pAction(pTask); + // todo handle error code; + + if (pTrans->autoInvokeEndFn) { + streamTaskOnHandleEventSuccess(pSM, event, NULL, NULL); } } @@ -349,6 +401,46 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { return code; } +int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param) { + int32_t code = TSDB_CODE_SUCCESS; + SStreamTask* pTask = pSM->pTask; + STaskStateTrans* pTrans = NULL; + + while (1) { + taosThreadMutexLock(&pTask->lock); + + if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) { + EStreamTaskEvent evt = pSM->pActiveTrans->event; + taosThreadMutexUnlock(&pTask->lock); + + stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed", + pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt)); + ASSERT(0); + taosMsleep(100); + } else { + // no active event trans exists, handle this event directly + pTrans = streamTaskFindTransform(pSM->current.state, event); + if (pTrans == NULL) { + stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event)); + taosThreadMutexUnlock(&pTask->lock); + return TSDB_CODE_STREAM_INVALID_STATETRANS; + } + + if (pSM->pActiveTrans != NULL) { + // currently in some state transfer procedure, not auto invoke transfer, quit from this prcedure + stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now", + pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name, + pSM->pActiveTrans->next.name, GET_EVT_NAME(event)); + } + + code = doHandleEventAsync(pSM, event, pTrans, callbackFn, param); + break; + } + } + + return code; +} + static void keepPrevInfo(SStreamTaskSM* pSM) { STaskStateTrans* pTrans = pSM->pActiveTrans; @@ -356,8 +448,9 @@ static void keepPrevInfo(SStreamTaskSM* pSM) { pSM->prev.evt = pTrans->event; } -int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event) { +int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param) { SStreamTask* pTask = pSM->pTask; + const char* id = pTask->id.idStr; // do update the task status taosThreadMutexLock(&pTask->lock); @@ -369,16 +462,16 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even s == TASK_STATUS__UNINIT || s == TASK_STATUS__READY); // the pSM->prev.evt may be 0, so print string is not appropriate. - stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", pTask->id.idStr, - GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pSM->prev.evt)); + stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", id, GET_EVT_NAME(event), + pSM->current.name, GET_EVT_NAME(pSM->prev.evt)); taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_STREAM_INVALID_STATETRANS; } if (pTrans->event != event) { - stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr, - GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pTrans->event)); + stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", id, GET_EVT_NAME(event), + pSM->current.name, GET_EVT_NAME(pTrans->event)); taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_STREAM_INVALID_STATETRANS; } @@ -388,16 +481,26 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even pSM->current = pTrans->next; pSM->pActiveTrans = NULL; + // todo remove it // on success callback, add into lock if necessary, or maybe we should add an option for this? pTrans->pSuccAction(pTask); + // after handling the callback function assigned by invoker, go on handling the waiting tasks + if (callbackFn != NULL) { + stDebug("s-task:%s start to handle user-specified callback fn for event:%s", id, GET_EVT_NAME(pTrans->event)); + callbackFn(pSM->pTask, param); + + stDebug("s-task:%s handle user-specified callback fn for event:%s completed", id, GET_EVT_NAME(pTrans->event)); + } + + // tasks in waiting list if (taosArrayGetSize(pSM->pWaitingEventList) > 0) { doHandleWaitingEvent(pSM, GET_EVT_NAME(pTrans->event), pTask); } else { taosThreadMutexUnlock(&pTask->lock); int64_t el = (taosGetTimestampMs() - pSM->startTs); - stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr, + stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", id, GET_EVT_NAME(pTrans->event), el, pSM->prev.state.name, pSM->current.name); } @@ -453,7 +556,7 @@ void streamTaskSetStatusReady(SStreamTask* pTask) { } STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn, - __state_trans_succ_fn succFn, SAttachedEventInfo* pEventInfo, bool autoInvoke) { + __state_trans_succ_fn succFn, SFutureHandleEventInfo* pEventInfo, bool autoInvoke) { STaskStateTrans trans = {0}; trans.state = StreamTaskStatusList[current]; trans.next = StreamTaskStatusList[next]; @@ -497,7 +600,7 @@ void doInitStateTransferTable(void) { trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT}; + SFutureHandleEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT}; trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); taosArrayPush(streamTaskSMTrans, &trans); @@ -518,7 +621,7 @@ void doInitStateTransferTable(void) { trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - info = (SAttachedEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE}; + info = (SFutureHandleEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE}; trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); From dbea34ce0ea9621fc01d08b35dbc266de708a55f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 18 Feb 2024 14:34:42 +0800 Subject: [PATCH 02/19] fix(stream): set the callback function for handle event. --- source/libs/stream/src/streamTaskSm.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index ecd3fba725..4ca0040941 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -355,7 +355,7 @@ static int32_t doHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, ST // todo handle error code; if (pTrans->autoInvokeEndFn) { - streamTaskOnHandleEventSuccess(pSM, event, NULL, NULL); + streamTaskOnHandleEventSuccess(pSM, event, callbackFn, param); } } From c4f9bee62922e7b738f872cf66e2d9c30d29f939 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 18 Feb 2024 15:46:37 +0800 Subject: [PATCH 03/19] fix(stream): add into buffer pool before start trans, to avoid false alarm on orphan task. --- source/dnode/mnode/impl/src/mndStream.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 173d28c705..cc6b2eadcb 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -720,6 +720,13 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } + // add into buffer firstly + // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already. + taosThreadMutexLock(&execInfo.lock); + mDebug("stream tasks register into node list"); + saveStreamTasksInfo(&streamObj, &execInfo); + taosThreadMutexUnlock(&execInfo.lock); + // execute creation if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); @@ -729,12 +736,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { mndTransDrop(pTrans); - taosThreadMutexLock(&execInfo.lock); - - mDebug("stream tasks register into node list"); - saveStreamTasksInfo(&streamObj, &execInfo); - taosThreadMutexUnlock(&execInfo.lock); - SName dbname = {0}; tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); From 245f0ef806493242470db3cbc8346cdc782af3f9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 18 Feb 2024 17:05:00 +0800 Subject: [PATCH 04/19] fix(stream): fix deadlock. --- source/dnode/vnode/src/tq/tqStreamTask.c | 2 +- source/libs/stream/src/streamTaskSm.c | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 280c110711..73508202d9 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -28,8 +28,8 @@ static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDurat // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. int32_t tqScanWal(STQ* pTq) { - int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; + int32_t vgId = pMeta->vgId; int64_t st = taosGetTimestampMs(); tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 4ca0040941..a44e107851 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -415,7 +415,6 @@ int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, _ stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed", pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt)); - ASSERT(0); taosMsleep(100); } else { // no active event trans exists, handle this event directly @@ -485,6 +484,9 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even // on success callback, add into lock if necessary, or maybe we should add an option for this? pTrans->pSuccAction(pTask); + taosThreadMutexUnlock(&pTask->lock); + + // todo: add parameter to control lock // after handling the callback function assigned by invoker, go on handling the waiting tasks if (callbackFn != NULL) { stDebug("s-task:%s start to handle user-specified callback fn for event:%s", id, GET_EVT_NAME(pTrans->event)); @@ -493,6 +495,8 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even stDebug("s-task:%s handle user-specified callback fn for event:%s completed", id, GET_EVT_NAME(pTrans->event)); } + taosThreadMutexLock(&pTask->lock); + // tasks in waiting list if (taosArrayGetSize(pSM->pWaitingEventList) > 0) { doHandleWaitingEvent(pSM, GET_EVT_NAME(pTrans->event), pTask); From d5b316839dae1135d4cc2ff9f0a5015e0d4d1a4e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 19 Feb 2024 09:14:21 +0800 Subject: [PATCH 05/19] refactor: do some internal refactor. --- source/libs/stream/src/streamTaskSm.c | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index a44e107851..593c2c5754 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -89,11 +89,6 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) { return 0; } -static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) { - stDebug("s-task:%s start to do checkpoint", pTask->id.idStr); - return 0; -} - int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { streamTaskSendCheckpointSourceRsp(pTask); @@ -612,9 +607,9 @@ void doInitStateTransferTable(void) { taosArrayPush(streamTaskSMTrans, &trans); // checkpoint related event - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); From 7e866c55278f86b892f50a906f86734ad992495d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 19 Feb 2024 11:32:04 +0800 Subject: [PATCH 06/19] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 7 -- source/libs/stream/src/streamMeta.c | 9 +- source/libs/stream/src/streamTaskSm.c | 113 ++++++++++---------------- 3 files changed, 50 insertions(+), 79 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 940a8e0c49..5ac44cbbae 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1039,14 +1039,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); - - // code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, handleStep2Async, pTq); -// if (code == TSDB_CODE_SUCCESS) { -// doStartFillhistoryStep2(pTask, pStreamTask, pTq); -// } else { -// tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr); -// } streamMetaReleaseTask(pMeta, pStreamTask); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index db74ce9897..774a32b265 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -669,6 +669,13 @@ static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamTaskId* i } } +static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + streamTaskSendCheckpointSourceRsp(pTask); + } + return 0; +} + int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { SStreamTask* pTask = NULL; @@ -687,7 +694,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t } // handle the dropping event - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_DROPPING); + streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL); } else { stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId); streamMetaWUnLock(pMeta); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 593c2c5754..9ca5248157 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -59,15 +59,14 @@ static int32_t streamTaskInitStatus(SStreamTask* pTask); static int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask); static int32_t initStateTransferTable(); static void doInitStateTransferTable(void); -static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask); static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn, __state_trans_succ_fn succFn, - SFutureHandleEventInfo* pEventInfo, bool autoInvoke); + SFutureHandleEventInfo* pEventInfo); static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; } -static int32_t attachNextHandledEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEvtInfo) { +static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEvtInfo) { char* p = streamTaskGetStatus(pTask)->name; stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p, @@ -89,13 +88,6 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) { return 0; } -int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask) { - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - streamTaskSendCheckpointSourceRsp(pTask); - } - return 0; -} - int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) { if (!HAS_RELATED_FILLHISTORY_TASK(pTask)) { stError("s-task:%s no related fill-history task, since it may have been dropped already", pTask->id.idStr); @@ -274,7 +266,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt const char* id = pTask->id.idStr; if (pTrans->attachEvent.event != 0) { - attachNextHandledEvent(pTask, &pTrans->attachEvent); + attachWaitedEvent(pTask, &pTrans->attachEvent); taosThreadMutexUnlock(&pTask->lock); while (1) { @@ -313,34 +305,13 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt static int32_t doHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans, __state_trans_user_fn callbackFn, void* param) { SStreamTask* pTask = pSM->pTask; - const char* id = pTask->id.idStr; - if (pTrans->attachEvent.event != 0) { SFutureHandleEventInfo info = pTrans->attachEvent; info.pParam = param; info.callBackFn = callbackFn; - attachNextHandledEvent(pTask, &info); + attachWaitedEvent(pTask, &info); taosThreadMutexUnlock(&pTask->lock); - -// while (1) { -// // wait for the task to be here -// taosThreadMutexLock(&pTask->lock); -// ETaskStatus s = streamTaskGetStatus(pTask)->state; -// taosThreadMutexUnlock(&pTask->lock); -// -// if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {// this event has been handled already -// stDebug("s-task:%s attached event:%s handled", id, GET_EVT_NAME(pTrans->event)); -// return TSDB_CODE_SUCCESS; -// } else if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT) { -// stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, GET_EVT_NAME(event)); -// taosMsleep(100); -// } else { -// stDebug("s-task:%s is dropped or stopped already, not wait.", id); -// return TSDB_CODE_STREAM_INVALID_STATETRANS; -// } -// } - } else { // override current active trans pSM->pActiveTrans = pTrans; pSM->startTs = taosGetTimestampMs(); @@ -415,13 +386,13 @@ int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, _ // no active event trans exists, handle this event directly pTrans = streamTaskFindTransform(pSM->current.state, event); if (pTrans == NULL) { - stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event)); + stDebug("s-task:%s failed to handle event:%s, status:%s", pTask->id.idStr, GET_EVT_NAME(event), pSM->current.name); taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_STREAM_INVALID_STATETRANS; } if (pSM->pActiveTrans != NULL) { - // currently in some state transfer procedure, not auto invoke transfer, quit from this prcedure + // currently in some state transfer procedure, not auto invoke transfer, quit from this procedure stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now", pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name, pSM->pActiveTrans->next.name, GET_EVT_NAME(event)); @@ -555,7 +526,7 @@ void streamTaskSetStatusReady(SStreamTask* pTask) { } STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn, - __state_trans_succ_fn succFn, SFutureHandleEventInfo* pEventInfo, bool autoInvoke) { + __state_trans_succ_fn succFn, SFutureHandleEventInfo* pEventInfo) { STaskStateTrans trans = {0}; trans.state = StreamTaskStatusList[current]; trans.next = StreamTaskStatusList[next]; @@ -570,7 +541,7 @@ STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStr trans.pAction = (fn != NULL) ? fn : dummyFn; trans.pSuccAction = (succFn != NULL) ? succFn : dummyFn; - trans.autoInvokeEndFn = autoInvoke; + trans.autoInvokeEndFn = (fn == NULL); return trans; } @@ -584,93 +555,93 @@ void doInitStateTransferTable(void) { streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans)); // initialization event handle - STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, false, false); + STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, NULL); taosArrayPush(streamTaskSMTrans, &trans); // scan-history related event - trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); // halt stream task, from other task status - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL); taosArrayPush(streamTaskSMTrans, &trans); SFutureHandleEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT}; - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL); taosArrayPush(streamTaskSMTrans, &trans); // checkpoint related event - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); // pause & resume related event handle - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); info = (SFutureHandleEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE}; - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_PAUSE, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); // resume is completed by restore status of state-machine // stop related event - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); // dropping related event - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); } //clang-format on \ No newline at end of file From d3e8adf2ebc3b1e941f29085f7694c1640a94bb7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 19 Feb 2024 13:36:57 +0800 Subject: [PATCH 07/19] enh(stream): async handle pause event --- source/dnode/vnode/src/tqCommon/tqCommon.c | 2 +- source/libs/stream/src/streamExec.c | 13 ++++++++----- source/libs/stream/src/streamTask.c | 8 ++++++-- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index c4973b7c1e..3cdb3d8ecf 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -837,7 +837,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg){ pHistoryTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId); if (pHistoryTask == NULL) { tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64 - ", it may have been dropped already", + ", it may have been dropped already", pMeta->vgId, pTask->hTaskInfo.id.taskId); streamMetaReleaseTask(pMeta, pTask); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 7fb8095acd..a2cf8bdab5 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -410,6 +410,11 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } +static int32_t haltCallback(SStreamTask* pTask, void* param) { + streamTaskOpenAllUpstreamInput(pTask); + streamTaskSendCheckpointReq(pTask); +} + int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { int32_t code = TSDB_CODE_SUCCESS; SStreamMeta* pMeta = pTask->pMeta; @@ -419,11 +424,12 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { int32_t level = pTask->info.taskLevel; if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states. code = streamDoTransferStateToStreamTask(pTask); - } else { // no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task. + } else { + // no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task. SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); if (pStreamTask != NULL) { // halt the related stream sink task - code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); + code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, haltCallback, NULL); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", pTask->id.idStr, pStreamTask->id.idStr, tstrerror(code)); @@ -432,9 +438,6 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { } else { stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr); } - - streamTaskOpenAllUpstreamInput(pStreamTask); - streamTaskSendCheckpointReq(pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask); } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index fef733c9f3..bba49e1226 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -847,8 +847,8 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) pDst->chkpointTransId = pSrc->chkpointTransId; } -void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) { - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE); +static int32_t taskPauseCallback(SStreamTask* pTask, void* param) { + SStreamMeta* pMeta = pTask->pMeta; int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); @@ -862,6 +862,10 @@ void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) { stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr); } +void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) { + streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL); +} + void streamTaskResume(SStreamTask* pTask) { SStreamTaskState prevState = *streamTaskGetStatus(pTask); SStreamMeta* pMeta = pTask->pMeta; From 730edf24f905d77556c12eb9340894078f3601d2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 19 Feb 2024 13:38:14 +0800 Subject: [PATCH 08/19] fix(stream): fix syntax error. --- source/libs/stream/src/streamTask.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index bba49e1226..5c03aced32 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -860,6 +860,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) { } stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr); + return TSDB_CODE_SUCCESS; } void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) { From c5b26b406f6e0eb493951734fee87805c2ef829f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 19 Feb 2024 13:38:56 +0800 Subject: [PATCH 09/19] fix(stream): fix syntax error. --- source/libs/stream/src/streamExec.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a2cf8bdab5..8d33eaae62 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -413,6 +413,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { static int32_t haltCallback(SStreamTask* pTask, void* param) { streamTaskOpenAllUpstreamInput(pTask); streamTaskSendCheckpointReq(pTask); + return TSDB_CODE_SUCCESS; } int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { From bf242a4f64d34987f1e1bf942dd65a5babf8e366 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 19 Feb 2024 15:57:21 +0800 Subject: [PATCH 10/19] fix(stream): handle pause event in waiting list. --- include/libs/stream/tstream.h | 2 +- source/dnode/snode/src/snode.c | 1 + source/dnode/vnode/src/tq/tq.c | 1 + source/libs/stream/src/streamTask.c | 21 +++------ source/libs/stream/src/streamTaskSm.c | 63 ++++++++++++++++++++------- 5 files changed, 57 insertions(+), 31 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 587e762448..17b87f2355 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -803,7 +803,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event); typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param); int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param); int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param); -void streamTaskRestoreStatus(SStreamTask* pTask); +int32_t streamTaskRestoreStatus(SStreamTask* pTask); int32_t streamTaskStop(SStreamTask* pTask); int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index f173c327c7..3a72146a41 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -87,6 +87,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer // checkpoint ver is the kept version, handled data should be the next version. if (pTask->chkInfo.checkpointId != 0) { pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1; + pChkInfo->processedVer = pChkInfo->checkpointVer; sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5ac44cbbae..81d9a9f13f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -835,6 +835,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { // checkpoint ver is the kept version, handled data should be the next version. if (pChkInfo->checkpointId != 0) { pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; + pChkInfo->processedVer = pChkInfo->checkpointVer; tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 5c03aced32..aab2e6ab95 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -636,8 +636,6 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpSet:%s", pTask->id.taskId, pDispatcher->taskId, nodeId, buf); } - } else { - // do nothing } } @@ -869,20 +867,15 @@ void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) { void streamTaskResume(SStreamTask* pTask) { SStreamTaskState prevState = *streamTaskGetStatus(pTask); + SStreamMeta* pMeta = pTask->pMeta; - - if (prevState.state == TASK_STATUS__PAUSE || prevState.state == TASK_STATUS__HALT) { - streamTaskRestoreStatus(pTask); - - char* pNew = streamTaskGetStatus(pTask)->name; - if (prevState.state == TASK_STATUS__PAUSE) { - int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); - stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num); - } else { - stInfo("s-task:%s status:%s resume from %s", pTask->id.idStr, pNew, prevState.name); - } + int32_t code = streamTaskRestoreStatus(pTask); + if (code == TSDB_CODE_SUCCESS) { + char* pNew = streamTaskGetStatus(pTask)->name; + int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); + stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num); } else { - stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, prevState.name); + stInfo("s-task:%s status:%s no need to resume, paused task(s):%d", pTask->id.idStr, prevState.name, pMeta->numOfPausedTasks); } } diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 9ca5248157..6aa215586a 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -196,30 +196,61 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName, return code; } -void streamTaskRestoreStatus(SStreamTask* pTask) { +static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent event) { SStreamTaskSM* pSM = pTask->status.pSM; + bool removed = false; taosThreadMutexLock(&pTask->lock); - ASSERT(pSM->pActiveTrans == NULL); - ASSERT(pSM->current.state == TASK_STATUS__PAUSE || pSM->current.state == TASK_STATUS__HALT); + int32_t num = taosArrayGetSize(pSM->pWaitingEventList); + for (int32_t i = 0; i < num; ++i) { + SFutureHandleEventInfo* pInfo = taosArrayGet(pSM->pWaitingEventList, i); + if (pInfo->event == event) { + taosArrayRemove(pSM->pWaitingEventList, i); + stDebug("s-task:%s pause event in waiting list not be handled yet, remove it from waiting list, remaining:%d", + pTask->id.idStr, pInfo->event); + removed = true; + break; + } + } - SStreamTaskState state = pSM->current; - pSM->current = pSM->prev.state; - - pSM->prev.state = state; - pSM->prev.evt = 0; - - pSM->startTs = taosGetTimestampMs(); - - if (taosArrayGetSize(pSM->pWaitingEventList) > 0) { - stDebug("s-task:%s restore status, %s -> %s, and then handle waiting event", pTask->id.idStr, pSM->prev.state.name, pSM->current.name); - doHandleWaitingEvent(pSM, "restore-pause/halt", pTask); - } else { - stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name); + if (!removed) { + stDebug("s-task:%s failed to remove event:%s in waiting list", pTask->id.idStr, StreamTaskEventList[event].name); } taosThreadMutexUnlock(&pTask->lock); + return TSDB_CODE_SUCCESS; +} + +int32_t streamTaskRestoreStatus(SStreamTask* pTask) { + SStreamTaskSM* pSM = pTask->status.pSM; + int32_t code = 0; + + taosThreadMutexLock(&pTask->lock); + + if (pSM->current.state == TASK_STATUS__PAUSE && pSM->pActiveTrans == NULL) { + SStreamTaskState state = pSM->current; + pSM->current = pSM->prev.state; + + pSM->prev.state = state; + pSM->prev.evt = 0; + + pSM->startTs = taosGetTimestampMs(); + + if (taosArrayGetSize(pSM->pWaitingEventList) > 0) { + stDebug("s-task:%s restore status, %s -> %s, and then handle waiting event", pTask->id.idStr, + pSM->prev.state.name, pSM->current.name); + doHandleWaitingEvent(pSM, "restore-pause/halt", pTask); + } else { + stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name); + } + } else { + removeEventInWaitingList(pTask, TASK_EVENT_PAUSE); + code = -1; // failed to restore the status + } + + taosThreadMutexUnlock(&pTask->lock); + return code; } SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) { From 9a5219d5943bd315d3f2e27ecb1d5f0d6b2a972f Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Thu, 29 Feb 2024 13:19:26 +0800 Subject: [PATCH 11/19] fix: memory free sequence of sub request caused memory use after free --- source/client/inc/clientInt.h | 1 + source/client/src/clientEnv.c | 32 +++++++++++++++++-- source/client/src/clientMain.c | 56 +++++++++++----------------------- 3 files changed, 49 insertions(+), 40 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 989c6614a6..59eee6fd9d 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -354,6 +354,7 @@ SRequestObj* acquireRequest(int64_t rid); int32_t releaseRequest(int64_t rid); int32_t removeRequest(int64_t rid); void doDestroyRequest(void* p); +int64_t removeFromMostPrevReq(SRequestObj* pRequest); char* getDbOfConnection(STscObj* pObj); void setConnectionDB(STscObj* pTscObj, const char* db); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 1df50a51da..6c20813118 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -385,6 +385,33 @@ int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, ri int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); } +/// return the most previous req ref id +int64_t removeFromMostPrevReq(SRequestObj* pRequest) { + int64_t mostPrevReqRefId = pRequest->self; + SRequestObj* pTmp = pRequest; + while (pTmp->relation.prevRefId) { + pTmp = acquireRequest(pTmp->relation.prevRefId); + if (pTmp) { + mostPrevReqRefId = pTmp->self; + releaseRequest(mostPrevReqRefId); + } else { + break; + } + } + removeRequest(mostPrevReqRefId); + return mostPrevReqRefId; +} + +void destroyNextReq(int64_t nextRefId) { + if (nextRefId) { + SRequestObj* pObj = acquireRequest(nextRefId); + if (pObj) { + releaseRequest(nextRefId); + releaseRequest(nextRefId); + } + } +} + void destroySubRequests(SRequestObj *pRequest) { int32_t reqIdx = -1; SRequestObj *pReqList[16] = {NULL}; @@ -435,7 +462,7 @@ void doDestroyRequest(void *p) { uint64_t reqId = pRequest->requestId; tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest); - destroySubRequests(pRequest); + int64_t nextReqRefId = pRequest->relation.nextRefId; taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); @@ -471,6 +498,7 @@ void doDestroyRequest(void *p) { taosMemoryFreeClear(pRequest->sqlstr); taosMemoryFree(pRequest); tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest); + destroyNextReq(nextReqRefId); } void destroyRequest(SRequestObj *pRequest) { @@ -479,7 +507,7 @@ void destroyRequest(SRequestObj *pRequest) { } taos_stop_query(pRequest); - removeRequest(pRequest->self); + removeFromMostPrevReq(pRequest); } void taosStopQueryImpl(SRequestObj *pRequest) { diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 275ca0d2aa..e9379946b1 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -1254,54 +1254,34 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { } void restartAsyncQuery(SRequestObj *pRequest, int32_t code) { - int32_t reqIdx = 0; - SRequestObj *pReqList[16] = {NULL}; - SRequestObj *pUserReq = NULL; - pReqList[0] = pRequest; - uint64_t tmpRefId = 0; - SRequestObj *pTmp = pRequest; - while (pTmp->relation.prevRefId) { - tmpRefId = pTmp->relation.prevRefId; - pTmp = acquireRequest(tmpRefId); - if (pTmp) { - pReqList[++reqIdx] = pTmp; - releaseRequest(tmpRefId); - } else { - tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId); + tscInfo("restart request: %s p: %p", pRequest->sqlstr, pRequest); + SRequestObj* pUserReq = pRequest; + acquireRequest(pRequest->self); + while (pUserReq) { + if (pUserReq->self == pUserReq->relation.userRefId || pUserReq->relation.userRefId == 0) { break; - } - } - - tmpRefId = pRequest->relation.nextRefId; - while (tmpRefId) { - pTmp = acquireRequest(tmpRefId); - if (pTmp) { - tmpRefId = pTmp->relation.nextRefId; - removeRequest(pTmp->self); - releaseRequest(pTmp->self); } else { - tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId); - break; + int64_t nextRefId = pUserReq->relation.nextRefId; + releaseRequest(pUserReq->self); + if (nextRefId) { + pUserReq = acquireRequest(nextRefId); + } } } - - for (int32_t i = reqIdx; i >= 0; i--) { - destroyCtxInRequest(pReqList[i]); - if (pReqList[i]->relation.userRefId == pReqList[i]->self || 0 == pReqList[i]->relation.userRefId) { - pUserReq = pReqList[i]; - } else { - removeRequest(pReqList[i]->self); - } - } - + bool hasSubRequest = pUserReq != pRequest || pRequest->relation.prevRefId != 0; if (pUserReq) { + destroyCtxInRequest(pUserReq); pUserReq->prevCode = code; memset(&pUserReq->relation, 0, sizeof(pUserReq->relation)); } else { - tscError("user req is missing"); + tscError("User req is missing"); + removeFromMostPrevReq(pRequest); return; } - + if (hasSubRequest) + removeFromMostPrevReq(pRequest); + else + releaseRequest(pUserReq->self); doAsyncQuery(pUserReq, true); } From 1bf403660f27835fed9dffaaffa1ae3e08c4ce10 Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Fri, 1 Mar 2024 15:02:26 +0800 Subject: [PATCH 12/19] Update 14-stream.md --- docs/zh/12-taos-sql/14-stream.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index 979fc436b9..c4ab450e79 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -49,10 +49,13 @@ window_clause: { SESSION(ts_col, tol_val) | STATE_WINDOW(col) | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] + | EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition + | COUNT_WINDOW(count_val[, sliding_val]) } ``` -其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。 +其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。COUNT_WINDOW是计数窗口,按固定的数据行数来划分窗口。 +count_val:常量,是正整数,必须大于等于2,最大INT32_MAX。count_val表示每个count window包含的最大数据行数,总数据行数不能整除count_val时,最后一个窗口的行数会小于count_val。sliding_val:是常量,表示窗口滑动的数量,类似于 interval的SLIDING 窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished) From e20dc784f82895d992fd357f0d86300f0272c752 Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Fri, 1 Mar 2024 15:09:57 +0800 Subject: [PATCH 13/19] Update 14-stream.md --- docs/en/12-taos-sql/14-stream.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/en/12-taos-sql/14-stream.md b/docs/en/12-taos-sql/14-stream.md index 6f2343d347..e6aaaf0663 100644 --- a/docs/en/12-taos-sql/14-stream.md +++ b/docs/en/12-taos-sql/14-stream.md @@ -41,10 +41,14 @@ window_clause: { SESSION(ts_col, tol_val) | STATE_WINDOW(col) | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] + | EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition + | COUNT_WINDOW(count_val[, sliding_val]) } ``` `SESSION` indicates a session window, and `tol_val` indicates the maximum range of the time interval. If the time interval between two continuous rows are within the time interval specified by `tol_val` they belong to the same session window; otherwise a new session window is started automatically. +COUNT_WINDOW is a counting window that is divided by a fixed number of data rows.count_val: A constant, which is a positive integer and must be greater than or equal to 2. The maximum value is 2147483648. count_val represents the maximum number of data rows contained in each COUNT_WINDOW. +When the total number of data rows cannot be divided by count_val, the number of rows in the last window will be less than count_val. sliding_val: is a constant that represents the number of window slides, similar to SLIDING in intervals For example, the following SQL statement creates a stream and automatically creates a supertable named `avg_vol`. The stream has a 1 minute time window that slides forward in 30 second intervals to calculate the average voltage of the meters supertable. From 4a53bfb1077a9aea5e4cb9252320cdc03235ef58 Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Fri, 1 Mar 2024 15:11:04 +0800 Subject: [PATCH 14/19] Update 14-stream.md --- docs/zh/12-taos-sql/14-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index c4ab450e79..c36f355ffa 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -55,7 +55,7 @@ window_clause: { ``` 其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。COUNT_WINDOW是计数窗口,按固定的数据行数来划分窗口。 -count_val:常量,是正整数,必须大于等于2,最大INT32_MAX。count_val表示每个count window包含的最大数据行数,总数据行数不能整除count_val时,最后一个窗口的行数会小于count_val。sliding_val:是常量,表示窗口滑动的数量,类似于 interval的SLIDING +count_val:常量,是正整数,必须大于等于2,小于2147483648。count_val表示每个count window包含的最大数据行数,总数据行数不能整除count_val时,最后一个窗口的行数会小于count_val。sliding_val:是常量,表示窗口滑动的数量,类似于 interval的SLIDING 窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished) From ce379d077d9559dd128aa4c1b08ac8593329e9d8 Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Fri, 1 Mar 2024 16:32:45 +0800 Subject: [PATCH 15/19] Update 14-stream.md --- docs/zh/12-taos-sql/14-stream.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index c36f355ffa..5a4772bb6c 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -54,8 +54,10 @@ window_clause: { } ``` -其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。COUNT_WINDOW是计数窗口,按固定的数据行数来划分窗口。 -count_val:常量,是正整数,必须大于等于2,小于2147483648。count_val表示每个count window包含的最大数据行数,总数据行数不能整除count_val时,最后一个窗口的行数会小于count_val。sliding_val:是常量,表示窗口滑动的数量,类似于 interval的SLIDING +其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。 +EVENT_WINDOW是事件窗口,根据开始条件和结束条件来划定窗口。当start_trigger_condition满足时则窗口开始,直到end_trigger_condition满足时窗口关闭。start_trigger_condition和end_trigger_condition可以是任意 TDengine 支持的条件表达式,且可以包含不同的列。 +COUNT_WINDOW是计数窗口,按固定的数据行数来划分窗口。count_val:常量,是正整数,必须大于等于2,小于2147483648。count_val表示每个count window包含的最大数据行数,总数据行数不能整除count_val时,最后一个窗口的行数会小于count_val。 +sliding_val:是常量,表示窗口滑动的数量,类似于 interval的SLIDING。 窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished) From dadaa9055f4d1696c57905be0362c510bcbb7b17 Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Fri, 1 Mar 2024 16:40:24 +0800 Subject: [PATCH 16/19] Update 14-stream.md --- docs/en/12-taos-sql/14-stream.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/docs/en/12-taos-sql/14-stream.md b/docs/en/12-taos-sql/14-stream.md index e6aaaf0663..099fa7a963 100644 --- a/docs/en/12-taos-sql/14-stream.md +++ b/docs/en/12-taos-sql/14-stream.md @@ -47,8 +47,12 @@ window_clause: { ``` `SESSION` indicates a session window, and `tol_val` indicates the maximum range of the time interval. If the time interval between two continuous rows are within the time interval specified by `tol_val` they belong to the same session window; otherwise a new session window is started automatically. -COUNT_WINDOW is a counting window that is divided by a fixed number of data rows.count_val: A constant, which is a positive integer and must be greater than or equal to 2. The maximum value is 2147483648. count_val represents the maximum number of data rows contained in each COUNT_WINDOW. -When the total number of data rows cannot be divided by count_val, the number of rows in the last window will be less than count_val. sliding_val: is a constant that represents the number of window slides, similar to SLIDING in intervals + +`EVENT_WINDOW` is determined according to the window start condition and the window close condition. The window is started when `start_trigger_condition` is evaluated to true, the window is closed when `end_trigger_condition` is evaluated to true. `start_trigger_condition` and `end_trigger_condition` +can be any conditional expressions supported by TDengine and can include multiple columns. + +`COUNT_WINDOW` is a counting window that is divided by a fixed number of data rows.`count_val`: A constant, which is a positive integer and must be greater than or equal to 2. The maximum value is 2147483648. `count_val` represents the maximum number of data rows contained in each `COUNT_WINDOW`. +When the total number of data rows cannot be divided by `count_val`, the number of rows in the last window will be less than `count_val`. `sliding_val`: is a constant that represents the number of window slides, similar to `SLIDING` in `INTERVAL`. For example, the following SQL statement creates a stream and automatically creates a supertable named `avg_vol`. The stream has a 1 minute time window that slides forward in 30 second intervals to calculate the average voltage of the meters supertable. From 413c1ee4ab86c75a5b1aa81a45f963fe5a1179f3 Mon Sep 17 00:00:00 2001 From: dapan1121 <72057773+dapan1121@users.noreply.github.com> Date: Fri, 1 Mar 2024 16:47:55 +0800 Subject: [PATCH 17/19] Update 14-stream.md --- docs/en/12-taos-sql/14-stream.md | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/docs/en/12-taos-sql/14-stream.md b/docs/en/12-taos-sql/14-stream.md index 099fa7a963..b8bbd4c154 100644 --- a/docs/en/12-taos-sql/14-stream.md +++ b/docs/en/12-taos-sql/14-stream.md @@ -48,11 +48,9 @@ window_clause: { `SESSION` indicates a session window, and `tol_val` indicates the maximum range of the time interval. If the time interval between two continuous rows are within the time interval specified by `tol_val` they belong to the same session window; otherwise a new session window is started automatically. -`EVENT_WINDOW` is determined according to the window start condition and the window close condition. The window is started when `start_trigger_condition` is evaluated to true, the window is closed when `end_trigger_condition` is evaluated to true. `start_trigger_condition` and `end_trigger_condition` -can be any conditional expressions supported by TDengine and can include multiple columns. +`EVENT_WINDOW` is determined according to the window start condition and the window close condition. The window is started when `start_trigger_condition` is evaluated to true, the window is closed when `end_trigger_condition` is evaluated to true. `start_trigger_condition` and `end_trigger_condition` can be any conditional expressions supported by TDengine and can include multiple columns. -`COUNT_WINDOW` is a counting window that is divided by a fixed number of data rows.`count_val`: A constant, which is a positive integer and must be greater than or equal to 2. The maximum value is 2147483648. `count_val` represents the maximum number of data rows contained in each `COUNT_WINDOW`. -When the total number of data rows cannot be divided by `count_val`, the number of rows in the last window will be less than `count_val`. `sliding_val`: is a constant that represents the number of window slides, similar to `SLIDING` in `INTERVAL`. +`COUNT_WINDOW` is a counting window that is divided by a fixed number of data rows.`count_val`: A constant, which is a positive integer and must be greater than or equal to 2. The maximum value is 2147483648. `count_val` represents the maximum number of data rows contained in each `COUNT_WINDOW`. When the total number of data rows cannot be divided by `count_val`, the number of rows in the last window will be less than `count_val`. `sliding_val`: is a constant that represents the number of window slides, similar to `SLIDING` in `INTERVAL`. For example, the following SQL statement creates a stream and automatically creates a supertable named `avg_vol`. The stream has a 1 minute time window that slides forward in 30 second intervals to calculate the average voltage of the meters supertable. From 0926bbf8fa9f6dec9d59591af2f6511021e81ce4 Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Fri, 1 Mar 2024 16:56:23 +0800 Subject: [PATCH 18/19] Update 14-stream.md --- docs/zh/12-taos-sql/14-stream.md | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index 5a4772bb6c..8868b728f8 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -55,9 +55,8 @@ window_clause: { ``` 其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。 -EVENT_WINDOW是事件窗口,根据开始条件和结束条件来划定窗口。当start_trigger_condition满足时则窗口开始,直到end_trigger_condition满足时窗口关闭。start_trigger_condition和end_trigger_condition可以是任意 TDengine 支持的条件表达式,且可以包含不同的列。 -COUNT_WINDOW是计数窗口,按固定的数据行数来划分窗口。count_val:常量,是正整数,必须大于等于2,小于2147483648。count_val表示每个count window包含的最大数据行数,总数据行数不能整除count_val时,最后一个窗口的行数会小于count_val。 -sliding_val:是常量,表示窗口滑动的数量,类似于 interval的SLIDING。 +EVENT_WINDOW 是事件窗口,根据开始条件和结束条件来划定窗口。当 start_trigger_condition 满足时则窗口开始,直到 end_trigger_condition 满足时窗口关闭。 start_trigger_condition 和 end_trigger_condition 可以是任意 TDengine 支持的条件表达式,且可以包含不同的列。 +COUNT_WINDOW 是计数窗口,按固定的数据行数来划分窗口。 count_val 是常量,是正整数,必须大于等于2,小于2147483648。 count_val 表示每个 COUNT_WINDOW 包含的最大数据行数,总数据行数不能整除 count_val 时,最后一个窗口的行数会小于 count_val 。 sliding_val 是常量,表示窗口滑动的数量,类似于 INTERVAL 的 SLIDING 。 窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished) @@ -66,6 +65,12 @@ sliding_val:是常量,表示窗口滑动的数量,类似于 interval的SLI ```sql CREATE STREAM avg_vol_s INTO avg_vol AS SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s); + +CREATE STREAM streams0 INTO streamt0 AS +SELECT _wstart, count(*), avg(voltage) from meters PARTITION BY tbname EVENT_WINDOW START WITH voltage < 0 END WITH voltage > 9; + +CREATE STREAM streams1 IGNORE EXPIRED 1 WATERMARK 100s INTO streamt1 AS +SELECT _wstart, count(*), avg(voltage) from meters PARTITION BY tbname COUNT_WINDOW(10); ``` ## 流式计算的 partition From 43a3f85f48679a809288b7e8b81033c2c4de93bc Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Fri, 1 Mar 2024 16:57:24 +0800 Subject: [PATCH 19/19] Update 14-stream.md --- docs/en/12-taos-sql/14-stream.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/en/12-taos-sql/14-stream.md b/docs/en/12-taos-sql/14-stream.md index b8bbd4c154..e1bf18c854 100644 --- a/docs/en/12-taos-sql/14-stream.md +++ b/docs/en/12-taos-sql/14-stream.md @@ -57,6 +57,12 @@ For example, the following SQL statement creates a stream and automatically crea ```sql CREATE STREAM avg_vol_s INTO avg_vol AS SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s); + +CREATE STREAM streams0 INTO streamt0 AS +SELECT _wstart, count(*), avg(voltage) from meters PARTITION BY tbname EVENT_WINDOW START WITH voltage < 0 END WITH voltage > 9; + +CREATE STREAM streams1 IGNORE EXPIRED 1 WATERMARK 100s INTO streamt1 AS +SELECT _wstart, count(*), avg(voltage) from meters PARTITION BY tbname COUNT_WINDOW(10); ``` ## Partitions of Stream