diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 7beee71be5..c27249cff6 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -418,7 +418,9 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { streamMetaReleaseTask(pMeta, pTask); tCleanupStreamRetrieveReq(&req); - return code; + + // always return success, to disable the auto rsp + return TSDB_CODE_SUCCESS; } int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 8bdc0d2343..be3da64c6a 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -174,7 +174,7 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT SArray* pRes); void destroyStreamDataBlock(SStreamDataBlock* pBlock); -int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); +int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData, const char* idstr); int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock); int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 11fecf7683..b577147171 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -509,7 +509,7 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { STaskOutputInfo* pOutputInfo = &pTask->outputInfo; if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher; - setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->taskId); + setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->nodeId); stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64, id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId); diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index fa4efc3c6e..fae90f4db8 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -104,10 +104,12 @@ void destroyStreamDataBlock(SStreamDataBlock* pBlock) { taosFreeQitem(pBlock); } -int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) { +int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData, const char* id) { SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock)); if (pArray == NULL) { - return -1; + terrno = TSDB_CODE_OUT_OF_MEMORY; + stError("failed to prepare retrieve block, %s", id); + return terrno; } taosArrayPush(pArray, &(SSDataBlock){0}); @@ -126,7 +128,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock pData->reqId = pReq->reqId; pData->blocks = pArray; - return 0; + return TSDB_CODE_SUCCESS; } SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index c10d716881..b17d0206f0 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -32,6 +32,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas int32_t numOfBlocks, int64_t dstTaskId, int32_t type); static int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now); static bool isDispatchRspTimeout(SDispatchEntry* pEntry, int64_t now); +static void addDispatchEntry(SDispatchMsgInfo* pMsgInfo, int32_t nodeId, int64_t now, bool lock); void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { pMsg->msgType = msgType; @@ -306,6 +307,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD } } + addDispatchEntry(&pTask->msgInfo, pTask->outputInfo.fixedDispatcher.nodeId, now, true); pTask->msgInfo.pData = pReqs; } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -328,11 +330,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD // it's a new vnode to receive dispatch msg, so add one if (pReqs[j].blockNum == 0) { SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j); - SDispatchEntry entry = {.nodeId = pDstVgroupInfo->vgId, .rspTs = -1, .status = 0, .sendTs = now}; - - taosThreadMutexLock(&pTask->msgInfo.lock); - taosArrayPush(pTask->msgInfo.pSendInfo, &entry); - taosThreadMutexUnlock(&pTask->msgInfo.lock); + addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, true); } pReqs[j].blockNum++; @@ -422,6 +420,20 @@ static void setResendInfo(SDispatchEntry* pEntry, int64_t now) { pEntry->retryCount += 1; } +static void addDispatchEntry(SDispatchMsgInfo* pMsgInfo, int32_t nodeId, int64_t now, bool lock) { + SDispatchEntry entry = {.nodeId = nodeId, .rspTs = -1, .status = 0, .sendTs = now}; + + if (lock) { + taosThreadMutexLock(&pMsgInfo->lock); + } + + taosArrayPush(pMsgInfo->pSendInfo, &entry); + + if (lock) { + taosThreadMutexUnlock(&pMsgInfo->lock); + } +} + static void doSendFailedDispatch(SStreamTask* pTask, SDispatchEntry* pEntry, int64_t now, const char* pMsg) { SStreamDispatchReq* pReq = pTask->msgInfo.pData; @@ -618,8 +630,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S if (pReqs[j].blockNum == 0) { SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j); - SDispatchEntry entry = {.nodeId = pDstVgroupInfo->vgId, .rspTs = -1, .status = 0, .sendTs = now}; - taosArrayPush(pTask->msgInfo.pSendInfo, &entry); + addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, false); } pReqs[j].blockNum++; @@ -1153,6 +1164,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, const char* id) { int32_t numOfRsp = 0; bool alreadySet = false; + bool updated = false; taosThreadMutexLock(&pMsgInfo->lock); for(int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) { @@ -1162,7 +1174,8 @@ static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int3 pEntry->rspTs = now; pEntry->status = code; alreadySet = true; - stDebug("s-task:%s record the rps recv, ts:%"PRId64" code:%d, idx:%d", id, now, code, j); + updated = true; + stDebug("s-task:%s record the rsp recv, ts:%"PRId64" code:%d, idx:%d", id, now, code, j); } if (pEntry->rspTs != -1) { @@ -1171,6 +1184,8 @@ static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int3 } taosThreadMutexUnlock(&pMsgInfo->lock); + ASSERT(updated); + return numOfRsp; } @@ -1417,6 +1432,5 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S } streamTrySchedExec(pTask); - return 0; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index e96e44c19b..7d869ce538 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -944,32 +944,36 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) { SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock)); - int8_t status = TASK_INPUT_STATUS__NORMAL; - - // enqueue - if (pData != NULL) { - stDebug("s-task:%s (child %d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr, - pTask->info.selfChildId, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId); - - pData->type = STREAM_INPUT__DATA_RETRIEVE; - pData->srcVgId = 0; - streamRetrieveReqToData(pReq, pData); - if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData) == 0) { - status = TASK_INPUT_STATUS__NORMAL; - } else { - status = TASK_INPUT_STATUS__FAILED; - } - } else { // todo handle oom - /*streamTaskInputFail(pTask);*/ - /*status = TASK_INPUT_STATUS__FAILED;*/ + if (pData == NULL) { + stError("s-task:%s failed to allocated retrieve-block", pTask->id.idStr); + return terrno; } - return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; + // enqueue + stDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr, + pTask->pMeta->vgId, pTask->info.taskLevel, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId); + + pData->type = STREAM_INPUT__DATA_RETRIEVE; + pData->srcVgId = 0; + + int32_t code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr); + if (code != TSDB_CODE_SUCCESS) { + taosFreeQitem(pData); + return code; + } + + code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s failed to put retrieve-block into inputQ, inputQ is full, discard the retrieve msg", + pTask->id.idStr); + } + + return code; } int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) { int32_t code = streamTaskEnqueueRetrieve(pTask, pReq); - if(code != 0){ + if (code != 0) { return code; } return streamTrySchedExec(pTask); diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 76d890b26a..b69f1eba4f 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -121,7 +121,7 @@ echo "tmrDebugFlag 131" >> $TAOS_CFG echo "uDebugFlag 131" >> $TAOS_CFG echo "rpcDebugFlag 135" >> $TAOS_CFG echo "jniDebugFlag 131" >> $TAOS_CFG -echo "qDebugFlag 131" >> $TAOS_CFG +echo "qDebugFlag 135" >> $TAOS_CFG echo "cDebugFlag 135" >> $TAOS_CFG echo "dDebugFlag 131" >> $TAOS_CFG echo "vDebugFlag 131" >> $TAOS_CFG @@ -136,7 +136,7 @@ echo "idxDebugFlag 135" >> $TAOS_CFG echo "udfDebugFlag 135" >> $TAOS_CFG echo "smaDebugFlag 135" >> $TAOS_CFG echo "metaDebugFlag 135" >> $TAOS_CFG -echo "stDebugFlag 135" >> $TAOS_CFG +echo "stDebugFlag 143" >> $TAOS_CFG echo "numOfLogLines 20000000" >> $TAOS_CFG echo "asyncLog 0" >> $TAOS_CFG echo "locale en_US.UTF-8" >> $TAOS_CFG diff --git a/tests/script/tsim/stream/checkStreamSTable1.sim b/tests/script/tsim/stream/checkStreamSTable1.sim index dd44f5c102..942a947feb 100644 --- a/tests/script/tsim/stream/checkStreamSTable1.sim +++ b/tests/script/tsim/stream/checkStreamSTable1.sim @@ -57,7 +57,7 @@ loop1: sleep 1000 $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 100 then return -1 endi