diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 0fb690e756..f0ab2a0e95 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -43,7 +43,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg); int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode); int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); -int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta); void tqSetRestoreVersionInfo(SStreamTask* pTask); +int32_t tqExpandStreamTask(SStreamTask* pTask); #endif // TDENGINE_TQ_COMMON_H diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index f928ba6314..03c42e5c7e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -157,7 +157,8 @@ typedef enum EStreamTaskEvent { typedef void FTbSink(SStreamTask* pTask, void* vnode, void* data); typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); -typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver); +typedef int32_t FTaskBuild(void* ahandle, SStreamTask* pTask, int64_t ver); +typedef int32_t FTaskExpand(SStreamTask* pTask); typedef struct { int8_t type; @@ -486,7 +487,8 @@ typedef struct SStreamMeta { SArray* pTaskList; // SArray void* ahandle; TXN* txn; - FTaskExpand* expandFunc; + FTaskBuild* buildTaskFn; + FTaskExpand* expandTaskFn; int32_t vgId; int64_t stage; int32_t role; @@ -710,8 +712,8 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st); // stream task meta void streamMetaInit(); void streamMetaCleanup(); -SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage, - startComplete_fn_t fn); +SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild expandFunc, FTaskExpand expandTaskFn, + int32_t vgId, int64_t stage, startComplete_fn_t fn); void streamMetaClose(SStreamMeta* streamMeta); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index b0d61ebc06..d322eb2977 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -25,14 +25,6 @@ #define sndDebug(...) do { if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0) // clang-format on -static STaskId replaceStreamTaskId(SStreamTask *pTask) { - ASSERT(pTask->info.fillHistory); - STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - pTask->id.streamId = pTask->streamTaskId.streamId; - pTask->id.taskId = pTask->streamTaskId.taskId; - return id; -} - static void restoreStreamTaskId(SStreamTask *pTask, STaskId *pId) { ASSERT(pTask->info.fillHistory); pTask->id.taskId = pId->taskId; @@ -85,7 +77,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { startRsync(); pSnode->msgCb = pOption->msgCb; - pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback); + pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskBuild *)sndExpandTask, tqExpandStreamTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback); if (pSnode->pMeta == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto FAIL; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 7f5ab8b6e6..22b26498e4 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -264,7 +264,7 @@ int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg); -int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); +int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); int tqCommit(STQ*); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9698d6b869..08fdda0e29 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -90,7 +90,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { int32_t tqInitialize(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); - pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, vgId, -1, tqStartTaskCompleteCallback); + pTq->pStreamMeta = + streamMetaOpen(pTq->path, pTq, tqBuildStreamTask, tqExpandStreamTask, vgId, -1, tqStartTaskCompleteCallback); if (pTq->pStreamMeta == NULL) { return -1; } @@ -713,7 +714,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } -int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { +int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) { + STQ* pTq = (STQ*) pTqObj; + int32_t vgId = TD_VID(pTq->pVnode); tqDebug("s-task:0x%x start to build task", pTask->id.taskId); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index bacab3ac7a..b0915640cc 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -541,14 +541,14 @@ int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBloc //static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; } static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; } -static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock) { +static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, int32_t num) { const char* id = pTask->id.idStr; int32_t blockSize = 0; int64_t st = taosGetTimestampMs(); SCheckpointInfo* pInfo = &pTask->chkInfo; int64_t ver = pInfo->processedVer; - stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, 1, "checkpoint-trigger"); + stDebug("s-task:%s start to process batch blocks, num:%d, type:%s", id, num, streamQueueItemGetTypeStr(pBlock->type)); doSetStreamInputBlock(pTask, pBlock, &ver, id); @@ -607,7 +607,7 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB } // 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V. - doStreamTaskExecImpl(pTask, pCheckpointBlock); + doStreamTaskExecImpl(pTask, pCheckpointBlock, 1); } /** @@ -698,7 +698,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { } if (type != STREAM_INPUT__CHECKPOINT) { - doStreamTaskExecImpl(pTask, pInput); + doStreamTaskExecImpl(pTask, pInput, numOfBlocks); streamFreeQitem(pInput); } else { // todo other thread may change the status // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index da1fec5565..d0f9d40469 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -299,8 +299,8 @@ void streamMetaRemoveDB(void* arg, char* key) { taosThreadMutexUnlock(&pMeta->backendMutex); } -SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage, - startComplete_fn_t fn) { +SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, + int32_t vgId, int64_t stage, startComplete_fn_t fn) { SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); if (pMeta == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -369,7 +369,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->scanInfo.scanCounter = 0; pMeta->vgId = vgId; pMeta->ahandle = ahandle; - pMeta->expandFunc = expandFunc; + pMeta->buildTaskFn = buildTaskFn; + pMeta->expandTaskFn = expandTaskFn; pMeta->stage = stage; pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT; pMeta->updateInfo.transId = -1; @@ -602,7 +603,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa return 0; } - if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { + if (pMeta->buildTaskFn(pMeta->ahandle, pTask, ver) < 0) { return -1; } @@ -901,7 +902,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p == NULL) { - code = pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1); + code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1); if (code < 0) { stError("failed to expand s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno)); tFreeStreamTask(pTask); @@ -1522,6 +1523,7 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { } int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, __stream_task_expand_fn expandFn) { + int32_t code = 0; int32_t vgId = pMeta->vgId; stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId); @@ -1541,40 +1543,22 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas ASSERT(pTask->status.downstreamReady == 0); if (pTask->pBackend == NULL) { - int32_t code = expandFn(pTask); + code = expandFn(pTask); if (code != TSDB_CODE_SUCCESS) { streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); - streamMetaReleaseTask(pMeta, pTask); - return code; - } - - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId); - if (pHTask != NULL) { - if (pHTask->pBackend == NULL) { - code = expandFn(pHTask); - if (code != TSDB_CODE_SUCCESS) { - streamMetaAddFailedTaskSelf(pHTask, pInfo->readyTs); - - streamMetaReleaseTask(pMeta, pHTask); - streamMetaReleaseTask(pMeta, pTask); - return code; - } - } - - streamMetaReleaseTask(pMeta, pHTask); - } } } - int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); - if (ret != TSDB_CODE_SUCCESS) { - stError("s-task:%s vgId:%d failed to handle event:%d", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT); - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + if (code == TSDB_CODE_SUCCESS) { + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s vgId:%d failed to handle event:%d", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT); + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + } } streamMetaReleaseTask(pMeta, pTask); - return ret; + return code; } static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) { diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index 7a864a60d2..050d88aaf1 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -197,6 +197,9 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { const char* idStr = pTask->id.idStr; int64_t hStreamId = pTask->hTaskInfo.id.streamId; int32_t hTaskId = pTask->hTaskInfo.id.taskId; + int64_t now = taosGetTimestampMs(); + int32_t code = 0; + ASSERT(hTaskId != 0); // check stream task status in the first place. @@ -226,7 +229,18 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr); streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true); } else { // exist, but not ready, continue check downstream task status - checkFillhistoryTaskStatus(pTask, pHisTask); + if (pHisTask->pBackend == NULL) { + code = pMeta->expandTaskFn(pHisTask); + if (code != TSDB_CODE_SUCCESS) { + streamMetaAddFailedTaskSelf(pHisTask, now); + stError("s-task:%s failed to expand fill-history task, code:%s", pHisTask->id.idStr, tstrerror(code)); + } + } + + if (code == TSDB_CODE_SUCCESS) { + checkFillhistoryTaskStatus(pTask, pHisTask); + } + } streamMetaReleaseTask(pMeta, pHisTask); @@ -306,6 +320,7 @@ void tryLaunchHistoryTask(void* param, void* tmrId) { SLaunchHTaskInfo* pInfo = param; SStreamMeta* pMeta = pInfo->pMeta; int64_t now = taosGetTimestampMs(); + int32_t code = 0; streamMetaWLock(pMeta); @@ -362,13 +377,22 @@ void tryLaunchHistoryTask(void* param, void* tmrId) { streamMetaReleaseTask(pMeta, pTask); return; } else { - checkFillhistoryTaskStatus(pTask, pHTask); - streamMetaReleaseTask(pMeta, pHTask); + if (pHTask->pBackend == NULL) { + code = pMeta->expandTaskFn(pHTask); + if (code != TSDB_CODE_SUCCESS) { + streamMetaAddFailedTaskSelf(pHTask, now); + stError("failed to expand fill-history task:%s, code:%s", pHTask->id.idStr, tstrerror(code)); + } + } - // not in timer anymore - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId, - pHTaskInfo->retryTimes, ref); + if (code == TSDB_CODE_SUCCESS) { + checkFillhistoryTaskStatus(pTask, pHTask); + // not in timer anymore + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId, + pHTaskInfo->retryTimes, ref); + } + streamMetaReleaseTask(pMeta, pHTask); } } diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index c9b981e5f9..dc506cfbc9 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -43,7 +43,7 @@ SStreamState *stateCreate(const char *path) { pTask->ver = 1024; pTask->id.streamId = 1023; pTask->id.taskId = 1111111; - SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, 0, 0, NULL); + SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL); pTask->pMeta = pMeta; SStreamState *p = streamStateOpen((char *)path, pTask, true, 32, 32 * 1024);