diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7e1c80c842..d007ea29a9 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -304,7 +304,7 @@ typedef struct SSTaskBasicInfo { int32_t totalLevel; int8_t taskLevel; int8_t fillHistory; // is fill history task or not - int64_t triggerParam; // in msec + int64_t delaySchedParam; // in msec } SSTaskBasicInfo; typedef struct SStreamRetrieveReq SStreamRetrieveReq; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index bb10a9b9ad..2800aecdfa 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -46,18 +46,8 @@ typedef struct SVgroupChangeInfo { SArray *pUpdateNodeList; // SArray } SVgroupChangeInfo; -// time to generated the checkpoint, if now() - checkpointTs >= tsCheckpointInterval, this checkpoint will be discard -// to avoid too many checkpoints for a taskk in the waiting list -typedef struct SCheckpointCandEntry { - char *pName; - int64_t streamId; - int64_t checkpointTs; - int64_t checkpointId; -} SCheckpointCandEntry; - typedef struct SStreamTransMgmt { SHashObj *pDBTrans; - SHashObj *pWaitingList; // stream id list, of which timed checkpoint failed to be issued due to the trans conflict. } SStreamTransMgmt; typedef struct SStreamExecInfo { @@ -98,7 +88,6 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream); int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId); int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt); -int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId); bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock); int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 09e0e4e415..b88c4a4665 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -45,7 +45,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq); static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq); static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq); -static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq); +static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq); static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextStream(SMnode *pMnode, void *pIter); static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); @@ -112,7 +112,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); - mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); @@ -141,7 +141,6 @@ void mndCleanupStream(SMnode *pMnode) { taosArrayDestroy(execInfo.pTaskList); taosHashCleanup(execInfo.pTaskMap); taosHashCleanup(execInfo.transMgmt.pDBTrans); - taosHashCleanup(execInfo.transMgmt.pWaitingList); taosHashCleanup(execInfo.pTransferStateStreams); taosThreadMutexDestroy(&execInfo.lock); mDebug("mnd stream exec info cleanup"); @@ -967,7 +966,6 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock); if (conflict) { - mndAddtoCheckpointWaitingList(pStream, checkpointId); mWarn("checkpoint conflict with other trans in %s, ignore the checkpoint for stream:%s %" PRIx64, pStream->sourceDb, pStream->name, pStream->uid); return -1; @@ -1131,7 +1129,7 @@ static int32_t streamWaitComparFn(const void* p1, const void* p2) { return pInt1->duration > pInt2->duration? -1:1; } -static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { +static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index ac4cb08308..ff31aa0f7d 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -151,28 +151,6 @@ int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamUid) { return 0; } -int32_t mndAddtoCheckpointWaitingList(SStreamObj* pStream, int64_t checkpointId) { - SCheckpointCandEntry* pEntry = taosHashGet(execInfo.transMgmt.pWaitingList, &pStream->uid, sizeof(pStream->uid)); - if (pEntry == NULL) { - SCheckpointCandEntry entry = {.streamId = pStream->uid, - .checkpointTs = taosGetTimestampMs(), - .checkpointId = checkpointId, - .pName = taosStrdup(pStream->name)}; - - taosHashPut(execInfo.transMgmt.pWaitingList, &pStream->uid, sizeof(pStream->uid), &entry, sizeof(entry)); - int32_t size = taosHashGetSize(execInfo.transMgmt.pWaitingList); - - mDebug("stream:%" PRIx64 " add into waiting list due to conflict, ts:%" PRId64 " , checkpointId: %" PRId64 - ", total in waitingList:%d", - pStream->uid, entry.checkpointTs, checkpointId, size); - } else { - mDebug("stream:%" PRIx64 " ts:%" PRId64 ", checkpointId:%" PRId64 " already in waiting list, no need to add into", - pStream->uid, pEntry->checkpointTs, checkpointId); - } - - return TSDB_CODE_SUCCESS; -} - STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, const char *pMsg) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name); if (pTrans == NULL) { diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index d5bc12f9df..e53908eeed 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -558,11 +558,6 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj * return 0; } -static void freeCheckpointCandEntry(void *param) { - SCheckpointCandEntry *pEntry = param; - taosMemoryFreeClear(pEntry->pName); -} - static void freeTaskList(void* param) { SArray** pList = (SArray **)param; taosArrayDestroy(*pList); @@ -575,9 +570,7 @@ void mndInitExecInfo() { execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId)); execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK); execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK); - execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK); execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK); - taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry); taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList); } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 89ab6d52c3..c61988574c 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -62,14 +62,14 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer " child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms", SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, - (int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam); + (int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam); } else { sndInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms", SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, - (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam); + (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam); } return 0; } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 7138ecbeaa..3cc7c6ec66 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -244,7 +244,7 @@ static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTask SStreamTask **ppTask = (SStreamTask **)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask && *ppTask) { pItem->submitReqVer = (*ppTask)->chkInfo.checkpointVer; - pItem->fetchResultVer = (*ppTask)->info.triggerParam; + pItem->fetchResultVer = (*ppTask)->info.delaySchedParam; } streamMetaRUnLock(pMeta); } @@ -1289,7 +1289,7 @@ _checkpoint: pTask->chkInfo.checkpointId = checkpointId; // 1pTask->checkpointingId; pTask->chkInfo.checkpointVer = pItem->submitReqVer; - pTask->info.triggerParam = pItem->fetchResultVer; + pTask->info.delaySchedParam = pItem->fetchResultVer; pTask->info.taskLevel = TASK_LEVEL_SMA; if (!checkpointBuilt) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index d8460d2cb5..1e564ba467 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -770,19 +770,19 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { tqInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 " child id:%d, level:%d, cur-status:%s, next-status:%s fill-history:%d, related stream task:0x%x " - "trigger:%" PRId64 " ms, inputVer:%" PRId64, + "delaySched:%" PRId64 " ms, inputVer:%" PRId64, vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory, - (int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam, nextProcessVer); + (int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam, nextProcessVer); } else { tqInfo( "vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 - " child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 + " child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x delaySched:%" PRId64 " ms, inputVer:%" PRId64, vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory, - (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam, nextProcessVer); + (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer); ASSERT(pChkInfo->checkpointVer <= pChkInfo->nextProcessVer); } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 10db53ea38..d6cd5b528d 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -145,9 +145,6 @@ void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups); int32_t getNumOfDispatchBranch(SStreamTask* pTask); void clearBufferedDispatchMsg(SStreamTask* pTask); -int32_t streamTaskBuildAndSendTriggerMsg(SStreamTask* pTask, const SStreamDataBlock* pData, int32_t dstTaskId, - int32_t vgId, SEpSet* pEpset); - int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg); SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 522ac9a910..a6cfbb063b 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -227,28 +227,6 @@ void clearBufferedDispatchMsg(SStreamTask* pTask) { pMsgInfo->dispatchMsgType = 0; } -int32_t streamTaskBuildAndSendTriggerMsg(SStreamTask* pTask, const SStreamDataBlock* pData, int32_t dstTaskId, - int32_t vgId, SEpSet* pEpset) { - SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq)); - - int32_t numOfBlocks = taosArrayGetSize(pData->blocks); - int32_t code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, dstTaskId, pData->type); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - for (int32_t i = 0; i < numOfBlocks; i++) { - SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); - code = streamAddBlockIntoDispatchMsg(pDataBlock, pReq); - if (code != TSDB_CODE_SUCCESS) { - destroyDispatchMsg(pReq, 1); - return code; - } - } - - return doSendDispatchMsg(pTask, pReq, vgId, pEpset); -} - static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) { int32_t code = 0; int32_t numOfBlocks = taosArrayGetSize(pData->blocks); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e0822f60e7..ae1d86e317 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -458,10 +458,10 @@ void streamMetaClear(SStreamMeta* pMeta) { SStreamTask* p = *(SStreamTask**)pIter; // release the ref by timer - if (p->info.triggerParam != 0 && p->info.fillHistory == 0) { // one more ref in timer + if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) { // one more ref in timer stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt); taosTmrStop(p->schedInfo.pDelayTimer); - p->info.triggerParam = 0; + p->info.delaySchedParam = 0; streamMetaReleaseTask(pMeta, p); } @@ -752,10 +752,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t ASSERT(pTask->status.timerActive == 0); - if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) { + if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt); taosTmrStop(pTask->schedInfo.pDelayTimer); - pTask->info.triggerParam = 0; + pTask->info.delaySchedParam = 0; streamMetaReleaseTask(pMeta, pTask); } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 461d53d5a9..9c5c230a3d 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -330,7 +330,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) } if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER && - (pTask->info.triggerParam != 0)) { + (pTask->info.delaySchedParam != 0)) { atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr, pTask->schedInfo.status); } diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 9bd12a4fd8..9c817d565b 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -20,13 +20,13 @@ static void streamTaskResumeHelper(void* param, void* tmrId); static void streamTaskSchedHelper(void* param, void* tmrId); int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { - if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) { + if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); ASSERT(ref == 2 && pTask->schedInfo.pDelayTimer == NULL); - stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam); + stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.delaySchedParam); - pTask->schedInfo.pDelayTimer = taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.triggerParam, pTask, streamTimer); + pTask->schedInfo.pDelayTimer = taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.delaySchedParam, pTask, streamTimer); pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; } @@ -119,7 +119,7 @@ void streamTaskResumeHelper(void* param, void* tmrId) { void streamTaskSchedHelper(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; const char* id = pTask->id.idStr; - int32_t nextTrigger = (int32_t)pTask->info.triggerParam; + int32_t nextTrigger = (int32_t)pTask->info.delaySchedParam; int8_t status = atomic_load_8(&pTask->schedInfo.status); stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 6d30cc6759..3daadda687 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -107,7 +107,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, pTask->info.taskLevel = taskLevel; pTask->info.fillHistory = fillHistory; - pTask->info.triggerParam = triggerParam; + pTask->info.delaySchedParam = triggerParam; pTask->subtableWithoutMd5 = subtableWithoutMd5; pTask->status.pSM = streamCreateStateMachine(pTask); diff --git a/source/libs/stream/src/streammsg.c b/source/libs/stream/src/streammsg.c index f8228a8f5f..705406f044 100644 --- a/source/libs/stream/src/streammsg.c +++ b/source/libs/stream/src/streammsg.c @@ -505,7 +505,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1; } - if (tEncodeI64(pEncoder, pTask->info.triggerParam) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->info.delaySchedParam) < 0) return -1; if (tEncodeI8(pEncoder, pTask->subtableWithoutMd5) < 0) return -1; if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1; @@ -588,7 +588,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1; } - if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->info.delaySchedParam) < 0) return -1; if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1; }