diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index c4033b5482..b7544a13ca 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -647,9 +647,9 @@ void streamMetaClose(SStreamMeta* streamMeta); // save to b-tree meta store int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); -int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); +int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId); -int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); // todo remove it +int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 0e3ad3293b..b0ddefce79 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -160,7 +160,9 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { // 2.save task taosWLockLatch(&pSnode->pMeta->lock); - code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask); + + bool added = false; + code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask, &added); if (code < 0) { taosWUnLockLatch(&pSnode->pMeta->lock); return -1; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 15b66b8e07..2331e690df 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1039,27 +1039,36 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms SStreamMeta* pStreamMeta = pTq->pStreamMeta; // 2.save task, use the newest commit version as the initial start version of stream task. - int32_t taskId = 0; - taosWLockLatch(&pStreamMeta->lock); - code = streamMetaRegisterTask(pStreamMeta, sversion, pTask); + int32_t taskId = pTask->id.taskId; + bool added = false; - taskId = pTask->id.taskId; + taosWLockLatch(&pStreamMeta->lock); + code = streamMetaRegisterTask(pStreamMeta, sversion, pTask, &added); int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta); + if (code < 0) { - tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks); + tqError("vgId:%d failed to add s-task:0x%x, total:%d", vgId, pTask->id.taskId, numOfTasks); tFreeStreamTask(pTask); taosWUnLockLatch(&pStreamMeta->lock); return -1; } + // not added into meta store + if (!added) { + tqWarn("vgId:%d failed to add s-task:0x%x, already exists in meta store", vgId, taskId); + tFreeStreamTask(pTask); + pTask = NULL; + } + taosWUnLockLatch(&pStreamMeta->lock); - tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr, - streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); + + tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks); // 3. It's an fill history task, do nothing. wait for the main task to start it SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId); - if (p != NULL) { - streamTaskCheckDownstreamTasks(pTask); + if (p != NULL) { // reset the downstreamReady flag. + p->status.downstreamReady = 0; + streamTaskCheckDownstreamTasks(p); } streamMetaReleaseTask(pStreamMeta, p); @@ -1174,7 +1183,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (!streamTaskRecoverScanStep1Finished(pTask)) { STimeWindow* pWindow = &pTask->dataRange.window; tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 - ", do secondary scan-history data after halt the related stream task:%s", + ", do secondary scan-history from WAL after halt the related stream task:%s", id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); @@ -1216,12 +1225,13 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (pTask->historyTaskId.taskId == 0) { *pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; tqDebug( - "s-task:%s scan history in stream time window completed, no related fill-history task, reset the time " + "s-task:%s scanhistory in stream time window completed, no related fill-history task, reset the time " "window:%" PRId64 " - %" PRId64, id, pWindow->skey, pWindow->ekey); + qResetStreamInfoTimeWindow(pTask->exec.pExecutor); } else { tqDebug( - "s-task:%s scan history in stream time window completed, now start to handle data from WAL, start " + "s-task:%s scan-history in stream time window completed, now start to handle data from WAL, start " "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64, id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey); } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 06af53d453..5ccf4c825b 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -35,7 +35,10 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v tqProcessSubmitReqForSubscribe(pTq); } + taosRLockLatch(&pTq->pStreamMeta->lock); int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); + taosRUnLockLatch(&pTq->pStreamMeta->lock); + tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks); // push data for stream processing: diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index d14b79f4bc..15b2cc4efb 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -122,8 +122,9 @@ void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) { return; } - qDebug("%s set fill history start key:%" PRId64, GET_TASKID(pTaskInfo), INT64_MIN); + qDebug("%s set stream fill-history window:%" PRId64"-%"PRId64, GET_TASKID(pTaskInfo), INT64_MIN, INT64_MAX); pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN; + pTaskInfo->streamInfo.fillHistoryWindow.ekey = INT64_MAX; } static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 758530f4fb..ae07738868 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -237,7 +237,9 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { } // add to the ready tasks hash map, not the restored tasks hash map -int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { +int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) { + *pAdded = false; + void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { @@ -261,13 +263,13 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); + *pAdded = true; return 0; } -int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) { +int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) { size_t size = taosHashGetSize(pMeta->pTasks); ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasks)); - return (int32_t)size; }