diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 1c255cb7df..a9e2277264 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -40,7 +40,7 @@ typedef struct SNodeEntry { typedef struct SVgroupChangeInfo { SHashObj *pDBMap; - SArray * pUpdateNodeList; // SArray + SArray *pUpdateNodeList; // SArray } SVgroupChangeInfo; static int32_t mndNodeCheckSentinel = 0; @@ -89,8 +89,8 @@ static void freeCheckpointCandEntry(void *); static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); -SSdbRaw * mndStreamSeqActionEncode(SStreamObj *pStream); -SSdbRow * mndStreamSeqActionDecode(SSdbRaw *pRaw); +SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream); +SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw); static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream); static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream); static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream); @@ -219,9 +219,9 @@ STREAM_ENCODE_OVER: SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; - SSdbRow * pRow = NULL; + SSdbRow *pRow = NULL; SStreamObj *pStream = NULL; - void * buf = NULL; + void *buf = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) { @@ -301,7 +301,7 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStream } SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName) { - SSdb * pSdb = pMnode->pSdb; + SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = sdbAcquire(pSdb, SDB_STREAM, streamName); if (pStream == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; @@ -356,7 +356,7 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { } static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) { - SNode * pAst = NULL; + SNode *pAst = NULL; SQueryPlan *pPlan = NULL; mInfo("stream:%s to create", pCreate->name); @@ -595,7 +595,7 @@ int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) { SStbObj *pStb = NULL; - SDbObj * pDb = NULL; + SDbObj *pDb = NULL; SMCreateStbReq createReq = {0}; tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); @@ -684,7 +684,7 @@ _OVER: return -1; } -static int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool* hasEpset, int32_t taskId, int32_t nodeId) { +static int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId) { *hasEpset = false; pEpSet->numOfEps = 0; @@ -773,7 +773,7 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks int32_t numOfStream = 0; SStreamObj *pStream = NULL; - void * pIter = NULL; + void *pIter = NULL; while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { if (pStream->sourceDbUid == pStreamObj->sourceDbUid) { @@ -804,7 +804,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; SStreamObj streamObj = {0}; - char * sql = NULL; + char *sql = NULL; int32_t sqlLen = 0; terrno = TSDB_CODE_SUCCESS; @@ -931,7 +931,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } _OVER: - if (terrno != TSDB_CODE_SUCCESS && terrno != TSDB_CODE_ACTION_IN_PROGRESS) { + if (terrno != TSDB_CODE_SUCCESS && terrno != TSDB_CODE_ACTION_IN_PROGRESS) { mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); } @@ -947,8 +947,8 @@ _OVER: int64_t mndStreamGenChkpId(SMnode *pMnode) { SStreamObj *pStream = NULL; - void * pIter = NULL; - SSdb * pSdb = pMnode->pSdb; + void *pIter = NULL; + SSdb *pSdb = pMnode->pSdb; int64_t maxChkpId = 0; while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); @@ -966,7 +966,7 @@ int64_t mndStreamGenChkpId(SMnode *pMnode) { static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; - SSdb * pSdb = pMnode->pSdb; + SSdb *pSdb = pMnode->pSdb; if (sdbGetSize(pSdb, SDB_STREAM) <= 0) { return 0; } @@ -982,7 +982,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { static int32_t mndProcessStreamRemainChkptTmr(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; - SSdb * pSdb = pMnode->pSdb; + SSdb *pSdb = pMnode->pSdb; if (sdbGetSize(pSdb, SDB_STREAM) <= 0) { return 0; } @@ -1023,7 +1023,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in return -1; } - void * abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); tEncodeStreamCheckpointSourceReq(&encoder, &req); @@ -1077,7 +1077,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre // 1. redo action: broadcast checkpoint source msg for all source vg int32_t totLevel = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < totLevel; i++) { - SArray * pLevel = taosArrayGetP(pStream->tasks, i); + SArray *pLevel = taosArrayGetP(pStream->tasks, i); SStreamTask *p = taosArrayGetP(pLevel, 0); if (p->info.taskLevel == TASK_LEVEL__SOURCE) { @@ -1091,7 +1091,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre goto _ERR; } - void * buf; + void *buf; int32_t tlen; if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, pTask->id.taskId, pTrans->id) < 0) { @@ -1143,7 +1143,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream int32_t totLevel = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < totLevel; i++) { - SArray * pLevel = taosArrayGetP(pStream->tasks, i); + SArray *pLevel = taosArrayGetP(pStream->tasks, i); SStreamTask *pTask = taosArrayGetP(pLevel, 0); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { @@ -1160,7 +1160,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream return -1; } - void * buf; + void *buf; int32_t tlen; if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, chkptId, pTask->id.streamId, pTask->id.taskId, pTrans->id) < 0) { @@ -1279,7 +1279,7 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) { } for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { - STaskId * p = taosArrayGet(execInfo.pTaskList, i); + STaskId *p = taosArrayGet(execInfo.pTaskList, i); STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); if (pEntry == NULL) { continue; @@ -1298,9 +1298,9 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) { } static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { - SMnode * pMnode = pReq->info.node; - SSdb * pSdb = pMnode->pSdb; - void * pIter = NULL; + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; SStreamObj *pStream = NULL; int32_t code = 0; @@ -1322,7 +1322,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; - void * pIter = NULL; + void *pIter = NULL; int32_t code = 0; taosThreadMutexLock(&execInfo.lock); @@ -1368,7 +1368,7 @@ static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq) { } static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { - SMnode * pMnode = pReq->info.node; + SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; SMDropStreamReq dropReq = {0}; @@ -1525,7 +1525,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { } int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) { - SSdb * pSdb = pMnode->pSdb; + SSdb *pSdb = pMnode->pSdb; SDbObj *pDb = mndAcquireDb(pMnode, dbName); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; @@ -1533,7 +1533,7 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) } int32_t numOfStreams = 0; - void * pIter = NULL; + void *pIter = NULL; while (1) { SStreamObj *pStream = NULL; pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); @@ -1552,8 +1552,8 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) } static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { - SMnode * pMnode = pReq->info.node; - SSdb * pSdb = pMnode->pSdb; + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SStreamObj *pStream = NULL; @@ -1728,7 +1728,7 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); // output queue -// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); + // sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); // STR_TO_VARSTR(vbuf, buf); // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -1764,8 +1764,8 @@ static int32_t getNumOfTasks(SArray *pTaskList) { } static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { - SMnode * pMnode = pReq->info.node; - SSdb * pSdb = pMnode->pSdb; + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SStreamObj *pStream = NULL; @@ -1790,7 +1790,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock int32_t numOfLevels = taosArrayGetSize(pLevel); for (int32_t j = 0; j < numOfLevels; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); - int32_t code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows); + int32_t code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows); if (code == TSDB_CODE_SUCCESS) { numOfRows++; } @@ -1824,7 +1824,7 @@ static int32_t mndPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *p pReq->taskId = pTask->id.taskId; pReq->streamId = pTask->id.streamId; - SEpSet epset = {0}; + SEpSet epset = {0}; mDebug("pause node:%d, epset:%d", pTask->info.nodeId, epset.numOfEps); bool hasEpset = false; int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); @@ -1871,12 +1871,14 @@ int32_t mndPauseAllStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStre return 0; } -static int32_t mndPersistStreamLog(STrans *pTrans, const SStreamObj *pStream, int8_t status) { - SStreamObj streamObj = {0}; - memcpy(streamObj.name, pStream->name, TSDB_STREAM_FNAME_LEN); - streamObj.status = status; +static int32_t mndPersistStreamLog(STrans *pTrans, SStreamObj *pStream, int8_t status) { + // SStreamObj streamObj = {0}; + // memcpy(streamObj.name, pStream->name, TSDB_STREAM_FNAME_LEN); + taosWLockLatch(&pStream->lock); + pStream->status = status; + SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); - SSdbRaw *pCommitRaw = mndStreamActionEncode(&streamObj); + taosWUnLockLatch(&pStream->lock); if (pCommitRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); @@ -1887,7 +1889,7 @@ static int32_t mndPersistStreamLog(STrans *pTrans, const SStreamObj *pStream, in } static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { - SMnode * pMnode = pReq->info.node; + SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; SMPauseStreamReq pauseReq = {0}; @@ -2029,7 +2031,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj *pStr } static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { - SMnode * pMnode = pReq->info.node; + SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; SMResumeStreamReq pauseReq = {0}; @@ -2147,7 +2149,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha return -1; } - void * abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); tEncodeStreamTaskUpdateMsg(&encoder, &req); @@ -2214,7 +2216,7 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p for (int32_t k = 0; k < numOfTasks; ++k) { SStreamTask *pTask = taosArrayGetP(pLevel, k); - void * pBuf = NULL; + void *pBuf = NULL; int32_t len = 0; streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); @@ -2293,8 +2295,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP } static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { - SSdb * pSdb = pMnode->pSdb; - void * pIter = NULL; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; SVgObj *pVgroup = NULL; *allReady = true; @@ -2362,7 +2364,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange STrans *pTrans = NULL; // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool - while(1) { + while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) { break; @@ -2378,7 +2380,6 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange } } - while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) { @@ -2440,9 +2441,9 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange } static SArray *extractNodeListFromStream(SMnode *pMnode) { - SSdb * pSdb = pMnode->pSdb; + SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; - void * pIter = NULL; + void *pIter = NULL; SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); while (1) { @@ -2489,9 +2490,9 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) { } static void doExtractTasksFromStream(SMnode *pMnode) { - SSdb * pSdb = pMnode->pSdb; + SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; - void * pIter = NULL; + void *pIter = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); @@ -2543,7 +2544,7 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList); for (int32_t i = 0; i < numOfTask; ++i) { - STaskId * pId = taosArrayGet(execInfo.pTaskList, i); + STaskId *pId = taosArrayGet(execInfo.pTaskList, i); STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); if (pEntry->nodeId == SNODE_HANDLE) continue; @@ -2591,7 +2592,7 @@ static void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInf char *pDb = (char *)pIter; size_t len = 0; - void * pKey = taosHashGetKey(pDb, &len); + void *pKey = taosHashGetKey(pDb, &len); doKillCheckpointTrans(pMnode, pKey, len); } } @@ -2671,7 +2672,7 @@ typedef struct SMStreamNodeCheckMsg { static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; - SSdb * pSdb = pMnode->pSdb; + SSdb *pSdb = pMnode->pSdb; if (sdbGetSize(pSdb, SDB_STREAM) <= 0) { return 0; } @@ -2695,7 +2696,7 @@ void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) { SStreamTask *pTask = taosArrayGetP(pLevel, j); STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - void * p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); + void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); if (p == NULL) { STaskStatusEntry entry = {0}; streamTaskStatusInit(&entry, pTask); @@ -2719,7 +2720,7 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { SStreamTask *pTask = taosArrayGetP(pLevel, j); STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - void * p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); + void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); if (p != NULL) { taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id)); @@ -2792,8 +2793,8 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { pReq->taskId = pTask->id.taskId; pReq->streamId = pTask->id.streamId; - SEpSet epset = {0}; - bool hasEpset = false; + SEpSet epset = {0}; + bool hasEpset = false; int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pReq); @@ -2837,7 +2838,7 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { return TSDB_CODE_ACTION_IN_PROGRESS; } -void killTransImpl(SMnode* pMnode, int32_t transId, const char* pDbName) { +void killTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) { STrans *pTrans = mndAcquireTrans(pMnode, transId); if (pTrans != NULL) { mInfo("kill active transId:%d in Db:%s", transId, pDbName); @@ -2859,7 +2860,7 @@ int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) { return TSDB_CODE_SUCCESS; } - char* pDupDBName = strndup(pDBName, len); + char *pDupDBName = strndup(pDBName, len); killTransImpl(pMnode, pTransInfo->transId, pDupDBName); taosMemoryFree(pDupDBName); @@ -2914,38 +2915,38 @@ static SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) { return NULL; } -//static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) { -// if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) { -// if (!pTaskEntry->inputQChanging && pTaskEntry->inputQUnchangeCounter > 10) { -// int32_t numOfReady = 0; -// int32_t numOfTotal = 0; -// for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { -// STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); -// if (pTaskEntry->id.streamId == pId->streamId) { -// numOfTotal++; +// static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) { +// if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) { +// if (!pTaskEntry->inputQChanging && pTaskEntry->inputQUnchangeCounter > 10) { +// int32_t numOfReady = 0; +// int32_t numOfTotal = 0; +// for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { +// STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); +// if (pTaskEntry->id.streamId == pId->streamId) { +// numOfTotal++; // -// if (pTaskEntry->id.taskId != pId->taskId) { -// STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); -// if (pEntry->status == TASK_STATUS__READY) { -// numOfReady++; -// } -// } -// } -// } +// if (pTaskEntry->id.taskId != pId->taskId) { +// STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); +// if (pEntry->status == TASK_STATUS__READY) { +// numOfReady++; +// } +// } +// } +// } // -// if (numOfReady > 0) { -// mDebug("stream:0x%" PRIx64 -// " %d tasks are ready, %d tasks in stream-scan-history for more than 50s, drop related fill-history task", -// pTaskEntry->id.streamId, numOfReady, numOfTotal - numOfReady); -// return true; -// } else { -// return false; -// } -// } -// } +// if (numOfReady > 0) { +// mDebug("stream:0x%" PRIx64 +// " %d tasks are ready, %d tasks in stream-scan-history for more than 50s, drop related fill-history +// task", pTaskEntry->id.streamId, numOfReady, numOfTotal - numOfReady); +// return true; +// } else { +// return false; +// } +// } +// } // -// return false; -//} +// return false; +// } // currently only handle the sink task // 1. sink task, drop related fill-history task msg is missing @@ -3028,7 +3029,7 @@ static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { } int32_t mndProcessStreamHb(SRpcMsg *pReq) { - SMnode * pMnode = pReq->info.node; + SMnode *pMnode = pReq->info.node; SStreamHbMsg req = {0}; bool checkpointFailed = false; @@ -3148,8 +3149,8 @@ void freeCheckpointCandEntry(void *param) { taosMemoryFreeClear(pEntry->pName); } SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) { - void * pIter = NULL; - SSdb * pSdb = pMnode->pSdb; + void *pIter = NULL; + SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {