Merge pull request #24477 from taosdata/fix/fixResumeBug

fix pause/resume error
This commit is contained in:
Haojun Liao 2024-01-16 14:06:20 +08:00 committed by GitHub
commit d1da8ef986
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 95 additions and 94 deletions

View File

@ -40,7 +40,7 @@ typedef struct SNodeEntry {
typedef struct SVgroupChangeInfo {
SHashObj *pDBMap;
SArray * pUpdateNodeList; // SArray<SNodeUpdateInfo>
SArray *pUpdateNodeList; // SArray<SNodeUpdateInfo>
} SVgroupChangeInfo;
static int32_t mndNodeCheckSentinel = 0;
@ -89,8 +89,8 @@ static void freeCheckpointCandEntry(void *);
static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
SSdbRaw * mndStreamSeqActionEncode(SStreamObj *pStream);
SSdbRow * mndStreamSeqActionDecode(SSdbRaw *pRaw);
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream);
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw);
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
@ -219,9 +219,9 @@ STREAM_ENCODE_OVER:
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow * pRow = NULL;
SSdbRow *pRow = NULL;
SStreamObj *pStream = NULL;
void * buf = NULL;
void *buf = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
@ -301,7 +301,7 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStream
}
SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName) {
SSdb * pSdb = pMnode->pSdb;
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = sdbAcquire(pSdb, SDB_STREAM, streamName);
if (pStream == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
@ -356,7 +356,7 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
}
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
SNode * pAst = NULL;
SNode *pAst = NULL;
SQueryPlan *pPlan = NULL;
mInfo("stream:%s to create", pCreate->name);
@ -595,7 +595,7 @@ int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
SStbObj *pStb = NULL;
SDbObj * pDb = NULL;
SDbObj *pDb = NULL;
SMCreateStbReq createReq = {0};
tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
@ -684,7 +684,7 @@ _OVER:
return -1;
}
static int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool* hasEpset, int32_t taskId, int32_t nodeId) {
static int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId) {
*hasEpset = false;
pEpSet->numOfEps = 0;
@ -773,7 +773,7 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream)
static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks
int32_t numOfStream = 0;
SStreamObj *pStream = NULL;
void * pIter = NULL;
void *pIter = NULL;
while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
if (pStream->sourceDbUid == pStreamObj->sourceDbUid) {
@ -804,7 +804,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
SStreamObj streamObj = {0};
char * sql = NULL;
char *sql = NULL;
int32_t sqlLen = 0;
terrno = TSDB_CODE_SUCCESS;
@ -931,7 +931,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
}
_OVER:
if (terrno != TSDB_CODE_SUCCESS && terrno != TSDB_CODE_ACTION_IN_PROGRESS) {
if (terrno != TSDB_CODE_SUCCESS && terrno != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
}
@ -947,8 +947,8 @@ _OVER:
int64_t mndStreamGenChkpId(SMnode *pMnode) {
SStreamObj *pStream = NULL;
void * pIter = NULL;
SSdb * pSdb = pMnode->pSdb;
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
int64_t maxChkpId = 0;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -966,7 +966,7 @@ int64_t mndStreamGenChkpId(SMnode *pMnode) {
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
SSdb *pSdb = pMnode->pSdb;
if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
return 0;
}
@ -982,7 +982,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
static int32_t mndProcessStreamRemainChkptTmr(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
SSdb *pSdb = pMnode->pSdb;
if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
return 0;
}
@ -1023,7 +1023,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
return -1;
}
void * abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
tEncodeStreamCheckpointSourceReq(&encoder, &req);
@ -1077,7 +1077,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
// 1. redo action: broadcast checkpoint source msg for all source vg
int32_t totLevel = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < totLevel; i++) {
SArray * pLevel = taosArrayGetP(pStream->tasks, i);
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *p = taosArrayGetP(pLevel, 0);
if (p->info.taskLevel == TASK_LEVEL__SOURCE) {
@ -1091,7 +1091,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
goto _ERR;
}
void * buf;
void *buf;
int32_t tlen;
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
pTask->id.taskId, pTrans->id) < 0) {
@ -1143,7 +1143,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
int32_t totLevel = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < totLevel; i++) {
SArray * pLevel = taosArrayGetP(pStream->tasks, i);
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *pTask = taosArrayGetP(pLevel, 0);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
@ -1160,7 +1160,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
return -1;
}
void * buf;
void *buf;
int32_t tlen;
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, chkptId, pTask->id.streamId,
pTask->id.taskId, pTrans->id) < 0) {
@ -1279,7 +1279,7 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) {
}
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId * p = taosArrayGet(execInfo.pTaskList, i);
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
if (pEntry == NULL) {
continue;
@ -1298,9 +1298,9 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) {
}
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
void * pIter = NULL;
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SStreamObj *pStream = NULL;
int32_t code = 0;
@ -1322,7 +1322,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
void * pIter = NULL;
void *pIter = NULL;
int32_t code = 0;
taosThreadMutexLock(&execInfo.lock);
@ -1368,7 +1368,7 @@ static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq) {
}
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
SMnode * pMnode = pReq->info.node;
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
SMDropStreamReq dropReq = {0};
@ -1525,7 +1525,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
}
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
SSdb * pSdb = pMnode->pSdb;
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
@ -1533,7 +1533,7 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams)
}
int32_t numOfStreams = 0;
void * pIter = NULL;
void *pIter = NULL;
while (1) {
SStreamObj *pStream = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -1552,8 +1552,8 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams)
}
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SStreamObj *pStream = NULL;
@ -1728,7 +1728,7 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
// output queue
// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
// STR_TO_VARSTR(vbuf, buf);
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -1764,8 +1764,8 @@ static int32_t getNumOfTasks(SArray *pTaskList) {
}
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SStreamObj *pStream = NULL;
@ -1790,7 +1790,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
int32_t numOfLevels = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfLevels; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
int32_t code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows);
int32_t code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows);
if (code == TSDB_CODE_SUCCESS) {
numOfRows++;
}
@ -1824,7 +1824,7 @@ static int32_t mndPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *p
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SEpSet epset = {0};
SEpSet epset = {0};
mDebug("pause node:%d, epset:%d", pTask->info.nodeId, epset.numOfEps);
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
@ -1871,12 +1871,14 @@ int32_t mndPauseAllStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStre
return 0;
}
static int32_t mndPersistStreamLog(STrans *pTrans, const SStreamObj *pStream, int8_t status) {
SStreamObj streamObj = {0};
memcpy(streamObj.name, pStream->name, TSDB_STREAM_FNAME_LEN);
streamObj.status = status;
static int32_t mndPersistStreamLog(STrans *pTrans, SStreamObj *pStream, int8_t status) {
// SStreamObj streamObj = {0};
// memcpy(streamObj.name, pStream->name, TSDB_STREAM_FNAME_LEN);
taosWLockLatch(&pStream->lock);
pStream->status = status;
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
SSdbRaw *pCommitRaw = mndStreamActionEncode(&streamObj);
taosWUnLockLatch(&pStream->lock);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
@ -1887,7 +1889,7 @@ static int32_t mndPersistStreamLog(STrans *pTrans, const SStreamObj *pStream, in
}
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
SMnode * pMnode = pReq->info.node;
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
SMPauseStreamReq pauseReq = {0};
@ -2029,7 +2031,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj *pStr
}
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
SMnode * pMnode = pReq->info.node;
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
SMResumeStreamReq pauseReq = {0};
@ -2147,7 +2149,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
return -1;
}
void * abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
tEncodeStreamTaskUpdateMsg(&encoder, &req);
@ -2214,7 +2216,7 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
void * pBuf = NULL;
void *pBuf = NULL;
int32_t len = 0;
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
@ -2293,8 +2295,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
}
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
SSdb * pSdb = pMnode->pSdb;
void * pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SVgObj *pVgroup = NULL;
*allReady = true;
@ -2362,7 +2364,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
STrans *pTrans = NULL;
// conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool
while(1) {
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
@ -2378,7 +2380,6 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
}
}
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
@ -2440,9 +2441,9 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
}
static SArray *extractNodeListFromStream(SMnode *pMnode) {
SSdb * pSdb = pMnode->pSdb;
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void * pIter = NULL;
void *pIter = NULL;
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
while (1) {
@ -2489,9 +2490,9 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
}
static void doExtractTasksFromStream(SMnode *pMnode) {
SSdb * pSdb = pMnode->pSdb;
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void * pIter = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -2543,7 +2544,7 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
for (int32_t i = 0; i < numOfTask; ++i) {
STaskId * pId = taosArrayGet(execInfo.pTaskList, i);
STaskId *pId = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
if (pEntry->nodeId == SNODE_HANDLE) continue;
@ -2591,7 +2592,7 @@ static void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInf
char *pDb = (char *)pIter;
size_t len = 0;
void * pKey = taosHashGetKey(pDb, &len);
void *pKey = taosHashGetKey(pDb, &len);
doKillCheckpointTrans(pMnode, pKey, len);
}
}
@ -2671,7 +2672,7 @@ typedef struct SMStreamNodeCheckMsg {
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
SSdb *pSdb = pMnode->pSdb;
if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
return 0;
}
@ -2695,7 +2696,7 @@ void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void * p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p == NULL) {
STaskStatusEntry entry = {0};
streamTaskStatusInit(&entry, pTask);
@ -2719,7 +2720,7 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void * p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p != NULL) {
taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id));
@ -2792,8 +2793,8 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SEpSet epset = {0};
bool hasEpset = false;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq);
@ -2837,7 +2838,7 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
void killTransImpl(SMnode* pMnode, int32_t transId, const char* pDbName) {
void killTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) {
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) {
mInfo("kill active transId:%d in Db:%s", transId, pDbName);
@ -2859,7 +2860,7 @@ int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
return TSDB_CODE_SUCCESS;
}
char* pDupDBName = strndup(pDBName, len);
char *pDupDBName = strndup(pDBName, len);
killTransImpl(pMnode, pTransInfo->transId, pDupDBName);
taosMemoryFree(pDupDBName);
@ -2914,38 +2915,38 @@ static SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) {
return NULL;
}
//static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) {
// if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) {
// if (!pTaskEntry->inputQChanging && pTaskEntry->inputQUnchangeCounter > 10) {
// int32_t numOfReady = 0;
// int32_t numOfTotal = 0;
// for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
// STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
// if (pTaskEntry->id.streamId == pId->streamId) {
// numOfTotal++;
// static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) {
// if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) {
// if (!pTaskEntry->inputQChanging && pTaskEntry->inputQUnchangeCounter > 10) {
// int32_t numOfReady = 0;
// int32_t numOfTotal = 0;
// for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
// STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
// if (pTaskEntry->id.streamId == pId->streamId) {
// numOfTotal++;
//
// if (pTaskEntry->id.taskId != pId->taskId) {
// STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
// if (pEntry->status == TASK_STATUS__READY) {
// numOfReady++;
// }
// }
// }
// }
// if (pTaskEntry->id.taskId != pId->taskId) {
// STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
// if (pEntry->status == TASK_STATUS__READY) {
// numOfReady++;
// }
// }
// }
// }
//
// if (numOfReady > 0) {
// mDebug("stream:0x%" PRIx64
// " %d tasks are ready, %d tasks in stream-scan-history for more than 50s, drop related fill-history task",
// pTaskEntry->id.streamId, numOfReady, numOfTotal - numOfReady);
// return true;
// } else {
// return false;
// }
// }
// }
// if (numOfReady > 0) {
// mDebug("stream:0x%" PRIx64
// " %d tasks are ready, %d tasks in stream-scan-history for more than 50s, drop related fill-history
// task", pTaskEntry->id.streamId, numOfReady, numOfTotal - numOfReady);
// return true;
// } else {
// return false;
// }
// }
// }
//
// return false;
//}
// return false;
// }
// currently only handle the sink task
// 1. sink task, drop related fill-history task msg is missing
@ -3028,7 +3029,7 @@ static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
}
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode * pMnode = pReq->info.node;
SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0};
bool checkpointFailed = false;
@ -3148,8 +3149,8 @@ void freeCheckpointCandEntry(void *param) {
taosMemoryFreeClear(pEntry->pName);
}
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) {
void * pIter = NULL;
SSdb * pSdb = pMnode->pSdb;
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {