diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 48c15e9117..5fd9a8b12b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -340,6 +340,7 @@ typedef struct SStreamMeta { TTB* pTaskDb; TTB* pCheckpointDb; SHashObj* pTasks; + SArray* pTaskList; // SArray void* ahandle; TXN* txn; FTaskExpand* expandFunc; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index cefc4fa63e..7352bbc0fe 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -153,11 +153,15 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { ASSERT(pTask->taskLevel == TASK_LEVEL__AGG); // 2.save task + taosWLockLatch(&pSnode->pMeta->lock); code = streamMetaAddDeployedTask(pSnode->pMeta, -1, pTask); if (code < 0) { + taosWUnLockLatch(&pSnode->pMeta->lock); return -1; } + taosWUnLockLatch(&pSnode->pMeta->lock); + // 3.go through recover steps to fill history if (pTask->fillHistory) { streamSetParamForRecover(pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6b46a6a12f..792ff8677e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -781,13 +781,17 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms tDecoderClear(&decoder); // 2.save task, use the newest commit version as the initial start version of stream task. + taosWLockLatch(&pTq->pStreamMeta->lock); code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask); if (code < 0) { tqError("vgId:%d failed to add s-task:%s, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr, streamMetaGetNumOfTasks(pTq->pStreamMeta)); + taosWUnLockLatch(&pTq->pStreamMeta->lock); return -1; } + taosWUnLockLatch(&pTq->pStreamMeta->lock); + // 3.go through recover steps to fill history if (pTask->fillHistory) { streamTaskCheckDownstream(pTask, sversion); @@ -1323,13 +1327,12 @@ FAIL: int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; } int32_t tqStartStreamTasks(STQ* pTq) { - int32_t vgId = TD_VID(pTq->pVnode); - + int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; taosWLockLatch(&pMeta->lock); - int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks); + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { tqInfo("vgId:%d no stream tasks exists", vgId); taosWUnLockLatch(&pTq->pStreamMeta->lock); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 0575b7299d..6a51f74908 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -262,14 +262,15 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v // taosWUnLockLatch(&pTq->lock); } - tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, (int)taosHashGetSize(pTq->pStreamMeta->pTasks)); + int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); + tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks); // push data for stream processing: // 1. the vnode has already been restored. // 2. the vnode should be the leader. // 3. the stream is not suspended yet. if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored) { - if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) { + if (numOfTasks == 0) { return 0; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index c3c7f7ba7b..1e45f578f6 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -57,42 +57,28 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { return 0; } -static SArray* extractTaskIdList(SStreamMeta* pStreamMeta, int32_t numOfTasks) { - SArray* pTaskIdList = taosArrayInit(numOfTasks, sizeof(int32_t)); - void* pIter = NULL; - - taosWLockLatch(&pStreamMeta->lock); - while(1) { - pIter = taosHashIterate(pStreamMeta->pTasks, pIter); - if (pIter == NULL) { - break; - } - - SStreamTask* pTask = *(SStreamTask**)pIter; - taosArrayPush(pTaskIdList, &pTask->id.taskId); - } - - taosWUnLockLatch(&pStreamMeta->lock); - return pTaskIdList; -} - int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = true; bool noNewDataInWal = true; int32_t vgId = pStreamMeta->vgId; - int32_t numOfTasks = taosHashGetSize(pStreamMeta->pTasks); + int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList); if (numOfTasks == 0) { return TSDB_CODE_SUCCESS; } + SArray* pTaskList = NULL; + taosWLockLatch(&pStreamMeta->lock); + pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL); + taosWUnLockLatch(&pStreamMeta->lock); + tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks); - SArray* pTaskIdList = extractTaskIdList(pStreamMeta, numOfTasks); // update the new task number - numOfTasks = taosArrayGetSize(pTaskIdList); + numOfTasks = taosArrayGetSize(pTaskList); + for (int32_t i = 0; i < numOfTasks; ++i) { - int32_t* pTaskId = taosArrayGet(pTaskIdList, i); + int32_t* pTaskId = taosArrayGet(pTaskList, i); SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, *pTaskId); if (pTask == NULL) { continue; @@ -166,7 +152,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = true; } - taosArrayDestroy(pTaskIdList); + taosArrayDestroy(pTaskList); return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 0d797f0bcb..aefe7885f9 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -57,6 +57,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } + // task list + pMeta->pTaskList = taosArrayInit(4, sizeof(int32_t)); + if (pMeta->pTaskList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + if (streamMetaBegin(pMeta) < 0) { goto _err; } @@ -70,6 +77,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF _err: taosMemoryFree(pMeta->path); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); + if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->db) tdbClose(pMeta->db); @@ -100,6 +108,7 @@ void streamMetaClose(SStreamMeta* pMeta) { } taosHashCleanup(pMeta->pTasks); + pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList); taosMemoryFree(pMeta->path); taosMemoryFree(pMeta); } @@ -180,11 +189,15 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* } taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES); + taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); return 0; } int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) { - return (int32_t) taosHashGetSize(pMeta->pTasks); + size_t size = taosHashGetSize(pMeta->pTasks); + ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasks)); + + return (int32_t) size; } SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { @@ -224,6 +237,15 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP); + int32_t num = taosArrayGetSize(pMeta->pTaskList); + for(int32_t i = 0; i < num; ++i) { + int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); + if (*pTaskId == taskId) { + taosArrayRemove(pMeta->pTaskList, i); + break; + } + } + streamMetaReleaseTask(pMeta, pTask); taosWUnLockLatch(&pMeta->lock); } @@ -308,6 +330,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { return -1; } + taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); + if (pTask->fillHistory) { pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; streamTaskCheckDownstream(pTask, ver);