Merge pull request #26147 from taosdata/fix/3_liaohj

fix(test): wait for a little bit more time.
This commit is contained in:
Haojun Liao 2024-06-14 16:34:40 +08:00 committed by GitHub
commit 85baf2b3e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 60 additions and 38 deletions

View File

@ -418,7 +418,9 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
streamMetaReleaseTask(pMeta, pTask);
tCleanupStreamRetrieveReq(&req);
return code;
// always return success, to disable the auto rsp
return TSDB_CODE_SUCCESS;
}
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {

View File

@ -174,7 +174,7 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT
SArray* pRes);
void destroyStreamDataBlock(SStreamDataBlock* pBlock);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData, const char* idstr);
int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock);
int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);

View File

@ -509,7 +509,7 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher;
setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->taskId);
setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->nodeId);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64,
id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);

View File

@ -104,10 +104,12 @@ void destroyStreamDataBlock(SStreamDataBlock* pBlock) {
taosFreeQitem(pBlock);
}
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) {
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData, const char* id) {
SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock));
if (pArray == NULL) {
return -1;
terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("failed to prepare retrieve block, %s", id);
return terrno;
}
taosArrayPush(pArray, &(SSDataBlock){0});
@ -126,7 +128,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
pData->reqId = pReq->reqId;
pData->blocks = pArray;
return 0;
return TSDB_CODE_SUCCESS;
}
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) {

View File

@ -32,6 +32,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas
int32_t numOfBlocks, int64_t dstTaskId, int32_t type);
static int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now);
static bool isDispatchRspTimeout(SDispatchEntry* pEntry, int64_t now);
static void addDispatchEntry(SDispatchMsgInfo* pMsgInfo, int32_t nodeId, int64_t now, bool lock);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
pMsg->msgType = msgType;
@ -306,6 +307,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
}
}
addDispatchEntry(&pTask->msgInfo, pTask->outputInfo.fixedDispatcher.nodeId, now, true);
pTask->msgInfo.pData = pReqs;
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
@ -328,11 +330,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
// it's a new vnode to receive dispatch msg, so add one
if (pReqs[j].blockNum == 0) {
SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j);
SDispatchEntry entry = {.nodeId = pDstVgroupInfo->vgId, .rspTs = -1, .status = 0, .sendTs = now};
taosThreadMutexLock(&pTask->msgInfo.lock);
taosArrayPush(pTask->msgInfo.pSendInfo, &entry);
taosThreadMutexUnlock(&pTask->msgInfo.lock);
addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, true);
}
pReqs[j].blockNum++;
@ -422,6 +420,20 @@ static void setResendInfo(SDispatchEntry* pEntry, int64_t now) {
pEntry->retryCount += 1;
}
static void addDispatchEntry(SDispatchMsgInfo* pMsgInfo, int32_t nodeId, int64_t now, bool lock) {
SDispatchEntry entry = {.nodeId = nodeId, .rspTs = -1, .status = 0, .sendTs = now};
if (lock) {
taosThreadMutexLock(&pMsgInfo->lock);
}
taosArrayPush(pMsgInfo->pSendInfo, &entry);
if (lock) {
taosThreadMutexUnlock(&pMsgInfo->lock);
}
}
static void doSendFailedDispatch(SStreamTask* pTask, SDispatchEntry* pEntry, int64_t now, const char* pMsg) {
SStreamDispatchReq* pReq = pTask->msgInfo.pData;
@ -618,8 +630,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
if (pReqs[j].blockNum == 0) {
SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j);
SDispatchEntry entry = {.nodeId = pDstVgroupInfo->vgId, .rspTs = -1, .status = 0, .sendTs = now};
taosArrayPush(pTask->msgInfo.pSendInfo, &entry);
addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, false);
}
pReqs[j].blockNum++;
@ -1153,6 +1164,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, const char* id) {
int32_t numOfRsp = 0;
bool alreadySet = false;
bool updated = false;
taosThreadMutexLock(&pMsgInfo->lock);
for(int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) {
@ -1162,7 +1174,8 @@ static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int3
pEntry->rspTs = now;
pEntry->status = code;
alreadySet = true;
stDebug("s-task:%s record the rps recv, ts:%"PRId64" code:%d, idx:%d", id, now, code, j);
updated = true;
stDebug("s-task:%s record the rsp recv, ts:%"PRId64" code:%d, idx:%d", id, now, code, j);
}
if (pEntry->rspTs != -1) {
@ -1171,6 +1184,8 @@ static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int3
}
taosThreadMutexUnlock(&pMsgInfo->lock);
ASSERT(updated);
return numOfRsp;
}
@ -1417,6 +1432,5 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
}
streamTrySchedExec(pTask);
return 0;
}

View File

@ -944,32 +944,36 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock));
int8_t status = TASK_INPUT_STATUS__NORMAL;
// enqueue
if (pData != NULL) {
stDebug("s-task:%s (child %d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
pTask->info.selfChildId, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId);
pData->type = STREAM_INPUT__DATA_RETRIEVE;
pData->srcVgId = 0;
streamRetrieveReqToData(pReq, pData);
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData) == 0) {
status = TASK_INPUT_STATUS__NORMAL;
} else {
status = TASK_INPUT_STATUS__FAILED;
}
} else { // todo handle oom
/*streamTaskInputFail(pTask);*/
/*status = TASK_INPUT_STATUS__FAILED;*/
if (pData == NULL) {
stError("s-task:%s failed to allocated retrieve-block", pTask->id.idStr);
return terrno;
}
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
// enqueue
stDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
pTask->pMeta->vgId, pTask->info.taskLevel, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId);
pData->type = STREAM_INPUT__DATA_RETRIEVE;
pData->srcVgId = 0;
int32_t code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr);
if (code != TSDB_CODE_SUCCESS) {
taosFreeQitem(pData);
return code;
}
code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to put retrieve-block into inputQ, inputQ is full, discard the retrieve msg",
pTask->id.idStr);
}
return code;
}
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
int32_t code = streamTaskEnqueueRetrieve(pTask, pReq);
if(code != 0){
if (code != 0) {
return code;
}
return streamTrySchedExec(pTask);

View File

@ -121,7 +121,7 @@ echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "uDebugFlag 131" >> $TAOS_CFG
echo "rpcDebugFlag 135" >> $TAOS_CFG
echo "jniDebugFlag 131" >> $TAOS_CFG
echo "qDebugFlag 131" >> $TAOS_CFG
echo "qDebugFlag 135" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG
echo "dDebugFlag 131" >> $TAOS_CFG
echo "vDebugFlag 131" >> $TAOS_CFG
@ -136,7 +136,7 @@ echo "idxDebugFlag 135" >> $TAOS_CFG
echo "udfDebugFlag 135" >> $TAOS_CFG
echo "smaDebugFlag 135" >> $TAOS_CFG
echo "metaDebugFlag 135" >> $TAOS_CFG
echo "stDebugFlag 135" >> $TAOS_CFG
echo "stDebugFlag 143" >> $TAOS_CFG
echo "numOfLogLines 20000000" >> $TAOS_CFG
echo "asyncLog 0" >> $TAOS_CFG
echo "locale en_US.UTF-8" >> $TAOS_CFG

View File

@ -57,7 +57,7 @@ loop1:
sleep 1000
$loop_count = $loop_count + 1
if $loop_count == 10 then
if $loop_count == 100 then
return -1
endi