diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 9bbda8309f..9a6a5329ae 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG main + GIT_TAG 3.0 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 0fb690e756..f0ab2a0e95 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -43,7 +43,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg); int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode); int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); -int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta); void tqSetRestoreVersionInfo(SStreamTask* pTask); +int32_t tqExpandStreamTask(SStreamTask* pTask); #endif // TDENGINE_TQ_COMMON_H diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index bcf081dbfb..03c42e5c7e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -157,7 +157,8 @@ typedef enum EStreamTaskEvent { typedef void FTbSink(SStreamTask* pTask, void* vnode, void* data); typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); -typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver); +typedef int32_t FTaskBuild(void* ahandle, SStreamTask* pTask, int64_t ver); +typedef int32_t FTaskExpand(SStreamTask* pTask); typedef struct { int8_t type; @@ -205,7 +206,6 @@ typedef struct { typedef struct { char stbFullName[TSDB_TABLE_FNAME_LEN]; - int32_t waitingRspCnt; SUseDbRsp dbInfo; } STaskDispatcherShuffle; @@ -312,15 +312,18 @@ typedef struct SMetaHbInfo SMetaHbInfo; typedef struct SDispatchMsgInfo { SStreamDispatchReq* pData; // current dispatch data - int8_t dispatchMsgType; - int64_t checkpointId;// checkpoint id msg - int32_t transId; // transId for current checkpoint - int16_t msgType; // dispatch msg type - int32_t retryCount; // retry send data count - int64_t startTs; // dispatch start time, record total elapsed time for dispatch - SArray* pRetryList; // current dispatch successfully completed node of downstream - void* pRetryTmr; // used to dispatch data after a given time duration - void* pRspTmr; // used to dispatch data after a given time duration + + int8_t dispatchMsgType; + int64_t checkpointId; // checkpoint id msg + int32_t transId; // transId for current checkpoint + int16_t msgType; // dispatch msg type + int32_t msgId; + int64_t startTs; // dispatch start time, record total elapsed time for dispatch + int64_t rspTs; // latest rsp time + void* pRetryTmr; // used to dispatch data after a given time duration + TdThreadMutex lock; + int8_t inMonitor; + SArray* pSendInfo; // SArray } SDispatchMsgInfo; typedef struct STaskQueue { @@ -484,7 +487,8 @@ typedef struct SStreamMeta { SArray* pTaskList; // SArray void* ahandle; TXN* txn; - FTaskExpand* expandFunc; + FTaskBuild* buildTaskFn; + FTaskExpand* expandTaskFn; int32_t vgId; int64_t stage; int32_t role; @@ -708,8 +712,8 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st); // stream task meta void streamMetaInit(); void streamMetaCleanup(); -SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage, - startComplete_fn_t fn); +SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild expandFunc, FTaskExpand expandTaskFn, + int32_t vgId, int64_t stage, startComplete_fn_t fn); void streamMetaClose(SStreamMeta* streamMeta); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey); diff --git a/source/common/src/tvariant.c b/source/common/src/tvariant.c index ae2c8c0c14..aad877312b 100644 --- a/source/common/src/tvariant.c +++ b/source/common/src/tvariant.c @@ -88,7 +88,7 @@ static int32_t parseSignAndUInteger(const char *z, int32_t n, bool *is_neg, uint if (errno == ERANGE || errno == EINVAL || endPtr - z != n) { return TSDB_CODE_FAILED; } - if (val > UINT64_MAX) { + if (val > (double)UINT64_MAX) { errno = ERANGE; return TSDB_CODE_FAILED; } @@ -172,7 +172,7 @@ int32_t toIntegerEx(const char *z, int32_t n, uint32_t type, int64_t *value) { } break; case TK_NK_FLOAT: { double val = round(taosStr2Double(z, &endPtr)); - if (!IS_VALID_INT64(val)) { + if(val < (double)INT64_MIN || val > (double)INT64_MAX){ return TSDB_CODE_FAILED; } if (errno == ERANGE || errno == EINVAL || endPtr - z != n) { @@ -271,7 +271,7 @@ int32_t toUIntegerEx(const char *z, int32_t n, uint32_t type, uint64_t *value) { } break; case TK_NK_FLOAT: { double val = round(taosStr2Double(p, &endPtr)); - if (!IS_VALID_UINT64(val)) { + if (val < 0 || val > (double)UINT64_MAX) { return TSDB_CODE_FAILED; } if (errno == ERANGE || errno == EINVAL || endPtr - z != n) { diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index b0d61ebc06..d322eb2977 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -25,14 +25,6 @@ #define sndDebug(...) do { if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0) // clang-format on -static STaskId replaceStreamTaskId(SStreamTask *pTask) { - ASSERT(pTask->info.fillHistory); - STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - pTask->id.streamId = pTask->streamTaskId.streamId; - pTask->id.taskId = pTask->streamTaskId.taskId; - return id; -} - static void restoreStreamTaskId(SStreamTask *pTask, STaskId *pId) { ASSERT(pTask->info.fillHistory); pTask->id.taskId = pId->taskId; @@ -85,7 +77,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { startRsync(); pSnode->msgCb = pOption->msgCb; - pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback); + pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskBuild *)sndExpandTask, tqExpandStreamTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback); if (pSnode->pMeta == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto FAIL; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 7f5ab8b6e6..22b26498e4 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -264,7 +264,7 @@ int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg); -int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); +int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); int tqCommit(STQ*); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 30710bef27..08fdda0e29 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -90,7 +90,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { int32_t tqInitialize(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); - pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, vgId, -1, tqStartTaskCompleteCallback); + pTq->pStreamMeta = + streamMetaOpen(pTq->path, pTq, tqBuildStreamTask, tqExpandStreamTask, vgId, -1, tqStartTaskCompleteCallback); if (pTq->pStreamMeta == NULL) { return -1; } @@ -713,7 +714,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } -int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { +int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) { + STQ* pTq = (STQ*) pTqObj; + int32_t vgId = TD_VID(pTq->pVnode); tqDebug("s-task:0x%x start to build task", pTask->id.taskId); @@ -1010,16 +1013,6 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { } int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) { - int32_t vgId = TD_VID(pTq->pVnode); - SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg; - -// if (!pTq->pVnode->restored) { -// tqDebug("vgId:%d update-checkpoint-info msg received during restoring, checkpointId:%" PRId64 -// ", transId:%d s-task:0x%x ignore it", -// vgId, pReq->checkpointId, pReq->transId, pReq->taskId); -// return TSDB_CODE_SUCCESS; -// } - return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, msg, msgLen); } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index d16c41ec1e..c27249cff6 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -362,6 +362,7 @@ int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t vgId = pMeta->vgId; + pRsp->upstreamNodeId = htonl(pRsp->upstreamNodeId); pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId); pRsp->streamId = htobe64(pRsp->streamId); pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId); @@ -369,6 +370,9 @@ int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { pRsp->stage = htobe64(pRsp->stage); pRsp->msgId = htonl(pRsp->msgId); + tqDebug("s-task:0x%x vgId:%d recv dispatch-rsp from 0x%x vgId:%d", pRsp->upstreamTaskId, pRsp->upstreamNodeId, + pRsp->downstreamTaskId, pRsp->downstreamNodeId); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId); if (pTask) { streamProcessDispatchRsp(pTask, pRsp, pMsg->code); @@ -414,7 +418,9 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { streamMetaReleaseTask(pMeta, pTask); tCleanupStreamRetrieveReq(&req); - return code; + + // always return success, to disable the auto rsp + return TSDB_CODE_SUCCESS; } int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 2a49c1ba0a..57b8a99fb1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -46,6 +46,12 @@ SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, } pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk)); + if (pLoadInfo->aSttBlk == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFreeClear(pLoadInfo); + return NULL; + } + pLoadInfo->pSchema = pSchema; pLoadInfo->colIds = colList; pLoadInfo->numOfCols = numOfCols; @@ -107,15 +113,21 @@ void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoa SArray *pList = taosArrayGetP(pLDataIterArray, i); for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) { SLDataIter *pIter = taosArrayGetP(pList, j); + if (pIter->pBlockLoadInfo == NULL) { + continue; + } + + SSttBlockLoadCostInfo* pCost = &pIter->pBlockLoadInfo->cost; if (pLoadCost != NULL) { - pLoadCost->loadBlocks += pIter->pBlockLoadInfo->cost.loadBlocks; - pLoadCost->loadStatisBlocks += pIter->pBlockLoadInfo->cost.loadStatisBlocks; - pLoadCost->blockElapsedTime += pIter->pBlockLoadInfo->cost.blockElapsedTime; - pLoadCost->statisElapsedTime += pIter->pBlockLoadInfo->cost.statisElapsedTime; + pLoadCost->loadBlocks += pCost->loadBlocks; + pLoadCost->loadStatisBlocks += pCost->loadStatisBlocks; + pLoadCost->blockElapsedTime += pCost->blockElapsedTime; + pLoadCost->statisElapsedTime += pCost->statisElapsedTime; } destroyLDataIter(pIter); } + taosArrayDestroy(pList); } @@ -903,6 +915,10 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF if (pLoadInfo == NULL) { pLoadInfo = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols); + if (pLoadInfo == NULL) { + code = terrno; + goto _end; + } } memset(pIter, 0, sizeof(SLDataIter)); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index e1cfcba070..fe44f01917 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2240,7 +2240,8 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan }; SSttDataInfoForTable info = {.pKeyRangeList = taosArrayInit(4, sizeof(SSttKeyRange))}; - int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info); + + int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info); if (code != TSDB_CODE_SUCCESS) { return false; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 8ab7f7eeef..273ec6d797 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -985,6 +985,10 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra if (pIter->pBlockLoadInfo == NULL) { pIter->pBlockLoadInfo = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols); + if (pIter->pBlockLoadInfo == NULL) { + tsdbError("failed to create block load info, code: out of memory, %s", pstr); + continue; + } } // load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index c671e7717c..776a7fb95a 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -64,7 +64,7 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) static void resetBoundingBox(MinMaxEntry *range, int32_t type) { if (IS_SIGNED_NUMERIC_TYPE(type)) { range->dMaxVal = INT64_MIN; - range->dMinVal = INT64_MAX; + range->dMinVal = (double)INT64_MAX; } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { range->u64MaxVal = 0; range->u64MinVal = UINT64_MAX; diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 7e344866a5..5f7764f342 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -26,6 +26,10 @@ #include "tudf.h" #include "tudfInt.h" +#ifdef _TD_DARWIN_64 +#include +#endif + typedef struct SUdfdData { bool startCalled; bool needCleanUp; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index f3ec01cf7a..be3da64c6a 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -26,20 +26,16 @@ extern "C" { #endif -#define CHECK_RSP_CHECK_INTERVAL 300 -#define LAUNCH_HTASK_INTERVAL 100 -#define WAIT_FOR_MINIMAL_INTERVAL 100.00 -#define MAX_RETRY_LAUNCH_HISTORY_TASK 40 -#define RETRY_LAUNCH_INTERVAL_INC_RATE 1.2 - -#define MAX_BLOCK_NAME_NUM 1024 -#define DISPATCH_RETRY_INTERVAL_MS 300 -#define MAX_CONTINUE_RETRY_COUNT 5 - -#define META_HB_CHECK_INTERVAL 200 -#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec -#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) - +#define CHECK_RSP_CHECK_INTERVAL 300 +#define LAUNCH_HTASK_INTERVAL 100 +#define WAIT_FOR_MINIMAL_INTERVAL 100.00 +#define MAX_RETRY_LAUNCH_HISTORY_TASK 40 +#define RETRY_LAUNCH_INTERVAL_INC_RATE 1.2 +#define MAX_BLOCK_NAME_NUM 1024 +#define DISPATCH_RETRY_INTERVAL_MS 300 +#define META_HB_CHECK_INTERVAL 200 +#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec +#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) #define STREAM_TASK_QUEUE_CAPACITY 20480 #define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30) @@ -118,6 +114,14 @@ typedef struct { int32_t taskId; } STaskTriggerSendInfo; +typedef struct { + int32_t nodeId; + int32_t status; + int64_t sendTs; + int64_t rspTs; + int32_t retryCount; +} SDispatchEntry; + typedef struct { int64_t streamId; int64_t recvTs; @@ -143,6 +147,12 @@ typedef enum { EXEC_AFTER_IDLE = 0x1, } EExtractDataCode; +typedef enum ECHECKPOINT_BACKUP_TYPE { + DATA_UPLOAD_DISABLE = -1, + DATA_UPLOAD_S3 = 0, + DATA_UPLOAD_RSYNC = 1, +} ECHECKPOINT_BACKUP_TYPE; + extern void* streamTimer; extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; @@ -153,10 +163,9 @@ void streamTimerCleanUp(); void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); -void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration); +void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration); int32_t streamDispatchStreamBlock(SStreamTask* pTask); void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups); -int32_t getNumOfDispatchBranch(SStreamTask* pTask); void clearBufferedDispatchMsg(SStreamTask* pTask); int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); @@ -165,7 +174,7 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT SArray* pRes); void destroyStreamDataBlock(SStreamDataBlock* pBlock); -int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); +int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData, const char* idstr); int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock); int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); @@ -204,12 +213,6 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); void streamMetaRemoveDB(void* arg, char* key); -typedef enum ECHECKPOINT_BACKUP_TYPE { - DATA_UPLOAD_DISABLE = -1, - DATA_UPLOAD_S3 = 0, - DATA_UPLOAD_RSYNC = 1, -} ECHECKPOINT_BACKUP_TYPE; - ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); int32_t streamTaskDownloadCheckpointData(const char* id, char* path); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 11fecf7683..b577147171 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -509,7 +509,7 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { STaskOutputInfo* pOutputInfo = &pTask->outputInfo; if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher; - setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->taskId); + setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->nodeId); stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64, id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId); diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index fa4efc3c6e..fae90f4db8 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -104,10 +104,12 @@ void destroyStreamDataBlock(SStreamDataBlock* pBlock) { taosFreeQitem(pBlock); } -int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) { +int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData, const char* id) { SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock)); if (pArray == NULL) { - return -1; + terrno = TSDB_CODE_OUT_OF_MEMORY; + stError("failed to prepare retrieve block, %s", id); + return terrno; } taosArrayPush(pArray, &(SSDataBlock){0}); @@ -126,7 +128,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock pData->reqId = pReq->reqId; pData->blocks = pArray; - return 0; + return TSDB_CODE_SUCCESS; } SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 9cdc668ec4..b17d0206f0 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -23,13 +23,16 @@ typedef struct SBlockName { char parTbName[TSDB_TABLE_NAME_LEN]; } SBlockName; -static void doRetryDispatchData(void* param, void* tmrId); +static void doMonitorDispatchData(void* param, void* tmrId); static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet); static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq); static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, - int32_t vgSz, int64_t groupId); + int64_t groupId, int64_t now); static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, int64_t dstTaskId, int32_t type); +static int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now); +static bool isDispatchRspTimeout(SDispatchEntry* pEntry, int64_t now); +static void addDispatchEntry(SDispatchMsgInfo* pMsgInfo, int32_t nodeId, int64_t now, bool lock); void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { pMsg->msgType = msgType; @@ -42,7 +45,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas pReq->streamId = pTask->id.streamId; pReq->srcVgId = vgId; pReq->stage = pTask->pMeta->stage; - pReq->msgId = pTask->execInfo.dispatch; + pReq->msgId = pTask->msgInfo.msgId; pReq->upstreamTaskId = pTask->id.taskId; pReq->upstreamChildId = pTask->info.selfChildId; pReq->upstreamNodeId = pTask->info.nodeId; @@ -65,6 +68,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas void streamTaskSendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){ void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp)); ((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId); + SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); pCont->streamId = pReq->streamId; pCont->rspToTaskId = pReq->srcTaskId; @@ -216,26 +220,66 @@ void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups) { taosMemoryFree(pReq); } -int32_t getNumOfDispatchBranch(SStreamTask* pTask) { - return (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) - ? 1 - : taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos); -} - void clearBufferedDispatchMsg(SStreamTask* pTask) { SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; if (pMsgInfo->pData != NULL) { - destroyDispatchMsg(pMsgInfo->pData, getNumOfDispatchBranch(pTask)); + destroyDispatchMsg(pMsgInfo->pData, streamTaskGetNumOfDownstream(pTask)); } pMsgInfo->checkpointId = -1; pMsgInfo->transId = -1; pMsgInfo->pData = NULL; pMsgInfo->dispatchMsgType = 0; + + taosThreadMutexLock(&pMsgInfo->lock); + taosArrayClear(pTask->msgInfo.pSendInfo); + taosThreadMutexUnlock(&pMsgInfo->lock); +} + +static SStreamDispatchReq* createDispatchDataReq(SStreamTask* pTask, const SStreamDataBlock* pData) { + int32_t code = 0; + int32_t type = pTask->outputInfo.type; + int32_t num = streamTaskGetNumOfDownstream(pTask); + + ASSERT(type == TASK_OUTPUT__SHUFFLE_DISPATCH || type == TASK_OUTPUT__FIXED_DISPATCH); + + SStreamDispatchReq* pReqs = taosMemoryCalloc(num, sizeof(SStreamDispatchReq)); + if (pReqs == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t numOfVgroups = taosArrayGetSize(vgInfo); + + for (int32_t i = 0; i < numOfVgroups; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type); + if (code != TSDB_CODE_SUCCESS) { + destroyDispatchMsg(pReqs, numOfVgroups); + terrno = code; + return NULL; + } + } + } else { + int32_t numOfBlocks = taosArrayGetSize(pData->blocks); + int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; + + code = tInitStreamDispatchReq(pReqs, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pReqs); + terrno = code; + return NULL; + } + } + + return pReqs; } static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) { int32_t code = 0; + int64_t now = taosGetTimestampMs(); int32_t numOfBlocks = taosArrayGetSize(pData->blocks); ASSERT(numOfBlocks != 0 && pTask->msgInfo.pData == NULL); @@ -247,48 +291,29 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD pTask->msgInfo.transId = p->info.window.ekey; } + SStreamDispatchReq* pReqs = createDispatchDataReq(pTask, pData); + if (pReqs == NULL) { + stError("s-task:%s failed to create dispatch req", pTask->id.idStr); + return terrno; + } + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq)); - - int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; - code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type); - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(pReq); - return code; - } - for (int32_t i = 0; i < numOfBlocks; i++) { SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); - code = streamAddBlockIntoDispatchMsg(pDataBlock, pReq); + code = streamAddBlockIntoDispatchMsg(pDataBlock, pReqs); if (code != TSDB_CODE_SUCCESS) { - destroyDispatchMsg(pReq, 1); + destroyDispatchMsg(pReqs, 1); return code; } } - pTask->msgInfo.pData = pReq; + addDispatchEntry(&pTask->msgInfo, pTask->outputInfo.fixedDispatcher.nodeId, now, true); + pTask->msgInfo.pData = pReqs; } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - int32_t rspCnt = atomic_load_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt); - ASSERT(rspCnt == 0); SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgroups = taosArrayGetSize(vgInfo); - SStreamDispatchReq* pReqs = taosMemoryCalloc(numOfVgroups, sizeof(SStreamDispatchReq)); - if (pReqs == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - for (int32_t i = 0; i < numOfVgroups; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type); - if (code != TSDB_CODE_SUCCESS) { - destroyDispatchMsg(pReqs, numOfVgroups); - return code; - } - } - for (int32_t i = 0; i < numOfBlocks; i++) { SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); @@ -304,7 +329,8 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD // it's a new vnode to receive dispatch msg, so add one if (pReqs[j].blockNum == 0) { - atomic_add_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1); + SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j); + addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, true); } pReqs[j].blockNum++; @@ -313,7 +339,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD continue; } - code = streamSearchAndAddBlock(pTask, pReqs, pDataBlock, numOfVgroups, pDataBlock->info.id.groupId); + code = streamSearchAndAddBlock(pTask, pReqs, pDataBlock, pDataBlock->info.id.groupId, now); if (code != 0) { destroyDispatchMsg(pReqs, numOfVgroups); return code; @@ -327,9 +353,9 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64 " %p", pTask->id.idStr, pTask->execInfo.dispatch, pTask->pMeta->stage, pTask->msgInfo.pData); } else { + int32_t numOfBranches = taosArrayGetSize(pTask->msgInfo.pSendInfo); stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64 " dstVgNum:%d %p", pTask->id.idStr, - pTask->execInfo.dispatch, pTask->pMeta->stage, pTask->outputInfo.shuffleDispatcher.waitingRspCnt, - pTask->msgInfo.pData); + pTask->execInfo.dispatch, pTask->pMeta->stage, numOfBranches, pTask->msgInfo.pData); } return code; @@ -337,8 +363,8 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatchMsg) { int32_t code = 0; - int32_t msgId = pTask->execInfo.dispatch; const char* id = pTask->id.idStr; + int32_t msgId = pTask->msgInfo.msgId; if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId; @@ -352,10 +378,10 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch } else { SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgroups = taosArrayGetSize(vgInfo); + int32_t numOfBranches = taosArrayGetSize(pTask->msgInfo.pSendInfo); - int32_t actualVgroups = pTask->outputInfo.shuffleDispatcher.waitingRspCnt; stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d/%d vgroup(s), msgId:%d", id, - pTask->info.selfChildId, actualVgroups, numOfVgroups, msgId); + pTask->info.selfChildId, numOfBranches, numOfVgroups, msgId); int32_t numOfSend = 0; for (int32_t i = 0; i < numOfVgroups; i++) { @@ -370,7 +396,7 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch } // no need to try remain, all already send. - if (++numOfSend == actualVgroups) { + if (++numOfSend == numOfBranches) { break; } } @@ -382,102 +408,168 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch return code; } -static void doRetryDispatchData(void* param, void* tmrId) { - SStreamTask* pTask = param; - const char* id = pTask->id.idStr; - int32_t msgId = pTask->execInfo.dispatch; +static void setNotInDispatchMonitor(SDispatchMsgInfo* pMsgInfo) { + taosThreadMutexLock(&pMsgInfo->lock); + pMsgInfo->inMonitor = 0; + taosThreadMutexUnlock(&pMsgInfo->lock); +} + +static void setResendInfo(SDispatchEntry* pEntry, int64_t now) { + pEntry->sendTs = now; + pEntry->rspTs = -1; + pEntry->retryCount += 1; +} + +static void addDispatchEntry(SDispatchMsgInfo* pMsgInfo, int32_t nodeId, int64_t now, bool lock) { + SDispatchEntry entry = {.nodeId = nodeId, .rspTs = -1, .status = 0, .sendTs = now}; + + if (lock) { + taosThreadMutexLock(&pMsgInfo->lock); + } + + taosArrayPush(pMsgInfo->pSendInfo, &entry); + + if (lock) { + taosThreadMutexUnlock(&pMsgInfo->lock); + } +} + +static void doSendFailedDispatch(SStreamTask* pTask, SDispatchEntry* pEntry, int64_t now, const char* pMsg) { + SStreamDispatchReq* pReq = pTask->msgInfo.pData; + + int32_t msgId = pTask->msgInfo.msgId; + SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t numOfVgroups = taosArrayGetSize(vgInfo); + + setResendInfo(pEntry, now); + for (int32_t j = 0; j < numOfVgroups; ++j) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); + if (pVgInfo->vgId == pEntry->nodeId) { + int32_t code = doSendDispatchMsg(pTask, &pReq[j], pVgInfo->vgId, &pVgInfo->epSet); + stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d for %s, msgId:%d, code:%s", + pTask->id.idStr, pTask->info.selfChildId, pReq[j].blockNum, pVgInfo->vgId, pMsg, msgId, tstrerror(code)); + break; + } + } +} + +static void doMonitorDispatchData(void* param, void* tmrId) { + SStreamTask* pTask = param; + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; + SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; + int32_t msgId = pMsgInfo->msgId; + int32_t code = 0; + int64_t now = taosGetTimestampMs(); + + stDebug("s-task:%s start monitor dispatch data", id); if (streamTaskShouldStop(pTask)) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); + setNotInDispatchMonitor(pMsgInfo); return; } - ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT); + // slave task not handle the dispatch, downstream not ready will break the monitor timer + // follower not handle the dispatch rsp + if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stError("s-task:%s vgId:%d follower or downstream not ready, jump out of monitor tmr, ref:%d", id, vgId, ref); + setNotInDispatchMonitor(pMsgInfo); + return; + } - int32_t code = 0; + taosThreadMutexLock(&pMsgInfo->lock); + if (pTask->outputq.status == TASK_OUTPUT_STATUS__NORMAL) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s not in dispatch procedure, abort from timer, ref:%d", pTask->id.idStr, ref); + + pTask->msgInfo.inMonitor = 0; + taosThreadMutexUnlock(&pMsgInfo->lock); + return; + } + taosThreadMutexUnlock(&pMsgInfo->lock); + + int32_t numOfFailed = getFailedDispatchInfo(pMsgInfo, now); + if (numOfFailed == 0) { + stDebug("s-task:%s no error occurs, check again in %dms", id, DISPATCH_RETRY_INTERVAL_MS); + streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); + return; + } { - SArray* pList = taosArrayDup(pTask->msgInfo.pRetryList, NULL); - taosArrayClear(pTask->msgInfo.pRetryList); - SStreamDispatchReq* pReq = pTask->msgInfo.pData; if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t numOfVgroups = taosArrayGetSize(vgInfo); + stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch to down streams, msgId:%d", id, + pTask->info.selfChildId, msgId); - int32_t numOfFailed = taosArrayGetSize(pList); - stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch blocks to %d vgroup(s), msgId:%d", id, - pTask->info.selfChildId, numOfFailed, msgId); + int32_t numOfRetry = 0; + for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) { + SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, i); + if (pEntry->status == TSDB_CODE_SUCCESS && pEntry->rspTs > 0) { + continue; + } - for (int32_t i = 0; i < numOfFailed; i++) { - int32_t vgId = *(int32_t*)taosArrayGet(pList, i); + // downstream not rsp yet beyond threshold that is 10s + if (isDispatchRspTimeout(pEntry, now)) { // not respond yet beyonds 30s, re-send data + doSendFailedDispatch(pTask, pEntry, now, "timeout"); + numOfRetry += 1; + continue; + } - for (int32_t j = 0; j < numOfVgroups; ++j) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); - if (pVgInfo->vgId == vgId) { - stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, - pTask->info.selfChildId, pReq[j].blockNum, pVgInfo->vgId); + // downstream inputQ is closed + if (pEntry->status == TASK_INPUT_STATUS__BLOCKED) { + doSendFailedDispatch(pTask, pEntry, now, "downstream inputQ blocked"); + numOfRetry += 1; + continue; + } - code = doSendDispatchMsg(pTask, &pReq[j], pVgInfo->vgId, &pVgInfo->epSet); - if (code < 0) { - break; - } - } + // handle other errors + if (pEntry->status != TSDB_CODE_SUCCESS) { + doSendFailedDispatch(pTask, pEntry, now, "downstream error"); + numOfRetry += 1; } } stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, - numOfFailed, msgId); + numOfRetry, msgId); } else { - int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId; + int32_t dstVgId = pTask->outputInfo.fixedDispatcher.nodeId; SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet; int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; - stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d", id, - pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId); + ASSERT(taosArrayGetSize(pTask->msgInfo.pSendInfo) == 1); + SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, 0); - code = doSendDispatchMsg(pTask, pReq, vgId, pEpSet); + setResendInfo(pEntry, now); + code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet); + + stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id, + pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code)); } - - taosArrayDestroy(pList); } - if (code != TSDB_CODE_SUCCESS) { - if (!streamTaskShouldStop(pTask)) { - // stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr); - // atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0); - if (streamTaskShouldPause(pTask)) { - streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS * 10); - } else { - streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); - } - } else { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); - } - } else { + if (streamTaskShouldStop(pTask)) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref); - } -} - -void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) { - pTask->msgInfo.retryCount++; - - stTrace("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr, - waitDuration, pTask->execInfo.dispatch, pTask->msgInfo.retryCount); - - if (pTask->msgInfo.pRetryTmr != NULL) { - taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr); + stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); + setNotInDispatchMonitor(pMsgInfo); } else { - pTask->msgInfo.pRetryTmr = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamTimer); + streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); } } -int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, - int64_t groupId) { +void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration) { + if (pTask->msgInfo.pRetryTmr != NULL) { + taosTmrReset(doMonitorDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr); + } else { + pTask->msgInfo.pRetryTmr = taosTmrStart(doMonitorDispatchData, waitDuration, pTask, streamTimer); + } +} + +int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, + int64_t groupId, int64_t now) { uint32_t hashValue = 0; SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; if (pTask->pNameMap == NULL) { @@ -495,23 +587,24 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } else { char ctbName[TSDB_TABLE_FNAME_LEN] = {0}; if (pDataBlock->info.parTbName[0]) { - if(pTask->subtableWithoutMd5 != 1 && - !isAutoTableName(pDataBlock->info.parTbName) && - !alreadyAddGroupId(pDataBlock->info.parTbName, groupId) && - groupId != 0){ - if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(pDataBlock->info.parTbName) && + !alreadyAddGroupId(pDataBlock->info.parTbName, groupId) && groupId != 0) { + if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) { buildCtbNameAddGroupId(NULL, pDataBlock->info.parTbName, groupId); - }else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) { + } else if (pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) { buildCtbNameAddGroupId(pTask->outputInfo.shuffleDispatcher.stbFullName, pDataBlock->info.parTbName, groupId); } } } else { buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName); } - snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); + + snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, + pDataBlock->info.parTbName); /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo; - hashValue = taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); + hashValue = + taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); SBlockName bln = {0}; bln.hashValue = hashValue; memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName)); @@ -520,20 +613,24 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } } - bool found = false; + bool found = false; + int32_t numOfVgroups = taosArrayGetSize(vgInfo); + // TODO: optimize search - int32_t j; - for (j = 0; j < vgSz; j++) { + taosThreadMutexLock(&pTask->msgInfo.lock); + + for (int32_t j = 0; j < numOfVgroups; j++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); - ASSERT(pVgInfo->vgId > 0); if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { + taosThreadMutexUnlock(&pTask->msgInfo.lock); return -1; } if (pReqs[j].blockNum == 0) { - atomic_add_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1); + SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j); + addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, false); } pReqs[j].blockNum++; @@ -541,10 +638,28 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S break; } } + + taosThreadMutexUnlock(&pTask->msgInfo.lock); ASSERT(found); return 0; } +static void initDispatchInfo(SDispatchMsgInfo* pInfo, int32_t msgId) { + pInfo->startTs = taosGetTimestampMs(); + pInfo->rspTs = -1; + pInfo->msgId = msgId; +} + +static void clearDispatchInfo(SDispatchMsgInfo* pInfo) { + pInfo->startTs = -1; + pInfo->msgId = -1; + pInfo->rspTs = -1; +} + +static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) { + pInfo->rspTs = recvTs; +} + int32_t streamDispatchStreamBlock(SStreamTask* pTask) { ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH)); @@ -587,7 +702,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { type == STREAM_INPUT__TRANS_STATE); pTask->execInfo.dispatch += 1; - pTask->msgInfo.startTs = taosGetTimestampMs(); + initDispatchInfo(&pTask->msgInfo, pTask->execInfo.dispatch); int32_t code = doBuildDispatchMsg(pTask, pBlock); if (code == 0) { @@ -599,34 +714,21 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { streamTaskInitTriggerDispatchInfo(pTask); } - int32_t retryCount = 0; - while (1) { - code = sendDispatchMsg(pTask, pTask->msgInfo.pData); - if (code == TSDB_CODE_SUCCESS) { - break; - } + code = sendDispatchMsg(pTask, pTask->msgInfo.pData); - stDebug("s-task:%s failed to dispatch msg:%d to downstream, code:%s, output status:%d, retry cnt:%d", id, - pTask->execInfo.dispatch, tstrerror(terrno), pTask->outputq.status, retryCount); - - // todo deal with only partially success dispatch case - atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0); - if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore - clearBufferedDispatchMsg(pTask); - return code; - } - - if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug( - "s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d", - pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); - - streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); - break; - } + taosThreadMutexLock(&pTask->msgInfo.lock); + if (pTask->msgInfo.inMonitor == 0) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s start dispatch monitor tmr in %dms, ref:%d, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS, ref, + tstrerror(code)); + streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); + pTask->msgInfo.inMonitor = 1; + } else { + stDebug("s-task:%s already in dispatch monitor tmr", id); } + taosThreadMutexUnlock(&pTask->msgInfo.lock); + // this block can not be deleted until it has been sent to downstream task successfully. return TSDB_CODE_SUCCESS; } @@ -817,8 +919,10 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN; ASSERT(dataStrLen > 0); - void* buf = taosMemoryCalloc(1, dataStrLen); - if (buf == NULL) return -1; + void* buf = taosMemoryCalloc(1, dataStrLen); + if (buf == NULL) { + return -1; + } SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; pRetrieve->useconds = 0; @@ -1031,23 +1135,6 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData); bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); - if (delayDispatch) { - taosThreadMutexLock(&pTask->lock); - // we only set the dispatch msg info for current checkpoint trans - if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK && - pTask->chkInfo.pActiveInfo->activeId == pTask->msgInfo.checkpointId) { - ASSERT(pTask->chkInfo.pActiveInfo->transId == pTask->msgInfo.transId); - stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed", - pTask->id.idStr, downstreamId, pTask->msgInfo.checkpointId, pTask->msgInfo.transId); - - streamTaskSetTriggerDispatchConfirmed(pTask, downstreamNodeId); - } else { - stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 " transId:%d discard, since expired", - pTask->id.idStr, pTask->msgInfo.checkpointId, pTask->msgInfo.transId); - } - taosThreadMutexUnlock(&pTask->lock); - } - clearBufferedDispatchMsg(pTask); int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs; @@ -1074,17 +1161,59 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId return 0; } -int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { - const char* id = pTask->id.idStr; - int32_t vgId = pTask->pMeta->vgId; - int32_t msgId = pTask->execInfo.dispatch; +static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, const char* id) { + int32_t numOfRsp = 0; + bool alreadySet = false; + bool updated = false; -#if 0 - // for test purpose, build the failure case - if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER) { - pRsp->inputStatus = TASK_INPUT_STATUS__REFUSED; + taosThreadMutexLock(&pMsgInfo->lock); + for(int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) { + SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j); + if (pEntry->nodeId == vgId) { + ASSERT(!alreadySet); + pEntry->rspTs = now; + pEntry->status = code; + alreadySet = true; + updated = true; + stDebug("s-task:%s record the rsp recv, ts:%"PRId64" code:%d, idx:%d", id, now, code, j); + } + + if (pEntry->rspTs != -1) { + numOfRsp += 1; + } } -#endif + + taosThreadMutexUnlock(&pMsgInfo->lock); + ASSERT(updated); + + return numOfRsp; +} + +bool isDispatchRspTimeout(SDispatchEntry* pEntry, int64_t now) { + return (pEntry->rspTs == -1) && (now - pEntry->sendTs) > 30 * 1000; +} + +int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now) { + int32_t numOfFailed = 0; + taosThreadMutexLock(&pMsgInfo->lock); + + for (int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) { + SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j); + if (pEntry->status != TSDB_CODE_SUCCESS || isDispatchRspTimeout(pEntry, now)) { + numOfFailed += 1; + } + } + taosThreadMutexUnlock(&pMsgInfo->lock); + return numOfFailed; +} + +int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; + SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; + int32_t msgId = pMsgInfo->msgId; + int64_t now = taosGetTimestampMs(); + int32_t totalRsp = 0; // follower not handle the dispatch rsp if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { @@ -1109,53 +1238,61 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId); + totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, id); } else { stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code)); - taosThreadMutexLock(&pTask->lock); - taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId); - taosThreadMutexUnlock(&pTask->lock); + totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, id); } } else { // code == 0 if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream - taosThreadMutexLock(&pTask->lock); - taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId); - taosThreadMutexUnlock(&pTask->lock); + totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, id); + stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for retry dispatch", id, + pRsp->downstreamTaskId, pRsp->downstreamNodeId); + } else { + if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) { + // todo handle the role-changed during checkpoint generation, add test case + stError( + "s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, downstream may become follower or " + "restart already, treat it as success", + id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); + } - stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch", id, - pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS); - } else if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) { - // todo handle the agg task failure, add test case - if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER && - pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - stError("s-task:%s failed to dispatch checkpoint-trigger msg, checkpointId:%" PRId64 - ", set the current checkpoint failed, and send rsp to mnode", - id, pTask->chkInfo.pActiveInfo->activeId); - { // send checkpoint failure msg to mnode directly - pTask->chkInfo.pActiveInfo->failedId = pTask->chkInfo.pActiveInfo->activeId; // record the latest failed checkpoint id - pTask->chkInfo.pActiveInfo->activeId = pTask->chkInfo.pActiveInfo->activeId; - streamTaskSendCheckpointSourceRsp(pTask); + totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, id); + + { + bool delayDispatch = (pMsgInfo->dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); + if (delayDispatch) { + taosThreadMutexLock(&pTask->lock); + // we only set the dispatch msg info for current checkpoint trans + if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK && + pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId) { + ASSERT(pTask->chkInfo.pActiveInfo->transId == pMsgInfo->transId); + stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed", + pTask->id.idStr, pRsp->downstreamTaskId, pMsgInfo->checkpointId, pMsgInfo->transId); + + streamTaskSetTriggerDispatchConfirmed(pTask, pRsp->downstreamNodeId); + } else { + stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 + " transId:%d discard, since expired", + pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId); + } + taosThreadMutexUnlock(&pTask->lock); } - } else { - stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id, - pRsp->downstreamTaskId, pRsp->downstreamNodeId); } } } - int32_t leftRsp = 0; + int32_t notRsp = taosArrayGetSize(pMsgInfo->pSendInfo) - totalRsp; if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - leftRsp = atomic_sub_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1); - ASSERT(leftRsp >= 0); - - if (leftRsp > 0) { + if (notRsp > 0) { stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, waiting " "for %d rsp", - id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), leftRsp); + id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), notRsp); } else { stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, all rsp", @@ -1166,31 +1303,17 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code)); } - ASSERT(leftRsp >= 0); - // all msg rsp already, continue - if (leftRsp == 0) { + if (notRsp == 0) { ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT); // we need to re-try send dispatch msg to downstream tasks - int32_t numOfFailed = taosArrayGetSize(pTask->msgInfo.pRetryList); - if (numOfFailed > 0) { - if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, numOfFailed); - stDebug("s-task:%s waiting rsp set to be %d", id, pTask->outputInfo.shuffleDispatcher.waitingRspCnt); - } - - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s failed to dispatch msg to downstream, add into timer to retry in %dms, ref:%d", - pTask->id.idStr, DISPATCH_RETRY_INTERVAL_MS, ref); - - streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); - } else { // this message has been sent successfully, let's try next one. - pTask->msgInfo.retryCount = 0; - + int32_t numOfFailed = getFailedDispatchInfo(pMsgInfo, now); + if (numOfFailed == 0) { // this message has been sent successfully, let's try next one. // trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state - if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) { - stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state", id, msgId); + if (pMsgInfo->dispatchMsgType == STREAM_INPUT__TRANS_STATE) { + stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state", + id, msgId); ASSERT(pTask->info.fillHistory == 1); code = streamTransferStatePrepare(pTask); @@ -1309,7 +1432,5 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S } streamTrySchedExec(pTask); - return 0; } - diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 98168abae1..b0915640cc 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -541,14 +541,14 @@ int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBloc //static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; } static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; } -static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock) { +static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, int32_t num) { const char* id = pTask->id.idStr; int32_t blockSize = 0; int64_t st = taosGetTimestampMs(); SCheckpointInfo* pInfo = &pTask->chkInfo; int64_t ver = pInfo->processedVer; - stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, 1, "checkpoint-trigger"); + stDebug("s-task:%s start to process batch blocks, num:%d, type:%s", id, num, streamQueueItemGetTypeStr(pBlock->type)); doSetStreamInputBlock(pTask, pBlock, &ver, id); @@ -607,7 +607,7 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB } // 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V. - doStreamTaskExecImpl(pTask, pCheckpointBlock); + doStreamTaskExecImpl(pTask, pCheckpointBlock, 1); } /** @@ -643,7 +643,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { } if (taosGetTimestampMs() - pTask->status.lastExecTs < MIN_INVOKE_INTERVAL) { - stDebug("s-task:%s invoke with high frequency, idle and retry exec in 50ms", id); + stDebug("s-task:%s invoke exec too fast, idle and retry in 50ms", id); streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL); return 0; } @@ -698,7 +698,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { } if (type != STREAM_INPUT__CHECKPOINT) { - doStreamTaskExecImpl(pTask, pInput); + doStreamTaskExecImpl(pTask, pInput, numOfBlocks); streamFreeQitem(pInput); } else { // todo other thread may change the status // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index da1fec5565..d0f9d40469 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -299,8 +299,8 @@ void streamMetaRemoveDB(void* arg, char* key) { taosThreadMutexUnlock(&pMeta->backendMutex); } -SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage, - startComplete_fn_t fn) { +SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, + int32_t vgId, int64_t stage, startComplete_fn_t fn) { SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); if (pMeta == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -369,7 +369,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->scanInfo.scanCounter = 0; pMeta->vgId = vgId; pMeta->ahandle = ahandle; - pMeta->expandFunc = expandFunc; + pMeta->buildTaskFn = buildTaskFn; + pMeta->expandTaskFn = expandTaskFn; pMeta->stage = stage; pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT; pMeta->updateInfo.transId = -1; @@ -602,7 +603,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa return 0; } - if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { + if (pMeta->buildTaskFn(pMeta->ahandle, pTask, ver) < 0) { return -1; } @@ -901,7 +902,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p == NULL) { - code = pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1); + code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1); if (code < 0) { stError("failed to expand s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno)); tFreeStreamTask(pTask); @@ -1522,6 +1523,7 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { } int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, __stream_task_expand_fn expandFn) { + int32_t code = 0; int32_t vgId = pMeta->vgId; stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId); @@ -1541,40 +1543,22 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas ASSERT(pTask->status.downstreamReady == 0); if (pTask->pBackend == NULL) { - int32_t code = expandFn(pTask); + code = expandFn(pTask); if (code != TSDB_CODE_SUCCESS) { streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); - streamMetaReleaseTask(pMeta, pTask); - return code; - } - - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId); - if (pHTask != NULL) { - if (pHTask->pBackend == NULL) { - code = expandFn(pHTask); - if (code != TSDB_CODE_SUCCESS) { - streamMetaAddFailedTaskSelf(pHTask, pInfo->readyTs); - - streamMetaReleaseTask(pMeta, pHTask); - streamMetaReleaseTask(pMeta, pTask); - return code; - } - } - - streamMetaReleaseTask(pMeta, pHTask); - } } } - int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); - if (ret != TSDB_CODE_SUCCESS) { - stError("s-task:%s vgId:%d failed to handle event:%d", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT); - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + if (code == TSDB_CODE_SUCCESS) { + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s vgId:%d failed to handle event:%d", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT); + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + } } streamMetaReleaseTask(pMeta, pTask); - return ret; + return code; } static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) { diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index 7a864a60d2..050d88aaf1 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -197,6 +197,9 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { const char* idStr = pTask->id.idStr; int64_t hStreamId = pTask->hTaskInfo.id.streamId; int32_t hTaskId = pTask->hTaskInfo.id.taskId; + int64_t now = taosGetTimestampMs(); + int32_t code = 0; + ASSERT(hTaskId != 0); // check stream task status in the first place. @@ -226,7 +229,18 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr); streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true); } else { // exist, but not ready, continue check downstream task status - checkFillhistoryTaskStatus(pTask, pHisTask); + if (pHisTask->pBackend == NULL) { + code = pMeta->expandTaskFn(pHisTask); + if (code != TSDB_CODE_SUCCESS) { + streamMetaAddFailedTaskSelf(pHisTask, now); + stError("s-task:%s failed to expand fill-history task, code:%s", pHisTask->id.idStr, tstrerror(code)); + } + } + + if (code == TSDB_CODE_SUCCESS) { + checkFillhistoryTaskStatus(pTask, pHisTask); + } + } streamMetaReleaseTask(pMeta, pHisTask); @@ -306,6 +320,7 @@ void tryLaunchHistoryTask(void* param, void* tmrId) { SLaunchHTaskInfo* pInfo = param; SStreamMeta* pMeta = pInfo->pMeta; int64_t now = taosGetTimestampMs(); + int32_t code = 0; streamMetaWLock(pMeta); @@ -362,13 +377,22 @@ void tryLaunchHistoryTask(void* param, void* tmrId) { streamMetaReleaseTask(pMeta, pTask); return; } else { - checkFillhistoryTaskStatus(pTask, pHTask); - streamMetaReleaseTask(pMeta, pHTask); + if (pHTask->pBackend == NULL) { + code = pMeta->expandTaskFn(pHTask); + if (code != TSDB_CODE_SUCCESS) { + streamMetaAddFailedTaskSelf(pHTask, now); + stError("failed to expand fill-history task:%s, code:%s", pHTask->id.idStr, tstrerror(code)); + } + } - // not in timer anymore - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId, - pHTaskInfo->retryTimes, ref); + if (code == TSDB_CODE_SUCCESS) { + checkFillhistoryTaskStatus(pTask, pHTask); + // not in timer anymore + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId, + pHTaskInfo->retryTimes, ref); + } + streamMetaReleaseTask(pMeta, pHTask); } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f8a4deb6b1..7d869ce538 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -283,10 +283,12 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->status.pSM = streamDestroyStateMachine(pTask->status.pSM); streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo); - pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList); taosMemoryFree(pTask->outputInfo.pTokenBucket); taosThreadMutexDestroy(&pTask->lock); + pTask->msgInfo.pSendInfo = taosArrayDestroy(pTask->msgInfo.pSendInfo); + taosThreadMutexDestroy(&pTask->msgInfo.lock); + pTask->outputInfo.pNodeEpsetUpdateList = taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList); if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) { @@ -373,7 +375,13 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->pMeta = pMeta; pTask->pMsgCb = pMsgCb; - pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t)); + pTask->msgInfo.pSendInfo = taosArrayInit(4, sizeof(SDispatchEntry)); + if (pTask->msgInfo.pSendInfo == NULL) { + stError("s-task:%s failed to create sendInfo struct for stream task, code:Out of memory", pTask->id.idStr); + return terrno; + } + + taosThreadMutexInit(&pTask->msgInfo.lock, NULL); TdThreadMutexAttr attr = {0}; @@ -936,32 +944,36 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) { SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock)); - int8_t status = TASK_INPUT_STATUS__NORMAL; - - // enqueue - if (pData != NULL) { - stDebug("s-task:%s (child %d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr, - pTask->info.selfChildId, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId); - - pData->type = STREAM_INPUT__DATA_RETRIEVE; - pData->srcVgId = 0; - streamRetrieveReqToData(pReq, pData); - if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData) == 0) { - status = TASK_INPUT_STATUS__NORMAL; - } else { - status = TASK_INPUT_STATUS__FAILED; - } - } else { // todo handle oom - /*streamTaskInputFail(pTask);*/ - /*status = TASK_INPUT_STATUS__FAILED;*/ + if (pData == NULL) { + stError("s-task:%s failed to allocated retrieve-block", pTask->id.idStr); + return terrno; } - return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; + // enqueue + stDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr, + pTask->pMeta->vgId, pTask->info.taskLevel, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId); + + pData->type = STREAM_INPUT__DATA_RETRIEVE; + pData->srcVgId = 0; + + int32_t code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr); + if (code != TSDB_CODE_SUCCESS) { + taosFreeQitem(pData); + return code; + } + + code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s failed to put retrieve-block into inputQ, inputQ is full, discard the retrieve msg", + pTask->id.idStr); + } + + return code; } int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) { int32_t code = streamTaskEnqueueRetrieve(pTask, pReq); - if(code != 0){ + if (code != 0) { return code; } return streamTrySchedExec(pTask); diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index c9b981e5f9..dc506cfbc9 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -43,7 +43,7 @@ SStreamState *stateCreate(const char *path) { pTask->ver = 1024; pTask->id.streamId = 1023; pTask->id.taskId = 1111111; - SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, 0, 0, NULL); + SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL); pTask->pMeta = pMeta; SStreamState *p = streamStateOpen((char *)path, pTask, true, 32, 32 * 1024); diff --git a/source/util/src/tunit.c b/source/util/src/tunit.c index 09f59f1e40..4ec9e39fde 100644 --- a/source/util/src/tunit.c +++ b/source/util/src/tunit.c @@ -24,7 +24,7 @@ #define UNIT_ONE_EXBIBYTE (UNIT_ONE_PEBIBYTE * UNIT_SIZE_CONVERT_FACTOR) static int32_t parseCfgIntWithUnit(const char* str, double *res) { - double val, temp = INT64_MAX; + double val, temp = (double)INT64_MAX; char* endPtr; errno = 0; val = taosStr2Int64(str, &endPtr, 0); diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 76d890b26a..b69f1eba4f 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -121,7 +121,7 @@ echo "tmrDebugFlag 131" >> $TAOS_CFG echo "uDebugFlag 131" >> $TAOS_CFG echo "rpcDebugFlag 135" >> $TAOS_CFG echo "jniDebugFlag 131" >> $TAOS_CFG -echo "qDebugFlag 131" >> $TAOS_CFG +echo "qDebugFlag 135" >> $TAOS_CFG echo "cDebugFlag 135" >> $TAOS_CFG echo "dDebugFlag 131" >> $TAOS_CFG echo "vDebugFlag 131" >> $TAOS_CFG @@ -136,7 +136,7 @@ echo "idxDebugFlag 135" >> $TAOS_CFG echo "udfDebugFlag 135" >> $TAOS_CFG echo "smaDebugFlag 135" >> $TAOS_CFG echo "metaDebugFlag 135" >> $TAOS_CFG -echo "stDebugFlag 135" >> $TAOS_CFG +echo "stDebugFlag 143" >> $TAOS_CFG echo "numOfLogLines 20000000" >> $TAOS_CFG echo "asyncLog 0" >> $TAOS_CFG echo "locale en_US.UTF-8" >> $TAOS_CFG diff --git a/tests/script/tsim/stream/checkStreamSTable1.sim b/tests/script/tsim/stream/checkStreamSTable1.sim index dd44f5c102..942a947feb 100644 --- a/tests/script/tsim/stream/checkStreamSTable1.sim +++ b/tests/script/tsim/stream/checkStreamSTable1.sim @@ -57,7 +57,7 @@ loop1: sleep 1000 $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 100 then return -1 endi