diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5e647c0f9e..0f399da8fd 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -384,8 +384,8 @@ typedef struct SSinkRecorder { typedef struct STaskExecStatisInfo { int64_t created; - int64_t init; - int64_t start; + int64_t checkTs; + int64_t readyTs; int64_t startCheckpointId; int64_t startCheckpointVer; @@ -432,6 +432,22 @@ typedef struct SUpstreamInfo { int32_t numOfClosed; } SUpstreamInfo; +typedef struct SDownstreamStatusInfo { + int64_t reqId; + int32_t taskId; + int64_t rspTs; + int32_t status; +} SDownstreamStatusInfo; + +typedef struct STaskCheckInfo { + SArray* pList; + int64_t startTs; + int32_t notReadyTasks; + int32_t inCheckProcess; + tmr_h checkRspTmr; + TdThreadMutex checkInfoLock; +} STaskCheckInfo; + struct SStreamTask { int64_t ver; SStreamTaskId id; @@ -455,14 +471,12 @@ struct SStreamTask { SStreamState* pState; // state backend SArray* pRspMsgList; SUpstreamInfo upstreamInfo; + STaskCheckInfo taskCheckInfo; // the followings attributes don't be serialized SScanhistorySchedInfo schedHistoryInfo; - int32_t notReadyTasks; int32_t numOfWaitingUpstream; - int64_t checkReqId; - SArray* checkReqIds; // shuffle int32_t refCnt; int32_t transferStateAlignCnt; struct SStreamMeta* pMeta; @@ -478,7 +492,7 @@ typedef struct STaskStartInfo { int64_t startTs; int64_t readyTs; int32_t tasksWillRestart; - int32_t taskStarting; // restart flag, sentinel to guard the restart procedure. + int32_t startAllTasks; // restart flag, sentinel to guard the restart procedure. SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed int64_t elapsedTime; @@ -821,8 +835,6 @@ void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask); void streamTaskResume(SStreamTask* pTask); int32_t streamTaskStop(SStreamTask* pTask); int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask); -void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); -void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask); int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); @@ -832,6 +844,15 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); bool streamTaskIsSinkTask(const SStreamTask* pTask); int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); +int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs); +int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id); +int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId, + int32_t* pNotReady, const char* id); +void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo); +int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id); +int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id); +int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask); + void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 0624f4e71e..521f359f73 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -657,7 +657,9 @@ _OVER: return -1; } -static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks +// 1. stream number check +// 2. target stable can not be target table of other existed streams. +static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) { int32_t numOfStream = 0; SStreamObj *pStream = NULL; void *pIter = NULL; @@ -670,14 +672,16 @@ static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { / sdbRelease(pMnode->pSdb, pStream); if (numOfStream > MND_STREAM_MAX_NUM) { - mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM); + mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM, + pStreamObj->name); sdbCancelFetch(pMnode->pSdb, pIter); terrno = TSDB_CODE_MND_TOO_MANY_STREAMS; return terrno; } if (pStream->targetStbUid == pStreamObj->targetStbUid) { - mError("Cannot write the same stable as other stream:%s", pStream->name); + mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name, + pStreamObj->name); sdbCancelFetch(pMnode->pSdb, pIter); terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE; return terrno; @@ -742,7 +746,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - if (checkForNumOfStreams(pMnode, &streamObj) < 0) { + if (doStreamCheck(pMnode, &streamObj) < 0) { goto _OVER; } @@ -978,7 +982,6 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre int32_t code = -1; int64_t ts = taosGetTimestampMs(); if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) { - // mWarn("checkpoint interval less than the threshold, ignore it"); return TSDB_CODE_SUCCESS; } @@ -1396,6 +1399,15 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) return 0; } +static void int64ToHexStr(int64_t id, char *pBuf, int32_t bufLen) { + memset(pBuf, 0, bufLen); + pBuf[2] = '0'; + pBuf[3] = 'x'; + + int32_t len = tintToHex(id, &pBuf[4]); + varDataSetLen(pBuf, len + 2); +} + static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -1420,19 +1432,14 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB // stream id char buf[128] = {0}; - int32_t len = tintToHex(pStream->uid, &buf[4]); - buf[2] = '0'; - buf[3] = 'x'; - varDataSetLen(buf, len + 2); + int64ToHexStr(pStream->uid, buf, tListLen(buf)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, buf, false); // related fill-history stream id - memset(buf, 0, tListLen(buf)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); if (pStream->hTaskUid != 0) { - len = tintToHex(pStream->hTaskUid, &buf[4]); - varDataSetLen(buf, len + 2); + int64ToHexStr(pStream->hTaskUid, buf, tListLen(buf)); colDataSetVal(pColInfo, numOfRows, buf, false); } else { colDataSetVal(pColInfo, numOfRows, buf, true); @@ -1531,11 +1538,8 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS // task id pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - char idstr[128] = {0}; - int32_t len = tintToHex(pTask->id.taskId, &idstr[4]); - idstr[2] = '0'; - idstr[3] = 'x'; - varDataSetLen(idstr, len + 2); + char idstr[128] = {0}; + int64ToHexStr(pTask->id.taskId, idstr, tListLen(idstr)); colDataSetVal(pColInfo, numOfRows, idstr, false); // node type @@ -1651,11 +1655,7 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS // history_task_id pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); if (pe->hTaskId != 0) { - memset(idstr, 0, tListLen(idstr)); - len = tintToHex(pe->hTaskId, &idstr[4]); - idstr[2] = '0'; - idstr[3] = 'x'; - varDataSetLen(idstr, len + 2); + int64ToHexStr(pe->hTaskId, idstr, tListLen(idstr)); colDataSetVal(pColInfo, numOfRows, idstr, false); } else { colDataSetVal(pColInfo, numOfRows, 0, true); @@ -2029,7 +2029,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange } } - // no need to build the trans to handle the vgroup upddate + // no need to build the trans to handle the vgroup update if (pTrans == NULL) { return 0; } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 1fedee3bcf..6a381ad31e 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -258,7 +258,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); if (numOfUpdated > 0) { - mDebug("%d stream node(s) need updated from report of hbMsg(vgId:%d)", numOfUpdated, req.vgId); + mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId); setNodeEpsetExpiredFlag(req.pUpdateNodes); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 00ca0319d0..8edc0fed4d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1102,7 +1102,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) } if (!pTq->pVnode->restored) { - tqDebug("vgId:%d checkpoint-source msg received during restoring, s-task:0x%x ignore it", vgId, req.taskId); + tqDebug("vgId:%d checkpoint-source msg received during restoring, checkpointId:%" PRId64 + ", transId:%d s-task:0x%x ignore it", + vgId, req.checkpointId, req.transId, req.taskId); SRpcMsg rsp = {0}; buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); tmsgSendRsp(&rsp); // error occurs @@ -1111,7 +1113,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); if (pTask == NULL) { - tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed", vgId, req.taskId); + tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. checkpointId:%" PRId64 + " transId:%d it may have been destroyed", + vgId, req.taskId, req.checkpointId, req.transId); SRpcMsg rsp = {0}; buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); tmsgSendRsp(&rsp); // error occurs @@ -1123,7 +1127,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) pTask->chkInfo.checkpointingId = req.checkpointId; pTask->chkInfo.transId = req.transId; - tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64 + tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpointId:%" PRId64 ", transId:%d set it failed", pTask->id.idStr, req.checkpointId, req.transId); streamMetaReleaseTask(pMeta, pTask); @@ -1140,7 +1144,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) if (req.mndTrigger == 1) { if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) { - tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpoint:%" PRId64 ", set it failure", + tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpointId:%" PRId64 ", set it failure", pTask->id.idStr, req.checkpointId); taosThreadMutexUnlock(&pTask->lock); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 15538b61e3..1da096224c 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -309,11 +309,10 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pRec->numOfSubmit += 1; if ((pRec->numOfSubmit % 1000) == 0) { - double el = (taosGetTimestampMs() - pTask->execInfo.start) / 1000.0; + double el = (taosGetTimestampMs() - pTask->execInfo.readyTs) / 1000.0; tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 " submit into dst table, %.2fMiB duration:%.2f Sec.", - pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->dataSize), - el); + id, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->dataSize), el); } return TSDB_CODE_SUCCESS; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 7299d172c6..d2c7924cf5 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -200,6 +200,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM streamTaskUpdateEpsetInfo(pTask, req.pNodeList); streamTaskResetStatus(pTask); + streamTaskCompleteCheck(&pTask->taskCheckInfo, pTask->id.idStr); SStreamTask** ppHTask = NULL; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { @@ -213,6 +214,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM } else { tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); + streamTaskResetStatus(*ppHTask); + streamTaskCompleteCheck(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr); } } @@ -455,8 +458,8 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId); } -static void setParam(SStreamTask* pTask, int64_t* initTs, bool* hasHTask, STaskId* pId) { - *initTs = pTask->execInfo.init; +static void setParam(SStreamTask* pTask, int64_t* startCheckTs, bool* hasHTask, STaskId* pId) { + *startCheckTs = pTask->execInfo.checkTs; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { *hasHTask = true; @@ -525,6 +528,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe if (pTask == NULL) { streamMetaRLock(pMeta); + // let's try to find this task in hashmap SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask != NULL) { setParam(*ppTask, &initTs, &hasHistoryTask, &fId); @@ -533,7 +537,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe if (hasHistoryTask) { streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false); } - } else { + } else { // not exist even in the hash map of meta, forget it streamMetaRUnLock(pMeta); } @@ -762,7 +766,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { int64_t st = taosGetTimestampMs(); streamMetaWLock(pMeta); - if (pMeta->startInfo.taskStarting == 1) { + if (pMeta->startInfo.startAllTasks == 1) { pMeta->startInfo.restartCount += 1; tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId, pMeta->startInfo.restartCount); @@ -770,7 +774,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { return TSDB_CODE_SUCCESS; } - pMeta->startInfo.taskStarting = 1; + pMeta->startInfo.startAllTasks = 1; streamMetaWUnLock(pMeta); terrno = 0; @@ -886,7 +890,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { bool scanWal = false; streamMetaWLock(pMeta); - if (pStartInfo->taskStarting == 1) { + if (pStartInfo->startAllTasks == 1) { tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId, pMeta->startInfo.restartCount); } else { // not in starting procedure @@ -936,13 +940,20 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr); taosThreadMutexLock(&pTask->lock); + streamTaskClearCheckInfo(pTask, true); // clear flag set during do checkpoint, and open inputQ for all upstream tasks - if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) { + SStreamTaskState *pState = streamTaskGetStatus(pTask); + if (pState->state == TASK_STATUS__CK) { tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d", pTask->id.idStr, pTask->chkInfo.checkpointingId, pTask->chkInfo.transId); - streamTaskClearCheckInfo(pTask, true); streamTaskSetStatusReady(pTask); + } else if (pState->state == TASK_STATUS__UNINIT) { + tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr); + ASSERT(pTask->status.downstreamReady == 0); + /*int32_t ret = */ streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); + } else { + tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name); } taosThreadMutexUnlock(&pTask->lock); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 63d8c05bf3..1ea037b4e9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4501,6 +4501,10 @@ int32_t tsdbReaderResume2(STsdbReader* pReader) { goto _err; } + // open reader failure may cause the flag still to be READER_STATUS_SUSPEND, which may cause suspend reader failure. + // So we need to set it A.S.A.P + pReader->flag = READER_STATUS_NORMAL; + if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) { code = doOpenReaderImpl(pReader); if (code != TSDB_CODE_SUCCESS) { @@ -4531,7 +4535,6 @@ int32_t tsdbReaderResume2(STsdbReader* pReader) { } } - pReader->flag = READER_STATUS_NORMAL; tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRId32 ", in this query %s", pReader, pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr); return code; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 72cf8295dc..166a230c76 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -579,12 +579,12 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) vInfo("vgId:%d sync restore finished, start to launch stream task(s)", pVnode->config.vgId); int32_t numOfTasks = tqStreamTasksGetTotalNum(pMeta); if (numOfTasks > 0) { - if (pMeta->startInfo.taskStarting == 1) { + if (pMeta->startInfo.startAllTasks == 1) { pMeta->startInfo.restartCount += 1; tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId, pMeta->startInfo.restartCount); } else { - pMeta->startInfo.taskStarting = 1; + pMeta->startInfo.startAllTasks = 1; streamMetaWUnLock(pMeta); tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index d1cc4fb710..b3ed86cff8 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -26,7 +26,7 @@ extern "C" { #endif -#define CHECK_DOWNSTREAM_INTERVAL 100 +#define CHECK_RSP_INTERVAL 300 #define LAUNCH_HTASK_INTERVAL 100 #define WAIT_FOR_MINIMAL_INTERVAL 100.00 #define MAX_RETRY_LAUNCH_HISTORY_TASK 40 diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 380680b642..06093cbaf8 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -3704,7 +3704,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb { char tbuf[256] = {0}; ginitDict[i].toStrFunc((void*)key, tbuf); - stDebug("streamState str: %s succ to write to %s_%s, len: %d", tbuf, wrapper->idstr, ginitDict[i].key, vlen); + stTrace("streamState str: %s succ to write to %s_%s, len: %d", tbuf, wrapper->idstr, ginitDict[i].key, vlen); } return 0; } @@ -3729,7 +3729,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb { char tbuf[256] = {0}; ginitDict[cfIdx].toStrFunc((void*)key, tbuf); - stDebug("streamState str: %s succ to write to %s_%s", tbuf, wrapper->idstr, ginitDict[cfIdx].key); + stTrace("streamState str: %s succ to write to %s_%s", tbuf, wrapper->idstr, ginitDict[cfIdx].key); } return 0; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 86ee2b837d..67b68f73ad 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -396,6 +396,7 @@ int32_t getChkpMeta(char* id, char* path, SArray* list) { taosMemoryFree(file); return code; } + int32_t doUploadChkp(void* param) { SAsyncUploadArg* arg = param; char* path = NULL; @@ -436,6 +437,7 @@ int32_t doUploadChkp(void* param) { taosMemoryFree(arg); return code; } + int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { // async upload UPLOAD_TYPE type = getUploadType(); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 3c22f33f93..4aecb6cc9d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1119,7 +1119,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { .stage = pMeta->stage, .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), - .startTime = (*pTask)->execInfo.start, + .startTime = (*pTask)->execInfo.readyTs, .checkpointInfo.latestId = (*pTask)->chkInfo.checkpointId, .checkpointInfo.latestVer = (*pTask)->chkInfo.checkpointVer, .checkpointInfo.latestTime = (*pTask)->chkInfo.checkpointTime, @@ -1141,7 +1141,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.transId; if (entry.checkpointInfo.failed) { - stInfo("s-task:%s send kill checkpoint trans info, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId); + stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId); } } @@ -1329,7 +1329,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { pStartInfo->readyTs = 0; // reset the sentinel flag value to be 0 - pStartInfo->taskStarting = 0; + pStartInfo->startAllTasks = 0; } void streamMetaRLock(SStreamMeta* pMeta) { @@ -1496,7 +1496,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { streamLaunchFillHistoryTask(pTask); } - streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, true); + streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, true); streamMetaReleaseTask(pMeta, pTask); continue; } @@ -1506,10 +1506,10 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); code = ret; - streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, false); if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->checkTs, pInfo->readyTs, false); } } @@ -1601,10 +1601,10 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas if (ret != TSDB_CODE_SUCCESS) { stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); - streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->init, pInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->checkTs, pInfo->readyTs, false); if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->checkTs, pInfo->readyTs, false); } } @@ -1617,7 +1617,7 @@ static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) void* pIter = NULL; size_t keyLen = 0; - stInfo("vgId:%d %d tasks check-downstream completed %s", vgId, taosHashGetSize(pTaskSet), + stInfo("vgId:%d %d tasks check-downstream completed, %s", vgId, taosHashGetSize(pTaskSet), succ ? "success" : "failed"); while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) { @@ -1641,7 +1641,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 streamMetaWLock(pMeta); - if (pStartInfo->taskStarting != 1) { + if (pStartInfo->startAllTasks != 1) { int64_t el = endTs - startTs; qDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms", pMeta->vgId, taskId, ready, el); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 276997e5a8..b9b7c8ddfa 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -56,8 +56,8 @@ int32_t streamTaskSetReady(SStreamTask* pTask) { ASSERT(pTask->status.downstreamReady == 0); pTask->status.downstreamReady = 1; - pTask->execInfo.start = taosGetTimestampMs(); - int64_t el = (pTask->execInfo.start - pTask->execInfo.init); + pTask->execInfo.readyTs = taosGetTimestampMs(); + int64_t el = (pTask->execInfo.readyTs - pTask->execInfo.checkTs); stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", pTask->id.idStr, numOfDowns, el, p->name); return TSDB_CODE_SUCCESS; @@ -83,7 +83,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { return 0; } -static void doReExecScanhistory(void* param, void* tmrId) { +static void doExecScanhistoryInFuture(void* param, void* tmrId) { SStreamTask* pTask = param; pTask->schedHistoryInfo.numOfTicks -= 1; @@ -105,7 +105,7 @@ static void doReExecScanhistory(void* param, void* tmrId) { // release the task. streamMetaReleaseTask(pTask->pMeta, pTask); } else { - taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer); + taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer); } } @@ -131,9 +131,9 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks * 0.1, ref); if (pTask->schedHistoryInfo.pTimer == NULL) { - pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer); + pTask->schedHistoryInfo.pTimer = taosTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer); } else { - taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer); + taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer); } return TSDB_CODE_SUCCESS; @@ -184,12 +184,20 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { ASSERT(pTask->status.downstreamReady == 0); + int32_t code = streamTaskStartCheckDownstream(&pTask->taskCheckInfo, pTask->id.idStr); + if (code != TSDB_CODE_SUCCESS) { + return; + } + + streamTaskInitTaskCheckInfo(&pTask->taskCheckInfo, &pTask->outputInfo, taosGetTimestampMs()); + // serialize streamProcessScanHistoryFinishRsp if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { req.reqId = tGenIdPI64(); req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId; req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; - pTask->checkReqId = req.reqId; + + streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64 " req:0x%" PRIx64, @@ -197,95 +205,36 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); + + streamTaskStartMonitorCheckRsp(pTask); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); - pTask->notReadyTasks = numOfVgs; - if (pTask->checkReqIds == NULL) { - pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t)); - } else { - taosArrayClear(pTask->checkReqIds); - } - stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey); for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); req.reqId = tGenIdPI64(); - taosArrayPush(pTask->checkReqIds, &req.reqId); req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; + + streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } - } else { + + streamTaskStartMonitorCheckRsp(pTask); + } else { // for sink task, set it ready directly. stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); + streamTaskCompleteCheck(&pTask->taskCheckInfo, pTask->id.idStr); doProcessDownstreamReadyRsp(pTask); } } -static STaskRecheckInfo* createRecheckInfo(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { - STaskRecheckInfo* pInfo = taosMemoryCalloc(1, sizeof(STaskRecheckInfo)); - if (pInfo == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - pInfo->pTask = pTask; - pInfo->req = (SStreamTaskCheckReq){ - .reqId = pRsp->reqId, - .streamId = pRsp->streamId, - .upstreamTaskId = pRsp->upstreamTaskId, - .upstreamNodeId = pRsp->upstreamNodeId, - .downstreamTaskId = pRsp->downstreamTaskId, - .downstreamNodeId = pRsp->downstreamNodeId, - .childId = pRsp->childId, - .stage = pTask->pMeta->stage, - }; - - return pInfo; -} - -static void destroyRecheckInfo(STaskRecheckInfo* pInfo) { - if (pInfo != NULL) { - taosTmrStop(pInfo->checkTimer); - pInfo->checkTimer = NULL; - taosMemoryFree(pInfo); - } -} - -static void recheckDownstreamTasks(void* param, void* tmrId) { - STaskRecheckInfo* pInfo = param; - SStreamTask* pTask = pInfo->pTask; - - SStreamTaskCheckReq* pReq = &pInfo->req; - - if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - stDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr, - pTask->info.nodeId, pReq->downstreamTaskId, pReq->downstreamNodeId, pReq->stage); - streamSendCheckMsg(pTask, pReq, pReq->downstreamNodeId, &pTask->outputInfo.fixedDispatcher.epSet); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; - - int32_t numOfVgs = taosArrayGetSize(vgInfo); - for (int32_t i = 0; i < numOfVgs; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - if (pVgInfo->taskId == pReq->downstreamTaskId) { - stDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr, - pTask->info.nodeId, pReq->downstreamTaskId, pReq->downstreamNodeId, pReq->stage); - streamSendCheckMsg(pTask, pReq, pReq->downstreamNodeId, &pVgInfo->epSet); - } - } - } - - destroyRecheckInfo(pInfo); - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref); -} - int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, int64_t* oldStage) { SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); @@ -391,9 +340,9 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) { EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST; streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL); - int64_t initTs = pTask->execInfo.init; - int64_t startTs = pTask->execInfo.start; - streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true); + int64_t checkTs = pTask->execInfo.checkTs; + int64_t readyTs = pTask->execInfo.readyTs; + streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true); if (pTask->status.taskStatus == TASK_STATUS__HALT) { ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0)); @@ -439,7 +388,12 @@ static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); - const char* id = pTask->id.idStr; + + int64_t now = taosGetTimestampMs(); + const char* id = pTask->id.idStr; + STaskCheckInfo* pInfo = &pTask->taskCheckInfo; + int32_t total = streamTaskGetNumOfDownstream(pTask); + int32_t left = -1; if (streamTaskShouldStop(pTask)) { stDebug("s-task:%s should stop, do not do check downstream again", id); @@ -447,47 +401,21 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs } if (pRsp->status == TASK_DOWNSTREAM_READY) { - if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - bool found = false; - int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds); - for (int32_t i = 0; i < numOfReqs; i++) { - int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i); - if (reqId == pRsp->reqId) { - found = true; - break; - } - } + streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id); - if (!found) { - return -1; - } - - int32_t left = atomic_sub_fetch_32(&pTask->notReadyTasks, 1); - ASSERT(left >= 0); - - if (left == 0) { - pTask->checkReqIds = taosArrayDestroy(pTask->checkReqIds);; - - doProcessDownstreamReadyRsp(pTask); - } else { - int32_t total = taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos); - stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, - pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); - } + if (left == 0) { + doProcessDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag + streamTaskCompleteCheck(pInfo, id); } else { - ASSERT(pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH); - if (pRsp->reqId != pTask->checkReqId) { - return -1; - } - - doProcessDownstreamReadyRsp(pTask); + stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, + pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); } } else { // not ready, wait for 100ms and retry + streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id); if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) { stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64 - ", current stage:%" PRId64 - ", not check wait for downstream task nodeUpdate, and all tasks restart", + ", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart", id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage); addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId); } else { @@ -498,8 +426,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); } - int32_t startTs = pTask->execInfo.init; - int64_t now = taosGetTimestampMs(); + int32_t startTs = pTask->execInfo.checkTs; streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false); // automatically set the related fill-history task to be failed. @@ -507,13 +434,11 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs STaskId* pId = &pTask->hTaskInfo.id; streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false); } - } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms - STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%" PRId64 ", retry in 100ms, ref:%d ", id, - pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref); - pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamTimer); + } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms + ASSERT(left > 0); + stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, + pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); } } @@ -603,13 +528,13 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) SDataRange* pRange = &pHTask->dataRange; // the query version range should be limited to the already processed data - pHTask->execInfo.init = taosGetTimestampMs(); + pHTask->execInfo.checkTs = taosGetTimestampMs(); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { stDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64 " verRange:%" PRId64 " - %" PRId64 ", init:%" PRId64, pTask->id.idStr, pHTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, - pRange->range.maxVer, pHTask->execInfo.init); + pRange->range.maxVer, pHTask->execInfo.checkTs); } else { stDebug("s-task:%s no fill-history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr); } @@ -767,8 +692,7 @@ static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pMeta, pTask->id.streamId, pTask->id.taskId, hStreamId, hTaskId); if (pInfo == NULL) { stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", idStr); - - streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); return terrno; } @@ -785,7 +709,7 @@ static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", idStr, ref); taosMemoryFree(pInfo); - streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); return terrno; } @@ -816,7 +740,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId, pStatus->name); - streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); return -1; // todo set the correct error code } @@ -831,11 +755,11 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId); if (pHisTask == NULL) { stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr); - streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); } else { if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr); - streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, true); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true); } else { // exist, but not ready, continue check downstream task status checkFillhistoryTaskStatus(pTask, pHisTask); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 7badbfa9f3..70c2619f6f 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -21,7 +21,11 @@ #include "ttimer.h" #include "wal.h" +#define CHECK_NOT_RSP_DURATION 10*1000 // 10 sec + static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo); +static void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); +static void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { int32_t childId = taosArrayGetSize(pArray); @@ -113,6 +117,9 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; + pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo)); + taosThreadMutexInit(&pTask->taskCheckInfo.checkInfoLock, NULL); + if (fillHistory) { ASSERT(hasFillhistory); } @@ -365,8 +372,9 @@ void tFreeStreamTask(SStreamTask* pTask) { stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64 ", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64 " nextProcessVer:%" PRId64 ", checkpointCount:%d", - taskId, pStatis->created, pStatis->init, pStatis->start, pStatis->updateCount, pStatis->latestUpdateTs, - pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, pStatis->checkpoint); + taskId, pStatis->created, pStatis->checkTs, pStatis->readyTs, pStatis->updateCount, + pStatis->latestUpdateTs, pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, + pStatis->checkpoint); // remove the ref by timer while (pTask->status.timerActive > 0) { @@ -423,9 +431,10 @@ void tFreeStreamTask(SStreamTask* pTask) { tSimpleHashCleanup(pTask->outputInfo.tbSink.pTblInfo); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos); - pTask->checkReqIds = taosArrayDestroy(pTask->checkReqIds); } + streamTaskCleanCheckInfo(&pTask->taskCheckInfo); + if (pTask->pState) { stDebug("s-task:0x%x start to free task state", taskId); streamStateClose(pTask->pState, status1 == TASK_STATUS__DROPPING); @@ -932,3 +941,321 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) { tmsgSendReq(&pTask->info.mnodeEpset, &msg); return 0; } + +int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) { + if (pInfo->pList == NULL) { + pInfo->pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo)); + } else { + taosArrayClear(pInfo->pList); + } + + taosThreadMutexLock(&pInfo->checkInfoLock); + + if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { + pInfo->notReadyTasks = 1; + } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos); + ASSERT(pInfo->notReadyTasks == pOutputInfo->shuffleDispatcher.dbInfo.vgNum); + } + + pInfo->startTs = startTs; + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_SUCCESS; +} + +int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) { + SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0}; + + taosThreadMutexLock(&pInfo->checkInfoLock); + + for(int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) { + SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i); + if (p->taskId == taskId) { + stDebug("s-task:%s check info to task:0x%x already sent", id, taskId); + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_SUCCESS; + } + } + + taosArrayPush(pInfo->pList, &info); + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_SUCCESS; +} + +int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId, + int32_t* pNotReady, const char* id) { + taosThreadMutexLock(&pInfo->checkInfoLock); + + for(int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) { + SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i); + if (p->taskId == taskId) { + ASSERT(reqId == p->reqId); + p->status = status; + p->rspTs = rspTs; + + // count down one, since it is ready now + if (p->status == TASK_DOWNSTREAM_READY) { + *pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1); + } else { + *pNotReady = pInfo->notReadyTasks; + } + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_SUCCESS; + } + } + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + stError("s-task:%s unexpected check rsp msg, downstream task:0x%x, reqId:%"PRIx64, id, taskId, reqId); + return TSDB_CODE_FAILED; +} + +int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { + taosThreadMutexLock(&pInfo->checkInfoLock); + if (pInfo->inCheckProcess == 0) { + pInfo->inCheckProcess = 1; + } else { + ASSERT(pInfo->startTs > 0); + stError("s-task:%s already in check procedure, checkTs:%"PRId64, id, pInfo->startTs); + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_FAILED; + } + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + stDebug("s-task:%s set the in check procedure flag", id); + return 0; +} + +int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id) { + taosThreadMutexLock(&pInfo->checkInfoLock); + if (!pInfo->inCheckProcess) { + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_SUCCESS; + } + + int64_t el = taosGetTimestampMs() - pInfo->startTs; + stDebug("s-task:%s check downstream completed, elapsed time:%" PRId64 " ms", id, el); + + pInfo->startTs = 0; + pInfo->inCheckProcess = 0; + pInfo->notReadyTasks = 0; + taosArrayClear(pInfo->pList); + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return 0; +} + +static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { + SStreamTaskCheckReq req = { + .streamId = pTask->id.streamId, + .upstreamTaskId = pTask->id.taskId, + .upstreamNodeId = pTask->info.nodeId, + .childId = pTask->info.selfChildId, + .stage = pTask->pMeta->stage, + }; + + STaskOutputInfo* pOutputInfo = &pTask->outputInfo; + if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { + req.reqId = p->reqId; + req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId; + req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId; + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) req:0x%" PRIx64, + pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId); + + streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet); + } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + SArray* vgInfo = pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t numOfVgs = taosArrayGetSize(vgInfo); + + for (int32_t i = 0; i < numOfVgs; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + + if (p->taskId == pVgInfo->taskId) { + req.reqId = p->reqId; + req.downstreamNodeId = pVgInfo->vgId; + req.downstreamTaskId = pVgInfo->taskId; + + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", + pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i); + streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); + break; + } + } + } else { + ASSERT(0); + } +} + +static void rspMonitorFn(void* param, void* tmrId) { + SStreamTask* pTask = param; + SStreamTaskState* pStat = streamTaskGetStatus(pTask); + STaskCheckInfo* pInfo = &pTask->taskCheckInfo; + int32_t vgId = pTask->pMeta->vgId; + int64_t now = taosGetTimestampMs(); + int64_t el = now - pInfo->startTs; + ETaskStatus state = pStat->state; + + int32_t numOfReady = 0; + int32_t numOfFault = 0; + + stDebug("s-task:%s start to do check downstream rsp check", pTask->id.idStr); + + if (state == TASK_STATUS__STOP || state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s status:%s vgId:%d quit from monitor rsp tmr, ref:%d", pTask->id.idStr, pStat->name, vgId, ref); + streamTaskCompleteCheck(pInfo, pTask->id.idStr); + return; + } + + taosThreadMutexLock(&pInfo->checkInfoLock); + if (pInfo->notReadyTasks == 0) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", pTask->id.idStr, + pStat->name, vgId, ref); + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + streamTaskCompleteCheck(pInfo, pTask->id.idStr); + return; + } + + SArray* pNotReadyList = taosArrayInit(4, sizeof(int64_t)); + SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t)); + + if (pStat->state == TASK_STATUS__UNINIT) { + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) { + SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i); + if (p->status == TASK_DOWNSTREAM_READY) { + numOfReady += 1; + } else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) { + stDebug("s-task:%s recv status from downstream, task:0x%x, quit from check downstream tasks", pTask->id.idStr, + p->taskId); + numOfFault += 1; + } else { // TASK_DOWNSTREAM_NOT_READY + if (p->rspTs == 0) { // not response yet + ASSERT(p->status == -1); + if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec. + taosArrayPush(pTimeoutList, &p->taskId); + } else { // el < CHECK_NOT_RSP_DURATION + // do nothing and continue waiting for their rsps + } + } else { + taosArrayPush(pNotReadyList, &p->taskId); + } + } + } + } else { // unexpected status + stError("s-task:%s unexpected task status:%s during waiting for check rsp", pTask->id.idStr, pStat->name); + } + + int32_t numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList); + int32_t numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); + + // fault tasks detected, not try anymore + if (((numOfReady + numOfFault + numOfNotReady + numOfTimeout) == taosArrayGetSize(pInfo->pList)) && (numOfFault > 0)) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug( + "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " + "detected, ref:%d", + pTask->id.idStr, pStat->name, vgId, ref); + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + taosArrayDestroy(pNotReadyList); + taosArrayDestroy(pTimeoutList); + + streamTaskCompleteCheck(pInfo, pTask->id.idStr); + return; + } + + // checking of downstream tasks has been stopped by other threads + if (pInfo->inCheckProcess == 0) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug( + "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notReady:%d, fault:%d, " + "timeout:%d, ready:%d ref:%d", + pTask->id.idStr, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + taosThreadMutexUnlock(&pInfo->checkInfoLock); + + // add the not-ready tasks into the final task status result buf, along with related fill-history task if exists. + streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; + streamMetaAddTaskLaunchResult(pTask->pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId, pInfo->startTs, now, false); + } + return; + } + + if (numOfNotReady > 0) { // check to make sure not in recheck timer + ASSERT(pTask->status.downstreamReady == 0); + + // reset the info, and send the check msg to failure downstream again + for (int32_t i = 0; i < numOfNotReady; ++i) { + int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i); + + for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) { + SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j); + if (p->taskId == taskId) { + p->rspTs = 0; + p->status = -1; + doSendCheckMsg(pTask, p); + } + } + } + + stDebug("s-task:%s %d downstream task(s) not ready, send check msg again", pTask->id.idStr, numOfNotReady); + } + + if (numOfTimeout > 0) { + pInfo->startTs = now; + ASSERT(pTask->status.downstreamReady == 0); + + for (int32_t i = 0; i < numOfTimeout; ++i) { + int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); + + for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) { + SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j); + if (p->taskId == taskId) { + ASSERT(p->status == -1 && p->rspTs == 0); + doSendCheckMsg(pTask, p); + break; + } + } + } + + stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, pTask->id.idStr, + numOfTimeout, now); + } + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + + taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); + stDebug("s-task:%s continue checking rsp in 200ms, notReady:%d, fault:%d, timeout:%d, ready:%d", pTask->id.idStr, + numOfNotReady, numOfFault, numOfTimeout, numOfReady); + + taosArrayDestroy(pNotReadyList); + taosArrayDestroy(pTimeoutList); +} + +int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { + ASSERT(pTask->taskCheckInfo.checkRspTmr == NULL); + + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref); + pTask->taskCheckInfo.checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer); + return 0; +} + +void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo) { + ASSERT(pInfo->inCheckProcess == 0); + + pInfo->pList = taosArrayDestroy(pInfo->pList); + if (pInfo->checkRspTmr != NULL) { + /*bool ret = */ taosTmrStop(pInfo->checkRspTmr); + pInfo->checkRspTmr = NULL; + } + + taosThreadMutexDestroy(&pInfo->checkInfoLock); +} \ No newline at end of file diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index cfa94209f6..c16598f84c 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -80,10 +80,10 @@ static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEv } int32_t streamTaskInitStatus(SStreamTask* pTask) { - pTask->execInfo.init = taosGetTimestampMs(); - + pTask->execInfo.checkTs = taosGetTimestampMs(); stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr, - pTask->execInfo.init); + pTask->execInfo.checkTs); + streamTaskCheckDownstream(pTask); return 0; }