other: update the test case.

This commit is contained in:
Haojun Liao 2024-06-07 11:15:23 +08:00
parent 34205ba62c
commit 1e47cb7eba
3 changed files with 33 additions and 24 deletions

View File

@ -39,6 +39,10 @@ static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo)
} }
static void removeDroppedStreamTasksInBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) { static void removeDroppedStreamTasksInBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
if (pMnode == NULL) {
return;
}
int32_t num = taosArrayGetSize(pExecInfo->pTaskList); int32_t num = taosArrayGetSize(pExecInfo->pTaskList);
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
@ -323,16 +327,16 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
} }
} else { } else {
// task is idle for more than 50 sec. // task is idle for more than 50 sec.
if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) { // if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) {
if (!pTaskEntry->inputQChanging) { // if (!pTaskEntry->inputQChanging) {
pTaskEntry->inputQUnchangeCounter++; // pTaskEntry->inputQUnchangeCounter++;
} else { // } else {
pTaskEntry->inputQChanging = false; // pTaskEntry->inputQChanging = false;
} // }
} else { // } else {
pTaskEntry->inputQChanging = true; // pTaskEntry->inputQChanging = true;
pTaskEntry->inputQUnchangeCounter = 0; // pTaskEntry->inputQUnchangeCounter = 0;
} // }
streamTaskStatusCopy(pTaskEntry, p); streamTaskStatusCopy(pTaskEntry, p);

View File

@ -138,7 +138,6 @@ void initStreamExecInfo() {
} }
void initNodeInfo() { void initNodeInfo() {
execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry));
SNodeEntry entry = {0}; SNodeEntry entry = {0};
entry.nodeId = 2; entry.nodeId = 2;
entry.stageUpdated = true; entry.stageUpdated = true;
@ -207,27 +206,32 @@ TEST_F(StreamTest, kill_checkpoint_trans) {
killAllCheckpointTrans(pMnode, &info); killAllCheckpointTrans(pMnode, &info);
SStreamObj stream; void* p = alloca(sizeof(SStreamObj) + sizeof(SSdbRow));
memset(&stream, 0, sizeof(SStreamObj)); SSdbRow* pRow = static_cast<SSdbRow*>(p);
pRow->type = SDB_MAX;
stream.uid = defStreamId; SStreamObj* pStream = (SStreamObj*)((char*)p + sizeof(SSdbRow));
stream.lock = 0;
stream.tasks = taosArrayInit(1, POINTER_BYTES); memset(pStream, 0, sizeof(SStreamObj));
stream.pHTasksList = taosArrayInit(1, POINTER_BYTES);
pStream->uid = defStreamId;
pStream->lock = 0;
pStream->tasks = taosArrayInit(1, POINTER_BYTES);
pStream->pHTasksList = taosArrayInit(1, POINTER_BYTES);
SArray* pLevel = taosArrayInit(1, POINTER_BYTES); SArray* pLevel = taosArrayInit(1, POINTER_BYTES);
SStreamTask* pTask = static_cast<SStreamTask*>(taosMemoryCalloc(1, sizeof(SStreamTask))); SStreamTask* pTask = static_cast<SStreamTask*>(taosMemoryCalloc(1, sizeof(SStreamTask)));
pTask->id.streamId = defStreamId; pTask->id.streamId = defStreamId;
pTask->id.taskId = 1; pTask->id.taskId = 1;
pTask->exec.qmsg = (char*)taosMemoryMalloc(1); pTask->exec.qmsg = (char*)taosMemoryCalloc(1,1);
taosThreadMutexInit(&pTask->lock, NULL); taosThreadMutexInit(&pTask->lock, NULL);
taosArrayPush(pLevel, &pTask); taosArrayPush(pLevel, &pTask);
taosArrayPush(stream.tasks, &pLevel); taosArrayPush(pStream->tasks, &pLevel);
mndCreateStreamResetStatusTrans(pMnode, &stream); mndCreateStreamResetStatusTrans(pMnode, pStream);
tFreeStreamObj(&stream); tFreeStreamObj(pStream);
sdbCleanup(pMnode->pSdb); sdbCleanup(pMnode->pSdb);
taosMemoryFree(pMnode); taosMemoryFree(pMnode);

View File

@ -624,18 +624,19 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
if (pInfo != NULL) { if ((pInfo != NULL) && pInfo->dataAllowed) {
pInfo->dataAllowed = false; pInfo->dataAllowed = false;
int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
ASSERT(t <= streamTaskGetNumOfUpstream(pTask));
} }
} }
void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) { void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) {
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
if (pInfo != NULL) { if ((pInfo != NULL) && (!pInfo->dataAllowed)) {
pInfo->dataAllowed = true;
int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
ASSERT(t >= 0); ASSERT(t >= 0);
pInfo->dataAllowed = true;
} }
} }