diff --git a/include/common/streamMsg.h b/include/common/streamMsg.h index d410bd17e0..5696b592da 100644 --- a/include/common/streamMsg.h +++ b/include/common/streamMsg.h @@ -241,6 +241,7 @@ typedef struct SRestoreCheckpointInfo { int32_t transId; // transaction id of the update the consensus-checkpointId transaction int32_t taskId; int32_t nodeId; + int32_t term; } SRestoreCheckpointInfo; int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1b6202f09b..322f3d5e9c 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -465,6 +465,17 @@ struct SStreamTask { typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*); +typedef enum { + START_MARK_REQ_CHKPID = 0x1, + START_WAIT_FOR_CHKPTID = 0x2, + START_CHECK_DOWNSTREAM = 0x3, +} EStartStage; + +typedef struct { + EStartStage stage; + int64_t ts; +} SStartTaskStageInfo; + typedef struct STaskStartInfo { int64_t startTs; int64_t readyTs; @@ -474,6 +485,8 @@ typedef struct STaskStartInfo { SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed int64_t elapsedTime; int32_t restartCount; // restart task counter + EStartStage curStage; // task start stage + SArray* pStagesList; // history stage list with timestamp, SArrya startComplete_fn_t completeFn; // complete callback function } STaskStartInfo; @@ -706,7 +719,7 @@ void streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id); void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); // fill-history task -int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); +int32_t streamLaunchFillHistoryTask(SStreamTask* pTask, bool lock); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); @@ -777,12 +790,14 @@ void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaStartHb(SStreamMeta* pMeta); int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, int64_t endTs, bool ready); +int32_t streamMetaAddTaskLaunchResultNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, + int64_t startTs, int64_t endTs, bool ready); int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo); void streamMetaClearStartInfo(STaskStartInfo* pStartInfo); int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta); -int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); -void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs); +int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, bool lock); +void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs, bool lock); void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId, int64_t startTs); void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta); diff --git a/source/common/src/msg/streamMsg.c b/source/common/src/msg/streamMsg.c index 54b17b14d1..76f1a02062 100644 --- a/source/common/src/msg/streamMsg.c +++ b/source/common/src/msg/streamMsg.c @@ -794,6 +794,7 @@ _exit: return code; } +// todo: serialized term attributes. int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) { int32_t code = 0; int32_t lino; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index bd18c9ceb9..0e7d56c001 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2896,31 +2896,18 @@ _end: } // Construct the child table name in the form of __ and store it in `ctbName`. -// If the name length exceeds TSDB_TABLE_NAME_LEN, first convert _ to an MD5 value and then -// concatenate. If the length is still too long, convert to an MD5 value as well. int32_t buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId, size_t cap) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; char tmp[TSDB_TABLE_NAME_LEN] = {0}; - char* suffix = tmp; - size_t suffixCap = sizeof(tmp); - size_t suffixLen = 0; - size_t prefixLen = 0; - T_MD5_CTX context; if (ctbName == NULL || cap < TSDB_TABLE_NAME_LEN) { code = TSDB_CODE_INTERNAL_ERROR; TSDB_CHECK_CODE(code, lino, _end); } - prefixLen = strlen(ctbName); - if (stbName == NULL) { - suffixLen = snprintf(suffix, suffixCap, "%" PRIu64, groupId); - if (suffixLen >= suffixCap) { - code = TSDB_CODE_INTERNAL_ERROR; - TSDB_CHECK_CODE(code, lino, _end); - } + snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId); } else { int32_t i = strlen(stbName) - 1; for (; i >= 0; i--) { @@ -2928,52 +2915,12 @@ int32_t buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t grou break; } } - suffixLen = snprintf(suffix, suffixCap, "%s_%" PRIu64, stbName + i + 1, groupId); - if (suffixLen >= suffixCap) { - suffixCap = suffixLen + 1; - suffix = taosMemoryMalloc(suffixCap); - TSDB_CHECK_NULL(suffix, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); - suffixLen = snprintf(suffix, suffixCap, "%s_%" PRIu64, stbName + i + 1, groupId); - if (suffixLen >= suffixCap) { - code = TSDB_CODE_INTERNAL_ERROR; - TSDB_CHECK_CODE(code, lino, _end); - } - } + snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%" PRIu64, stbName + i + 1, groupId); } - if (prefixLen + suffixLen + 1 >= TSDB_TABLE_NAME_LEN) { - // If the name length exceeeds the limit, convert the suffix to MD5 value. - tMD5Init(&context); - tMD5Update(&context, (uint8_t*)suffix, suffixLen); - tMD5Final(&context); - suffixLen = snprintf(suffix, suffixCap, "%016" PRIx64 "%016" PRIx64, *(uint64_t*)context.digest, - *(uint64_t*)(context.digest + 8)); - if (suffixLen >= suffixCap) { - code = TSDB_CODE_INTERNAL_ERROR; - TSDB_CHECK_CODE(code, lino, _end); - } - } - - if (prefixLen + suffixLen + 1 >= TSDB_TABLE_NAME_LEN) { - // If the name is still too long, convert the ctbName to MD5 value. - tMD5Init(&context); - tMD5Update(&context, (uint8_t*)ctbName, prefixLen); - tMD5Final(&context); - prefixLen = snprintf(ctbName, cap, "t_%016" PRIx64 "%016" PRIx64, *(uint64_t*)context.digest, - *(uint64_t*)(context.digest + 8)); - if (prefixLen >= cap) { - code = TSDB_CODE_INTERNAL_ERROR; - TSDB_CHECK_CODE(code, lino, _end); - } - } - - if (prefixLen + suffixLen + 1 >= TSDB_TABLE_NAME_LEN) { - code = TSDB_CODE_INTERNAL_ERROR; - TSDB_CHECK_CODE(code, lino, _end); - } - - ctbName[prefixLen] = '_'; - tstrncpy(&ctbName[prefixLen + 1], suffix, cap - prefixLen - 1); + ctbName[cap - strlen(tmp) - 1] = 0; // put stbname + groupId to the end + size_t prefixLen = strlen(ctbName); + ctbName = strncat(ctbName, tmp, cap - prefixLen - 1); for (char* p = ctbName; *p; ++p) { if (*p == '.') *p = '_'; @@ -2983,9 +2930,6 @@ _end: if (code != TSDB_CODE_SUCCESS) { uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } - if (suffix != tmp) { - taosMemoryFree(suffix); - } return code; } diff --git a/source/common/test/dataformatTest.cpp b/source/common/test/dataformatTest.cpp index 10c1077697..7c5404f1fa 100644 --- a/source/common/test/dataformatTest.cpp +++ b/source/common/test/dataformatTest.cpp @@ -531,7 +531,6 @@ TEST(testCase, StreamWithoutDotInStbName2) { TEST(testCase, StreamWithLongStbName) { char ctbName[TSDB_TABLE_NAME_LEN]; - char expectName[TSDB_TABLE_NAME_LEN]; char *stbName = "a_simle_stb_name"; uint64_t groupId = UINT64_MAX; @@ -550,29 +549,13 @@ TEST(testCase, StreamWithLongStbName) { EXPECT_EQ(buildCtbNameAddGroupId(stbName, NULL, groupId, sizeof(ctbName)), TSDB_CODE_INTERNAL_ERROR); EXPECT_EQ(buildCtbNameAddGroupId(stbName, ctbName, groupId, sizeof(ctbName) - 1), TSDB_CODE_INTERNAL_ERROR); - // test md5 conversion of stbName with groupid + // test truncation of long ctbName for (int32_t i = 0; i < 159; ++i) ctbName[i] = 'A'; ctbName[159] = '\0'; stbName = taosStrdup(ctbName); - snprintf(expectName, TSDB_TABLE_NAME_LEN, "%s_d85f0d87946d76eeedd7b7b78b7492a2", ctbName); + std::string expectName = std::string(ctbName) + "_" + std::string(stbName) + "_" + std::to_string(groupId); EXPECT_EQ(buildCtbNameAddGroupId(stbName, ctbName, groupId, sizeof(ctbName)), TSDB_CODE_SUCCESS); - EXPECT_STREQ(ctbName, expectName); - - // test md5 conversion of all parts - for (int32_t i = 0; i < 190; ++i) ctbName[i] = 'A'; - ctbName[190] = '\0'; - tstrncpy(expectName, "t_d38a8b2df999bef0082ffc80a59a9cd7_d85f0d87946d76eeedd7b7b78b7492a2", TSDB_TABLE_NAME_LEN); - EXPECT_EQ(buildCtbNameAddGroupId(stbName, ctbName, groupId, sizeof(ctbName)), TSDB_CODE_SUCCESS); - EXPECT_STREQ(ctbName, expectName); - - // test larger stbName - taosMemoryFree(stbName); - for (int32_t i = 0; i < 190; ++i) ctbName[i] = 'A'; - ctbName[190] = '\0'; - stbName = taosStrdup(ctbName); - tstrncpy(expectName, "t_d38a8b2df999bef0082ffc80a59a9cd7_9c99cc7c52073b63fb750af402d9b84b", TSDB_TABLE_NAME_LEN); - EXPECT_EQ(buildCtbNameAddGroupId(stbName, ctbName, groupId, sizeof(ctbName)), TSDB_CODE_SUCCESS); - EXPECT_STREQ(ctbName, expectName); + EXPECT_STREQ(ctbName, expectName.c_str() + expectName.size() - TSDB_TABLE_NAME_LEN + 1); taosMemoryFree(stbName); } diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index d694dc67eb..45006638f8 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -122,7 +122,7 @@ int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId); int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams); int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream); -int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList); +int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList, SHashObj* pTermMap); void mndDestroyVgroupChangeInfo(SVgroupChangeInfo *pInfo); void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, @@ -147,14 +147,13 @@ int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *p int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t chkptId); int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId); -int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId, int64_t ts); +int32_t mndStreamSetChkptIdAction(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t checkpointId, SArray *pList); int32_t mndStreamSetRestartAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId, int8_t mndTrigger); int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList); int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq); -int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId, - int64_t ts); +int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId, SArray* pList); void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList, SVgroupChangeInfo *pInfo); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 09314c9e63..c59a647b86 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -452,19 +452,18 @@ static void *mndThreadFp(void *param) { while (1) { lastTime++; taosMsleep(100); + if (mndGetStop(pMnode)) break; if (lastTime % 10 != 0) continue; + if (mnodeIsNotLeader(pMnode)) { + mTrace("timer not process since mnode is not leader"); + continue; + } + int64_t sec = lastTime / 10; mndDoTimerCheckTask(pMnode, sec); - int64_t minCron = minCronTime(); - if (sec % minCron == 0 && mnodeIsNotLeader(pMnode)) { - // not leader, do nothing - mTrace("timer not process since mnode is not leader, reason: %s", tstrerror(terrno)); - terrno = 0; - continue; - } mndDoTimerPullupTask(pMnode, sec); } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 009cd1a108..81175f4c9d 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -13,13 +13,13 @@ * along with this program. If not, see . */ +#include "mndStream.h" #include "audit.h" #include "mndDb.h" #include "mndPrivilege.h" #include "mndScheduler.h" #include "mndShow.h" #include "mndStb.h" -#include "mndStream.h" #include "mndTrans.h" #include "osMemory.h" #include "parser.h" @@ -1228,18 +1228,16 @@ static int32_t streamWaitComparFn(const void *p1, const void *p2) { } // all tasks of this stream should be ready, otherwise do nothing -static bool isStreamReadyHelp(int64_t now, SStreamObj* pStream) { +static bool isStreamReadyHelp(int64_t now, SStreamObj *pStream) { bool ready = false; streamMutexLock(&execInfo.lock); int64_t lastReadyTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid); if ((lastReadyTs == -1) || ((lastReadyTs != -1) && ((now - lastReadyTs) < tsStreamCheckpointInterval * 1000))) { - if (lastReadyTs != -1) { - mInfo("not start checkpoint, stream:0x%" PRIx64 " last ready ts:%" PRId64 " ready duration:%" PRId64 - "ms less than threshold", - pStream->uid, lastReadyTs, (now - lastReadyTs)); + mInfo("not start checkpoint, stream:0x%" PRIx64 " readyTs:%" PRId64 " ready duration:%.2fs less than threshold", + pStream->uid, lastReadyTs, (now - lastReadyTs) / 1000.0); } ready = false; @@ -1901,11 +1899,12 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } -static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes, STrans** pUpdateTrans) { - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; - STrans *pTrans = NULL; - int32_t code = 0; +static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes, + STrans **pUpdateTrans, SArray* pStreamList) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + STrans *pTrans = NULL; + int32_t code = 0; *pUpdateTrans = NULL; // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool @@ -1974,6 +1973,10 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange } code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY); + if (code == 0) { + taosArrayPush(pStreamList, &pStream->uid); + } + sdbRelease(pSdb, pStream); if (code != TSDB_CODE_SUCCESS) { @@ -2152,7 +2155,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { return 0; } - code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot); + code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot, NULL); if (code) { mError("failed to take the vgroup snapshot, ignore it and continue"); } @@ -2176,10 +2179,27 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { mDebug("vnode(s) change detected, build trans to update stream task epsets"); STrans *pTrans = NULL; + SArray* pStreamIdList = taosArrayInit(4, sizeof(int64_t)); streamMutexLock(&execInfo.lock); - code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups, &pTrans); + code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups, &pTrans, pStreamIdList); + + // remove the consensus-checkpoint-id req of all related stream(s) + int32_t num = taosArrayGetSize(pStreamIdList); + if (num > 0) { + mDebug("start to clear %d related stream in consensus-checkpoint-id list due to nodeUpdate", num); + for (int32_t x = 0; x < num; ++x) { + int64_t uid = *(int64_t *)taosArrayGet(pStreamIdList, x); + int32_t ret = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, uid); + if (ret != 0) { + mError("failed to remove stream:0x%" PRIx64 " from consensus-checkpoint-id list, code:%s", uid, + tstrerror(ret)); + } + } + } + streamMutexUnlock(&execInfo.lock); + taosArrayDestroy(pStreamIdList); // NOTE: sync trans out of lock if (code == 0 && pTrans != NULL) { @@ -2385,8 +2405,9 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { if (pStream != NULL) { // TODO:handle error code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); - if (code) { - mError("failed to create checkpoint trans, code:%s", tstrerror(code)); + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + mError("stream:0x%" PRIx64 " failed to create checkpoint trans, checkpointId:%" PRId64 ", code:%s", + req.streamId, checkpointId, tstrerror(code)); } } else { // todo: wait for the create stream trans completed, and launch the checkpoint trans @@ -2394,11 +2415,15 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { // sleep(500ms) } - // remove this entry - (void) taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t)); + // remove this entry, not overwriting the global error code + int32_t ret = taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t)); + if (ret) { + mError("failed to remove transfer state stream, code:%s", tstrerror(ret)); + } int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams); - mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams); + mDebug("stream:0x%" PRIx64 " removed in transfer-state list, %d stream(s) not finish fill-history process", + req.streamId, numOfStreams); } if (pStream != NULL) { @@ -2475,7 +2500,7 @@ static void doAddReportStreamTask(SArray *pList, int64_t reportedChkptId, const pReport->taskId, p->checkpointId, pReport->checkpointId); } else if (p->checkpointId < pReport->checkpointId) { // expired checkpoint-report msg, update it mInfo("s-task:0x%x expired checkpoint-report info in checkpoint-report list update from %" PRId64 "->%" PRId64, - pReport->taskId, p->checkpointId, pReport->checkpointId); + pReport->taskId, p->checkpointId, pReport->checkpointId); // update the checkpoint report info p->checkpointId = pReport->checkpointId; @@ -2612,6 +2637,8 @@ static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t *pEx if (chkId > pe->checkpointInfo.latestId) { if (chkId != INT64_MAX) { *pAllSame = false; + mDebug("checkpointIds not identical, prev:%" PRId64 " smaller:%" PRId64 " from task:0x%" PRIx64, chkId, + pe->checkpointInfo.latestId, pe->id.taskId); } chkId = pe->checkpointInfo.latestId; } @@ -2637,7 +2664,7 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, } } -static int32_t doCleanReqList(SArray* pList, SCheckpointConsensusInfo* pInfo) { +static int32_t doCleanReqList(SArray *pList, SCheckpointConsensusInfo *pInfo) { int32_t alreadySend = taosArrayGetSize(pList); for (int32_t i = 0; i < alreadySend; ++i) { @@ -2663,7 +2690,6 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { int64_t now = taosGetTimestampMs(); bool allReady = true; SArray *pNodeSnapshot = NULL; - int32_t maxAllowedTrans = 20; int32_t numOfTrans = 0; int32_t code = 0; void *pIter = NULL; @@ -2679,9 +2705,16 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { return terrno; } + SHashObj* pTermMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + if (pTermMap == NULL) { + taosArrayDestroy(pList); + taosArrayDestroy(pStreamList); + return terrno; + } + mDebug("start to process consensus-checkpointId in tmr"); - code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot); + code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot, pTermMap); taosArrayDestroy(pNodeSnapshot); if (code) { mError("failed to get the vgroup snapshot, ignore it and continue"); @@ -2691,6 +2724,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process"); taosArrayDestroy(pStreamList); taosArrayDestroy(pList); + taosHashCleanup(pTermMap); return 0; } @@ -2717,31 +2751,62 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { continue; } + if (pStream->uid != pInfo->streamId) { + // todo remove it + } + + if ((num < pInfo->numOfTasks) || (pInfo->numOfTasks == 0)) { + mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-consensus req(not all), ignore", pStream->uid, + pStream->name, num, pInfo->numOfTasks); + mndReleaseStream(pMnode, pStream); + continue; + } + + streamId = pStream->uid; + + int32_t existed = 0; + bool allSame = true; + int64_t chkId = getConsensusId(pInfo->streamId, pInfo->numOfTasks, &existed, &allSame); + if (chkId == -1) { + mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again", existed, pInfo->numOfTasks); + mndReleaseStream(pMnode, pStream); + continue; + } + + bool allQualified = true; for (int32_t j = 0; j < num; ++j) { SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j); if (pe == NULL) { continue; } - if (streamId == -1) { - streamId = pe->req.streamId; - } - - int32_t existed = 0; - bool allSame = true; - int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed, &allSame); - if (chkId == -1) { - mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed, - pInfo->numOfTasks, pe->req.taskId); - break; + if (pe->req.nodeId != -2) { + int32_t *pTerm = taosHashGet(pTermMap, &(pe->req.nodeId), sizeof(pe->req.nodeId)); + if (pTerm == NULL) { + mError("stream:0x%" PRIx64 " s-task:0x%x req from vgId:%d not found in termMap", pe->req.streamId, + pe->req.taskId, pe->req.nodeId); + allQualified = false; + continue; + } else { + if (*pTerm != pe->req.term) { + mWarn("stream:0x%" PRIx64 " s-task:0x%x req from vgId:%d is expired, term:%d, current term:%d", + pe->req.streamId, pe->req.taskId, pe->req.nodeId, pe->req.term, *pTerm); + allQualified = false; + continue; + } + } } if (((now - pe->ts) >= 10 * 1000) || allSame) { - mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId, - pe->req.startTs, (now - pe->ts) / 1000.0); + mDebug("s-task:0x%x vgId:%d term:%d sendTs:%" PRId64 " wait %.2fs or all tasks have same checkpointId:%" PRId64, pe->req.taskId, + pe->req.nodeId, pe->req.term, pe->req.startTs, (now - pe->ts) / 1000.0, chkId); if (chkId > pe->req.checkpointId) { streamMutexUnlock(&execInfo.lock); + taosArrayDestroy(pStreamList); + taosArrayDestroy(pList); + taosHashCleanup(pTermMap); + mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId, pe->req.checkpointId, chkId); @@ -2750,42 +2815,38 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { return TSDB_CODE_FAILED; } - // todo: check for redundant consensus-checkpoint trans, if this kinds of trans repeatly failed. - code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs); - if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid); - } - - void *p = taosArrayPush(pList, &pe->req.taskId); - if (p == NULL) { - mError("failed to put into task list, taskId:0x%x", pe->req.taskId); - } } else { mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId, pe->req.startTs, (now - pe->ts) / 1000.0); + allQualified = false; + } + } + + if (allQualified) { + code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_CONSEN_NAME, false); + + if (code == 0) { + code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, chkId, pInfo->pTaskList); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { + mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid); + } else { + numOfTrans += 1; + mndClearConsensusRspEntry(pInfo); + void *p = taosArrayPush(pStreamList, &streamId); + if (p == NULL) { + mError("failed to put into stream list, stream:0x%" PRIx64 " not remove it in consensus-chkpt list", + streamId); + } + } + } else { + mDebug("stream:0x%" PRIx64 "not create chktp-consensus, due to trans conflict", pStream->uid); } } mndReleaseStream(pMnode, pStream); - int32_t alreadySend = doCleanReqList(pList, pInfo); - - // clear request stream item with empty task list - if (taosArrayGetSize(pInfo->pTaskList) == 0) { - mndClearConsensusRspEntry(pInfo); - if (streamId == -1) { - mError("streamId is -1, streamId:%" PRIx64" in consensus-checkpointId hashMap, cont", pInfo->streamId); - } - - void *p = taosArrayPush(pStreamList, &streamId); - if (p == NULL) { - mError("failed to put into stream list, stream:0x%" PRIx64 " not remove it in consensus-chkpt list", streamId); - } - } - - numOfTrans += alreadySend; - if (numOfTrans > maxAllowedTrans) { - mInfo("already send consensus-checkpointId trans:%d, try next time", alreadySend); + // create one transaction each time + if (numOfTrans > 0) { taosHashCancelIterate(execInfo.pStreamConsensus, pIter); break; } @@ -2804,6 +2865,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { taosArrayDestroy(pStreamList); taosArrayDestroy(pList); + taosHashCleanup(pTermMap); mDebug("end to process consensus-checkpointId in tmr, send consensus-checkpoint trans:%d", numOfTrans); return code; diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 1923587711..95f52ae5f3 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -427,6 +427,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { .taskId = p->id.taskId, .checkpointId = p->checkpointInfo.latestId, .startTs = pChkInfo->consensusTs, + .nodeId = p->nodeId, + .term = p->stage, }; SStreamObj *pStream = NULL; @@ -486,7 +488,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (pMnode != NULL) { SArray *p = NULL; - code = mndTakeVgroupSnapshot(pMnode, &allReady, &p); + code = mndTakeVgroupSnapshot(pMnode, &allReady, &p, NULL); taosArrayDestroy(p); if (code) { mError("failed to get the vgroup snapshot, ignore it and continue"); diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index f4f7c65a00..1162796554 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -16,7 +16,7 @@ #include "mndStream.h" #include "mndTrans.h" -#define MAX_CHKPT_EXEC_ELAPSED (600*1000) // 600s +#define MAX_CHKPT_EXEC_ELAPSED (600*1000*3) // 600s typedef struct SKeyInfo { void *pKey; @@ -137,6 +137,7 @@ static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, cons } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) || + (strcmp(tInfo.name, MND_STREAM_CHKPT_CONSEN_NAME) == 0) || strcmp(tInfo.name, MND_STREAM_RESTART_NAME) == 0) { mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId, tInfo.name); @@ -152,7 +153,7 @@ static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, cons // * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream. // For a given stream: // 1. checkpoint trans is conflict with any other trans except for the drop and reset trans. -// 2. create/drop/reset/update trans are conflict with any other trans. +// 2. create/drop/reset/update/chkpt-consensus trans are conflict with any other trans. int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) { if (lock) { streamMutexLock(&execInfo.lock); diff --git a/source/dnode/mnode/impl/src/mndStreamTransAct.c b/source/dnode/mnode/impl/src/mndStreamTransAct.c index 5ccb626609..b0c2ab6c41 100644 --- a/source/dnode/mnode/impl/src/mndStreamTransAct.c +++ b/source/dnode/mnode/impl/src/mndStreamTransAct.c @@ -113,8 +113,6 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT taosMemoryFree(pReq); return code; } - - mDebug("set the resume action for trans:%d", pTrans->id); return code; } @@ -438,6 +436,8 @@ int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pSt return code; } + mDebug("transId:%d start to create resume actions", pTrans->id); + while (streamTaskIterNextTask(pIter)) { SStreamTask *pTask = NULL; code = streamTaskIterGetCurrent(pIter, &pTask); @@ -578,7 +578,7 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj * return 0; } -int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts) { +int32_t doSetCheckpointIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts) { SRestoreCheckpointInfo req = { .taskId = pTask->id.taskId, .streamId = pTask->id.streamId, @@ -624,7 +624,7 @@ int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* p return code; } - code = setTransAction(pTrans, pBuf, tlen, TDMT_STREAM_CONSEN_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); + code = setTransAction(pTrans, pBuf, tlen, TDMT_STREAM_CONSEN_CHKPT, &epset, TSDB_CODE_STREAM_TASK_IVLD_STATUS, TSDB_CODE_VND_INVALID_VGROUP_ID); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pBuf); } @@ -632,6 +632,50 @@ int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* p return code; } +int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t checkpointId, + SArray *pList) { + SStreamTaskIter *pIter = NULL; + int32_t num = taosArrayGetSize(pList); + + taosWLockLatch(&pStream->lock); + int32_t code = createStreamTaskIter(pStream, &pIter); + if (code) { + taosWUnLockLatch(&pStream->lock); + mError("failed to create stream task iter:%s", pStream->name); + return code; + } + + while (streamTaskIterNextTask(pIter)) { + SStreamTask *pTask = NULL; + code = streamTaskIterGetCurrent(pIter, &pTask); + if (code) { + destroyStreamTaskIter(pIter); + taosWUnLockLatch(&pStream->lock); + return code; + } + + // find the required entry + int64_t startTs = 0; + for(int32_t i = 0; i < num; ++i) { + SCheckpointConsensusEntry* pEntry = taosArrayGet(pList, i); + if (pEntry->req.taskId == pTask->id.taskId) { + startTs = pEntry->req.startTs; + break; + } + } + + code = doSetCheckpointIdAction(pMnode, pTrans, pTask, checkpointId, startTs); + if (code != TSDB_CODE_SUCCESS) { + destroyStreamTaskIter(pIter); + taosWUnLockLatch(&pStream->lock); + return code; + } + } + + destroyStreamTaskIter(pIter); + taosWUnLockLatch(&pStream->lock); + return 0; +} int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId, int8_t mndTrigger) { diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 4779f1d6cb..142a2b383c 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -181,7 +181,7 @@ static int32_t mndCheckMnodeStatus(SMnode* pMnode) { return TSDB_CODE_SUCCESS; } -static int32_t mndCheckAndAddVgroupsInfo(SMnode *pMnode, SArray *pVgroupList, bool* allReady) { +static int32_t mndCheckAndAddVgroupsInfo(SMnode *pMnode, SArray *pVgroupList, bool* allReady, SHashObj* pTermMap) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; SVgObj *pVgroup = NULL; @@ -243,6 +243,14 @@ static int32_t mndCheckAndAddVgroupsInfo(SMnode *pMnode, SArray *pVgroupList, bo mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf); } + if (pTermMap != NULL) { + int64_t term = pVgroup->vnodeGid[0].syncTerm; + code = taosHashPut(pTermMap, &pVgroup->vgId, sizeof(pVgroup->vgId), &term, sizeof(term)); + if (code) { + mError("failed to put vnode:%d term into hashMap, code:%s", pVgroup->vgId, tstrerror(code)); + } + } + sdbRelease(pSdb, pVgroup); } @@ -251,7 +259,7 @@ _end: return code; } -int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { +int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList, SHashObj* pTermMap) { int32_t code = 0; SArray *pVgroupList = NULL; @@ -266,7 +274,7 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { } // 1. check for all vnodes status - code = mndCheckAndAddVgroupsInfo(pMnode, pVgroupList, allReady); + code = mndCheckAndAddVgroupsInfo(pMnode, pVgroupList, allReady, pTermMap); if (code) { goto _err; } @@ -728,15 +736,21 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; void *pIter = NULL; int32_t code = 0; - SArray *pDropped = taosArrayInit(4, sizeof(int64_t)); - if (pDropped == NULL) { - return terrno; - } + int32_t lino = 0; + SArray *pDropped = NULL; mDebug("start to scan checkpoint report info"); streamMutexLock(&execInfo.lock); + int32_t num = taosHashGetSize(execInfo.pChkptStreams); + if (num == 0) { + goto _end; + } + + pDropped = taosArrayInit(4, sizeof(int64_t)); + TSDB_CHECK_NULL(pDropped, code, lino, _end, terrno); + while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) { SChkptReportInfo *px = (SChkptReportInfo *)pIter; if (taosArrayGetSize(px->pTaskList) == 0) { @@ -804,42 +818,35 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", size, numOfStreams); } +_end: streamMutexUnlock(&execInfo.lock); - taosArrayDestroy(pDropped); + if (pDropped != NULL) { + taosArrayDestroy(pDropped); + } mDebug("end to scan checkpoint report info") - return TSDB_CODE_SUCCESS; + return code; } -int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId, - int64_t ts) { - char msg[128] = {0}; - STrans *pTrans = NULL; - SStreamTask *pTask = NULL; +int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId, SArray* pList) { + char msg[128] = {0}; + STrans *pTrans = NULL; - snprintf(msg, tListLen(msg), "set consen-chkpt-id for task:0x%x", taskId); + snprintf(msg, tListLen(msg), "set consen-chkpt-id for stream:0x%" PRIx64, pStream->uid); int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg, &pTrans); if (pTrans == NULL || code != 0) { return terrno; } - STaskId id = {.streamId = pStream->uid, .taskId = taskId}; - code = mndGetStreamTask(&id, pStream, &pTask); - if (code) { - mError("failed to get task:0x%x in stream:%s, failed to create consensus-checkpointId", taskId, pStream->name); - sdbRelease(pMnode->pSdb, pStream); - return code; - } - code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_CONSEN_NAME, pStream->uid); if (code) { sdbRelease(pMnode->pSdb, pStream); return code; } - code = mndStreamSetChkptIdAction(pMnode, pTrans, pTask, checkpointId, ts); + code = mndStreamSetChkptIdAction(pMnode, pTrans, pStream, checkpointId, pList); if (code != 0) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -854,8 +861,10 @@ int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, i } code = mndTransPrepare(pMnode, pTrans); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("trans:%d, failed to prepare set consensus-chkptId trans since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to prepare set consensus-chkptId trans for stream:0x%" PRId64 " since %s", pTrans->id, + pStream->uid, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; @@ -911,13 +920,15 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo } if (p->req.taskId == info.req.taskId) { - mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update ts %" PRId64 - "->%" PRId64 " checkpointId:%" PRId64 " -> %" PRId64 " total existed:%d", + mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update send reqTs %" PRId64 + "->%" PRId64 " checkpointId:%" PRId64 " -> %" PRId64 " term:%d->%d total existed:%d", pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs, p->req.checkpointId, - info.req.checkpointId, num); + info.req.checkpointId, p->req.term, info.req.term, num); p->req.startTs = info.req.startTs; p->req.checkpointId = info.req.checkpointId; p->req.transId = info.req.transId; + p->req.nodeId = info.req.nodeId; + p->req.term = info.req.term; return; } } @@ -927,9 +938,10 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo mError("s-task:0x%x failed to put task into consensus-checkpointId list, code: out of memory", info.req.taskId); } else { num = taosArrayGetSize(pInfo->pTaskList); - mDebug("s-task:0x%x checkpointId:%" PRId64 " added into consensus-checkpointId list, stream:0x%" PRIx64 - " waiting tasks:%d", - pRestoreInfo->taskId, pRestoreInfo->checkpointId, pRestoreInfo->streamId, num); + mDebug("s-task:0x%x (vgId:%d) checkpointId:%" PRId64 " term:%d, reqTs:%" PRId64 + " added into consensus-checkpointId list, stream:0x%" PRIx64 " waiting tasks:%d", + pRestoreInfo->taskId, pRestoreInfo->nodeId, pRestoreInfo->checkpointId, info.req.term, + info.req.startTs, pRestoreInfo->streamId, num); } } @@ -947,6 +959,7 @@ int32_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) { code = taosHashRemove(pHash, &streamId, sizeof(streamId)); if (code == 0) { + numOfStreams = taosHashGetSize(pHash); mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams); } else { mError("failed to remove stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams); @@ -1632,7 +1645,7 @@ static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) { } } - int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot); + int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot, NULL); if (code) { mError("failed to get the vgroup snapshot, ignore it and continue"); } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 12a803d1d8..121872d321 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -156,7 +156,7 @@ int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id); int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock, - SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq); + SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq, const char* id); int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type); #define TQ_ERR_GO_TO_END(c) \ diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index c9868f0398..82305940d2 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -192,7 +192,8 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1; - TAOS_CHECK_EXIT(buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true, &tbData.pCreateTbReq)); + TAOS_CHECK_EXIT(buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true, &tbData.pCreateTbReq, + "")); { uint64_t groupId = pDataBlock->info.id.groupId; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 407a1e88c8..7a0f56ef18 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1064,11 +1064,19 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // 1. get the related stream task code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask); if (pStreamTask == NULL) { - tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s", - pTask->streamTaskId.taskId, pTask->id.idStr); - tqDebug("s-task:%s fill-history task set status to be dropping", id); - code = streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0); + int32_t ret = streamMetaAcquireTaskUnsafe(pMeta, &pTask->streamTaskId, &pStreamTask); + if (ret == 0 && pStreamTask != NULL) { + tqWarn("s-task:0x%" PRIx64 " stopped, not ready for related task:%s scan-history work, do nothing", + pTask->streamTaskId.taskId, pTask->id.idStr); + streamMetaReleaseTask(pMeta, pStreamTask); + } else { + tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s", + pTask->streamTaskId.taskId, pTask->id.idStr); + + tqDebug("s-task:%s fill-history task set status to be dropping", id); + code = streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0); + } atomic_store_32(&pTask->status.inScanHistorySentinel, 0); streamMetaReleaseTask(pMeta, pTask); @@ -1347,10 +1355,24 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); - SStreamCheckpointReadyMsg* pReq = (SStreamCheckpointReadyMsg*)pMsg->pCont; if (!vnodeIsRoleLeader(pTq->pVnode)) { - tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId, - (int32_t)pReq->downstreamTaskId); + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + int32_t code = 0; + SDecoder decoder; + + SStreamCheckpointReadyMsg req = {0}; + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) { + code = TSDB_CODE_MSG_DECODE_ERROR; + tDecoderClear(&decoder); + return code; + } + tDecoderClear(&decoder); + + tqError("vgId:%d not leader, s-task:0x%x ignore the retrieve checkpoint-trigger msg from s-task:0x%x vgId:%d", vgId, + req.upstreamTaskId, req.downstreamTaskId, req.downstreamNodeId); + return TSDB_CODE_STREAM_NOT_LEADER; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 239a8e65b5..d0131ab76a 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -41,7 +41,7 @@ static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, int32_t numOfTags); static int32_t createDefaultTagColName(SArray** pColNameList); static int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, - const char* stbFullName, int64_t gid, bool newSubTableRule); + const char* stbFullName, int64_t gid, bool newSubTableRule, const char* id); static int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo); static int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id); @@ -262,7 +262,7 @@ int32_t createDefaultTagColName(SArray** pColNameList) { } int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, - int64_t gid, bool newSubTableRule) { + int64_t gid, bool newSubTableRule, const char* id) { if (pDataBlock->info.parTbName[0]) { if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) && !alreadyAddGroupId(pDataBlock->info.parTbName, gid) && gid != 0 && stbFullName) { @@ -276,16 +276,17 @@ int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* if (code != TSDB_CODE_SUCCESS) { return code; } - // tqDebug("gen name from:%s", pDataBlock->info.parTbName); + tqDebug("s-task:%s gen name from:%s blockdata", id, pDataBlock->info.parTbName); } else { pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName); if (pCreateTableReq->name == NULL) { return terrno; } - // tqDebug("copy name:%s", pDataBlock->info.parTbName); + tqDebug("s-task:%s copy name:%s from blockdata", id, pDataBlock->info.parTbName); } } else { int32_t code = buildCtbNameByGroupId(stbFullName, gid, &pCreateTableReq->name); + tqDebug("s-task:%s no name in blockdata, auto-created table name:%s", id, pCreateTableReq->name); return code; } @@ -391,7 +392,8 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S } } - code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, IS_NEW_SUBTB_RULE(pTask)); + code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, IS_NEW_SUBTB_RULE(pTask), + pTask->id.idStr); if (code) { goto _end; } @@ -643,7 +645,7 @@ bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbNam } int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock, - SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq) { + SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq, const char* id) { *pReq = NULL; SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); @@ -676,7 +678,8 @@ int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t n } // set table name - code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId, newSubTableRule); + code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId, newSubTableRule, + id); if (code) { return code; } @@ -1043,7 +1046,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; code = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray, - IS_NEW_SUBTB_RULE(pTask), &pTableData->pCreateTbReq); + IS_NEW_SUBTB_RULE(pTask), &pTableData->pCreateTbReq, id); taosArrayDestroy(pTagArray); if (code) { @@ -1160,8 +1163,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks); if (!onlySubmitData || pTask->subtableWithoutMd5 == 1) { - tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id, - numOfBlocks); + tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has other type block, submit one-by-one", vgId, + id, numOfBlocks); for (int32_t i = 0; i < numOfBlocks; ++i) { if (streamTaskShouldStop(pTask)) { diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 4d2449bb37..4a4ed2234b 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -87,7 +87,7 @@ static void doStartScanWal(void* param, void* tmrId) { tmr_h pTimer = NULL; SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param; - tqDebug("start to do scan wal in tmr, metaRid:%" PRId64, pParam->metaId); + tqTrace("start to do scan wal in tmr, metaRid:%" PRId64, pParam->metaId); SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId); if (pMeta == NULL) { @@ -173,7 +173,7 @@ static void doStartScanWal(void* param, void* tmrId) { _end: streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal"); - tqDebug("vgId:%d try scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT); + tqTrace("vgId:%d try scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT); code = taosReleaseRef(streamMetaRefPool, pParam->metaId); if (code) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index d5edfe4b35..5ddbdc9f5c 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -540,13 +540,13 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe if (!isLeader) { tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); - return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId); + return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId, true); } SStreamTask* pTask = NULL; code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId, &pTask); if ((pTask == NULL) || (code != 0)) { - return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId); + return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId, true); } code = streamTaskProcessCheckRsp(pTask, &rsp); @@ -746,6 +746,9 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen // commit the update int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks); + if (numOfTasks == 0) { + streamMetaResetStartInfo(&pMeta->startInfo, vgId); + } if (streamMetaCommit(pMeta) < 0) { // persist to disk @@ -786,48 +789,63 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored } static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { - int32_t vgId = pMeta->vgId; - int32_t code = 0; - int64_t st = taosGetTimestampMs(); + int32_t vgId = pMeta->vgId; + int32_t code = 0; + int64_t st = taosGetTimestampMs(); + STaskStartInfo* pStartInfo = &pMeta->startInfo; + + if (pStartInfo->startAllTasks == 1) { + // wait for the checkpoint id rsp, this rsp will be expired + if (pStartInfo->curStage == START_MARK_REQ_CHKPID) { + SStartTaskStageInfo* pCurStageInfo = taosArrayGetLast(pStartInfo->pStagesList); + tqInfo("vgId:%d only mark the req consensus checkpointId flag, reqTs:%"PRId64 " ignore and continue", vgId, pCurStageInfo->ts); + + taosArrayClear(pStartInfo->pStagesList); + pStartInfo->curStage = 0; + goto _start; + + } else if (pStartInfo->curStage == START_WAIT_FOR_CHKPTID) { + SStartTaskStageInfo* pCurStageInfo = taosArrayGetLast(pStartInfo->pStagesList); + tqInfo("vgId:%d already sent consensus-checkpoint msg(waiting for chkptid) expired, reqTs:%" PRId64 + " rsp will be discarded", + vgId, pCurStageInfo->ts); + + taosArrayClear(pStartInfo->pStagesList); + pStartInfo->curStage = 0; + goto _start; + + } else if (pStartInfo->curStage == START_CHECK_DOWNSTREAM) { + pStartInfo->restartCount += 1; + tqDebug( + "vgId:%d in start tasks procedure (check downstream), inc restartCounter by 1 and wait for it completes, " + "remaining restart:%d", + vgId, pStartInfo->restartCount); + } else { + tqInfo("vgId:%d in start procedure, but not start to do anything yet, do nothing", vgId); + } - streamMetaWLock(pMeta); - if (pMeta->startInfo.startAllTasks == 1) { - pMeta->startInfo.restartCount += 1; - tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId, - pMeta->startInfo.restartCount); - streamMetaWUnLock(pMeta); return TSDB_CODE_SUCCESS; } - pMeta->startInfo.startAllTasks = 1; - streamMetaWUnLock(pMeta); +_start: + pStartInfo->startAllTasks = 1; terrno = 0; tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d, ts:%" PRId64, vgId, pMeta->updateInfo.completeTransId, pMeta->updateInfo.completeTs); - streamMetaWLock(pMeta); streamMetaClear(pMeta); int64_t el = taosGetTimestampMs() - st; - tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.); + tqInfo("vgId:%d clear&close stream meta completed, elapsed time:%.3fs", vgId, el / 1000.); streamMetaLoadAllTasks(pMeta); - { - STaskStartInfo* pStartInfo = &pMeta->startInfo; - taosHashClear(pStartInfo->pReadyTaskSet); - taosHashClear(pStartInfo->pFailedTaskSet); - pStartInfo->readyTs = 0; - } - if (isLeader && !tsDisableStream) { - streamMetaWUnLock(pMeta); code = streamMetaStartAllTasks(pMeta); } else { streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId); - pMeta->startInfo.restartCount = 0; - streamMetaWUnLock(pMeta); + pStartInfo->restartCount = 0; tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId); } @@ -857,16 +875,20 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead code = streamMetaStartOneTask(pMeta, req.streamId, req.taskId); return 0; } else if (type == STREAM_EXEC_T_START_ALL_TASKS) { + streamMetaWLock(pMeta); code = streamMetaStartAllTasks(pMeta); + streamMetaWUnLock(pMeta); return 0; } else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) { + streamMetaWLock(pMeta); code = restartStreamTasks(pMeta, isLeader); + streamMetaWUnLock(pMeta); return 0; } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) { code = streamMetaStopAllTasks(pMeta); return 0; } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) { - code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); + code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId, true); return code; } else if (type == STREAM_EXEC_T_STOP_ONE_TASK) { code = streamMetaStopOneTask(pMeta, req.streamId, req.taskId); @@ -923,7 +945,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { bool scanWal = false; int32_t code = 0; - streamMetaWLock(pMeta); +// streamMetaWLock(pMeta); if (pStartInfo->startAllTasks == 1) { tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId, pMeta->startInfo.restartCount); @@ -935,7 +957,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { pStartInfo->restartCount -= 1; tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", vgId, pMeta->role, pStartInfo->restartCount); - streamMetaWUnLock(pMeta); +// streamMetaWUnLock(pMeta); return restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER)); } else { @@ -950,7 +972,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { } } - streamMetaWUnLock(pMeta); +// streamMetaWUnLock(pMeta); return code; } @@ -1179,7 +1201,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t pTask->hTaskInfo.operatorOpen = false; code = streamStartScanHistoryAsync(pTask, igUntreated); } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) { -// code = tqScanWalAsync((STQ*)handle, false); + // code = tqScanWalAsync((STQ*)handle, false); } else { code = streamTrySchedExec(pTask); } @@ -1299,12 +1321,30 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask); if (pTask == NULL || (code != 0)) { - tqError("vgId:%d process consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already", - pMeta->vgId, req.taskId); - // ignore this code to avoid error code over write - int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); - if (ret) { - tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret)); + // ignore this code to avoid error code over writing + if (pMeta->role == NODE_ROLE_LEADER) { + tqError("vgId:%d process consensus checkpointId req:%" PRId64 + " transId:%d, failed to acquire task:0x%x, it may have been dropped/stopped already", + pMeta->vgId, req.checkpointId, req.transId, req.taskId); + + int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId, true); + if (ret) { + tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret)); + } + +// STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; +// int32_t ret1 = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask); +// if (ret1 == 0 && pTask != NULL) { +// SStreamTaskState s = streamTaskGetStatus(pTask); +// if (s.state == TASK_STATUS__STOP) { +// tqDebug("s-task:0x%x status:%s wait for it become init", req.taskId, s.name); +// streamMetaReleaseTask(pMeta, pTask); +// return TSDB_CODE_STREAM_TASK_IVLD_STATUS; +// } +// } + } else { + tqDebug("vgId:%d task:0x%x stopped in follower node, not set the consensus checkpointId:%" PRId64 " transId:%d", + pMeta->vgId, req.taskId, req.checkpointId, req.transId); } return 0; @@ -1312,19 +1352,26 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { // discard the rsp, since it is expired. if (req.startTs < pTask->execInfo.created) { - tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64 + tqWarn("s-task:%s vgId:%d createTs:%" PRId64 " recv expired consensus checkpointId:%" PRId64 " from task createTs:%" PRId64 " < task createTs:%" PRId64 ", discard", pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs, pTask->execInfo.created); - streamMetaAddFailedTaskSelf(pTask, now); + if (pMeta->role == NODE_ROLE_LEADER) { + streamMetaAddFailedTaskSelf(pTask, now, true); + } + streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } - tqDebug("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64 " from mnode", - pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId); + tqDebug("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64 + " transId:%d from mnode, reqTs:%" PRId64 " task createTs:%" PRId64, + pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId, req.transId, req.startTs, + pTask->execInfo.created); streamMutexLock(&pTask->lock); + SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo; + if (pTask->chkInfo.checkpointId < req.checkpointId) { tqFatal("s-task:%s vgId:%d invalid consensus-checkpointId:%" PRId64 ", greater than existed checkpointId:%" PRId64, pTask->id.idStr, vgId, req.checkpointId, pTask->chkInfo.checkpointId); @@ -1334,9 +1381,8 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { return 0; } - SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo; if (pConsenInfo->consenChkptTransId >= req.transId) { - tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId, + tqWarn("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId, pConsenInfo->consenChkptTransId, req.transId); streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); @@ -1356,6 +1402,19 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { streamTaskSetConsenChkptIdRecv(pTask, req.transId, now); streamMutexUnlock(&pTask->lock); + streamMetaWLock(pTask->pMeta); + if (pMeta->startInfo.curStage == START_WAIT_FOR_CHKPTID) { + pMeta->startInfo.curStage = START_CHECK_DOWNSTREAM; + + SStartTaskStageInfo info = {.stage = pMeta->startInfo.curStage, .ts = now}; + taosArrayPush(pMeta->startInfo.pStagesList, &info); + + tqDebug("vgId:%d wait_for_chkptId stage -> check_down_stream stage, reqTs:%" PRId64 " , numOfStageHist:%d", + pMeta->vgId, info.ts, (int32_t)taosArrayGetSize(pMeta->startInfo.pStagesList)); + } + + streamMetaWUnLock(pTask->pMeta); + if (pMeta->role == NODE_ROLE_LEADER) { code = tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId); if (code) { diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 653f525e09..2778cb9115 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -401,6 +401,12 @@ int vnodeValidateTableHash(SVnode *pVnode, char *tableFName) { } if (hashValue < pVnode->config.hashBegin || hashValue > pVnode->config.hashEnd) { + vInfo("vgId:%d, %u, %u, hashVal: %u, restored:%d", pVnode->config.vgId, pVnode->config.hashBegin, + pVnode->config.hashEnd, hashValue, pVnode->restored); + + vError("vgId:%d invalid table name:%s, hashVal:0x%x, range [0x%x, 0x%x]", pVnode->config.vgId, + tableFName, hashValue, pVnode->config.hashBegin, pVnode->config.hashEnd); + return terrno = TSDB_CODE_VND_HASH_MISMATCH; } diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h index 0a69080314..bea34ee2b4 100644 --- a/source/libs/executor/inc/streamexecutorInt.h +++ b/source/libs/executor/inc/streamexecutorInt.h @@ -105,6 +105,9 @@ int32_t createStreamIntervalSliceOperatorInfo(struct SOperatorInfo* downstream, int32_t buildAllResultKey(SStateStore* pStateStore, SStreamState* pState, TSKEY ts, SArray* pUpdated); int32_t initOffsetInfo(int32_t** ppOffset, SSDataBlock* pRes); TSKEY compareTs(void* pKey); +void clearGroupResArray(SGroupResInfo* pGroupResInfo); +void clearSessionGroupResInfo(SGroupResInfo* pGroupResInfo); +void destroyResultWinInfo(void* pRes); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b6044eaacf..f9054cd734 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -980,7 +980,7 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration) { } if (waitDuration > 0) { - qDebug("%s sync killed execTask, and waiting for %.2fs", GET_TASKID(pTaskInfo), waitDuration/1000.0); + qDebug("%s sync killed execTask, and waiting for at most %.2fs", GET_TASKID(pTaskInfo), waitDuration/1000.0); } else { qDebug("%s async killed execTask", GET_TASKID(pTaskInfo)); } @@ -1008,6 +1008,11 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration) { } } + int64_t et = taosGetTimestampMs() - st; + if (et < waitDuration) { + qInfo("%s waiting %.2fs for executor stopping", GET_TASKID(pTaskInfo), et / 1000.0); + return TSDB_CODE_SUCCESS; + } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 5066107f3c..d3cc0a7511 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -58,8 +58,8 @@ void destroyStreamCountAggOperatorInfo(void* param) { } cleanupExprSupp(&pInfo->scalarSupp); - clearGroupResInfo(&pInfo->groupResInfo); - taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); + clearSessionGroupResInfo(&pInfo->groupResInfo); + taosArrayDestroyEx(pInfo->pUpdated, destroyResultWinInfo); pInfo->pUpdated = NULL; destroyStreamAggSupporter(&pInfo->streamAggSup); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index a70e67ad04..c361694b38 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -54,8 +54,8 @@ void destroyStreamEventOperatorInfo(void* param) { pInfo->pOperator = NULL; } - clearGroupResInfo(&pInfo->groupResInfo); - taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); + clearSessionGroupResInfo(&pInfo->groupResInfo); + taosArrayDestroyEx(pInfo->pUpdated, destroyResultWinInfo); pInfo->pUpdated = NULL; destroyStreamAggSupporter(&pInfo->streamAggSup); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index ffaa62721e..ba2ff50ee0 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -135,6 +135,13 @@ void destroyStreamFillInfo(SStreamFillInfo* pFillInfo) { taosMemoryFree(pFillInfo); } +void clearGroupResArray(SGroupResInfo* pGroupResInfo) { + pGroupResInfo->freeItem = false; + taosArrayDestroy(pGroupResInfo->pRows); + pGroupResInfo->pRows = NULL; + pGroupResInfo->index = 0; +} + static void destroyStreamFillOperatorInfo(void* param) { SStreamFillOperatorInfo* pInfo = (SStreamFillOperatorInfo*)param; destroyStreamFillInfo(pInfo->pFillInfo); @@ -148,7 +155,7 @@ static void destroyStreamFillOperatorInfo(void* param) { taosArrayDestroy(pInfo->matchInfo.pList); pInfo->matchInfo.pList = NULL; taosArrayDestroy(pInfo->pUpdated); - clearGroupResInfo(&pInfo->groupResInfo); + clearGroupResArray(&pInfo->groupResInfo); taosArrayDestroy(pInfo->pCloseTs); if (pInfo->stateStore.streamFileStateDestroy != NULL) { diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 7eadf3405a..bc2508809e 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -166,7 +166,7 @@ void destroyStreamTimeSliceOperatorInfo(void* param) { cleanupExprSupp(&pInfo->scalarSup); taosArrayDestroy(pInfo->historyPoints); - taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); + taosArrayDestroy(pInfo->pUpdated); pInfo->pUpdated = NULL; tSimpleHashCleanup(pInfo->pUpdatedMap); @@ -174,7 +174,7 @@ void destroyStreamTimeSliceOperatorInfo(void* param) { taosArrayDestroy(pInfo->pDelWins); tSimpleHashCleanup(pInfo->pDeletedMap); - clearGroupResInfo(&pInfo->groupResInfo); + clearGroupResArray(&pInfo->groupResInfo); taosArrayDestroy(pInfo->historyWins); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 0c30266ba9..1b02caac57 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -486,6 +486,7 @@ void clearGroupResInfo(SGroupResInfo* pGroupResInfo) { destroyFlusedPos(pPos); } } + pGroupResInfo->freeItem = false; taosArrayDestroy(pGroupResInfo->pRows); pGroupResInfo->pRows = NULL; @@ -2132,6 +2133,27 @@ void destroyStreamAggSupporter(SStreamAggSupporter* pSup) { taosMemoryFreeClear(pSup->pDummyCtx); } +void destroyResultWinInfo(void* pRes) { + SResultWindowInfo* pWinRes = (SResultWindowInfo*)pRes; + destroyFlusedPos(pWinRes->pStatePos); +} + +void clearSessionGroupResInfo(SGroupResInfo* pGroupResInfo) { + int32_t size = taosArrayGetSize(pGroupResInfo->pRows); + if (pGroupResInfo->index >= 0 && pGroupResInfo->index < size) { + for (int32_t i = pGroupResInfo->index; i < size; i++) { + SResultWindowInfo* pRes = (SResultWindowInfo*) taosArrayGet(pGroupResInfo->pRows, i); + destroyFlusedPos(pRes->pStatePos); + pRes->pStatePos = NULL; + } + } + + pGroupResInfo->freeItem = false; + taosArrayDestroy(pGroupResInfo->pRows); + pGroupResInfo->pRows = NULL; + pGroupResInfo->index = 0; +} + void destroyStreamSessionAggOperatorInfo(void* param) { if (param == NULL) { return; @@ -2145,8 +2167,8 @@ void destroyStreamSessionAggOperatorInfo(void* param) { } cleanupExprSupp(&pInfo->scalarSupp); - clearGroupResInfo(&pInfo->groupResInfo); - taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); + clearSessionGroupResInfo(&pInfo->groupResInfo); + taosArrayDestroyEx(pInfo->pUpdated, destroyResultWinInfo); pInfo->pUpdated = NULL; destroyStreamAggSupporter(&pInfo->streamAggSup); @@ -4255,8 +4277,8 @@ void destroyStreamStateOperatorInfo(void* param) { pInfo->pOperator = NULL; } - clearGroupResInfo(&pInfo->groupResInfo); - taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); + clearSessionGroupResInfo(&pInfo->groupResInfo); + taosArrayDestroyEx(pInfo->pUpdated, destroyResultWinInfo); pInfo->pUpdated = NULL; destroyStreamAggSupporter(&pInfo->streamAggSup); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 2b5312c0a0..c6cfd02287 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -44,7 +44,7 @@ typedef struct { TdThreadMutex cfMutex; SHashObj* cfInst; int64_t defaultCfInit; - + int64_t vgId; } SBackendWrapper; typedef struct { diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 6031710bf9..eb262793ae 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -843,6 +843,8 @@ int32_t streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId, pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); TSDB_CHECK_NULL(pHandle->cfInst, code, lino, _EXIT, terrno); + pHandle->vgId = vgId; + rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); int32_t nBGThread = tsNumOfSnodeStreamThreads <= 2 ? 1 : tsNumOfSnodeStreamThreads / 2; @@ -914,6 +916,7 @@ _EXIT: taosMemoryFree(backendPath); return code; } + void streamBackendCleanup(void* arg) { SBackendWrapper* pHandle = (SBackendWrapper*)arg; @@ -930,6 +933,7 @@ void streamBackendCleanup(void* arg) { rocksdb_close(pHandle->db); pHandle->db = NULL; } + rocksdb_options_destroy(pHandle->dbOpt); rocksdb_env_destroy(pHandle->env); rocksdb_cache_destroy(pHandle->cache); @@ -945,16 +949,16 @@ void streamBackendCleanup(void* arg) { streamMutexDestroy(&pHandle->mutex); streamMutexDestroy(&pHandle->cfMutex); - stDebug("destroy stream backend :%p", pHandle); + stDebug("vgId:%d destroy stream backend:%p", (int32_t) pHandle->vgId, pHandle); taosMemoryFree(pHandle); - return; } + void streamBackendHandleCleanup(void* arg) { SBackendCfWrapper* wrapper = arg; bool remove = wrapper->remove; TAOS_UNUSED(taosThreadRwlockWrlock(&wrapper->rwLock)); - stDebug("start to do-close backendwrapper %p, %s", wrapper, wrapper->idstr); + stDebug("start to do-close backendWrapper %p, %s", wrapper, wrapper->idstr); if (wrapper->rocksdb == NULL) { TAOS_UNUSED(taosThreadRwlockUnlock(&wrapper->rwLock)); return; @@ -2613,11 +2617,14 @@ int32_t taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* void taskDbDestroy(void* pDb, bool flush) { STaskDbWrapper* wrapper = pDb; - if (wrapper == NULL) return; + if (wrapper == NULL) { + return; + } + int64_t st = taosGetTimestampMs(); streamMetaRemoveDB(wrapper->pMeta, wrapper->idstr); - stDebug("succ to destroy stream backend:%p", wrapper); + stDebug("%s succ to destroy stream backend:%p", wrapper->idstr, wrapper); int8_t nCf = tListLen(ginitDict); if (flush && wrapper->removeAllFiles == 0) { @@ -2674,25 +2681,26 @@ void taskDbDestroy(void* pDb, bool flush) { rocksdb_comparator_destroy(compare); rocksdb_block_based_options_destroy(tblOpt); } + taosMemoryFree(wrapper->pCompares); taosMemoryFree(wrapper->pCfOpts); taosMemoryFree(wrapper->pCfParams); streamMutexDestroy(&wrapper->mutex); - taskDbDestroyChkpOpt(wrapper); - taosMemoryFree(wrapper->idstr); - if (wrapper->removeAllFiles) { char* err = NULL; - stInfo("drop task remove backend dat:%s", wrapper->path); + stInfo("drop task remove backend data:%s", wrapper->path); taosRemoveDir(wrapper->path); } + + int64_t et = taosGetTimestampMs(); + stDebug("%s destroy stream backend:%p completed, elapsed time:%.2fs", wrapper->idstr, wrapper, (et - st)/1000.0); + + taosMemoryFree(wrapper->idstr); taosMemoryFree(wrapper->path); taosMemoryFree(wrapper); - - return; } void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); } diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index f880526541..a69a1f83e1 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -20,7 +20,7 @@ #define CHECK_NOT_RSP_DURATION 60 * 1000 // 60 sec -static void processDownstreamReadyRsp(SStreamTask* pTask); +static void processDownstreamReadyRsp(SStreamTask* pTask, bool lock); static void rspMonitorFn(void* param, void* tmrId); static void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs); static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id); @@ -133,9 +133,11 @@ void streamTaskSendCheckMsg(SStreamTask* pTask) { code = streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { // for sink task, set it ready directly. +// streamTaskSetConsenChkptIdRecv(pTask, 0, taosGetTimestampMs()); +// stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId); streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr); - processDownstreamReadyRsp(pTask); + processDownstreamReadyRsp(pTask, false); } if (code) { @@ -208,7 +210,7 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* } if (left == 0) { - processDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag + processDownstreamReadyRsp(pTask, true); // all downstream tasks are ready, set the complete check downstream flag streamTaskStopMonitorCheckRsp(pInfo, id); } else { stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, @@ -234,7 +236,7 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); } - streamMetaAddFailedTaskSelf(pTask, now); + streamMetaAddFailedTaskSelf(pTask, now, true); } else { // TASK_DOWNSTREAM_NOT_READY, rsp-check monitor will retry in 300 ms stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); @@ -331,7 +333,7 @@ void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) { } /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -void processDownstreamReadyRsp(SStreamTask* pTask) { +void processDownstreamReadyRsp(SStreamTask* pTask, bool lock) { EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST; int32_t code = streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL); if (code) { @@ -340,7 +342,12 @@ void processDownstreamReadyRsp(SStreamTask* pTask) { int64_t checkTs = pTask->execInfo.checkTs; int64_t readyTs = pTask->execInfo.readyTs; - code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true); + if (lock) { + code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true); + } else { + code = streamMetaAddTaskLaunchResultNoLock(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true); + } + if (code) { stError("s-task:%s failed to record the downstream task status, code:%s", pTask->id.idStr, tstrerror(code)); } @@ -351,7 +358,7 @@ void processDownstreamReadyRsp(SStreamTask* pTask) { pTask->info.fillHistory); } - // halt it self for count window stream task until the related fill history task completed. + // halt itself for count window stream task until the related fill history task completed. stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr, pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus)); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT); @@ -365,7 +372,7 @@ void processDownstreamReadyRsp(SStreamTask* pTask) { // todo: let's retry if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { stDebug("s-task:%s try to launch related fill-history task", pTask->id.idStr); - code = streamLaunchFillHistoryTask(pTask); + code = streamLaunchFillHistoryTask(pTask, lock); if (code) { stError("s-task:%s failed to launch history task, code:%s", pTask->id.idStr, tstrerror(code)); } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index f07d6f4cc1..64f8ecbbe0 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -629,8 +629,9 @@ static int32_t doUpdateCheckpointInfoCheck(SStreamTask* pTask, bool restored, SV code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d", - id, vgId, pReq->taskId, numOfTasks); + stDebug("s-task:%s vgId:%d related fill-history task:0x%" PRIx64 + " dropped in update checkpointInfo, remain tasks:%d", + id, vgId, pReq->hTaskId, numOfTasks); //todo: task may not exist, commit anyway, optimize this later code = streamMetaCommit(pMeta); @@ -1586,18 +1587,27 @@ int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) { streamTaskSetReqConsenChkptId(pTask, taosGetTimestampMs()); streamMutexUnlock(&pTask->lock); + // 1. stop the executo at first + if (pTask->exec.pExecutor != NULL) { + // we need to make sure the underlying operator is stopped right, otherwise, SIGSEG may occur, + // waiting at most for 10min + if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) { + int32_t code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, 600000); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code)); + } + } + + qDestroyTask(pTask->exec.pExecutor); + pTask->exec.pExecutor = NULL; + } + + // 2. destroy backend after stop executor if (pTask->pBackend != NULL) { streamFreeTaskState(pTask, p); pTask->pBackend = NULL; } - streamMetaWLock(pTask->pMeta); - if (pTask->exec.pExecutor != NULL) { - qDestroyTask(pTask->exec.pExecutor); - pTask->exec.pExecutor = NULL; - } - streamMetaWUnLock(pTask->pMeta); - return 0; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b093f808c0..0e8cbb32bf 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -145,7 +145,7 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBlock, SStreamRetrieveReq* req) { SRetrieveTableRsp* pRetrieve = NULL; - size_t dataEncodeSize = blockGetEncodeSize(pBlock); + size_t dataEncodeSize = blockGetEncodeSize(pBlock); int32_t len = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN; pRetrieve = taosMemoryCalloc(1, len); @@ -684,6 +684,9 @@ static int32_t doAddDispatchBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, } if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { + stDebug("s-task:%s dst table hashVal:0x%x assign to vgId:%d range[0x%x, 0x%x]", pTask->id.idStr, hashValue, + pVgInfo->vgId, pVgInfo->hashBegin, pVgInfo->hashEnd); + if ((code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j])) < 0) { stError("s-task:%s failed to add dispatch block, code:%s", pTask->id.idStr, tstrerror(terrno)); return code; @@ -727,6 +730,8 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S if (!pDataBlock->info.parTbName[0]) { memset(pDataBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); memcpy(pDataBlock->info.parTbName, pBln->parTbName, strlen(pBln->parTbName)); + stDebug("s-task:%s cached table name:%s, groupId:%" PRId64 " hashVal:0x%x", pTask->id.idStr, pBln->parTbName, + groupId, hashValue); } } else { char ctbName[TSDB_TABLE_FNAME_LEN] = {0}; @@ -752,9 +757,9 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } } - snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, + snprintf(ctbName, TSDB_TABLE_FNAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); - /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ + SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo; hashValue = taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); @@ -762,6 +767,8 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S bln.hashValue = hashValue; memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName)); + stDebug("s-task:%s dst table:%s hashVal:0x%x groupId:%"PRId64, pTask->id.idStr, ctbName, hashValue, groupId); + // failed to put into name buffer, no need to do anything if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { // allow error, and do nothing code = tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName)); @@ -890,7 +897,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } else { streamMutexLock(&pTask->msgInfo.lock); if (pTask->msgInfo.inMonitor == 0) { -// int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + // int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start dispatch monitor tmr in %dms, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS, tstrerror(code)); streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); @@ -967,8 +974,8 @@ static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, void* param, int32_t n } if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) { - stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr", id, - vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num); + stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr", id, vgId, + (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num); return -1; } @@ -1128,8 +1135,7 @@ static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) { // 1. check status in the first place if (state.state != TASK_STATUS__CK) { streamCleanBeforeQuitTmr(pTmrInfo, param); - stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready", id, vgId, - state.name); + stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready", id, vgId, state.name); streamMetaReleaseTask(pTask->pMeta, pTask); taosArrayDestroy(pNotRspList); return; @@ -1258,7 +1264,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { } int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { - size_t dataEncodeSize = blockGetEncodeSize(pBlock); + size_t dataEncodeSize = blockGetEncodeSize(pBlock); int32_t dataStrLen = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN; void* buf = taosMemoryCalloc(1, dataStrLen); if (buf == NULL) { diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index a6d0142010..983862507e 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -195,6 +195,7 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); SMetaHbInfo* pInfo = pMeta->pHbInfo; int32_t code = 0; + bool setReqCheckpointId = false; // not recv the hb msg rsp yet, send current hb msg again if (pInfo->msgSendTs > 0) { @@ -243,7 +244,7 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { continue; } - // todo: this lock may blocked by lock in streamMetaStartOneTask function, which may lock a very long time when + // todo: this lock may be blocked by lock in streamMetaStartOneTask function, which may lock a very long time when // trying to load remote checkpoint data streamMutexLock(&pTask->lock); STaskStatusEntry entry = streamTaskGetStatusEntry(pTask); @@ -274,7 +275,8 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { streamMutexLock(&pTask->lock); entry.checkpointInfo.consensusChkptId = streamTaskCheckIfReqConsenChkptId(pTask, pMsg->ts); if (entry.checkpointInfo.consensusChkptId) { - entry.checkpointInfo.consensusTs = pMsg->ts; + entry.checkpointInfo.consensusTs = pTask->status.consenChkptInfo.statusTs; + setReqCheckpointId = true; } streamMutexUnlock(&pTask->lock); @@ -294,6 +296,20 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { streamMetaReleaseTask(pMeta, pTask); } + if (setReqCheckpointId) { + if (pMeta->startInfo.curStage != START_MARK_REQ_CHKPID) { + stError("vgId:%d internal unknown error, current stage is:%d expected:%d", pMeta->vgId, pMeta->startInfo.curStage, + START_MARK_REQ_CHKPID); + } + + pMeta->startInfo.curStage = START_WAIT_FOR_CHKPTID; + SStartTaskStageInfo info = {.stage = pMeta->startInfo.curStage, .ts = pMsg->ts}; + taosArrayPush(pMeta->startInfo.pStagesList, &info); + + stDebug("vgId:%d mark_req stage -> wait_for_chkptId stage, reqTs:%" PRId64 " , numOfStageHist:%d", pMeta->vgId, + info.ts, (int32_t)taosArrayGetSize(pMeta->startInfo.pStagesList)); + } + pMsg->numOfTasks = taosArrayGetSize(pMsg->pTaskStatus); if (hasMnodeEpset) { @@ -317,7 +333,6 @@ void streamMetaHbToMnode(void* param, void* tmrId) { SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, rid); if (pMeta == NULL) { stError("invalid meta rid:%" PRId64 " failed to acquired stream-meta", rid); -// taosMemoryFree(param); return; } @@ -345,7 +360,6 @@ void streamMetaHbToMnode(void* param, void* tmrId) { } else { stError("vgId:%d role:%d not leader not send hb to mnode, failed to release meta rid:%" PRId64, vgId, role, rid); } -// taosMemoryFree(param); return; } @@ -381,7 +395,10 @@ void streamMetaHbToMnode(void* param, void* tmrId) { } if (!send) { - stError("vgId:%d failed to send hmMsg to mnode, retry again in 5s, code:%s", pMeta->vgId, tstrerror(code)); + stError("vgId:%d failed to send hbMsg to mnode due to acquire lock failure, retry again in 5s", pMeta->vgId); + } + if (code) { + stError("vgId:%d failed to send hbMsg to mnode, retry in 5, code:%s", pMeta->vgId, tstrerror(code)); } streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 404436e5a5..36ecf99b35 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -239,7 +239,7 @@ int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) { void* key = taosHashGetKey(pIter, NULL); code = streamStateCvtDataFormat(pMeta->path, key, *(void**)pIter); if (code != 0) { - stError("failed to cvt data"); + stError("vgId:%d failed to cvt data", pMeta->vgId); goto _EXIT; } @@ -495,6 +495,7 @@ _err: if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet); if (pMeta->bkdChkptMgt) bkdMgtDestroy(pMeta->bkdChkptMgt); + if (pMeta->startInfo.pStagesList) taosArrayDestroy(pMeta->startInfo.pStagesList); taosMemoryFree(pMeta); stError("vgId:%d failed to open stream meta, at line:%d reason:%s", vgId, lino, tstrerror(code)); @@ -526,7 +527,9 @@ void streamMetaInitBackend(SStreamMeta* pMeta) { void streamMetaClear(SStreamMeta* pMeta) { // remove all existed tasks in this vnode - void* pIter = NULL; + int64_t st = taosGetTimestampMs(); + void* pIter = NULL; + while ((pIter = taosHashIterate(pMeta->pTasksMap, pIter)) != NULL) { int64_t refId = *(int64_t*)pIter; SStreamTask* p = taosAcquireRef(streamTaskRefPool, refId); @@ -552,6 +555,9 @@ void streamMetaClear(SStreamMeta* pMeta) { } } + int64_t et = taosGetTimestampMs(); + stDebug("vgId:%d clear task map, elapsed time:%.2fs", pMeta->vgId, (et - st)/1000.0); + if (pMeta->streamBackendRid != 0) { int32_t code = taosRemoveRef(streamBackendId, pMeta->streamBackendRid); if (code) { @@ -559,6 +565,9 @@ void streamMetaClear(SStreamMeta* pMeta) { } } + int64_t et1 = taosGetTimestampMs(); + stDebug("vgId:%d clear backend completed, elapsed time:%.2fs", pMeta->vgId, (et1 - et)/1000.0); + taosHashClear(pMeta->pTasksMap); taosArrayClear(pMeta->pTaskList); @@ -571,6 +580,8 @@ void streamMetaClear(SStreamMeta* pMeta) { // the willrestart/starting flag can NOT be cleared taosHashClear(pMeta->startInfo.pReadyTaskSet); taosHashClear(pMeta->startInfo.pFailedTaskSet); + + taosArrayClear(pMeta->startInfo.pStagesList); pMeta->startInfo.readyTs = 0; } diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index f8b1b5ecbc..5c15616ca0 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -25,9 +25,9 @@ #define SCANHISTORY_IDLE_TICK ((SCANHISTORY_MAX_IDLE_TIME * 1000) / SCANHISTORY_IDLE_TIME_SLICE) typedef struct SLaunchHTaskInfo { - int64_t metaRid; - STaskId id; - STaskId hTaskId; + int64_t metaRid; + STaskId id; + STaskId hTaskId; } SLaunchHTaskInfo; static int32_t streamSetParamForScanHistory(SStreamTask* pTask); @@ -40,7 +40,7 @@ static void doExecScanhistoryInFuture(void* param, void* tmrId); static int32_t doStartScanHistoryTask(SStreamTask* pTask); static int32_t streamTaskStartScanHistory(SStreamTask* pTask); static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask); -static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask); +static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask, bool lock); static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now); static void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now); @@ -122,7 +122,7 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) { int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) { const char* id = pTask->id.idStr; - int32_t code = 0; + int32_t code = 0; code = streamTaskSetReady(pTask); if (code) { @@ -192,7 +192,7 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* p } // an fill history task needs to be started. -int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { +int32_t streamLaunchFillHistoryTask(SStreamTask* pTask, bool lock) { SStreamMeta* pMeta = pTask->pMeta; STaskExecStatisInfo* pExecInfo = &pTask->execInfo; const char* idStr = pTask->id.idStr; @@ -200,29 +200,44 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { int32_t hTaskId = pTask->hTaskInfo.id.taskId; int64_t now = taosGetTimestampMs(); int32_t code = 0; + SStreamTask* pHisTask = NULL; // check stream task status in the first place. - SStreamTaskState pStatus = streamTaskGetStatus(pTask); - if (pStatus.state != TASK_STATUS__READY && pStatus.state != TASK_STATUS__HALT && - pStatus.state != TASK_STATUS__PAUSE) { + SStreamTaskState status = streamTaskGetStatus(pTask); + if (status.state != TASK_STATUS__READY && status.state != TASK_STATUS__HALT && status.state != TASK_STATUS__PAUSE) { stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId, - pStatus.name); - - return streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); + status.name); + if (lock) { + return streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); + } else { + return streamMetaAddTaskLaunchResultNoLock(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, + false); + } } stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId); // Set the execution conditions, including the query time window and the version range - streamMetaRLock(pMeta); - SStreamTask* pHisTask = NULL; - code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->hTaskInfo.id, &pHisTask); - streamMetaRUnLock(pMeta); + if (lock) { + streamMetaRLock(pMeta); + } - if (code == 0) { // it is already added into stream meta store. + code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->hTaskInfo.id, &pHisTask); + + if (lock) { + streamMetaRUnLock(pMeta); + } + + if (code == 0) { // it is already added into stream meta store. if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr); - code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true); + if (lock) { + code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true); + } else { + code = streamMetaAddTaskLaunchResultNoLock(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, + true); + } + if (code) { stError("s-task:%s failed to record start task status, code:%s", idStr, tstrerror(code)); } @@ -230,7 +245,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { if (pHisTask->pBackend == NULL) { code = pMeta->expandTaskFn(pHisTask); if (code != TSDB_CODE_SUCCESS) { - streamMetaAddFailedTaskSelf(pHisTask, now); + streamMetaAddFailedTaskSelf(pHisTask, now, lock); stError("s-task:%s failed to expand fill-history task, code:%s", pHisTask->id.idStr, tstrerror(code)); } } @@ -243,7 +258,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { streamMetaReleaseTask(pMeta, pHisTask); return code; } else { - return launchNotBuiltFillHistoryTask(pTask); + return launchNotBuiltFillHistoryTask(pTask, lock); } } @@ -281,14 +296,14 @@ void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, SStreamMeta* pMeta = pTask->pMeta; SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; -// int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + // int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false); if (code) { stError("s-task:%s failed to record the start task status, code:%s", pTask->id.idStr, tstrerror(code)); } else { - stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x", - pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId); + stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x", pTask->id.idStr, + MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId); } pHTaskInfo->id.taskId = 0; @@ -300,7 +315,7 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; if (streamTaskShouldStop(pTask)) { // record the failure -// int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + // int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:0x%" PRIx64 " stopped, not launch rel history task:0x%" PRIx64, pInfo->id.taskId, pInfo->hTaskId.taskId); @@ -328,7 +343,7 @@ static void doCleanup(SStreamTask* pTask, int64_t metaRid, SLaunchHTaskInfo* pIn streamMetaReleaseTask(pMeta, pTask); int32_t ret = taosReleaseRef(streamMetaRefPool, metaRid); if (ret) { - stError("vgId:%d failed to release meta refId:%"PRId64, vgId, metaRid); + stError("vgId:%d failed to release meta refId:%" PRId64, vgId, metaRid); } if (pInfo != NULL) { @@ -363,7 +378,7 @@ void tryLaunchHistoryTask(void* param, void* tmrId) { int32_t ret = taosReleaseRef(streamMetaRefPool, metaRid); if (ret) { - stError("vgId:%d failed to release meta refId:%"PRId64, vgId, metaRid); + stError("vgId:%d failed to release meta refId:%" PRId64, vgId, metaRid); } // already dropped, no need to set the failure info into the stream task meta. @@ -416,7 +431,7 @@ void tryLaunchHistoryTask(void* param, void* tmrId) { if (pHTask->pBackend == NULL) { code = pMeta->expandTaskFn(pHTask); if (code != TSDB_CODE_SUCCESS) { - streamMetaAddFailedTaskSelf(pHTask, now); + streamMetaAddFailedTaskSelf(pHTask, now, true); stError("failed to expand fill-history task:%s, code:%s", pHTask->id.idStr, tstrerror(code)); } } @@ -451,13 +466,14 @@ int32_t createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStr return TSDB_CODE_SUCCESS; } -int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { +int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask, bool lock) { SStreamMeta* pMeta = pTask->pMeta; STaskExecStatisInfo* pExecInfo = &pTask->execInfo; const char* idStr = pTask->id.idStr; int64_t hStreamId = pTask->hTaskInfo.id.streamId; int32_t hTaskId = pTask->hTaskInfo.id.taskId; SLaunchHTaskInfo* pInfo = NULL; + int32_t ret = 0; stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", idStr, pMeta->vgId, hTaskId); @@ -465,10 +481,16 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { int32_t code = createHTaskLaunchInfo(pMeta, &id, hStreamId, hTaskId, &pInfo); if (code) { stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", idStr); - int32_t ret = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); + if (lock) { + ret = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); + } else { + ret = streamMetaAddTaskLaunchResultNoLock(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); + } + if (ret) { stError("s-task:%s add task check downstream result failed, code:%s", idStr, tstrerror(ret)); } + return code; } @@ -483,7 +505,13 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { stError("s-task:%s failed to start timer, related fill-history task not launched", idStr); taosMemoryFree(pInfo); - code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); + + if (lock) { + code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); + } else { + code = streamMetaAddTaskLaunchResultNoLock(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); + } + if (code) { stError("s-task:0x%x failed to record the start task status, code:%s", hTaskId, tstrerror(code)); } @@ -508,8 +536,8 @@ int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask) { bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVer) { SVersionRange* pRange = &pTask->dataRange.range; if (nextProcessVer < pRange->maxVer) { - stError("s-task:%s next processdVer:%"PRId64" is less than range max ver:%"PRId64, pTask->id.idStr, nextProcessVer, - pRange->maxVer); + stError("s-task:%s next processdVer:%" PRId64 " is less than range max ver:%" PRId64, pTask->id.idStr, + nextProcessVer, pRange->maxVer); return true; } @@ -570,7 +598,7 @@ int32_t streamTaskSetRangeStreamCalc(SStreamTask* pTask) { } void doExecScanhistoryInFuture(void* param, void* tmrId) { - int64_t taskRefId = *(int64_t*) param; + int64_t taskRefId = *(int64_t*)param; SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId); if (pTask == NULL) { @@ -595,8 +623,7 @@ void doExecScanhistoryInFuture(void* param, void* tmrId) { stError("s-task:%s async start history task failed", pTask->id.idStr); } - stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr", pTask->id.idStr, - pTask->info.fillHistory); + stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr", pTask->id.idStr, pTask->info.fillHistory); } else { int64_t* pTaskRefId = NULL; int32_t code = streamTaskAllocRefId(pTask, &pTaskRefId); diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c index 60c1694dda..28df04adc8 100644 --- a/source/libs/stream/src/streamStartTask.c +++ b/source/libs/stream/src/streamStartTask.c @@ -39,19 +39,18 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { int32_t vgId = pMeta->vgId; int64_t now = taosGetTimestampMs(); SArray* pTaskList = NULL; + int32_t numOfConsensusChkptIdTasks = 0; + int32_t numOfTasks = 0; - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now); - + numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId); - - streamMetaWLock(pMeta); streamMetaResetStartInfo(&pMeta->startInfo, vgId); - streamMetaWUnLock(pMeta); return TSDB_CODE_SUCCESS; } + stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now); + code = prepareBeforeStartTasks(pMeta, &pTaskList, now); if (code != TSDB_CODE_SUCCESS) { return TSDB_CODE_SUCCESS; // ignore the error and return directly @@ -65,10 +64,11 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); SStreamTask* pTask = NULL; - code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); + + code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); if ((pTask == NULL) || (code != 0)) { stError("vgId:%d failed to acquire task:0x%x during start task, it may be dropped", pMeta->vgId, pTaskId->taskId); - int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); + int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId, false); if (ret) { stError("s-task:0x%x add check downstream failed, core:%s", pTaskId->taskId, tstrerror(ret)); } @@ -79,7 +79,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { code = pMeta->expandTaskFn(pTask); if (code != TSDB_CODE_SUCCESS) { stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId); - streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs); + streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs, false); } } @@ -91,10 +91,10 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); SStreamTask* pTask = NULL; - code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); - if ((pTask == NULL )|| (code != 0)) { + code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); + if ((pTask == NULL) || (code != 0)) { stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); - int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); + int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId, false); if (ret) { stError("s-task:0x%x failed add check downstream failed, core:%s", pTaskId->taskId, tstrerror(ret)); } @@ -116,14 +116,14 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", pTask->id.idStr); - code = streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task? + code = streamLaunchFillHistoryTask(pTask, false); // todo: how about retry launch fill-history task? if (code) { stError("s-task:%s failed to launch history task, code:%s", pTask->id.idStr, tstrerror(code)); } } - code = streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, - true); + code = streamMetaAddTaskLaunchResultNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, + pInfo->readyTs, true); streamMetaReleaseTask(pMeta, pTask); continue; } @@ -136,7 +136,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { // do no added into result hashmap if it is failed due to concurrently starting of this stream task. if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) { - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs, false); } } @@ -146,11 +146,23 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { // negotiate the consensus checkpoint id for current task code = streamTaskSendNegotiateChkptIdMsg(pTask); + if (code == 0) { + numOfConsensusChkptIdTasks += 1; + } - // this task may has no checkpoint, but others tasks may generate checkpoint already? + // this task may have no checkpoint, but others tasks may generate checkpoint already? streamMetaReleaseTask(pMeta, pTask); } + if (numOfConsensusChkptIdTasks > 0) { + pMeta->startInfo.curStage = START_MARK_REQ_CHKPID; + SStartTaskStageInfo info = {.stage = pMeta->startInfo.curStage, .ts = now}; + + taosArrayPush(pMeta->startInfo.pStagesList, &info); + stDebug("vgId:%d %d task(s) 0 stage -> mark_req stage, reqTs:%" PRId64 " numOfStageHist:%d", pMeta->vgId, numOfConsensusChkptIdTasks, + info.ts, (int32_t)taosArrayGetSize(pMeta->startInfo.pStagesList)); + } + // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without // initialization, when the operation of check downstream tasks status is executed far quickly. stInfo("vgId:%d start all task(s) completed", pMeta->vgId); @@ -159,54 +171,76 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { } int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now) { - streamMetaWLock(pMeta); - + STaskStartInfo* pInfo = &pMeta->startInfo; if (pMeta->closeFlag) { - streamMetaWUnLock(pMeta); stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId); return TSDB_CODE_FAILED; } *pList = taosArrayDup(pMeta->pTaskList, NULL); if (*pList == NULL) { + stError("vgId:%d failed to dup tasklist, before restart tasks, code:%s", pMeta->vgId, tstrerror(terrno)); return terrno; } - taosHashClear(pMeta->startInfo.pReadyTaskSet); - taosHashClear(pMeta->startInfo.pFailedTaskSet); - pMeta->startInfo.startTs = now; + taosHashClear(pInfo->pReadyTaskSet); + taosHashClear(pInfo->pFailedTaskSet); + taosArrayClear(pInfo->pStagesList); + + pInfo->curStage = 0; + pInfo->startTs = now; int32_t code = streamMetaResetTaskStatus(pMeta); - streamMetaWUnLock(pMeta); - return code; } void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) { taosHashClear(pStartInfo->pReadyTaskSet); taosHashClear(pStartInfo->pFailedTaskSet); + taosArrayClear(pStartInfo->pStagesList); + pStartInfo->tasksWillRestart = 0; pStartInfo->readyTs = 0; pStartInfo->elapsedTime = 0; + pStartInfo->curStage = 0; // reset the sentinel flag value to be 0 pStartInfo->startAllTasks = 0; stDebug("vgId:%d clear start-all-task info", vgId); } -int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, - int64_t endTs, bool ready) { +static void streamMetaLogLaunchTasksInfo(SStreamMeta* pMeta, int32_t numOfTotal, int32_t taskId, bool ready) { + STaskStartInfo* pStartInfo = &pMeta->startInfo; + + pStartInfo->readyTs = taosGetTimestampMs(); + pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; + + for (int32_t i = 0; i < taosArrayGetSize(pStartInfo->pStagesList); ++i) { + SStartTaskStageInfo* pStageInfo = taosArrayGet(pStartInfo->pStagesList, i); + stDebug("vgId:%d start task procedure, stage:%d, ts:%" PRId64, pMeta->vgId, pStageInfo->stage, pStageInfo->ts); + } + + stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64 + ", readyTs:%" PRId64 " total elapsed time:%.2fs", + pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs, + pStartInfo->elapsedTime / 1000.0); + + // print the initialization elapsed time and info + displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); + displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); +} + +int32_t streamMetaAddTaskLaunchResultNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, + int64_t startTs, int64_t endTs, bool ready) { STaskStartInfo* pStartInfo = &pMeta->startInfo; STaskId id = {.streamId = streamId, .taskId = taskId}; int32_t vgId = pMeta->vgId; bool allRsp = true; SStreamTask* p = NULL; - streamMetaWLock(pMeta); int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &p); if (code != 0) { // task does not exist in current vnode, not record the complete info stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId); - streamMetaWUnLock(pMeta); return 0; } @@ -218,7 +252,6 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 "vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed " "time:%" PRId64 "ms", vgId, taskId, ready, el); - streamMetaWUnLock(pMeta); return 0; } @@ -230,35 +263,24 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 stError("vgId:%d record start task result failed, s-task:0x%" PRIx64 " already exist start results in meta start task result hashmap", vgId, id.taskId); + code = 0; } else { - stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed", vgId, id.taskId); + stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed, code:%s", vgId, + id.taskId, tstrerror(code)); } - streamMetaWUnLock(pMeta); - return code; } int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); - int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet); + int32_t numOfSucc = taosHashGetSize(pStartInfo->pReadyTaskSet); + int32_t numOfRecv = numOfSucc + taosHashGetSize(pStartInfo->pFailedTaskSet); allRsp = allCheckDownstreamRsp(pMeta, pStartInfo, numOfTotal); if (allRsp) { - pStartInfo->readyTs = taosGetTimestampMs(); - pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; - - stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64 - ", readyTs:%" PRId64 " total elapsed time:%.2fs", - vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs, - pStartInfo->elapsedTime / 1000.0); - - // print the initialization elapsed time and info - displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); - displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); + streamMetaLogLaunchTasksInfo(pMeta, numOfTotal, taskId, ready); streamMetaResetStartInfo(pStartInfo, vgId); - streamMetaWUnLock(pMeta); code = pStartInfo->completeFn(pMeta); } else { - streamMetaWUnLock(pMeta); stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", vgId, taskId, ready, numOfRecv, numOfTotal); } @@ -266,6 +288,17 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 return code; } +int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, + int64_t endTs, bool ready) { + int32_t code = 0; + + streamMetaWLock(pMeta); + code = streamMetaAddTaskLaunchResultNoLock(pMeta, streamId, taskId, startTs, endTs, ready); + streamMetaWUnLock(pMeta); + + return code; +} + // check all existed tasks are received rsp bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal) { for (int32_t i = 0; i < numOfTotal; ++i) { @@ -279,6 +312,7 @@ bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32 if (px == NULL) { px = taosHashGet(pStartInfo->pFailedTaskSet, &idx, sizeof(idx)); if (px == NULL) { + stDebug("vgId:%d s-task:0x%x start result not rsp yet", pMeta->vgId, (int32_t) idx.taskId); return false; } } @@ -292,7 +326,7 @@ void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) { void* pIter = NULL; size_t keyLen = 0; - stInfo("vgId:%d %d tasks check-downstream completed, %s", vgId, taosHashGetSize(pTaskSet), + stInfo("vgId:%d %d tasks complete check-downstream, %s", vgId, taosHashGetSize(pTaskSet), succ ? "success" : "failed"); while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) { @@ -323,12 +357,19 @@ int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo) { return terrno; } + pStartInfo->pStagesList = taosArrayInit(4, sizeof(SStartTaskStageInfo)); + if (pStartInfo->pStagesList == NULL) { + return terrno; + } + return 0; } void streamMetaClearStartInfo(STaskStartInfo* pStartInfo) { taosHashCleanup(pStartInfo->pReadyTaskSet); taosHashCleanup(pStartInfo->pFailedTaskSet); + taosArrayDestroy(pStartInfo->pStagesList); + pStartInfo->readyTs = 0; pStartInfo->elapsedTime = 0; pStartInfo->startTs = 0; @@ -348,7 +389,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask); if ((pTask == NULL) || (code != 0)) { stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId); - int32_t ret = streamMetaAddFailedTask(pMeta, streamId, taskId); + int32_t ret = streamMetaAddFailedTask(pMeta, streamId, taskId, true); if (ret) { stError("s-task:0x%x add check downstream failed, core:%s", taskId, tstrerror(ret)); } @@ -365,7 +406,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas } // the start all tasks procedure may happen to start the newly deployed stream task, and results in the - // concurrently start this task by two threads. + // concurrent start this task by two threads. streamMutexLock(&pTask->lock); SStreamTaskState status = streamTaskGetStatus(pTask); @@ -382,12 +423,14 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } - if(pTask->status.downstreamReady != 0) { + if (pTask->status.downstreamReady != 0) { stFatal("s-task:0x%x downstream should be not ready, but it ready here, internal error happens", taskId); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_STREAM_INTERNAL_ERROR; } + streamMetaWLock(pMeta); + // avoid initialization and destroy running concurrently. streamMutexLock(&pTask->lock); if (pTask->pBackend == NULL) { @@ -395,7 +438,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas streamMutexUnlock(&pTask->lock); if (code != TSDB_CODE_SUCCESS) { - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs, false); } } else { streamMutexUnlock(&pTask->lock); @@ -410,12 +453,14 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas // do no added into result hashmap if it is failed due to concurrently starting of this stream task. if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) { - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs, false); } } } + streamMetaWUnLock(pMeta); streamMetaReleaseTask(pMeta, pTask); + return code; } @@ -470,26 +515,21 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts) { SConsenChkptInfo* pConChkptInfo = &pTask->status.consenChkptInfo; + int32_t vgId = pTask->pMeta->vgId; - int32_t vgId = pTask->pMeta->vgId; - if (pConChkptInfo->status == TASK_CONSEN_CHKPT_REQ) { - // mark the sending of req consensus checkpoint request. - pConChkptInfo->status = TASK_CONSEN_CHKPT_SEND; - pConChkptInfo->statusTs = ts; - stDebug("s-task:%s vgId:%d set requiring consensus-chkptId in hbMsg, ts:%" PRId64, pTask->id.idStr, - vgId, pConChkptInfo->statusTs); - return 1; - } else { - int32_t el = (ts - pConChkptInfo->statusTs) / 1000; - - // not recv consensus-checkpoint rsp for 60sec, send it again in hb to mnode - if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && el > 60) { - pConChkptInfo->statusTs = ts; - - stWarn( - "s-task:%s vgId:%d not recv consensus-chkptId for %ds(more than 60s), set requiring in Hb again, ts:%" PRId64, - pTask->id.idStr, vgId, el, pConChkptInfo->statusTs); + if (pTask->pMeta->startInfo.curStage == START_MARK_REQ_CHKPID) { + if (pConChkptInfo->status == TASK_CONSEN_CHKPT_REQ) { + // mark the sending of req consensus checkpoint request. + pConChkptInfo->status = TASK_CONSEN_CHKPT_SEND; + stDebug("s-task:%s vgId:%d set requiring consensus-chkptId in hbMsg, ts:%" PRId64, pTask->id.idStr, vgId, + pConChkptInfo->statusTs); return 1; + } else if (pConChkptInfo->status == 0) { + stDebug("vgId:%d s-task:%s not need to set the req checkpointId, current stage:%d", vgId, pTask->id.idStr, + pConChkptInfo->status); + } else { + stWarn("vgId:%d, s-task:%s restart procedure expired, start stage:%d", vgId, pTask->id.idStr, + pConChkptInfo->status); } } @@ -513,10 +553,11 @@ void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) { pInfo->statusTs = ts; pInfo->consenChkptTransId = 0; - stDebug("s-task:%s set req consen-checkpointId flag, prev transId:%d, ts:%" PRId64, pTask->id.idStr, prevTrans, ts); + stDebug("s-task:%s set req consen-checkpointId flag, prev transId:%d, ts:%" PRId64 ", task created ts:%" PRId64, + pTask->id.idStr, prevTrans, ts, pTask->execInfo.created); } -int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { +int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, bool lock) { int32_t code = TSDB_CODE_SUCCESS; int64_t now = taosGetTimestampMs(); int64_t startTs = 0; @@ -527,7 +568,9 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId); - streamMetaRLock(pMeta); + if (lock) { + streamMetaRLock(pMeta); + } code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask); if (code == 0) { @@ -536,15 +579,26 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta hId = pTask->hTaskInfo.id; streamMetaReleaseTask(pMeta, pTask); - streamMetaRUnLock(pMeta); + if (lock) { + streamMetaRUnLock(pMeta); + } // add the failed task info, along with the related fill-history task info into tasks list. - code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false); - if (hasFillhistoryTask) { - code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false); + if (lock) { + code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false); + if (hasFillhistoryTask) { + code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false); + } + } else { + code = streamMetaAddTaskLaunchResultNoLock(pMeta, streamId, taskId, startTs, now, false); + if (hasFillhistoryTask) { + code = streamMetaAddTaskLaunchResultNoLock(pMeta, hId.streamId, hId.taskId, startTs, now, false); + } } } else { - streamMetaRUnLock(pMeta); + if (lock) { + streamMetaRUnLock(pMeta); + } stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", streamId, taskId, pMeta->vgId); @@ -554,9 +608,17 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta return code; } -void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) { +void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs, bool lock) { int32_t startTs = pTask->execInfo.checkTs; - int32_t code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false); + int32_t code = 0; + + if (lock) { + code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false); + } else { + code = streamMetaAddTaskLaunchResultNoLock(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, + false); + } + if (code) { stError("s-task:%s failed to add self task failed to start, code:%s", pTask->id.idStr, tstrerror(code)); } @@ -564,7 +626,13 @@ void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) { // automatically set the related fill-history task to be failed. if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; - code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false); + + if (lock) { + code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false); + } else { + code = streamMetaAddTaskLaunchResultNoLock(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false); + } + if (code) { stError("s-task:0x%" PRIx64 " failed to add self task failed to start, code:%s", pId->taskId, tstrerror(code)); } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 621be05e84..8543c0e39e 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -99,11 +99,12 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { } SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStreamTask* pStreamTask = pTask; SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); - stDebug("open stream state %p, %s", pState, path); + stDebug("s-task:%s open stream state %p, %s", pStreamTask->id.idStr, pState, path); if (pState == NULL) { code = terrno; @@ -117,7 +118,6 @@ SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, i QUERY_CHECK_CODE(code, lino, _end); } - SStreamTask* pStreamTask = pTask; pState->streamId = streamId; pState->taskId = taskId; TAOS_UNUSED(tsnprintf(pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr), "0x%" PRIx64 "-0x%x", @@ -133,8 +133,8 @@ SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, i pState->parNameMap = tSimpleHashInit(1024, hashFn); QUERY_CHECK_NULL(pState->parNameMap, code, lino, _end, terrno); - stInfo("open state %p on backend %p 0x%" PRIx64 "-%d succ", pState, pMeta->streamBackend, pState->streamId, - pState->taskId); + stInfo("s-task:%s open state %p on backend %p 0x%" PRIx64 "-%d succ", pStreamTask->id.idStr, pState, + pMeta->streamBackend, pState->streamId, pState->taskId); return pState; _end: diff --git a/source/libs/stream/src/streamUtil.c b/source/libs/stream/src/streamUtil.c index 11e291c876..82f70288a2 100644 --- a/source/libs/stream/src/streamUtil.c +++ b/source/libs/stream/src/streamUtil.c @@ -15,21 +15,21 @@ #include "streamInt.h" -void streamMutexLock(TdThreadMutex *pMutex) { +void streamMutexLock(TdThreadMutex* pMutex) { int32_t code = taosThreadMutexLock(pMutex); if (code) { stError("%p mutex lock failed, code:%s", pMutex, tstrerror(code)); } } -void streamMutexUnlock(TdThreadMutex *pMutex) { +void streamMutexUnlock(TdThreadMutex* pMutex) { int32_t code = taosThreadMutexUnlock(pMutex); if (code) { stError("%p mutex unlock failed, code:%s", pMutex, tstrerror(code)); } } -void streamMutexDestroy(TdThreadMutex *pMutex) { +void streamMutexDestroy(TdThreadMutex* pMutex) { int32_t code = taosThreadMutexDestroy(pMutex); if (code) { stError("%p mutex destroy, code:%s", pMutex, tstrerror(code)); @@ -37,7 +37,7 @@ void streamMutexDestroy(TdThreadMutex *pMutex) { } void streamMetaRLock(SStreamMeta* pMeta) { - // stTrace("vgId:%d meta-rlock", pMeta->vgId); +// stTrace("vgId:%d meta-rlock", pMeta->vgId); int32_t code = taosThreadRwlockRdlock(&pMeta->lock); if (code) { stError("vgId:%d meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code)); @@ -45,7 +45,7 @@ void streamMetaRLock(SStreamMeta* pMeta) { } void streamMetaRUnLock(SStreamMeta* pMeta) { - // stTrace("vgId:%d meta-runlock", pMeta->vgId); +// stTrace("vgId:%d meta-runlock", pMeta->vgId); int32_t code = taosThreadRwlockUnlock(&pMeta->lock); if (code != TSDB_CODE_SUCCESS) { stError("vgId:%d meta-runlock failed, code:%s", pMeta->vgId, tstrerror(code)); @@ -57,14 +57,16 @@ void streamMetaRUnLock(SStreamMeta* pMeta) { int32_t streamMetaTryRlock(SStreamMeta* pMeta) { int32_t code = taosThreadRwlockTryRdlock(&pMeta->lock); if (code) { - stError("vgId:%d try meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code)); + if (code != TAOS_SYSTEM_ERROR(EBUSY)) { + stError("vgId:%d try meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code)); + } } return code; } void streamMetaWLock(SStreamMeta* pMeta) { - // stTrace("vgId:%d meta-wlock", pMeta->vgId); +// stTrace("vgId:%d meta-wlock", pMeta->vgId); int32_t code = taosThreadRwlockWrlock(&pMeta->lock); if (code) { stError("vgId:%d failed to apply wlock, code:%s", pMeta->vgId, tstrerror(code)); @@ -72,7 +74,7 @@ void streamMetaWLock(SStreamMeta* pMeta) { } void streamMetaWUnLock(SStreamMeta* pMeta) { - // stTrace("vgId:%d meta-wunlock", pMeta->vgId); +// stTrace("vgId:%d meta-wunlock", pMeta->vgId); int32_t code = taosThreadRwlockUnlock(&pMeta->lock); if (code) { stError("vgId:%d failed to apply wunlock, code:%s", pMeta->vgId, tstrerror(code)); @@ -94,5 +96,5 @@ void streamSetFatalError(SStreamMeta* pMeta, int32_t code, const char* funcName, } int32_t streamGetFatalError(const SStreamMeta* pMeta) { - return atomic_load_32((volatile int32_t*) &pMeta->fatalInfo.code); + return atomic_load_32((volatile int32_t*)&pMeta->fatalInfo.code); } diff --git a/tests/system-test/1-insert/table_param_ttl.py b/tests/system-test/1-insert/table_param_ttl.py index 371be76b55..34e7e4f103 100644 --- a/tests/system-test/1-insert/table_param_ttl.py +++ b/tests/system-test/1-insert/table_param_ttl.py @@ -17,7 +17,7 @@ from util.sql import * from util.common import * class TDTestCase: - updatecfgDict = {'ttlUnit':5,'ttlPushInterval':3} + updatecfgDict = {'ttlUnit':5,'ttlPushInterval':3, 'mdebugflag':143} def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug("start to execute %s" % __file__)