From 5bffb0c6754a3fd66fd15ace237821bced3098e3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 29 Aug 2024 14:21:23 +0800 Subject: [PATCH 1/9] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 4 +++- source/dnode/vnode/src/tq/tqStreamTask.c | 6 +++++ source/dnode/vnode/src/tqCommon/tqCommon.c | 23 ++++++++---------- source/libs/stream/src/streamCheckpoint.c | 26 ++++++++------------ source/libs/stream/src/streamHb.c | 4 +++- source/libs/stream/src/streamStartTask.c | 28 ++++++++++++++++++---- 6 files changed, 56 insertions(+), 35 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 818d8cfdd4..20f91106a5 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -785,7 +785,9 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); bool streamMetaAllTasksReady(const SStreamMeta* pMeta); int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask); -int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts); +int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts); +void streamTaskSetConsenChkptIdRecv(SStreamTask* pTask, int32_t transId, int64_t ts); +void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts); // timer int32_t streamTimerGetInstance(tmr_h* pTmr); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 4b206fc04f..4c44280311 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -138,6 +138,12 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { return 0; } + if (pMeta->startInfo.startAllTasks) { + tqTrace("vgId:%d in restart procedure, not scan wal", vgId); + streamMetaWUnLock(pMeta); + return 0; + } + pMeta->scanInfo.scanCounter += 1; if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) { pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index cad2ca3eb0..29f050d5e4 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1191,14 +1191,13 @@ int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { } int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { - int32_t vgId = pMeta->vgId; - int32_t code = 0; - - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - int64_t now = taosGetTimestampMs(); - + int32_t vgId = pMeta->vgId; + int32_t code = 0; + SStreamTask* pTask = NULL; SRestoreCheckpointInfo req = {0}; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + int64_t now = taosGetTimestampMs(); SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); @@ -1211,7 +1210,6 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { tDecoderClear(&decoder); - SStreamTask* pTask = NULL; code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask); if (pTask == NULL || (code != 0)) { tqError( @@ -1238,9 +1236,10 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { streamMutexLock(&pTask->lock); ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId); - if (pTask->status.consenChkptInfo.consenChkptTransId >= req.transId) { + SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo; + if (pConsenInfo->consenChkptTransId >= req.transId) { tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId, - pTask->status.consenChkptInfo.consenChkptTransId, req.transId); + pConsenInfo->consenChkptTransId, req.transId); streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; @@ -1256,9 +1255,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { pTask->id.idStr, vgId, req.checkpointId, req.transId); } - pTask->status.consenChkptInfo.consenChkptTransId = req.transId; - pTask->status.consenChkptInfo.status = TASK_CONSEN_CHKPT_RECV; - pTask->status.consenChkptInfo.statusTs = taosGetTimestampMs(); + streamTaskSetConsenChkptIdRecv(pTask, req.transId, now); streamMutexUnlock(&pTask->lock); if (pMeta->role == NODE_ROLE_LEADER) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 565a3e35e7..da882505f4 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -615,7 +615,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV pInfo->checkpointVer = pReq->checkpointVer; pInfo->checkpointTime = pReq->checkpointTs; - if (restored) { + if (restored && (pMeta->role == NODE_ROLE_LEADER)) { code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); } } @@ -1371,29 +1371,23 @@ int32_t deleteCheckpointFile(const char* id, const char* name) { } int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) { - const char* id = pTask->id.idStr; - SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo; - streamMutexLock(&pTask->lock); ETaskStatus p = streamTaskGetStatus(pTask).state; -// if (pInfo->alreadySendChkptId == true) { -// stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id); -// streamMutexUnlock(&pTask->lock); -// return TSDB_CODE_SUCCESS; -// } else { -// pInfo->alreadySendChkptId = true; -// } -// + // if (pInfo->alreadySendChkptId == true) { + // stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id); + // streamMutexUnlock(&pTask->lock); + // return TSDB_CODE_SUCCESS; + // } else { + // pInfo->alreadySendChkptId = true; + // } + // + streamTaskSetReqConsenChkptId(pTask, taosGetTimestampMs()); streamMutexUnlock(&pTask->lock); if (pTask->pBackend != NULL) { streamFreeTaskState(pTask, p); pTask->pBackend = NULL; } - - pInfo->status = TASK_CONSEN_CHKPT_REQ; - pInfo->statusTs = taosGetTimestampMs(); - stDebug("s-task:%s set the require consensus-checkpointId flag, ts:%" PRId64, id, pInfo->statusTs); return 0; } diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index ec65c274cf..7a703ae30c 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -197,10 +197,12 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { } } - entry.checkpointInfo.consensusChkptId = streamTaskSetReqConsensusChkptId(*pTask, pMsg->ts); + streamMutexLock(&(*pTask)->lock); + entry.checkpointInfo.consensusChkptId = streamTaskCheckIfReqConsenChkptId(*pTask, pMsg->ts); if (entry.checkpointInfo.consensusChkptId) { entry.checkpointInfo.consensusTs = pMsg->ts; } + streamMutexUnlock(&(*pTask)->lock); if ((*pTask)->exec.pWalReader != NULL) { entry.processedVer = walReaderGetCurrentVer((*pTask)->exec.pWalReader) - 1; diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c index 90987e3fba..d5d6163009 100644 --- a/source/libs/stream/src/streamStartTask.c +++ b/source/libs/stream/src/streamStartTask.c @@ -444,7 +444,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { return 0; } -int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts) { +int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts) { SConsenChkptInfo* pConChkptInfo = &pTask->status.consenChkptInfo; int32_t vgId = pTask->pMeta->vgId; @@ -455,11 +455,13 @@ int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts) { vgId, pConChkptInfo->statusTs); return 1; } else { - if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && (ts - pConChkptInfo->statusTs) > 60 * 1000) { + int64_t el = (ts - pConChkptInfo->statusTs) / 1000; + if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && el > 60) { pConChkptInfo->statusTs = ts; - stWarn("s-task:%s vgId:%d not recv consensus-chkptId for 60s, set requiring in Hb again, ts:%" PRId64, - pTask->id.idStr, vgId, pConChkptInfo->statusTs); + stWarn( + "s-task:%s vgId:%d not recv consensus-chkptId for %ds(more than 60s), set requiring in Hb again, ts:%" PRId64, + pTask->id.idStr, vgId, el, pConChkptInfo->statusTs); return 1; } } @@ -467,4 +469,22 @@ int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts) { return 0; } +void streamTaskSetConsenChkptIdRecv(SStreamTask* pTask, int32_t transId, int64_t ts) { + SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo; + pInfo->consenChkptTransId = transId; + pInfo->status = TASK_CONSEN_CHKPT_RECV; + pInfo->statusTs = ts; + stDebug("s-task:%s set recv consen-checkpointId, transId:%d", pTask->id.idStr, transId); +} + +void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) { + SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo; + int32_t prevTrans = pInfo->consenChkptTransId; + + pInfo->status = TASK_CONSEN_CHKPT_REQ; + pInfo->statusTs = ts; + pInfo->consenChkptTransId = 0; + + stDebug("s-task:%s set req consen-checkpointId flag, prev transId:%d, ts:%" PRId64, pTask->id.idStr, prevTrans, ts); +} From 9e527632979bdb7bee7a1819993652d696a00736 Mon Sep 17 00:00:00 2001 From: Jing Sima Date: Thu, 29 Aug 2024 14:36:53 +0800 Subject: [PATCH 2/9] fix:[TD-31700] fix memory leak when error occurs in sclExecOperator. --- source/libs/scalar/src/scalar.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index cff73067b1..0fc4a046f5 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -879,7 +879,7 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp SScalarParam *params = NULL; int32_t rowNum = 0; int32_t code = 0; - int32_t paramNum = 0; + int32_t paramNum = scalarGetOperatorParamNum(node->opType); // json not support in in operator if (nodeType(node->pLeft) == QUERY_NODE_VALUE) { @@ -890,7 +890,7 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp } } - SCL_ERR_RET(sclInitOperatorParams(¶ms, node, ctx, &rowNum)); + SCL_ERR_JRET(sclInitOperatorParams(¶ms, node, ctx, &rowNum)); if (output->columnData == NULL) { code = sclCreateColumnInfoData(&node->node.resType, rowNum, output); if (code != TSDB_CODE_SUCCESS) { @@ -900,7 +900,6 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(node->opType); - paramNum = scalarGetOperatorParamNum(node->opType); SScalarParam *pLeft = ¶ms[0]; SScalarParam *pRight = paramNum > 1 ? ¶ms[1] : NULL; From b8f56aa1b1869609c665e3255ba9239902f6f59a Mon Sep 17 00:00:00 2001 From: Jing Sima Date: Thu, 29 Aug 2024 14:40:36 +0800 Subject: [PATCH 3/9] fix:[TD-31792] fix memory leak when error occurs in scalarGenerateSetFromList. --- source/libs/scalar/src/scalar.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index cff73067b1..9e8de33547 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -131,7 +131,7 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) { SListCell *cell = nodeList->pNodeList->pHead; SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))}; if (out.columnData == NULL) { - SCL_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + SCL_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } int32_t len = 0; void *buf = NULL; From 90bec3cd3f416128020d22d10f0888968d628cd9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 29 Aug 2024 16:00:22 +0800 Subject: [PATCH 4/9] fix(stream): remove wait for quit in meta hb timer. --- source/libs/stream/src/streamHb.c | 54 +++++++++++++++---------------- 1 file changed, 26 insertions(+), 28 deletions(-) diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 7a703ae30c..1ef938494e 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -25,7 +25,6 @@ int32_t streamMetaId = 0; struct SMetaHbInfo { tmr_h hbTmr; - int32_t stopFlag; int32_t tickCounter; int32_t hbCount; int64_t hbStart; @@ -242,6 +241,8 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { void streamMetaHbToMnode(void* param, void* tmrId) { int64_t rid = *(int64_t*)param; int32_t code = 0; + int32_t vgId = 0; + int32_t role = 0; SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); if (pMeta == NULL) { @@ -249,29 +250,41 @@ void streamMetaHbToMnode(void* param, void* tmrId) { return; } + vgId = pMeta->vgId; + role = pMeta->role; + // need to stop, stop now - if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) { // todo refactor: not need this now, use closeFlag in Meta - pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP; + if (pMeta->closeFlag) { + pMeta->pHbInfo->hbStart = 0; code = taosReleaseRef(streamMetaId, rid); if (code == TSDB_CODE_SUCCESS) { - stDebug("vgId:%d jump out of meta timer", pMeta->vgId); + stDebug("vgId:%d jump out of meta timer", vgId); } else { - stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid); + stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid); } return; } // not leader not send msg if (pMeta->role != NODE_ROLE_LEADER) { + pMeta->pHbInfo->hbStart = 0; code = taosReleaseRef(streamMetaId, rid); if (code == TSDB_CODE_SUCCESS) { - stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role); + stInfo("vgId:%d role:%d not leader not send hb to mnode", vgId, role); } else { - stError("vgId:%d role:%d not leader not send hb to mnodefailed to release the meta rid:%" PRId64, pMeta->vgId, - pMeta->role, rid); + stError("vgId:%d role:%d not leader not send hb to mnodefailed to release the meta rid:%" PRId64, vgId, role, rid); } + return; + } - pMeta->pHbInfo->hbStart = 0; + if (!waitForEnoughDuration(pMeta->pHbInfo)) { + streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, vgId, + "meta-hb-tmr"); + + code = taosReleaseRef(streamMetaId, rid); + if (code) { + stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, vgId, rid); + } return; } @@ -280,17 +293,6 @@ void streamMetaHbToMnode(void* param, void* tmrId) { pMeta->pHbInfo->hbStart = taosGetTimestampMs(); } - if (!waitForEnoughDuration(pMeta->pHbInfo)) { - streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, - "meta-hb-tmr"); - - code = taosReleaseRef(streamMetaId, rid); - if (code) { - stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid); - } - return; - } - streamMetaRLock(pMeta); code = streamMetaSendHbHelper(pMeta); if (code) { @@ -300,10 +302,10 @@ void streamMetaHbToMnode(void* param, void* tmrId) { streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, "meta-hb-tmr"); - code = taosReleaseRef(streamMetaId, rid); + code = taosReleaseRef(streamMetaId, rid); if (code) { - stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid); + stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, vgId, rid); } } @@ -316,7 +318,6 @@ int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes) { pInfo->hbTmr = taosTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer); pInfo->tickCounter = 0; - pInfo->stopFlag = 0; pInfo->msgSendTs = -1; pInfo->hbCount = 0; @@ -340,11 +341,8 @@ void destroyMetaHbInfo(SMetaHbInfo* pInfo) { void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) { // wait for the stream meta hb function stopping if (pMeta->role == NODE_ROLE_LEADER) { - pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP; - while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) { - taosMsleep(100); - stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); - } + taosMsleep(2 * META_HB_CHECK_INTERVAL); + stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); } } From 01e8fc583d19fb116f8c595f4db5399d9b43aa02 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 29 Aug 2024 14:52:42 +0800 Subject: [PATCH 5/9] fix(query):adj error code for aggretate operator --- source/libs/executor/src/aggregateoperator.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index b5a3f2f484..fe82e0eb62 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -193,8 +193,9 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) { pAggInfo->pNewGroupBlock = NULL; tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable); setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); - QUERY_CHECK_CODE(code, lino, _end); code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true); + QUERY_CHECK_CODE(code, lino, _end); + code = doAggregateImpl(pOperator, pSup->pCtx); QUERY_CHECK_CODE(code, lino, _end); } From 925b28894515360025a0d6b0bee3e4bd01f2f69b Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 29 Aug 2024 15:55:11 +0800 Subject: [PATCH 6/9] limit the number of stream results --- .../executor/src/streamtimewindowoperator.c | 38 +++++++++++++++---- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 0651e2dbf6..6a1a5942d6 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -49,6 +49,8 @@ #define STREAM_SESSION_OP_CHECKPOINT_NAME "StreamSessionOperator_Checkpoint" #define STREAM_STATE_OP_CHECKPOINT_NAME "StreamStateOperator_Checkpoint" +#define MAX_STREAM_HISTORY_RESULT 100000000 + typedef struct SStateWindowInfo { SResultWindowInfo winInfo; SStateKeys* pStateKey; @@ -161,11 +163,19 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) { } int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) { + if (tSimpleHashGetSize(pStUpdated) > MAX_STREAM_HISTORY_RESULT) { + qError("%s failed at line %d since too many history result. ", __func__, __LINE__); + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } winInfo.sessionWin.win.ekey = winInfo.sessionWin.win.skey; return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); } static int32_t saveWinResult(SWinKey* pKey, SRowBuffPos* pPos, SSHashObj* pUpdatedMap) { + if (tSimpleHashGetSize(pUpdatedMap) > MAX_STREAM_HISTORY_RESULT) { + qError("%s failed at line %d since too many history result. ", __func__, __LINE__); + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } return tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), &pPos, POINTER_BYTES); } @@ -481,6 +491,12 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pMidRetriveRes); blockDataDestroy(pInfo->pMidPulloverRes); + if (pInfo->pUpdatedMap != NULL) { + tSimpleHashSetFreeFp(pInfo->pUpdatedMap, destroyFlusedppPos); + tSimpleHashCleanup(pInfo->pUpdatedMap); + pInfo->pUpdatedMap = NULL; + } + if (pInfo->stateStore.streamFileStateDestroy != NULL) { pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState); } @@ -495,11 +511,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { nodesDestroyNode((SNode*)pInfo->pPhyNode); colDataDestroy(&pInfo->twAggSup.timeWindowData); cleanupExprSupp(&pInfo->scalarSupp); - if (pInfo->pUpdatedMap != NULL) { - tSimpleHashSetFreeFp(pInfo->pUpdatedMap, destroyFlusedppPos); - tSimpleHashCleanup(pInfo->pUpdatedMap); - pInfo->pUpdatedMap = NULL; - } tSimpleHashCleanup(pInfo->pDeletedMap); blockDataDestroy(pInfo->pCheckpointRes); @@ -994,7 +1005,7 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) { return pInfo->primaryPkIndex != -1; } -static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId, +static int32_t doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId, SSHashObj* pUpdatedMap, SSHashObj* pDeletedMap) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -1166,6 +1177,7 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } + return code; } static inline int winPosCmprImpl(const void* pKey1, const void* pKey2) { @@ -1718,7 +1730,12 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc code = setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); QUERY_CHECK_CODE(code, lino, _end); - doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap, pInfo->pDeletedMap); + code = doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap, pInfo->pDeletedMap); + if (code == TSDB_CODE_STREAM_INTERNAL_ERROR) { + code = TSDB_CODE_SUCCESS; + pOperator->status = OP_RES_TO_RETURN; + break; + } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey); @@ -5184,7 +5201,12 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p } #endif - doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap, pInfo->pDeletedMap); + code = doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap, pInfo->pDeletedMap); + if (code == TSDB_CODE_STREAM_INTERNAL_ERROR) { + pOperator->status = OP_RES_TO_RETURN; + code = TSDB_CODE_SUCCESS; + break; + } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey); } From 251bdf51c18d50f4a72434d5693019293c44f792 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 29 Aug 2024 16:02:33 +0800 Subject: [PATCH 7/9] fix(stream): fix syntax error. --- source/libs/stream/src/streamStartTask.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c index d5d6163009..c2e8a523e5 100644 --- a/source/libs/stream/src/streamStartTask.c +++ b/source/libs/stream/src/streamStartTask.c @@ -455,7 +455,7 @@ int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts) { vgId, pConChkptInfo->statusTs); return 1; } else { - int64_t el = (ts - pConChkptInfo->statusTs) / 1000; + int32_t el = (ts - pConChkptInfo->statusTs) / 1000; if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && el > 60) { pConChkptInfo->statusTs = ts; From 460394038defb8cc0331896d4c6a7b133bf56fe5 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 29 Aug 2024 16:10:06 +0800 Subject: [PATCH 8/9] fix(stream):forbid create stream on Windows --- source/dnode/mnode/impl/src/mndStream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e940948fae..43433243af 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -775,7 +775,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } #ifdef WINDOWS - terrno = TSDB_CODE_MND_INVALID_PLATFORM; + code = TSDB_CODE_MND_INVALID_PLATFORM; goto _OVER; #endif From 40cc67719d08aef1e26d2023104e6e68d88f60ca Mon Sep 17 00:00:00 2001 From: wade zhang <95411902+gccgdb1234@users.noreply.github.com> Date: Thu, 29 Aug 2024 19:19:34 +0800 Subject: [PATCH 9/9] Update 11-compress.md --- docs/zh/26-tdinternal/11-compress.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/zh/26-tdinternal/11-compress.md b/docs/zh/26-tdinternal/11-compress.md index 05ad2d60f6..10625a3606 100644 --- a/docs/zh/26-tdinternal/11-compress.md +++ b/docs/zh/26-tdinternal/11-compress.md @@ -4,8 +4,6 @@ title: 数据压缩 toc_max_heading_level: 4 --- -## 概述 - 数据压缩是一种在不损失数据有效信息的前提下,利用特定算法对数据进行重新组织和处理,以减少数据占用的存储空间和提高数据传输效率的技术。TDengine 在数据的存储和传输过程中均采用了这一技术,旨在优化存储资源的使用并加快数据交换的速度。 @@ -58,4 +56,4 @@ TDengine 在数据传输过程中提供了压缩功能,以减少网络带宽 下图展示了 TDengine 引擎在时序数据的整个传输及存储过程中的压缩及解压过程,以更好地理解整个处理过程。 -![TDengine 针对时序数据的压缩及解压过程](./compression.png) \ No newline at end of file +![TDengine 针对时序数据的压缩及解压过程](./compression.png)