diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 78fd9bed5d..48c15e9117 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -50,7 +50,6 @@ enum { TASK_STATUS__RECOVER_PREPARE, TASK_STATUS__RECOVER1, TASK_STATUS__RECOVER2, - TASK_STATUS__RESTORE, // only available for source task to replay WAL from the checkpoint }; enum { @@ -346,7 +345,7 @@ typedef struct SStreamMeta { FTaskExpand* expandFunc; int32_t vgId; SRWLatch lock; - int32_t walScan; + int32_t walScanCounter; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); @@ -545,8 +544,9 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); // recover and fill history int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version); int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version); -int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq); +int32_t streamTaskCheckStatus(SStreamTask* pTask); int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version); + // common int32_t streamSetParamForRecover(SStreamTask* pTask); int32_t streamRestoreParam(SStreamTask* pTask); diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index aa38b94fd7..96401511d2 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -23,13 +23,12 @@ extern "C" { #endif enum { - MQ_CONSUMER_STATUS__MODIFY = 1, + MQ_CONSUMER_STATUS_REBALANCE = 1, // MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore MQ_CONSUMER_STATUS__READY, MQ_CONSUMER_STATUS__LOST, // MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore MQ_CONSUMER_STATUS__LOST_REBD, - MQ_CONSUMER_STATUS__REMOVED, }; int32_t mndInitConsumer(SMnode *pMnode); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index fcd314d2ae..2579ff5231 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -142,7 +142,7 @@ typedef enum { CONSUMER_UPDATE__REMOVE, CONSUMER_UPDATE__LOST, CONSUMER_UPDATE__RECOVER, - CONSUMER_UPDATE__MODIFY, // subscribe req need change consume topic + CONSUMER_UPDATE__REBALANCE, // subscribe req need change consume topic } ECsmUpdateType; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index ca71e17d7e..d144747e81 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -192,15 +192,18 @@ FAIL: return -1; } +// todo check the clear process static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SMqConsumerClearMsg *pClearMsg = pMsg->pCont; - SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId); + + SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId); if (pConsumer == NULL) { + mError("consumer:0x%"PRIx64" failed to be found to clear it", pClearMsg->consumerId); return 0; } - mInfo("receive consumer clear msg, consumer id %" PRId64 ", status %s", pClearMsg->consumerId, + mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId, mndConsumerStatusName(pConsumer->status)); if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) { @@ -215,6 +218,8 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm"); if (pTrans == NULL) goto FAIL; + + // this is the drop action, not the update action if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; @@ -299,28 +304,36 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { if (status == MQ_CONSUMER_STATUS__READY) { if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg)); + if (pLostMsg == NULL) { + mError("consumer:0x%"PRIx64" failed to transfer consumer status to lost due to out of memory. alloc size:%d", + pConsumer->consumerId, (int32_t)sizeof(SMqConsumerLostMsg)); + continue; + } pLostMsg->consumerId = pConsumer->consumerId; SRpcMsg rpcMsg = { - .msgType = TDMT_MND_TMQ_CONSUMER_LOST, - .pCont = pLostMsg, - .contLen = sizeof(SMqConsumerLostMsg), - }; + .msgType = TDMT_MND_TMQ_CONSUMER_LOST, .pCont = pLostMsg, .contLen = sizeof(SMqConsumerLostMsg)}; + mDebug("consumer:0x%"PRIx64" hb not received beyond threshold %d, set to lost", pConsumer->consumerId, + MND_CONSUMER_LOST_HB_CNT); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } } else if (status == MQ_CONSUMER_STATUS__LOST_REBD) { // if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers. if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg)); + if (pClearMsg == NULL) { + mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d", + pConsumer->consumerId, (int32_t)sizeof(SMqConsumerClearMsg)); + continue; + } pClearMsg->consumerId = pConsumer->consumerId; SRpcMsg rpcMsg = { - .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, - .pCont = pClearMsg, - .contLen = sizeof(SMqConsumerClearMsg), - }; + .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)}; + mDebug("consumer:0x%" PRIx64 " lost beyond threshold %d, clear it", pConsumer->consumerId, + MND_CONSUMER_LOST_CLEAR_THRESHOLD); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } } else if (status == MQ_CONSUMER_STATUS__LOST) { @@ -334,7 +347,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); } taosRUnLockLatch(&pConsumer->lock); - } else { + } else { // MQ_CONSUMER_STATUS_REBALANCE taosRLockLatch(&pConsumer->lock); int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics); @@ -658,7 +671,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId)); // set the update type - pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; + pConsumerNew->updateType = CONSUMER_UPDATE__REBALANCE; taosArrayDestroy(pConsumerNew->assignedTopics); pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); @@ -671,7 +684,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { if (mndTransPrepare(pMnode, pTrans) != 0) goto _over; } else { - /*taosRLockLatch(&pExistedConsumer->lock);*/ int32_t status = atomic_load_32(&pExistedConsumer->status); mInfo("receive subscribe request from existed consumer:0x%" PRIx64 @@ -689,7 +701,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { } // set the update type - pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; + pConsumerNew->updateType = CONSUMER_UPDATE__REBALANCE; taosArrayDestroy(pConsumerNew->assignedTopics); pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); @@ -868,9 +880,10 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) { int32_t status = pConsumer->status; if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) { - if (status == MQ_CONSUMER_STATUS__MODIFY) { + if (status == MQ_CONSUMER_STATUS_REBALANCE) { pConsumer->status = MQ_CONSUMER_STATUS__READY; } else if (status == MQ_CONSUMER_STATUS__LOST) { + ASSERT(taosArrayGetSize(pConsumer->currentTopics) == 0); pConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD; } } @@ -879,7 +892,7 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) { // remove from new topic static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { int32_t size = taosArrayGetSize(pConsumer->rebNewTopics); - for (int32_t i = 0; i < taosArrayGetSize(pConsumer->rebNewTopics); i++) { + for (int32_t i = 0; i < size; i++) { char *p = taosArrayGetP(pConsumer->rebNewTopics, i); if (strcmp(pTopic, p) == 0) { taosArrayRemove(pConsumer->rebNewTopics, i); @@ -900,32 +913,57 @@ static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTo if (strcmp(pTopic, p) == 0) { taosArrayRemove(pConsumer->rebRemovedTopics, i); taosMemoryFree(p); + + mDebug("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d", + pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebRemovedTopics)); break; } } } +static void removeFromCurrentTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { + int32_t sz = taosArrayGetSize(pConsumer->currentTopics); + for (int32_t i = 0; i < sz; i++) { + char *topic = taosArrayGetP(pConsumer->currentTopics, i); + if (strcmp(pTopic, topic) == 0) { + taosArrayRemove(pConsumer->currentTopics, i); + taosMemoryFree(topic); + + mDebug("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d", + pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->currentTopics)); + break; + } + } +} + +static bool existInCurrentTopicList(const SMqConsumerObj* pConsumer, const char* pTopic) { + bool existing = false; + int32_t size = taosArrayGetSize(pConsumer->currentTopics); + for (int32_t i = 0; i < size; i++) { + char *topic = taosArrayGetP(pConsumer->currentTopics, i); + + if (strcmp(topic, pTopic) == 0) { + existing = true; + break; + } + } + + return existing; +} + static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) { mDebug("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", uptime:%" PRId64, pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->upTime); taosWLockLatch(&pOldConsumer->lock); - if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) { - SArray *tmp = pOldConsumer->rebNewTopics; - pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics; - pNewConsumer->rebNewTopics = tmp; - - tmp = pOldConsumer->rebRemovedTopics; - pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics; - pNewConsumer->rebRemovedTopics = tmp; - - tmp = pOldConsumer->assignedTopics; - pOldConsumer->assignedTopics = pNewConsumer->assignedTopics; - pNewConsumer->assignedTopics = tmp; + if (pNewConsumer->updateType == CONSUMER_UPDATE__REBALANCE) { + TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics); + TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics); + TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics); pOldConsumer->subscribeTime = pNewConsumer->upTime; - pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY; + pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) { int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics); for (int32_t i = 0; i < sz; i++) { @@ -935,10 +973,10 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, pOldConsumer->rebalanceTime = pNewConsumer->upTime; - int32_t status = pOldConsumer->status; + int32_t prevStatus = pOldConsumer->status; pOldConsumer->status = MQ_CONSUMER_STATUS__LOST; mDebug("consumer:0x%" PRIx64 " state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d", - pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status), + pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status), pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); } else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) { int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); @@ -948,8 +986,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, } pOldConsumer->rebalanceTime = pNewConsumer->upTime; - - pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY; + pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; } else if (pNewConsumer->updateType == CONSUMER_UPDATE__TOUCH) { atomic_add_fetch_32(&pOldConsumer->epoch, 1); @@ -958,24 +995,16 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) { char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0)); - // not exist in current topic - bool existing = false; - int32_t numOfExistedTopics = taosArrayGetSize(pOldConsumer->currentTopics); - for (int32_t i = 0; i < numOfExistedTopics; i++) { - char *topic = taosArrayGetP(pOldConsumer->currentTopics, i); - if (strcmp(topic, pNewTopic) == 0) { - existing = true; - } - } - + // check if exist in current topic removeFromNewTopicList(pOldConsumer, pNewTopic); // add to current topic - if (!existing) { + bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic); + if (existing) { + taosMemoryFree(pNewTopic); + } else { // added into current topic list taosArrayPush(pOldConsumer->currentTopics, &pNewTopic); taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString); - } else { - taosMemoryFree(pNewTopic); } // set status @@ -1000,16 +1029,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, removeFromRemoveTopicList(pOldConsumer, removedTopic); // remove from current topic - int32_t i = 0; - int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics); - for (i = 0; i < sz; i++) { - char *topic = taosArrayGetP(pOldConsumer->currentTopics, i); - if (strcmp(removedTopic, topic) == 0) { - taosArrayRemove(pOldConsumer->currentTopics, i); - taosMemoryFree(topic); - break; - } - } + removeFromCurrentTopicList(pOldConsumer, removedTopic); // set status int32_t status = pOldConsumer->status; @@ -1158,7 +1178,7 @@ static const char *mndConsumerStatusName(int status) { case MQ_CONSUMER_STATUS__LOST: case MQ_CONSUMER_STATUS__LOST_REBD: return "lost"; - case MQ_CONSUMER_STATUS__MODIFY: + case MQ_CONSUMER_STATUS_REBALANCE: return "rebalancing"; default: return "unknown"; diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index c69f08eb6b..6dab018236 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -225,7 +225,7 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_L memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN); pConsumer->epoch = 0; - pConsumer->status = MQ_CONSUMER_STATUS__MODIFY; + pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; pConsumer->hbStatus = 0; taosInitRWLatch(&pConsumer->lock); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 015c497de1..573c60549e 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -213,13 +213,9 @@ static void doRemoveExistedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); for (int32_t j = 0; j < consumerVgNum; j++) { - SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); - SMqRebOutputVg outputVg = { - .oldConsumerId = consumerId, - .newConsumerId = -1, - .pVgEp = pVgEp, - }; + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); + SMqRebOutputVg outputVg = {.oldConsumerId = consumerId, .newConsumerId = -1, .pVgEp = pVgEp}; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, pSubKey, pVgEp->vgId, consumerId); } @@ -484,14 +480,16 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu for (int32_t i = 0; i < vgNum; i++) { SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i); if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) { - goto REB_FAIL; + mndTransDrop(pTrans); + return -1; } } // 2. redo log: subscribe and vg assignment // subscribe if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) { - goto REB_FAIL; + mndTransDrop(pTrans); + return -1; } // 3. commit log: consumer to update status and epoch @@ -506,11 +504,15 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); - goto REB_FAIL; + + mndTransDrop(pTrans); + return -1; } + tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); } + // 3.2 set new consumer consumerNum = taosArrayGetSize(pOutput->newConsumers); for (int32_t i = 0; i < consumerNum; i++) { @@ -527,8 +529,11 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); - goto REB_FAIL; + + mndTransDrop(pTrans); + return -1; } + tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); } @@ -549,8 +554,11 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); - goto REB_FAIL; + + mndTransDrop(pTrans); + return -1; } + tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); } @@ -563,15 +571,12 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu // 6. execution if (mndTransPrepare(pMnode, pTrans) != 0) { mError("failed to prepare trans rebalance since %s", terrstr()); - goto REB_FAIL; + mndTransDrop(pTrans); + return -1; } mndTransDrop(pTrans); return 0; - -REB_FAIL: - mndTransDrop(pTrans); - return -1; } static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { @@ -584,16 +589,11 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { // here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction. while (1) { -// if (rebalanceOnce) { -// break; -// } - pIter = taosHashIterate(pReq->rebSubHash, pIter); if (pIter == NULL) { break; } - // todo handle the malloc failure SMqRebInputObj rebInput = {0}; SMqRebOutputObj rebOutput = {0}; rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t)); @@ -601,6 +601,20 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { rebOutput.modifyConsumers = taosArrayInit(0, sizeof(int64_t)); rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg)); + if (rebOutput.newConsumers == NULL || rebOutput.removedConsumers == NULL || rebOutput.modifyConsumers == NULL || + rebOutput.rebVgs == NULL) { + taosArrayDestroy(rebOutput.newConsumers); + taosArrayDestroy(rebOutput.removedConsumers); + taosArrayDestroy(rebOutput.modifyConsumers); + taosArrayDestroy(rebOutput.rebVgs); + + terrno = TSDB_CODE_OUT_OF_MEMORY; + mInfo("mq re-balance failed, due to out of memory"); + taosHashCleanup(pReq->rebSubHash); + mndRebEnd(); + return -1; + } + SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter; SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key); @@ -640,6 +654,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash); rebOutput.pSub = tCloneSubscribeObj(pSub); taosRUnLockLatch(&pSub->lock); + mInfo("sub topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum); mndReleaseSubscribe(pMnode, pSub); } @@ -661,9 +676,6 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { taosArrayDestroy(rebOutput.rebVgs); tDeleteSubscribeObj(rebOutput.pSub); taosMemoryFree(rebOutput.pSub); - -// taosSsleep(100); -// rebalanceOnce = true; } // reset flag diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 14c9178365..12b81b6c3f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -103,8 +103,13 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo); - tqInitialize(pTq); - return pTq; + int32_t code = tqInitialize(pTq); + if (code != TSDB_CODE_SUCCESS) { + tqClose(pTq); + return NULL; + } else { + return pTq; + } } int32_t tqInitialize(STQ* pTq) { @@ -594,11 +599,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->chkInfo.currentVer = ver; // expand executor - if (pTask->fillHistory) { - pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; - } else { - pTask->status.taskStatus = TASK_STATUS__RESTORE; - } + pTask->status.taskStatus = (pTask->fillHistory)? TASK_STATUS__WAIT_DOWNSTREAM:TASK_STATUS__NORMAL; if (pTask->taskLevel == TASK_LEVEL__SOURCE) { pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); @@ -657,6 +658,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } streamSetupTrigger(pTask); + tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel); @@ -686,8 +688,9 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { }; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); + if (pTask) { - rsp.status = (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL) ? 1 : 0; + rsp.status = streamTaskCheckStatus(pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); tqDebug("tq recv task check req(reqId:0x%" PRIx64 @@ -1168,9 +1171,6 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask != NULL) { if (pTask->status.taskStatus == TASK_STATUS__NORMAL) { - tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr); - streamProcessRunReq(pTask); - } else if (pTask->status.taskStatus == TASK_STATUS__RESTORE) { tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.version); streamProcessRunReq(pTask); @@ -1334,10 +1334,10 @@ int32_t tqStartStreamTasks(STQ* pTq) { return 0; } - pMeta->walScan += 1; + pMeta->walScanCounter += 1; - if (pMeta->walScan > 1) { - tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScan); + if (pMeta->walScanCounter > 1) { + tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter); taosWUnLockLatch(&pTq->pStreamMeta->lock); return 0; } diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index e8051a1406..34e93cec2d 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -31,57 +31,67 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer) { int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ); - if (pFile != NULL) { - STqOffsetHead head = {0}; - int64_t code; + if (pFile == NULL) { + return TSDB_CODE_SUCCESS; + } - while (1) { - if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) { - if (code == 0) { - break; - } else { - return -1; - } - } - int32_t size = htonl(head.size); - void* memBuf = taosMemoryCalloc(1, size); - if (memBuf == NULL) { + int32_t vgId = TD_VID(pStore->pTq->pVnode); + int64_t code = 0; + + STqOffsetHead head = {0}; + + while (1) { + if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) { + if (code == 0) { + break; + } else { return -1; } - if ((code = taosReadFile(pFile, memBuf, size)) != size) { - taosMemoryFree(memBuf); - return -1; - } - STqOffset offset; - SDecoder decoder; - tDecoderInit(&decoder, memBuf, size); - if (tDecodeSTqOffset(&decoder, &offset) < 0) { - taosMemoryFree(memBuf); - tDecoderClear(&decoder); - return -1; - } - - tDecoderClear(&decoder); - if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) { - return -1; - } - - if (offset.val.type == TMQ_OFFSET__LOG) { - STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey)); - if (pHandle) { - if (walRefVer(pHandle->pRef, offset.val.version) < 0) { - tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId, - pHandle->subKey, offset.val.version); - } - } - } - - taosMemoryFree(memBuf); } - taosCloseFile(&pFile); + int32_t size = htonl(head.size); + void* pMemBuf = taosMemoryCalloc(1, size); + if (pMemBuf == NULL) { + tqError("vgId:%d failed to restore offset from file, since out of memory, malloc size:%d", vgId, size); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if ((code = taosReadFile(pFile, pMemBuf, size)) != size) { + taosMemoryFree(pMemBuf); + return -1; + } + + STqOffset offset; + SDecoder decoder; + tDecoderInit(&decoder, pMemBuf, size); + if (tDecodeSTqOffset(&decoder, &offset) < 0) { + taosMemoryFree(pMemBuf); + tDecoderClear(&decoder); + return code; + } + + tDecoderClear(&decoder); + if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) { + return -1; + } + + // todo remove this + if (offset.val.type == TMQ_OFFSET__LOG) { + STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey)); + if (pHandle) { + if (walRefVer(pHandle->pRef, offset.val.version) < 0) { +// tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId, pHandle->subKey, +// offset.val.version); + } + } + } + + taosMemoryFree(pMemBuf); } - return 0; + + taosCloseFile(&pFile); + return TSDB_CODE_SUCCESS; } STqOffsetStore* tqOffsetOpen(STQ* pTq) { @@ -89,6 +99,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { if (pStore == NULL) { return NULL; } + pStore->pTq = pTq; pStore->needCommit = 0; pTq->pOffsetStore = pStore; @@ -98,12 +109,14 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { taosMemoryFree(pStore); return NULL; } + char* fname = tqOffsetBuildFName(pStore->pTq->path, 0); if (tqOffsetRestoreFromFile(pStore, fname) < 0) { taosMemoryFree(fname); taosMemoryFree(pStore); return NULL; } + taosMemoryFree(fname); return pStore; } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 68240195d7..0575b7299d 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -274,21 +274,6 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v } if (msgType == TDMT_VND_SUBMIT) { -#if 0 - void* data = taosMemoryMalloc(len); - if (data == NULL) { - // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", vgId); - return -1; - } - - memcpy(data, pReq, len); - SPackedData submit = {.msgStr = data, .msgLen = len, .ver = ver}; - - tqDebug("vgId:%d tq copy submit msg:%p len:%d ver:%" PRId64 " from %p for stream", vgId, data, len, ver, pReq); - tqProcessSubmitReq(pTq, submit); -#endif SPackedData submit = {0}; tqProcessSubmitReq(pTq, submit); } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 22903b95d9..58cb7b9e63 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -18,16 +18,15 @@ static int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle); // this function should be executed by stream threads. -// there is a case that the WAL increases more fast than the restore procedure, and this restore procedure -// will not stop eventually. +// extract submit block from WAL, and add them into the input queue for the sources tasks. int32_t tqStreamTasksScanWal(STQ* pTq) { - int32_t vgId = TD_VID(pTq->pVnode); + int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; - int64_t st = taosGetTimestampMs(); + int64_t st = taosGetTimestampMs(); while (1) { - int32_t scan = pMeta->walScan; - tqDebug("vgId:%d continue check if data in wal are available, scan:%d", vgId, scan); + int32_t scan = pMeta->walScanCounter; + tqDebug("vgId:%d continue check if data in wal are available, walScanCounter:%d", vgId, scan); // check all restore tasks bool shouldIdle = true; @@ -37,12 +36,12 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { if (shouldIdle) { taosWLockLatch(&pMeta->lock); - pMeta->walScan -= 1; - times = pMeta->walScan; + pMeta->walScanCounter -= 1; + times = pMeta->walScanCounter; - ASSERT(pMeta->walScan >= 0); + ASSERT(pMeta->walScanCounter >= 0); - if (pMeta->walScan <= 0) { + if (pMeta->walScanCounter <= 0) { taosWUnLockLatch(&pMeta->lock); break; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index eb15400d05..1ef86f5b30 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3395,6 +3395,11 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { STableUidList* pUidList = &pStatus->uidList; while (1) { + if (pReader->flag == READER_STATUS_SHOULD_STOP) { + tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr); + return TSDB_CODE_SUCCESS; + } + STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter; initMemDataIterator(*pBlockScanInfo, pReader); @@ -3474,45 +3479,68 @@ static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) { ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc))); } +typedef enum { + TSDB_READ_RETURN = 0x1, + TSDB_READ_CONTINUE = 0x2, +} ERetrieveType; + +static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) { + int32_t code = TSDB_CODE_SUCCESS; + SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; + SDataBlockIter* pBlockIter = &pReader->status.blockIter; + + while(1) { + terrno = 0; + + code = doLoadLastBlockSequentially(pReader); + if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) { + terrno = code; + return TSDB_READ_RETURN; + } + + if (pResBlock->info.rows > 0) { + return TSDB_READ_RETURN; + } + + // all data blocks are checked in this last block file, now let's try the next file + ASSERT(pReader->status.pTableIter == NULL); + code = initForFirstBlockInFile(pReader, pBlockIter); + + // error happens or all the data files are completely checked + if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false) || + pReader->flag == READER_STATUS_SHOULD_STOP) { + terrno = code; + return TSDB_READ_RETURN; + } + + if (pBlockIter->numOfBlocks > 0) { // there are data blocks existed. + return TSDB_READ_CONTINUE; + } else { // all blocks in data file are checked, let's check the data in last files + resetTableListIndex(&pReader->status); + } + } +} + static int32_t buildBlockFromFiles(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; bool asc = ASCENDING_TRAVERSE(pReader->order); SDataBlockIter* pBlockIter = &pReader->status.blockIter; + SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; if (pBlockIter->numOfBlocks == 0) { - _begin: - code = doLoadLastBlockSequentially(pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - if (pReader->resBlockInfo.pResBlock->info.rows > 0) { - return TSDB_CODE_SUCCESS; - } - - // all data blocks are checked in this last block file, now let's try the next file - if (pReader->status.pTableIter == NULL) { - code = initForFirstBlockInFile(pReader, pBlockIter); - - // error happens or all the data files are completely checked - if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { - return code; - } - - // this file does not have data files, let's start check the last block file if exists - if (pBlockIter->numOfBlocks == 0) { - resetTableListIndex(&pReader->status); - goto _begin; - } + // let's try to extract data from stt files. + ERetrieveType type = doReadDataFromLastFiles(pReader); + if (type == TSDB_READ_RETURN) { + return terrno; } code = doBuildDataBlock(pReader); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) { return code; } - if (pReader->resBlockInfo.pResBlock->info.rows > 0) { + if (pResBlock->info.rows > 0) { return TSDB_CODE_SUCCESS; } } @@ -3530,30 +3558,22 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { if (hasNext) { // check for the next block in the block accessed order list initBlockDumpInfo(pReader, pBlockIter); } else { - if (pReader->status.pCurrentFileset->nSttF > 0) { - // data blocks in current file are exhausted, let's try the next file now - SBlockData* pBlockData = &pReader->status.fileBlockData; - if (pBlockData->uid != 0) { - tBlockDataClear(pBlockData); - } + // all data blocks in files are checked, let's check the data in last files. + ASSERT(pReader->status.pCurrentFileset->nSttF > 0); - tBlockDataReset(pBlockData); - resetDataBlockIterator(pBlockIter, pReader->order); - resetTableListIndex(&pReader->status); - goto _begin; - } else { - code = initForFirstBlockInFile(pReader, pBlockIter); + // data blocks in current file are exhausted, let's try the next file now + SBlockData* pBlockData = &pReader->status.fileBlockData; + if (pBlockData->uid != 0) { + tBlockDataClear(pBlockData); + } - // error happens or all the data files are completely checked - if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { - return code; - } + tBlockDataReset(pBlockData); + resetDataBlockIterator(pBlockIter, pReader->order); + resetTableListIndex(&pReader->status); - // this file does not have blocks, let's start check the last block file - if (pBlockIter->numOfBlocks == 0) { - resetTableListIndex(&pReader->status); - goto _begin; - } + ERetrieveType type = doReadDataFromLastFiles(pReader); + if (type == TSDB_READ_RETURN) { + return terrno; } } } @@ -3561,11 +3581,11 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { code = doBuildDataBlock(pReader); } - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) { return code; } - if (pReader->resBlockInfo.pResBlock->info.rows > 0) { + if (pResBlock->info.rows > 0) { return TSDB_CODE_SUCCESS; } } diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 876b80697a..b17afeec98 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -39,7 +39,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); -int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); +int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, SEpSet* pEpSet); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 046dab380e..9ed297bd6b 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -212,9 +212,10 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { } int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { - qDebug("vgId:%d s-task:%s receive dispatch req from taskId:%d", pReq->upstreamNodeId, pTask->id.idStr, - pReq->upstreamTaskId); + qDebug("s-task:%s receive dispatch req from taskId:%d(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId, + pReq->upstreamNodeId); + // todo add the input queue buffer limitation streamTaskEnqueueBlocks(pTask, pReq, pRsp); tDeleteStreamDispatchReq(pReq); @@ -222,10 +223,6 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S if (streamTryExec(pTask) < 0) { return -1; } - - /*if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/ - /*streamDispatch(pTask);*/ - /*}*/ } else { streamSchedExec(pTask); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 549374ed94..d12eca7ce3 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -208,7 +208,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis return 0; } -int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) { +int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) { void* buf = NULL; int32_t code = -1; SRpcMsg msg = {0}; @@ -240,7 +240,7 @@ int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* msg.pCont = buf; msg.msgType = TDMT_STREAM_TASK_CHECK; - qDebug("dispatch from s-task:%s to downstream s-task:%" PRIx64 ":%d node %d: check msg", pTask->id.idStr, + qDebug("s-task:%s dispatch check msg to downstream s-task:%" PRIx64 ":%d node %d: check msg", pTask->id.idStr, pReq->streamId, pReq->downstreamTaskId, nodeId); tmsgSendReq(pEpSet, &msg); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e711700ef2..f33e126068 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -28,7 +28,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* while (pTask->taskLevel == TASK_LEVEL__SOURCE) { int8_t status = atomic_load_8(&pTask->status.taskStatus); - if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__RESTORE) { + if (status != TASK_STATUS__NORMAL) { qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr, atomic_load_8(&pTask->status.taskStatus)); taosMsleep(2); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 065e9d280f..822ae2a485 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -287,6 +287,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { tdbTbcClose(pCur); return -1; } + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecodeStreamTask(&decoder, pTask); tDecoderClear(&decoder); @@ -305,7 +306,6 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { return -1; } - /*pTask->status.taskStatus = TASK_STATUS__NORMAL;*/ if (pTask->fillHistory) { pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; streamTaskCheckDownstream(pTask, ver); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 0d1440fbde..0324580885 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -54,6 +54,8 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { // checkstatus int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { + qDebug("s-taks:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, version); + SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, .upstreamTaskId = pTask->id.taskId, @@ -63,16 +65,18 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { // serialize if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + req.reqId = tGenIdPI64(); req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId; req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; pTask->checkReqId = req.reqId; - qDebug("task %d at node %d check downstream task %d at node %d", pTask->id.taskId, pTask->nodeId, req.downstreamTaskId, + qDebug("s-task:%s at node %d check downstream task %d at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); - streamDispatchOneCheckReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); + streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t vgSz = taosArrayGetSize(vgInfo); pTask->recoverTryingDownstream = vgSz; pTask->checkReqIds = taosArrayInit(vgSz, sizeof(int64_t)); @@ -83,14 +87,15 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { taosArrayPush(pTask->checkReqIds, &req.reqId); req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; - qDebug("task %d at node %d check downstream task %d at node %d (shuffle)", pTask->id.taskId, pTask->nodeId, + qDebug("s-task:%s at node %d check downstream task %d at node %d (shuffle)", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); - streamDispatchOneCheckReq(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); + streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { - qDebug("task %d at node %d direct launch recover since no downstream", pTask->id.taskId, pTask->nodeId); + qDebug("s-task:%s at node %d direct launch recover since no downstream", pTask->id.idStr, pTask->nodeId); streamTaskLaunchRecover(pTask, version); } + return 0; } @@ -109,14 +114,14 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp req.downstreamTaskId, req.downstreamNodeId); if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - streamDispatchOneCheckReq(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet); + streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t vgSz = taosArrayGetSize(vgInfo); for (int32_t i = 0; i < vgSz; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); if (pVgInfo->taskId == req.downstreamTaskId) { - streamDispatchOneCheckReq(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet); + streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet); } } } @@ -124,8 +129,8 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp return 0; } -int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq) { - return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL; +int32_t streamTaskCheckStatus(SStreamTask* pTask) { + return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL? 1:0; } int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) { @@ -135,7 +140,9 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* if (pRsp->status == 1) { if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { bool found = false; - for (int32_t i = 0; i < taosArrayGetSize(pTask->checkReqIds); i++) { + + int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds); + for (int32_t i = 0; i < numOfReqs; i++) { int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i); if (reqId == pRsp->reqId) { found = true; @@ -149,9 +156,12 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* int32_t left = atomic_sub_fetch_32(&pTask->recoverTryingDownstream, 1); ASSERT(left >= 0); + if (left == 0) { taosArrayDestroy(pTask->checkReqIds); pTask->checkReqIds = NULL; + + qDebug("s-task:%s all downstream tasks:%d are ready, now enter into recover stage", pTask->id.idStr, numOfReqs); streamTaskLaunchRecover(pTask, version); } } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { @@ -163,7 +173,10 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* } else { ASSERT(0); } - } else { // not ready, it should wait for at least 100ms and then retry + } else { // not ready, wait for 100ms and retry + qDebug("s-task:%s downstream taskId:%d (vgId:%d) not ready, wait for 100ms and retry", pTask->id.idStr, + pRsp->downstreamTaskId, pRsp->downstreamNodeId); + taosMsleep(100); streamRecheckOneDownstream(pTask, pRsp); }