diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d3717bc1e7..7859c60944 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -236,6 +236,11 @@ typedef struct { SUseDbRsp dbInfo; } STaskDispatcherShuffle; +typedef struct { + int32_t nodeId; + SEpSet epset; +} SDownstreamTaskEpset; + typedef struct { int64_t stbUid; char stbFullName[TSDB_TABLE_FNAME_LEN]; @@ -327,15 +332,10 @@ typedef struct SDispatchMsgInfo { void* pTimer; // used to dispatch data after a given time duration } SDispatchMsgInfo; -typedef struct STaskOutputQueue { +typedef struct STaskQueue { int8_t status; SStreamQueue* queue; -} STaskOutputQueue; - -typedef struct STaskInputInfo { - int8_t status; - SStreamQueue* queue; -} STaskInputInfo; +} STaskQueue; typedef struct STaskSchedInfo { int8_t status; @@ -384,6 +384,7 @@ typedef struct STaskOutputInfo { }; int8_t type; STokenBucket* pTokenBucket; + SArray* pDownstreamUpdateList; } STaskOutputInfo; typedef struct SUpstreamInfo { @@ -395,8 +396,8 @@ struct SStreamTask { int64_t ver; SStreamTaskId id; SSTaskBasicInfo info; - STaskOutputQueue outputq; - STaskInputInfo inputInfo; + STaskQueue outputq; + STaskQueue inputq; STaskSchedInfo schedInfo; STaskOutputInfo outputInfo; SDispatchMsgInfo msgInfo; @@ -645,7 +646,8 @@ typedef struct STaskStatusEntry { typedef struct SStreamHbMsg { int32_t vgId; int32_t numOfTasks; - SArray* pTaskStatus; // SArray + SArray* pTaskStatus; // SArray + SArray* pUpdateNodes; // SArray, needs update the epsets in stream tasks for those nodes. } SStreamHbMsg; int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 14bdb73b4f..13b6e1c30c 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -91,6 +91,7 @@ static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExe static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); static int32_t doKillActiveCheckpointTrans(SMnode *pMnode); +static int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList); int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { @@ -2119,7 +2120,9 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange continue; } - mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans", pStream->uid, pStream->name); + mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid, + pStream->name, pTrans->id); + int32_t code = createStreamUpdateTrans(pStream, pChangeInfo, pTrans); // todo: not continue, drop all and retry again @@ -2216,23 +2219,26 @@ static void doExtractTasksFromStream(SMnode *pMnode) { } } -static int32_t doRemoveTasks(SStreamExecInfo* pExecNode, STaskId* pRemovedId) { +static int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) { void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); + if (p == NULL) { + return TSDB_CODE_SUCCESS; + } - if (p != NULL) { - taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); + taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); - for(int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { - STaskId* pId = taosArrayGet(pExecNode->pTaskList, k); - if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) { - taosArrayRemove(pExecNode->pTaskList, k); - mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t) pRemovedId->taskId, - (int32_t)taosArrayGetSize(pExecNode->pTaskList)); - break; - } + for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { + STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); + if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) { + taosArrayRemove(pExecNode->pTaskList, k); + + int32_t num = taosArrayGetSize(pExecNode->pTaskList); + mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)pRemovedId->taskId, num); + break; } } - return 0; + + return TSDB_CODE_SUCCESS; } static bool taskNodeExists(SArray* pList, int32_t nodeId) { @@ -2322,6 +2328,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); taosThreadMutexLock(&execInfo.lock); + removeExpirednodeEntryAndTask(pNodeSnapshot); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot); @@ -2359,10 +2366,6 @@ typedef struct SMStreamNodeCheckMsg { int8_t placeHolder; // // to fix windows compile error, define place holder } SMStreamNodeCheckMsg; -typedef struct SMStreamTaskResetMsg { - int8_t placeHolder; -} SMStreamTaskResetMsg; - static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -2577,6 +2580,43 @@ int32_t mndResetFromCheckpoint(SMnode* pMnode) { return 0; } +int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) { + int32_t num = taosArrayGetSize(pNodeList); + + for (int k = 0; k < num; ++k) { + int32_t* pVgId = taosArrayGet(pNodeList, k); + + int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); + for (int i = 0; i < numOfNodes; ++i) { + SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i); + + if (pNodeEntry->nodeId == *pVgId) { + mInfo("vgId:%d expired in stream task, needs update nodeEp", *pVgId); + pNodeEntry->stageUpdated = true; + break; + } + } + } + + return TSDB_CODE_SUCCESS; +} + +static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) { + int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); + for(int32_t j = 0; j < numOfNodes; ++j) { + SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j); + if (pNodeEntry->nodeId == pTaskEntry->nodeId) { + + mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId, + pTaskEntry->stage, stage, pTaskEntry->id.taskId); + + pNodeEntry->stageUpdated = true; + pTaskEntry->stage = stage; + break; + } + } +} + int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamHbMsg req = {0}; @@ -2602,29 +2642,20 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { doExtractTasksFromStream(pMnode); } + setNodeEpsetExpiredFlag(req.pUpdateNodes); + for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); - STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); - if (pEntry == NULL) { + STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); + if (pTaskEntry == NULL) { mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId); continue; } - if (p->stage != pEntry->stage && pEntry->stage != -1) { - int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); - for(int32_t j = 0; j < numOfNodes; ++j) { - SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j); - if (pNodeEntry->nodeId == pEntry->nodeId) { - mInfo("vgId:%d stage updated, from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, - pEntry->nodeId, pEntry->stage, p->stage, pEntry->id.taskId); - - pNodeEntry->stageUpdated = true; - pEntry->stage = p->stage; - break; - } - } + if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) { + updateStageInfo(pTaskEntry, p->stage); } else { - streamTaskStatusCopy(pEntry, p); + streamTaskStatusCopy(pTaskEntry, p); if (p->activeCheckpointId != 0) { if (activeCheckpointId != 0) { ASSERT(activeCheckpointId == p->activeCheckpointId); @@ -2638,7 +2669,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } } - pEntry->status = p->status; + pTaskEntry->status = p->status; if (p->status != TASK_STATUS__READY) { mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status)); } @@ -2655,5 +2686,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { taosThreadMutexUnlock(&execInfo.lock); taosArrayDestroy(req.pTaskStatus); + taosArrayDestroy(req.pUpdateNodes); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 907dc8d88a..5446363698 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1550,7 +1550,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) { streamStartScanHistoryAsync(pTask, igUntreated); - } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputInfo.queue) == 0)) { + } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) { tqScanWalAsync(pTq, false); } else { streamSchedExec(pTask); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 62952078bc..50ee52f45b 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -39,7 +39,7 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) { int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); taosRUnLockLatch(&pTq->pStreamMeta->lock); - tqDebug("handle submit, restore:%d, numOfTasks:%d", pTq->pVnode->restored, numOfTasks); +// tqTrace("vgId:%d handle submit, restore:%d, numOfTasks:%d", TD_VID(pTq->pVnode), pTq->pVnode->restored, numOfTasks); // push data for stream processing: // 1. the vnode has already been restored. diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 7df8bdf891..202c765944 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -344,13 +344,13 @@ static bool taskReadyForDataFromWal(SStreamTask* pTask) { } // check if input queue is full or not - if (streamQueueIsFull(pTask->inputInfo.queue)) { + if (streamQueueIsFull(pTask->inputq.queue)) { tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr); return false; } // the input queue of downstream task is full, so the output is blocked, stopped for a while - if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { + if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr); return false; } @@ -444,7 +444,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputInfo.queue); + int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX; taosThreadMutexLock(&pTask->lock); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index fcb61dc892..34b4677235 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -296,7 +296,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S return 0; } -void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputInfo.status, TASK_INPUT_STATUS__FAILED); } +void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); } void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index e34ec07eac..cd8bbacb98 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1043,8 +1043,8 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs; // put data into inputQ of current task is also allowed - if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { - pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; + if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { + pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", pTask->id.idStr, downstreamId, el); } else { @@ -1096,7 +1096,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } else { // code == 0 if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { - pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; + pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream taosThreadMutexLock(&pTask->lock); taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 2d5788fc4d..28d5336a5e 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -106,7 +106,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i return 0; } - if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { + if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry exec task", pTask->id.idStr); taosMsleep(1000); continue; @@ -217,7 +217,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { return 0; } - if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { + if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { stDebug("s-task:%s inputQ is blocked, wait for 10sec and retry", pTask->id.idStr); taosMsleep(10000); continue; @@ -374,7 +374,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 7. pause allowed. streamTaskEnablePause(pStreamTask); - if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputInfo.queue->pQueue)) { + if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) { SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); @@ -630,7 +630,7 @@ int32_t streamExecTask(SStreamTask* pTask) { } taosThreadMutexLock(&pTask->lock); - if ((streamQueueGetNumOfItems(pTask->inputInfo.queue) == 0) || streamTaskShouldStop(pTask) || + if ((streamQueueGetNumOfItems(pTask->inputq.queue) == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); taosThreadMutexUnlock(&pTask->lock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 41ad5d4840..eb4fe3a498 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -799,6 +799,15 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeI64(pEncoder, ps->activeCheckpointId) < 0) return -1; if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1; } + + int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes); + if (tEncodeI32(pEncoder, numOfVgs) < 0) return -1; + + for (int j = 0; j < numOfVgs; ++j) { + int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j); + if (tEncodeI32(pEncoder, *pVgId) < 0) return -1; + } + tEndEncode(pEncoder); return pEncoder->pos; } @@ -832,6 +841,17 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { taosArrayPush(pReq->pTaskStatus, &entry); } + int32_t numOfVgs = 0; + if (tDecodeI32(pDecoder, &numOfVgs) < 0) return -1; + + pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t)); + + for (int j = 0; j < numOfVgs; ++j) { + int32_t vgId = 0; + if (tDecodeI32(pDecoder, &vgId) < 0) return -1; + taosArrayPush(pReq->pUpdateNodes, &vgId); + } + tEndDecode(pDecoder); return 0; } @@ -886,13 +906,14 @@ void metaHbToMnode(void* param, void* tmrId) { int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); SEpSet epset = {0}; - bool hasValEpset = false; + bool hasMnodeEpset = false; + hbMsg.vgId = pMeta->vgId; hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); + hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t)); for (int32_t i = 0; i < numOfTasks; ++i) { - STaskId* pId = taosArrayGet(pMeta->pTaskList, i); - + STaskId* pId = taosArrayGet(pMeta->pTaskList, i); SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); // not report the status of fill-history task @@ -905,7 +926,7 @@ void metaHbToMnode(void* param, void* tmrId) { .status = streamTaskGetStatus(*pTask, NULL), .nodeId = pMeta->vgId, .stage = pMeta->stage, - .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputInfo.queue)), + .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), }; entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; @@ -924,18 +945,39 @@ void metaHbToMnode(void* param, void* tmrId) { walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd); } - taosArrayPush(hbMsg.pTaskStatus, &entry); + taosThreadMutexLock(&(*pTask)->lock); + int32_t num = taosArrayGetSize((*pTask)->outputInfo.pDownstreamUpdateList); + for (int j = 0; j < num; ++j) { + int32_t *pNodeId = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j); - if (!hasValEpset) { + bool exist = false; + int32_t numOfExisted = taosArrayGetSize(hbMsg.pUpdateNodes); + for (int k = 0; k < numOfExisted; ++k) { + if (*pNodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) { + exist = true; + break; + } + } + + if (!exist) { + taosArrayPush(hbMsg.pUpdateNodes, pNodeId); + } + } + + taosArrayClear((*pTask)->outputInfo.pDownstreamUpdateList); + taosThreadMutexUnlock(&(*pTask)->lock); + + taosArrayPush(hbMsg.pTaskStatus, &entry); + if (!hasMnodeEpset) { epsetAssign(&epset, &(*pTask)->info.mnodeEpset); - hasValEpset = true; + hasMnodeEpset = true; } } hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus); taosRUnLockLatch(&pMeta->lock); - if (hasValEpset) { + if (hasMnodeEpset) { int32_t code = 0; int32_t tlen = 0; @@ -980,6 +1022,8 @@ void metaHbToMnode(void* param, void* tmrId) { } taosArrayDestroy(hbMsg.pTaskStatus); + taosArrayDestroy(hbMsg.pUpdateNodes); + taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index e4c13699c9..eae4605dbc 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -169,7 +169,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu return TSDB_CODE_SUCCESS; } - SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue); + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputq.queue); if (qItem == NULL) { if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) { taosMsleep(WAIT_FOR_DURATION); @@ -211,7 +211,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize); } - streamQueueProcessFail(pTask->inputInfo.queue); + streamQueueProcessFail(pTask->inputq.queue); return TSDB_CODE_SUCCESS; } } else { @@ -232,7 +232,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize); } - streamQueueProcessFail(pTask->inputInfo.queue); + streamQueueProcessFail(pTask->inputq.queue); return TSDB_CODE_SUCCESS; } @@ -240,7 +240,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu } *numOfBlocks += 1; - streamQueueProcessSuccess(pTask->inputInfo.queue); + streamQueueProcessSuccess(pTask->inputq.queue); if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); @@ -258,12 +258,12 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) { int8_t type = pItem->type; - STaosQueue* pQueue = pTask->inputInfo.queue->pQueue; - int32_t total = streamQueueGetNumOfItems(pTask->inputInfo.queue) + 1; + STaosQueue* pQueue = pTask->inputq.queue->pQueue; + int32_t total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1; if (type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; - if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputInfo.queue)) { + if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputq.queue)) { double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); stTrace( "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", @@ -290,7 +290,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) msgLen, ver, total, size + SIZE_IN_MiB(msgLen)); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - if (streamQueueIsFull(pTask->inputInfo.queue)) { + if (streamQueueIsFull(pTask->inputq.queue)) { double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 8b2fb4b9b2..4c71841062 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -346,6 +346,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask) { int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; if (streamTaskShouldStop(pTask)) { stDebug("s-task:%s should stop, do not do check downstream again", id); @@ -354,8 +355,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs if (pRsp->status == TASK_DOWNSTREAM_READY) { if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - bool found = false; + bool found = false; int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds); for (int32_t i = 0; i < numOfReqs; i++) { int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i); @@ -402,6 +403,26 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " "downstream again, nodeUpdate needed", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); + + taosThreadMutexLock(&pTask->lock); + int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList); + bool existed = false; + for (int i = 0; i < num; ++i) { + SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i); + if (p->nodeId == pRsp->downstreamNodeId) { + existed = true; + break; + } + } + + if (!existed) { + SDownstreamTaskEpset t = {.nodeId = pRsp->downstreamNodeId}; + taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t); + stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", id, vgId, + t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList)); + } + + taosThreadMutexUnlock(&pTask->lock); return 0; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index ec9715c068..f949d46315 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -59,7 +59,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory pTask->id.idStr = taosStrdup(buf); pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.taskStatus = (fillHistory || hasFillhistory) ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY; - pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; + pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; if (fillHistory) { @@ -337,8 +337,8 @@ void tFreeStreamTask(SStreamTask* pTask) { } int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); - if (pTask->inputInfo.queue) { - streamQueueClose(pTask->inputInfo.queue, pTask->id.taskId); + if (pTask->inputq.queue) { + streamQueueClose(pTask->inputq.queue, pTask->id.taskId); } if (pTask->outputq.queue) { @@ -399,8 +399,11 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList); taosMemoryFree(pTask->outputInfo.pTokenBucket); taosThreadMutexDestroy(&pTask->lock); - taosMemoryFree(pTask); + taosArrayDestroy(pTask->outputInfo.pDownstreamUpdateList); + pTask->outputInfo.pDownstreamUpdateList = NULL; + + taosMemoryFree(pTask); stDebug("s-task:0x%x free task completed", taskId); } @@ -409,10 +412,10 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->refCnt = 1; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.timerActive = 0; - pTask->inputInfo.queue = streamQueueOpen(512 << 10); + pTask->inputq.queue = streamQueueOpen(512 << 10); pTask->outputq.queue = streamQueueOpen(512 << 10); - if (pTask->inputInfo.queue == NULL || pTask->outputq.queue == NULL) { + if (pTask->inputq.queue == NULL || pTask->outputq.queue == NULL) { stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr); return TSDB_CODE_OUT_OF_MEMORY; } @@ -425,7 +428,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i } pTask->execInfo.created = taosGetTimestampMs(); - pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; + pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMeta = pMeta; @@ -462,6 +465,11 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i taosThreadMutexInit(&pTask->lock, &attr); streamTaskOpenAllUpstreamInput(pTask); + pTask->outputInfo.pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset)); + if (pTask->outputInfo.pDownstreamUpdateList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + return TSDB_CODE_SUCCESS; } diff --git a/tests/script/tsim/stream/sliding.sim b/tests/script/tsim/stream/sliding.sim index 18893245fa..97b2464bc8 100644 --- a/tests/script/tsim/stream/sliding.sim +++ b/tests/script/tsim/stream/sliding.sim @@ -582,6 +582,7 @@ sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); sql create stream streams23 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt23 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(20s) sliding(10s); +sleep 1000 sql insert into t1 values(1648791213000,1,1,1,1.0); sql insert into t1 values(1648791223001,2,2,2,1.1);