From 849f64d557a744fe6c0dac1f84eb2bdad862e7f1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 13 Jun 2024 13:25:29 +0800 Subject: [PATCH 1/4] fix(tsdb): check null for tsdb block load info --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 18 ++++++++++++++---- source/dnode/vnode/src/tsdb/tsdbRead2.c | 3 ++- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 2a49c1ba0a..d02ef92429 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -107,15 +107,21 @@ void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoa SArray *pList = taosArrayGetP(pLDataIterArray, i); for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) { SLDataIter *pIter = taosArrayGetP(pList, j); + if (pIter->pBlockLoadInfo == NULL) { + continue; + } + + SSttBlockLoadCostInfo* pCost = &pIter->pBlockLoadInfo->cost; if (pLoadCost != NULL) { - pLoadCost->loadBlocks += pIter->pBlockLoadInfo->cost.loadBlocks; - pLoadCost->loadStatisBlocks += pIter->pBlockLoadInfo->cost.loadStatisBlocks; - pLoadCost->blockElapsedTime += pIter->pBlockLoadInfo->cost.blockElapsedTime; - pLoadCost->statisElapsedTime += pIter->pBlockLoadInfo->cost.statisElapsedTime; + pLoadCost->loadBlocks += pCost->loadBlocks; + pLoadCost->loadStatisBlocks += pCost->loadStatisBlocks; + pLoadCost->blockElapsedTime += pCost->blockElapsedTime; + pLoadCost->statisElapsedTime += pCost->statisElapsedTime; } destroyLDataIter(pIter); } + taosArrayDestroy(pList); } @@ -903,6 +909,10 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF if (pLoadInfo == NULL) { pLoadInfo = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols); + if (pLoadInfo == NULL) { + code = terrno; + goto _end; + } } memset(pIter, 0, sizeof(SLDataIter)); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index e1cfcba070..fe44f01917 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2240,7 +2240,8 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan }; SSttDataInfoForTable info = {.pKeyRangeList = taosArrayInit(4, sizeof(SSttKeyRange))}; - int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info); + + int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info); if (code != TSDB_CODE_SUCCESS) { return false; } From 5d2d5c3eee2b279bcdf370f1321b254900aaf2ea Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 13 Jun 2024 13:43:57 +0800 Subject: [PATCH 2/4] fix(tsdb): check for null ptr for block load info when retrieving block distribution. --- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 8ab7f7eeef..273ec6d797 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -985,6 +985,10 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra if (pIter->pBlockLoadInfo == NULL) { pIter->pBlockLoadInfo = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols); + if (pIter->pBlockLoadInfo == NULL) { + tsdbError("failed to create block load info, code: out of memory, %s", pstr); + continue; + } } // load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file From 6f32a617f709c514b3ff9f2cd4f67d725a6f314a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 13 Jun 2024 13:45:42 +0800 Subject: [PATCH 3/4] fix(tsdb): check for malloc failure. --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index d02ef92429..57b8a99fb1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -46,6 +46,12 @@ SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, } pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk)); + if (pLoadInfo->aSttBlk == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFreeClear(pLoadInfo); + return NULL; + } + pLoadInfo->pSchema = pSchema; pLoadInfo->colIds = colList; pLoadInfo->numOfCols = numOfCols; From 0f8c0fa8cbb6c7764e17c9dfbcdd7f15e5e76175 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 13 Jun 2024 14:56:20 +0800 Subject: [PATCH 4/4] fix(stream): add dispatch data monitor to handle the network broken problem that may cause the stream process frozen. --- include/libs/stream/tstream.h | 22 +- source/dnode/vnode/src/tq/tq.c | 10 - source/dnode/vnode/src/tqCommon/tqCommon.c | 4 + source/libs/stream/inc/streamInt.h | 47 +- source/libs/stream/src/streamDispatch.c | 557 ++++++++++++--------- source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamTask.c | 12 +- 7 files changed, 384 insertions(+), 270 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index bcf081dbfb..f928ba6314 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -205,7 +205,6 @@ typedef struct { typedef struct { char stbFullName[TSDB_TABLE_FNAME_LEN]; - int32_t waitingRspCnt; SUseDbRsp dbInfo; } STaskDispatcherShuffle; @@ -312,15 +311,18 @@ typedef struct SMetaHbInfo SMetaHbInfo; typedef struct SDispatchMsgInfo { SStreamDispatchReq* pData; // current dispatch data - int8_t dispatchMsgType; - int64_t checkpointId;// checkpoint id msg - int32_t transId; // transId for current checkpoint - int16_t msgType; // dispatch msg type - int32_t retryCount; // retry send data count - int64_t startTs; // dispatch start time, record total elapsed time for dispatch - SArray* pRetryList; // current dispatch successfully completed node of downstream - void* pRetryTmr; // used to dispatch data after a given time duration - void* pRspTmr; // used to dispatch data after a given time duration + + int8_t dispatchMsgType; + int64_t checkpointId; // checkpoint id msg + int32_t transId; // transId for current checkpoint + int16_t msgType; // dispatch msg type + int32_t msgId; + int64_t startTs; // dispatch start time, record total elapsed time for dispatch + int64_t rspTs; // latest rsp time + void* pRetryTmr; // used to dispatch data after a given time duration + TdThreadMutex lock; + int8_t inMonitor; + SArray* pSendInfo; // SArray } SDispatchMsgInfo; typedef struct STaskQueue { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a2ca3662d7..2d7c36c6ec 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1012,16 +1012,6 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { } int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) { - int32_t vgId = TD_VID(pTq->pVnode); - SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg; - -// if (!pTq->pVnode->restored) { -// tqDebug("vgId:%d update-checkpoint-info msg received during restoring, checkpointId:%" PRId64 -// ", transId:%d s-task:0x%x ignore it", -// vgId, pReq->checkpointId, pReq->transId, pReq->taskId); -// return TSDB_CODE_SUCCESS; -// } - return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, msg, msgLen); } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index d16c41ec1e..7beee71be5 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -362,6 +362,7 @@ int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t vgId = pMeta->vgId; + pRsp->upstreamNodeId = htonl(pRsp->upstreamNodeId); pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId); pRsp->streamId = htobe64(pRsp->streamId); pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId); @@ -369,6 +370,9 @@ int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { pRsp->stage = htobe64(pRsp->stage); pRsp->msgId = htonl(pRsp->msgId); + tqDebug("s-task:0x%x vgId:%d recv dispatch-rsp from 0x%x vgId:%d", pRsp->upstreamTaskId, pRsp->upstreamNodeId, + pRsp->downstreamTaskId, pRsp->downstreamNodeId); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId); if (pTask) { streamProcessDispatchRsp(pTask, pRsp, pMsg->code); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index f3ec01cf7a..8bdc0d2343 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -26,20 +26,16 @@ extern "C" { #endif -#define CHECK_RSP_CHECK_INTERVAL 300 -#define LAUNCH_HTASK_INTERVAL 100 -#define WAIT_FOR_MINIMAL_INTERVAL 100.00 -#define MAX_RETRY_LAUNCH_HISTORY_TASK 40 -#define RETRY_LAUNCH_INTERVAL_INC_RATE 1.2 - -#define MAX_BLOCK_NAME_NUM 1024 -#define DISPATCH_RETRY_INTERVAL_MS 300 -#define MAX_CONTINUE_RETRY_COUNT 5 - -#define META_HB_CHECK_INTERVAL 200 -#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec -#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) - +#define CHECK_RSP_CHECK_INTERVAL 300 +#define LAUNCH_HTASK_INTERVAL 100 +#define WAIT_FOR_MINIMAL_INTERVAL 100.00 +#define MAX_RETRY_LAUNCH_HISTORY_TASK 40 +#define RETRY_LAUNCH_INTERVAL_INC_RATE 1.2 +#define MAX_BLOCK_NAME_NUM 1024 +#define DISPATCH_RETRY_INTERVAL_MS 300 +#define META_HB_CHECK_INTERVAL 200 +#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec +#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) #define STREAM_TASK_QUEUE_CAPACITY 20480 #define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30) @@ -118,6 +114,14 @@ typedef struct { int32_t taskId; } STaskTriggerSendInfo; +typedef struct { + int32_t nodeId; + int32_t status; + int64_t sendTs; + int64_t rspTs; + int32_t retryCount; +} SDispatchEntry; + typedef struct { int64_t streamId; int64_t recvTs; @@ -143,6 +147,12 @@ typedef enum { EXEC_AFTER_IDLE = 0x1, } EExtractDataCode; +typedef enum ECHECKPOINT_BACKUP_TYPE { + DATA_UPLOAD_DISABLE = -1, + DATA_UPLOAD_S3 = 0, + DATA_UPLOAD_RSYNC = 1, +} ECHECKPOINT_BACKUP_TYPE; + extern void* streamTimer; extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; @@ -153,10 +163,9 @@ void streamTimerCleanUp(); void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); -void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration); +void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration); int32_t streamDispatchStreamBlock(SStreamTask* pTask); void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups); -int32_t getNumOfDispatchBranch(SStreamTask* pTask); void clearBufferedDispatchMsg(SStreamTask* pTask); int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); @@ -204,12 +213,6 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); void streamMetaRemoveDB(void* arg, char* key); -typedef enum ECHECKPOINT_BACKUP_TYPE { - DATA_UPLOAD_DISABLE = -1, - DATA_UPLOAD_S3 = 0, - DATA_UPLOAD_RSYNC = 1, -} ECHECKPOINT_BACKUP_TYPE; - ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); int32_t streamTaskDownloadCheckpointData(const char* id, char* path); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 9cdc668ec4..c10d716881 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -23,13 +23,15 @@ typedef struct SBlockName { char parTbName[TSDB_TABLE_NAME_LEN]; } SBlockName; -static void doRetryDispatchData(void* param, void* tmrId); +static void doMonitorDispatchData(void* param, void* tmrId); static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet); static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq); static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, - int32_t vgSz, int64_t groupId); + int64_t groupId, int64_t now); static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, int64_t dstTaskId, int32_t type); +static int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now); +static bool isDispatchRspTimeout(SDispatchEntry* pEntry, int64_t now); void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { pMsg->msgType = msgType; @@ -42,7 +44,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas pReq->streamId = pTask->id.streamId; pReq->srcVgId = vgId; pReq->stage = pTask->pMeta->stage; - pReq->msgId = pTask->execInfo.dispatch; + pReq->msgId = pTask->msgInfo.msgId; pReq->upstreamTaskId = pTask->id.taskId; pReq->upstreamChildId = pTask->info.selfChildId; pReq->upstreamNodeId = pTask->info.nodeId; @@ -65,6 +67,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas void streamTaskSendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){ void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp)); ((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId); + SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); pCont->streamId = pReq->streamId; pCont->rspToTaskId = pReq->srcTaskId; @@ -216,26 +219,66 @@ void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups) { taosMemoryFree(pReq); } -int32_t getNumOfDispatchBranch(SStreamTask* pTask) { - return (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) - ? 1 - : taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos); -} - void clearBufferedDispatchMsg(SStreamTask* pTask) { SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; if (pMsgInfo->pData != NULL) { - destroyDispatchMsg(pMsgInfo->pData, getNumOfDispatchBranch(pTask)); + destroyDispatchMsg(pMsgInfo->pData, streamTaskGetNumOfDownstream(pTask)); } pMsgInfo->checkpointId = -1; pMsgInfo->transId = -1; pMsgInfo->pData = NULL; pMsgInfo->dispatchMsgType = 0; + + taosThreadMutexLock(&pMsgInfo->lock); + taosArrayClear(pTask->msgInfo.pSendInfo); + taosThreadMutexUnlock(&pMsgInfo->lock); +} + +static SStreamDispatchReq* createDispatchDataReq(SStreamTask* pTask, const SStreamDataBlock* pData) { + int32_t code = 0; + int32_t type = pTask->outputInfo.type; + int32_t num = streamTaskGetNumOfDownstream(pTask); + + ASSERT(type == TASK_OUTPUT__SHUFFLE_DISPATCH || type == TASK_OUTPUT__FIXED_DISPATCH); + + SStreamDispatchReq* pReqs = taosMemoryCalloc(num, sizeof(SStreamDispatchReq)); + if (pReqs == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t numOfVgroups = taosArrayGetSize(vgInfo); + + for (int32_t i = 0; i < numOfVgroups; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type); + if (code != TSDB_CODE_SUCCESS) { + destroyDispatchMsg(pReqs, numOfVgroups); + terrno = code; + return NULL; + } + } + } else { + int32_t numOfBlocks = taosArrayGetSize(pData->blocks); + int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; + + code = tInitStreamDispatchReq(pReqs, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pReqs); + terrno = code; + return NULL; + } + } + + return pReqs; } static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) { int32_t code = 0; + int64_t now = taosGetTimestampMs(); int32_t numOfBlocks = taosArrayGetSize(pData->blocks); ASSERT(numOfBlocks != 0 && pTask->msgInfo.pData == NULL); @@ -247,48 +290,28 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD pTask->msgInfo.transId = p->info.window.ekey; } + SStreamDispatchReq* pReqs = createDispatchDataReq(pTask, pData); + if (pReqs == NULL) { + stError("s-task:%s failed to create dispatch req", pTask->id.idStr); + return terrno; + } + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq)); - - int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; - code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type); - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(pReq); - return code; - } - for (int32_t i = 0; i < numOfBlocks; i++) { SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); - code = streamAddBlockIntoDispatchMsg(pDataBlock, pReq); + code = streamAddBlockIntoDispatchMsg(pDataBlock, pReqs); if (code != TSDB_CODE_SUCCESS) { - destroyDispatchMsg(pReq, 1); + destroyDispatchMsg(pReqs, 1); return code; } } - pTask->msgInfo.pData = pReq; + pTask->msgInfo.pData = pReqs; } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - int32_t rspCnt = atomic_load_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt); - ASSERT(rspCnt == 0); SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgroups = taosArrayGetSize(vgInfo); - SStreamDispatchReq* pReqs = taosMemoryCalloc(numOfVgroups, sizeof(SStreamDispatchReq)); - if (pReqs == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - for (int32_t i = 0; i < numOfVgroups; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type); - if (code != TSDB_CODE_SUCCESS) { - destroyDispatchMsg(pReqs, numOfVgroups); - return code; - } - } - for (int32_t i = 0; i < numOfBlocks; i++) { SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); @@ -304,7 +327,12 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD // it's a new vnode to receive dispatch msg, so add one if (pReqs[j].blockNum == 0) { - atomic_add_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1); + SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j); + SDispatchEntry entry = {.nodeId = pDstVgroupInfo->vgId, .rspTs = -1, .status = 0, .sendTs = now}; + + taosThreadMutexLock(&pTask->msgInfo.lock); + taosArrayPush(pTask->msgInfo.pSendInfo, &entry); + taosThreadMutexUnlock(&pTask->msgInfo.lock); } pReqs[j].blockNum++; @@ -313,7 +341,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD continue; } - code = streamSearchAndAddBlock(pTask, pReqs, pDataBlock, numOfVgroups, pDataBlock->info.id.groupId); + code = streamSearchAndAddBlock(pTask, pReqs, pDataBlock, pDataBlock->info.id.groupId, now); if (code != 0) { destroyDispatchMsg(pReqs, numOfVgroups); return code; @@ -327,9 +355,9 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64 " %p", pTask->id.idStr, pTask->execInfo.dispatch, pTask->pMeta->stage, pTask->msgInfo.pData); } else { + int32_t numOfBranches = taosArrayGetSize(pTask->msgInfo.pSendInfo); stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64 " dstVgNum:%d %p", pTask->id.idStr, - pTask->execInfo.dispatch, pTask->pMeta->stage, pTask->outputInfo.shuffleDispatcher.waitingRspCnt, - pTask->msgInfo.pData); + pTask->execInfo.dispatch, pTask->pMeta->stage, numOfBranches, pTask->msgInfo.pData); } return code; @@ -337,8 +365,8 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatchMsg) { int32_t code = 0; - int32_t msgId = pTask->execInfo.dispatch; const char* id = pTask->id.idStr; + int32_t msgId = pTask->msgInfo.msgId; if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId; @@ -352,10 +380,10 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch } else { SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgroups = taosArrayGetSize(vgInfo); + int32_t numOfBranches = taosArrayGetSize(pTask->msgInfo.pSendInfo); - int32_t actualVgroups = pTask->outputInfo.shuffleDispatcher.waitingRspCnt; stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d/%d vgroup(s), msgId:%d", id, - pTask->info.selfChildId, actualVgroups, numOfVgroups, msgId); + pTask->info.selfChildId, numOfBranches, numOfVgroups, msgId); int32_t numOfSend = 0; for (int32_t i = 0; i < numOfVgroups; i++) { @@ -370,7 +398,7 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch } // no need to try remain, all already send. - if (++numOfSend == actualVgroups) { + if (++numOfSend == numOfBranches) { break; } } @@ -382,102 +410,154 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch return code; } -static void doRetryDispatchData(void* param, void* tmrId) { - SStreamTask* pTask = param; - const char* id = pTask->id.idStr; - int32_t msgId = pTask->execInfo.dispatch; +static void setNotInDispatchMonitor(SDispatchMsgInfo* pMsgInfo) { + taosThreadMutexLock(&pMsgInfo->lock); + pMsgInfo->inMonitor = 0; + taosThreadMutexUnlock(&pMsgInfo->lock); +} + +static void setResendInfo(SDispatchEntry* pEntry, int64_t now) { + pEntry->sendTs = now; + pEntry->rspTs = -1; + pEntry->retryCount += 1; +} + +static void doSendFailedDispatch(SStreamTask* pTask, SDispatchEntry* pEntry, int64_t now, const char* pMsg) { + SStreamDispatchReq* pReq = pTask->msgInfo.pData; + + int32_t msgId = pTask->msgInfo.msgId; + SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t numOfVgroups = taosArrayGetSize(vgInfo); + + setResendInfo(pEntry, now); + for (int32_t j = 0; j < numOfVgroups; ++j) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); + if (pVgInfo->vgId == pEntry->nodeId) { + int32_t code = doSendDispatchMsg(pTask, &pReq[j], pVgInfo->vgId, &pVgInfo->epSet); + stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d for %s, msgId:%d, code:%s", + pTask->id.idStr, pTask->info.selfChildId, pReq[j].blockNum, pVgInfo->vgId, pMsg, msgId, tstrerror(code)); + break; + } + } +} + +static void doMonitorDispatchData(void* param, void* tmrId) { + SStreamTask* pTask = param; + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; + SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; + int32_t msgId = pMsgInfo->msgId; + int32_t code = 0; + int64_t now = taosGetTimestampMs(); + + stDebug("s-task:%s start monitor dispatch data", id); if (streamTaskShouldStop(pTask)) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); + setNotInDispatchMonitor(pMsgInfo); return; } - ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT); + // slave task not handle the dispatch, downstream not ready will break the monitor timer + // follower not handle the dispatch rsp + if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stError("s-task:%s vgId:%d follower or downstream not ready, jump out of monitor tmr, ref:%d", id, vgId, ref); + setNotInDispatchMonitor(pMsgInfo); + return; + } - int32_t code = 0; + taosThreadMutexLock(&pMsgInfo->lock); + if (pTask->outputq.status == TASK_OUTPUT_STATUS__NORMAL) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s not in dispatch procedure, abort from timer, ref:%d", pTask->id.idStr, ref); + + pTask->msgInfo.inMonitor = 0; + taosThreadMutexUnlock(&pMsgInfo->lock); + return; + } + taosThreadMutexUnlock(&pMsgInfo->lock); + + int32_t numOfFailed = getFailedDispatchInfo(pMsgInfo, now); + if (numOfFailed == 0) { + stDebug("s-task:%s no error occurs, check again in %dms", id, DISPATCH_RETRY_INTERVAL_MS); + streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); + return; + } { - SArray* pList = taosArrayDup(pTask->msgInfo.pRetryList, NULL); - taosArrayClear(pTask->msgInfo.pRetryList); - SStreamDispatchReq* pReq = pTask->msgInfo.pData; if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t numOfVgroups = taosArrayGetSize(vgInfo); + stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch to down streams, msgId:%d", id, + pTask->info.selfChildId, msgId); - int32_t numOfFailed = taosArrayGetSize(pList); - stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch blocks to %d vgroup(s), msgId:%d", id, - pTask->info.selfChildId, numOfFailed, msgId); + int32_t numOfRetry = 0; + for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) { + SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, i); + if (pEntry->status == TSDB_CODE_SUCCESS && pEntry->rspTs > 0) { + continue; + } - for (int32_t i = 0; i < numOfFailed; i++) { - int32_t vgId = *(int32_t*)taosArrayGet(pList, i); + // downstream not rsp yet beyond threshold that is 10s + if (isDispatchRspTimeout(pEntry, now)) { // not respond yet beyonds 30s, re-send data + doSendFailedDispatch(pTask, pEntry, now, "timeout"); + numOfRetry += 1; + continue; + } - for (int32_t j = 0; j < numOfVgroups; ++j) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); - if (pVgInfo->vgId == vgId) { - stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, - pTask->info.selfChildId, pReq[j].blockNum, pVgInfo->vgId); + // downstream inputQ is closed + if (pEntry->status == TASK_INPUT_STATUS__BLOCKED) { + doSendFailedDispatch(pTask, pEntry, now, "downstream inputQ blocked"); + numOfRetry += 1; + continue; + } - code = doSendDispatchMsg(pTask, &pReq[j], pVgInfo->vgId, &pVgInfo->epSet); - if (code < 0) { - break; - } - } + // handle other errors + if (pEntry->status != TSDB_CODE_SUCCESS) { + doSendFailedDispatch(pTask, pEntry, now, "downstream error"); + numOfRetry += 1; } } stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, - numOfFailed, msgId); + numOfRetry, msgId); } else { - int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId; + int32_t dstVgId = pTask->outputInfo.fixedDispatcher.nodeId; SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet; int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; - stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d", id, - pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId); + ASSERT(taosArrayGetSize(pTask->msgInfo.pSendInfo) == 1); + SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, 0); - code = doSendDispatchMsg(pTask, pReq, vgId, pEpSet); + setResendInfo(pEntry, now); + code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet); + + stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id, + pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code)); } - - taosArrayDestroy(pList); } - if (code != TSDB_CODE_SUCCESS) { - if (!streamTaskShouldStop(pTask)) { - // stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr); - // atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0); - if (streamTaskShouldPause(pTask)) { - streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS * 10); - } else { - streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); - } - } else { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); - } - } else { + if (streamTaskShouldStop(pTask)) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref); - } -} - -void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) { - pTask->msgInfo.retryCount++; - - stTrace("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr, - waitDuration, pTask->execInfo.dispatch, pTask->msgInfo.retryCount); - - if (pTask->msgInfo.pRetryTmr != NULL) { - taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr); + stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); + setNotInDispatchMonitor(pMsgInfo); } else { - pTask->msgInfo.pRetryTmr = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamTimer); + streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); } } -int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, - int64_t groupId) { +void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration) { + if (pTask->msgInfo.pRetryTmr != NULL) { + taosTmrReset(doMonitorDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr); + } else { + pTask->msgInfo.pRetryTmr = taosTmrStart(doMonitorDispatchData, waitDuration, pTask, streamTimer); + } +} + +int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, + int64_t groupId, int64_t now) { uint32_t hashValue = 0; SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; if (pTask->pNameMap == NULL) { @@ -495,23 +575,24 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } else { char ctbName[TSDB_TABLE_FNAME_LEN] = {0}; if (pDataBlock->info.parTbName[0]) { - if(pTask->subtableWithoutMd5 != 1 && - !isAutoTableName(pDataBlock->info.parTbName) && - !alreadyAddGroupId(pDataBlock->info.parTbName, groupId) && - groupId != 0){ - if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(pDataBlock->info.parTbName) && + !alreadyAddGroupId(pDataBlock->info.parTbName, groupId) && groupId != 0) { + if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) { buildCtbNameAddGroupId(NULL, pDataBlock->info.parTbName, groupId); - }else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) { + } else if (pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) { buildCtbNameAddGroupId(pTask->outputInfo.shuffleDispatcher.stbFullName, pDataBlock->info.parTbName, groupId); } } } else { buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName); } - snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); + + snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, + pDataBlock->info.parTbName); /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo; - hashValue = taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); + hashValue = + taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); SBlockName bln = {0}; bln.hashValue = hashValue; memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName)); @@ -520,20 +601,25 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } } - bool found = false; + bool found = false; + int32_t numOfVgroups = taosArrayGetSize(vgInfo); + // TODO: optimize search - int32_t j; - for (j = 0; j < vgSz; j++) { + taosThreadMutexLock(&pTask->msgInfo.lock); + + for (int32_t j = 0; j < numOfVgroups; j++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); - ASSERT(pVgInfo->vgId > 0); if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { + taosThreadMutexUnlock(&pTask->msgInfo.lock); return -1; } if (pReqs[j].blockNum == 0) { - atomic_add_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1); + SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j); + SDispatchEntry entry = {.nodeId = pDstVgroupInfo->vgId, .rspTs = -1, .status = 0, .sendTs = now}; + taosArrayPush(pTask->msgInfo.pSendInfo, &entry); } pReqs[j].blockNum++; @@ -541,10 +627,28 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S break; } } + + taosThreadMutexUnlock(&pTask->msgInfo.lock); ASSERT(found); return 0; } +static void initDispatchInfo(SDispatchMsgInfo* pInfo, int32_t msgId) { + pInfo->startTs = taosGetTimestampMs(); + pInfo->rspTs = -1; + pInfo->msgId = msgId; +} + +static void clearDispatchInfo(SDispatchMsgInfo* pInfo) { + pInfo->startTs = -1; + pInfo->msgId = -1; + pInfo->rspTs = -1; +} + +static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) { + pInfo->rspTs = recvTs; +} + int32_t streamDispatchStreamBlock(SStreamTask* pTask) { ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH)); @@ -587,7 +691,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { type == STREAM_INPUT__TRANS_STATE); pTask->execInfo.dispatch += 1; - pTask->msgInfo.startTs = taosGetTimestampMs(); + initDispatchInfo(&pTask->msgInfo, pTask->execInfo.dispatch); int32_t code = doBuildDispatchMsg(pTask, pBlock); if (code == 0) { @@ -599,34 +703,21 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { streamTaskInitTriggerDispatchInfo(pTask); } - int32_t retryCount = 0; - while (1) { - code = sendDispatchMsg(pTask, pTask->msgInfo.pData); - if (code == TSDB_CODE_SUCCESS) { - break; - } + code = sendDispatchMsg(pTask, pTask->msgInfo.pData); - stDebug("s-task:%s failed to dispatch msg:%d to downstream, code:%s, output status:%d, retry cnt:%d", id, - pTask->execInfo.dispatch, tstrerror(terrno), pTask->outputq.status, retryCount); - - // todo deal with only partially success dispatch case - atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0); - if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore - clearBufferedDispatchMsg(pTask); - return code; - } - - if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug( - "s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d", - pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); - - streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); - break; - } + taosThreadMutexLock(&pTask->msgInfo.lock); + if (pTask->msgInfo.inMonitor == 0) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s start dispatch monitor tmr in %dms, ref:%d, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS, ref, + tstrerror(code)); + streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); + pTask->msgInfo.inMonitor = 1; + } else { + stDebug("s-task:%s already in dispatch monitor tmr", id); } + taosThreadMutexUnlock(&pTask->msgInfo.lock); + // this block can not be deleted until it has been sent to downstream task successfully. return TSDB_CODE_SUCCESS; } @@ -817,8 +908,10 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN; ASSERT(dataStrLen > 0); - void* buf = taosMemoryCalloc(1, dataStrLen); - if (buf == NULL) return -1; + void* buf = taosMemoryCalloc(1, dataStrLen); + if (buf == NULL) { + return -1; + } SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; pRetrieve->useconds = 0; @@ -1031,23 +1124,6 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData); bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); - if (delayDispatch) { - taosThreadMutexLock(&pTask->lock); - // we only set the dispatch msg info for current checkpoint trans - if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK && - pTask->chkInfo.pActiveInfo->activeId == pTask->msgInfo.checkpointId) { - ASSERT(pTask->chkInfo.pActiveInfo->transId == pTask->msgInfo.transId); - stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed", - pTask->id.idStr, downstreamId, pTask->msgInfo.checkpointId, pTask->msgInfo.transId); - - streamTaskSetTriggerDispatchConfirmed(pTask, downstreamNodeId); - } else { - stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 " transId:%d discard, since expired", - pTask->id.idStr, pTask->msgInfo.checkpointId, pTask->msgInfo.transId); - } - taosThreadMutexUnlock(&pTask->lock); - } - clearBufferedDispatchMsg(pTask); int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs; @@ -1074,17 +1150,55 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId return 0; } -int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { - const char* id = pTask->id.idStr; - int32_t vgId = pTask->pMeta->vgId; - int32_t msgId = pTask->execInfo.dispatch; +static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, const char* id) { + int32_t numOfRsp = 0; + bool alreadySet = false; -#if 0 - // for test purpose, build the failure case - if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER) { - pRsp->inputStatus = TASK_INPUT_STATUS__REFUSED; + taosThreadMutexLock(&pMsgInfo->lock); + for(int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) { + SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j); + if (pEntry->nodeId == vgId) { + ASSERT(!alreadySet); + pEntry->rspTs = now; + pEntry->status = code; + alreadySet = true; + stDebug("s-task:%s record the rps recv, ts:%"PRId64" code:%d, idx:%d", id, now, code, j); + } + + if (pEntry->rspTs != -1) { + numOfRsp += 1; + } } -#endif + + taosThreadMutexUnlock(&pMsgInfo->lock); + return numOfRsp; +} + +bool isDispatchRspTimeout(SDispatchEntry* pEntry, int64_t now) { + return (pEntry->rspTs == -1) && (now - pEntry->sendTs) > 30 * 1000; +} + +int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now) { + int32_t numOfFailed = 0; + taosThreadMutexLock(&pMsgInfo->lock); + + for (int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) { + SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j); + if (pEntry->status != TSDB_CODE_SUCCESS || isDispatchRspTimeout(pEntry, now)) { + numOfFailed += 1; + } + } + taosThreadMutexUnlock(&pMsgInfo->lock); + return numOfFailed; +} + +int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; + SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; + int32_t msgId = pMsgInfo->msgId; + int64_t now = taosGetTimestampMs(); + int32_t totalRsp = 0; // follower not handle the dispatch rsp if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { @@ -1109,53 +1223,61 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId); + totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, id); } else { stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code)); - taosThreadMutexLock(&pTask->lock); - taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId); - taosThreadMutexUnlock(&pTask->lock); + totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, id); } } else { // code == 0 if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream - taosThreadMutexLock(&pTask->lock); - taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId); - taosThreadMutexUnlock(&pTask->lock); + totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, id); + stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for retry dispatch", id, + pRsp->downstreamTaskId, pRsp->downstreamNodeId); + } else { + if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) { + // todo handle the role-changed during checkpoint generation, add test case + stError( + "s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, downstream may become follower or " + "restart already, treat it as success", + id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); + } - stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch", id, - pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS); - } else if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) { - // todo handle the agg task failure, add test case - if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER && - pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - stError("s-task:%s failed to dispatch checkpoint-trigger msg, checkpointId:%" PRId64 - ", set the current checkpoint failed, and send rsp to mnode", - id, pTask->chkInfo.pActiveInfo->activeId); - { // send checkpoint failure msg to mnode directly - pTask->chkInfo.pActiveInfo->failedId = pTask->chkInfo.pActiveInfo->activeId; // record the latest failed checkpoint id - pTask->chkInfo.pActiveInfo->activeId = pTask->chkInfo.pActiveInfo->activeId; - streamTaskSendCheckpointSourceRsp(pTask); + totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, id); + + { + bool delayDispatch = (pMsgInfo->dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); + if (delayDispatch) { + taosThreadMutexLock(&pTask->lock); + // we only set the dispatch msg info for current checkpoint trans + if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK && + pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId) { + ASSERT(pTask->chkInfo.pActiveInfo->transId == pMsgInfo->transId); + stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed", + pTask->id.idStr, pRsp->downstreamTaskId, pMsgInfo->checkpointId, pMsgInfo->transId); + + streamTaskSetTriggerDispatchConfirmed(pTask, pRsp->downstreamNodeId); + } else { + stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 + " transId:%d discard, since expired", + pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId); + } + taosThreadMutexUnlock(&pTask->lock); } - } else { - stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id, - pRsp->downstreamTaskId, pRsp->downstreamNodeId); } } } - int32_t leftRsp = 0; + int32_t notRsp = taosArrayGetSize(pMsgInfo->pSendInfo) - totalRsp; if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - leftRsp = atomic_sub_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1); - ASSERT(leftRsp >= 0); - - if (leftRsp > 0) { + if (notRsp > 0) { stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, waiting " "for %d rsp", - id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), leftRsp); + id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), notRsp); } else { stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, all rsp", @@ -1166,31 +1288,17 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code)); } - ASSERT(leftRsp >= 0); - // all msg rsp already, continue - if (leftRsp == 0) { + if (notRsp == 0) { ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT); // we need to re-try send dispatch msg to downstream tasks - int32_t numOfFailed = taosArrayGetSize(pTask->msgInfo.pRetryList); - if (numOfFailed > 0) { - if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, numOfFailed); - stDebug("s-task:%s waiting rsp set to be %d", id, pTask->outputInfo.shuffleDispatcher.waitingRspCnt); - } - - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s failed to dispatch msg to downstream, add into timer to retry in %dms, ref:%d", - pTask->id.idStr, DISPATCH_RETRY_INTERVAL_MS, ref); - - streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); - } else { // this message has been sent successfully, let's try next one. - pTask->msgInfo.retryCount = 0; - + int32_t numOfFailed = getFailedDispatchInfo(pMsgInfo, now); + if (numOfFailed == 0) { // this message has been sent successfully, let's try next one. // trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state - if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) { - stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state", id, msgId); + if (pMsgInfo->dispatchMsgType == STREAM_INPUT__TRANS_STATE) { + stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state", + id, msgId); ASSERT(pTask->info.fillHistory == 1); code = streamTransferStatePrepare(pTask); @@ -1312,4 +1420,3 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S return 0; } - diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 98168abae1..bacab3ac7a 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -643,7 +643,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { } if (taosGetTimestampMs() - pTask->status.lastExecTs < MIN_INVOKE_INTERVAL) { - stDebug("s-task:%s invoke with high frequency, idle and retry exec in 50ms", id); + stDebug("s-task:%s invoke exec too fast, idle and retry in 50ms", id); streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL); return 0; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f8a4deb6b1..e96e44c19b 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -283,10 +283,12 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->status.pSM = streamDestroyStateMachine(pTask->status.pSM); streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo); - pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList); taosMemoryFree(pTask->outputInfo.pTokenBucket); taosThreadMutexDestroy(&pTask->lock); + pTask->msgInfo.pSendInfo = taosArrayDestroy(pTask->msgInfo.pSendInfo); + taosThreadMutexDestroy(&pTask->msgInfo.lock); + pTask->outputInfo.pNodeEpsetUpdateList = taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList); if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) { @@ -373,7 +375,13 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->pMeta = pMeta; pTask->pMsgCb = pMsgCb; - pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t)); + pTask->msgInfo.pSendInfo = taosArrayInit(4, sizeof(SDispatchEntry)); + if (pTask->msgInfo.pSendInfo == NULL) { + stError("s-task:%s failed to create sendInfo struct for stream task, code:Out of memory", pTask->id.idStr); + return terrno; + } + + taosThreadMutexInit(&pTask->msgInfo.lock, NULL); TdThreadMutexAttr attr = {0};