diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 3b7461c7e0..42efb6589e 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -39,6 +39,10 @@ static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) } static void removeDroppedStreamTasksInBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) { + if (pMnode == NULL) { + return; + } + int32_t num = taosArrayGetSize(pExecInfo->pTaskList); SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); @@ -323,16 +327,16 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } } else { // task is idle for more than 50 sec. - if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) { - if (!pTaskEntry->inputQChanging) { - pTaskEntry->inputQUnchangeCounter++; - } else { - pTaskEntry->inputQChanging = false; - } - } else { - pTaskEntry->inputQChanging = true; - pTaskEntry->inputQUnchangeCounter = 0; - } +// if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) { +// if (!pTaskEntry->inputQChanging) { +// pTaskEntry->inputQUnchangeCounter++; +// } else { +// pTaskEntry->inputQChanging = false; +// } +// } else { +// pTaskEntry->inputQChanging = true; +// pTaskEntry->inputQUnchangeCounter = 0; +// } streamTaskStatusCopy(pTaskEntry, p); diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp index 8480f204d6..c9365b4318 100644 --- a/source/dnode/mnode/impl/test/stream/stream.cpp +++ b/source/dnode/mnode/impl/test/stream/stream.cpp @@ -138,7 +138,6 @@ void initStreamExecInfo() { } void initNodeInfo() { - execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry)); SNodeEntry entry = {0}; entry.nodeId = 2; entry.stageUpdated = true; @@ -207,27 +206,32 @@ TEST_F(StreamTest, kill_checkpoint_trans) { killAllCheckpointTrans(pMnode, &info); - SStreamObj stream; - memset(&stream, 0, sizeof(SStreamObj)); + void* p = alloca(sizeof(SStreamObj) + sizeof(SSdbRow)); + SSdbRow* pRow = static_cast(p); + pRow->type = SDB_MAX; - stream.uid = defStreamId; - stream.lock = 0; - stream.tasks = taosArrayInit(1, POINTER_BYTES); - stream.pHTasksList = taosArrayInit(1, POINTER_BYTES); + SStreamObj* pStream = (SStreamObj*)((char*)p + sizeof(SSdbRow)); + + memset(pStream, 0, sizeof(SStreamObj)); + + pStream->uid = defStreamId; + pStream->lock = 0; + pStream->tasks = taosArrayInit(1, POINTER_BYTES); + pStream->pHTasksList = taosArrayInit(1, POINTER_BYTES); SArray* pLevel = taosArrayInit(1, POINTER_BYTES); SStreamTask* pTask = static_cast(taosMemoryCalloc(1, sizeof(SStreamTask))); pTask->id.streamId = defStreamId; pTask->id.taskId = 1; - pTask->exec.qmsg = (char*)taosMemoryMalloc(1); + pTask->exec.qmsg = (char*)taosMemoryCalloc(1,1); taosThreadMutexInit(&pTask->lock, NULL); taosArrayPush(pLevel, &pTask); - taosArrayPush(stream.tasks, &pLevel); - mndCreateStreamResetStatusTrans(pMnode, &stream); + taosArrayPush(pStream->tasks, &pLevel); + mndCreateStreamResetStatusTrans(pMnode, pStream); - tFreeStreamObj(&stream); + tFreeStreamObj(pStream); sdbCleanup(pMnode->pSdb); taosMemoryFree(pMnode); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 834daf15d0..dc9d2166e6 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -624,18 +624,19 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) { void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); - if (pInfo != NULL) { + if ((pInfo != NULL) && pInfo->dataAllowed) { pInfo->dataAllowed = false; int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); + ASSERT(t <= streamTaskGetNumOfUpstream(pTask)); } } void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) { SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); - if (pInfo != NULL) { - pInfo->dataAllowed = true; + if ((pInfo != NULL) && (!pInfo->dataAllowed)) { int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); ASSERT(t >= 0); + pInfo->dataAllowed = true; } }