fix(stream): fix the invalid counter for ready tasks in vnode, fix an invalid read.

This commit is contained in:
Haojun Liao 2023-09-25 15:58:24 +08:00
parent 1af95969b2
commit fcf4cc7169
6 changed files with 25 additions and 15 deletions

View File

@ -392,10 +392,10 @@ struct SStreamTask {
};
typedef struct STaskStartInfo {
int64_t ts;
int32_t startedAfterNodeUpdate;
int32_t readyTasks;
int32_t elapsedTime;
int64_t ts;
int32_t startedAfterNodeUpdate;
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
int32_t elapsedTime;
} STaskStartInfo;
// meta

View File

@ -1847,7 +1847,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
}
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
vInfo("vgId:%d restart all stream tasks", vgId);
vInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
tqStartStreamTasks(pTq);
tqCheckAndRunStreamTaskAsync(pTq);
} else {

View File

@ -72,6 +72,8 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) {
SArray* pTaskList = NULL;
taosWLockLatch(&pMeta->lock);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosHashClear(pMeta->startInfo.pReadyTaskSet);
pMeta->startInfo.ts = taosGetTimestampMs();
taosWUnLockLatch(&pMeta->lock);
// broadcast the check downstream tasks msg
@ -235,8 +237,6 @@ int32_t tqStartStreamTasks(STQ* pTq) {
return TSDB_CODE_SUCCESS;
}
pMeta->startInfo.ts = taosGetTimestampMs();
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);

View File

@ -184,12 +184,13 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
return NULL;
}
streamQueueItemIncSize((SStreamQueueItem*)pMerged, streamQueueItemGetSize(pElem));
streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem);
taosFreeQitem(dst);
taosFreeQitem(pElem);
streamQueueItemIncSize((SStreamQueueItem*)pMerged, streamQueueItemGetSize(pElem));
return (SStreamQueueItem*)pMerged;
} else {
stDebug("block type:%s not merged with existed blocks list, type:%d", streamQueueItemGetTypeStr(pElem->type), dst->type);

View File

@ -151,6 +151,11 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err;
}
pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
if (pMeta->startInfo.pReadyTaskSet == NULL) {
}
pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo));
if (pMeta->pHbInfo == NULL) {
goto _err;

View File

@ -53,13 +53,17 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
pTask->id.idStr, numOfReqs, el, streamGetTaskStatusStr(pTask->status.taskStatus));
taosWLockLatch(&pMeta->lock);
pMeta->startInfo.readyTasks += 1;
STaskId id = extractStreamTaskKey(pTask);
taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0);
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
if (pMeta->startInfo.readyTasks == numOfTotal) {
if (taosHashGetSize(pMeta->startInfo.pReadyTaskSet) == numOfTotal) {
// reset value for next time start
pMeta->startInfo.readyTasks = 0;
taosHashClear(pMeta->startInfo.pReadyTaskSet);
pMeta->startInfo.startedAfterNodeUpdate = 0;
pMeta->startInfo.elapsedTime = pTask->execInfo.start - pMeta->startInfo.ts;
stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, total elapsed time:%.2f sec",
vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pMeta->startInfo.elapsedTime / 1000.0);
}
@ -174,7 +178,7 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
taosArrayPush(pTask->checkReqIds, &req.reqId);
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 "check downstream task:0x%x (vgId:%d) (shuffle), idx:%d",
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d",
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i);
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
@ -272,10 +276,10 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
id, upstreamTaskId, vgId, stage, pInfo->stage);
}
if (pTask->status.downstreamReady != 1) {
return TASK_DOWNSTREAM_NOT_READY;
} else if (pInfo->stage != stage) {
if (pInfo->stage != stage) {
return TASK_SELF_NEW_STAGE;
} else if (pTask->status.downstreamReady != 1) {
return TASK_DOWNSTREAM_NOT_READY;
} else {
return TASK_DOWNSTREAM_READY;
}