From 3c4631ef1f1bad565f1fa9bf7d1eb3866f479ec9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 14 Jul 2023 10:06:24 +0000 Subject: [PATCH 1/2] add checkpoint --- include/libs/stream/tstream.h | 45 ++-- source/dnode/mnode/impl/src/mndStream.c | 259 ++++++++++++------------ source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tq.c | 62 +++--- source/libs/stream/src/streamMeta.c | 43 +++- 5 files changed, 227 insertions(+), 184 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 08e4b55ffe..daee53afe2 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -47,7 +47,7 @@ enum { TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused TASK_STATUS__PAUSE, - TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore + TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore TASK_STATUS__CK_READY, }; @@ -103,7 +103,7 @@ typedef struct { } SStreamQueueItem; typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data); -typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver); +typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver, int64_t checkpointId); typedef struct { int8_t type; @@ -120,7 +120,7 @@ typedef struct { } SStreamMergedSubmit; typedef struct { - int8_t type; + int8_t type; int32_t srcVgId; int32_t srcTaskId; @@ -249,7 +249,7 @@ typedef struct SStreamChildEpInfo { int32_t childId; int32_t taskId; SEpSet epSet; - bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it + bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it } SStreamChildEpInfo; typedef struct SStreamId { @@ -260,17 +260,17 @@ typedef struct SStreamId { typedef struct SCheckpointInfo { int64_t checkpointId; - int64_t checkpointVer; // latest checkpointId version - int64_t currentVer; // current offset in WAL, not serialize it + int64_t checkpointVer; // latest checkpointId version + int64_t currentVer; // current offset in WAL, not serialize it } SCheckpointInfo; typedef struct SStreamStatus { - int8_t taskStatus; - int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set - int8_t schedStatus; - int8_t keepTaskStatus; - bool transferState; - int8_t timerActive; // timer is active + int8_t taskStatus; + int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set + int8_t schedStatus; + int8_t keepTaskStatus; + bool transferState; + int8_t timerActive; // timer is active } SStreamStatus; typedef struct SHistDataRange { @@ -311,8 +311,8 @@ struct SStreamTask { SHistDataRange dataRange; SStreamId historyTaskId; SStreamId streamTaskId; - SArray* pUpstreamInfoList; // SArray, // children info - SArray* pReadyMsgList; // SArray + SArray* pUpstreamInfoList; // SArray, // children info + SArray* pReadyMsgList; // SArray // output union { @@ -533,7 +533,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq); -void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq); +void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq); int32_t streamSetupScheduleTrigger(SStreamTask* pTask); @@ -541,10 +541,10 @@ int32_t streamProcessRunReq(SStreamTask* pTask); int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); -int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); -void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); -void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); -SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); +int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); +void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); +void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); +SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); void streamTaskInputFail(SStreamTask* pTask); int32_t streamTryExec(SStreamTask* pTask); @@ -556,7 +556,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); -char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); +char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); // recover and fill history void streamPrepareNdoCheckDownstream(SStreamTask* pTask); @@ -600,7 +600,7 @@ void streamMetaClose(SStreamMeta* streamMeta); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen); -int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); // todo remove it +int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); // todo remove it SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); @@ -617,8 +617,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); -int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, - SStreamTask* pTask); +int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask); int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId); #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index adf7d85aeb..80f657a9a9 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -836,48 +836,48 @@ _OVER: return code; } -static int32_t mndCreateCheckpoint(SMnode *pMnode, int32_t vgId, SList *pStreamList) { - void *buf = NULL; - int32_t tlen = 0; - int32_t checkpointId = tGenIdPI64(); +// static int32_t mndCreateCheckpoint(SMnode *pMnode, int32_t vgId, SList *pStreamList) { +// void *buf = NULL; +// int32_t tlen = 0; +// int32_t checkpointId = tGenIdPI64(); - SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); - SArray *stream = taosArrayInit(64, sizeof(void *)); +// SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); +// SArray *stream = taosArrayInit(64, sizeof(void *)); - SListIter iter = {0}; - tdListInitIter(pStreamList, &iter, TD_LIST_FORWARD); - SListNode *pNode = NULL; - while ((pNode = tdListNext(&iter)) != NULL) { - char streamName[TSDB_STREAM_FNAME_LEN] = {0}; - tdListNodeGetData(pStreamList, pNode, streamName); - SStreamObj *pStream = mndAcquireStream(pMnode, streamName); - taosArrayPush(stream, &pStream); - } +// SListIter iter = {0}; +// tdListInitIter(pStreamList, &iter, TD_LIST_FORWARD); +// SListNode *pNode = NULL; +// while ((pNode = tdListNext(&iter)) != NULL) { +// char streamName[TSDB_STREAM_FNAME_LEN] = {0}; +// tdListNodeGetData(pStreamList, pNode, streamName); +// SStreamObj *pStream = mndAcquireStream(pMnode, streamName); +// taosArrayPush(stream, &pStream); +// } - if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, vgId, checkpointId, 0, 0) < 0) { - mndReleaseVgroup(pMnode, pVgObj); - for (int i = 0; i < taosArrayGetSize(stream); i++) { - SStreamObj *p = taosArrayGetP(stream, i); - mndReleaseStream(pMnode, p); - } - taosArrayDestroy(stream); - return -1; +// if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, vgId, checkpointId, 0, 0) < 0) { +// mndReleaseVgroup(pMnode, pVgObj); +// for (int i = 0; i < taosArrayGetSize(stream); i++) { +// SStreamObj *p = taosArrayGetP(stream, i); +// mndReleaseStream(pMnode, p); +// } +// taosArrayDestroy(stream); +// return -1; - STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgObj); - action.pCont = buf; - action.contLen = tlen; - action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE; - } - mndReleaseVgroup(pMnode, pVgObj); +// STransAction action = {0}; +// action.epSet = mndGetVgroupEpset(pMnode, pVgObj); +// action.pCont = buf; +// action.contLen = tlen; +// action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE; +// } +// mndReleaseVgroup(pMnode, pVgObj); - for (int i = 0; i < taosArrayGetSize(stream); i++) { - SStreamObj *p = taosArrayGetP(stream, i); - mndReleaseStream(pMnode, p); - } - taosArrayDestroy(stream); - return 0; -} +// for (int i = 0; i < taosArrayGetSize(stream); i++) { +// SStreamObj *p = taosArrayGetP(stream, i); +// mndReleaseStream(pMnode, p); +// } +// taosArrayDestroy(stream); +// return 0; +// } static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -979,106 +979,107 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in return 0; } -static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId) { - int64_t timestampMs = taosGetTimestampMs(); - if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointTickInterval * 1000) { - return -1; - } +// static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId) { +// int64_t timestampMs = taosGetTimestampMs(); +// if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointTickInterval * 1000) { +// return -1; +// } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint"); - if (pTrans == NULL) return -1; - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); - if (mndTrancCheckConflict(pMnode, pTrans) != 0) { - mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId, - tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); - mndTransDrop(pTrans); - return -1; - } - mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId); - atomic_store_64(&pStream->currentTick, 1); - taosWLockLatch(&pStream->lock); - // 1. redo action: broadcast checkpoint source msg for all source vg - int32_t totLevel = taosArrayGetSize(pStream->tasks); - for (int32_t i = 0; i < totLevel; i++) { - SArray *pLevel = taosArrayGetP(pStream->tasks, i); - SStreamTask *pTask = taosArrayGetP(pLevel, 0); - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - int32_t sz = taosArrayGetSize(pLevel); - for (int32_t j = 0; j < sz; j++) { - SStreamTask *pTask = taosArrayGetP(pLevel, j); - /*A(pTask->info.nodeId > 0);*/ - SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); - if (pVgObj == NULL) { - taosWUnLockLatch(&pStream->lock); - mndTransDrop(pTrans); - return -1; - } +// STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint"); +// if (pTrans == NULL) return -1; +// mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); +// if (mndTrancCheckConflict(pMnode, pTrans) != 0) { +// mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, +// checkpointId, +// tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); +// mndTransDrop(pTrans); +// return -1; +// } +// mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId); +// atomic_store_64(&pStream->currentTick, 1); +// taosWLockLatch(&pStream->lock); +// // 1. redo action: broadcast checkpoint source msg for all source vg +// int32_t totLevel = taosArrayGetSize(pStream->tasks); +// for (int32_t i = 0; i < totLevel; i++) { +// SArray *pLevel = taosArrayGetP(pStream->tasks, i); +// SStreamTask *pTask = taosArrayGetP(pLevel, 0); +// if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { +// int32_t sz = taosArrayGetSize(pLevel); +// for (int32_t j = 0; j < sz; j++) { +// SStreamTask *pTask = taosArrayGetP(pLevel, j); +// /*A(pTask->info.nodeId > 0);*/ +// SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); +// if (pVgObj == NULL) { +// taosWUnLockLatch(&pStream->lock); +// mndTransDrop(pTrans); +// return -1; +// } - void *buf; - int32_t tlen; - if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, - pTask->id.taskId) < 0) { - mndReleaseVgroup(pMnode, pVgObj); - taosWUnLockLatch(&pStream->lock); - mndTransDrop(pTrans); - return -1; - } +// void *buf; +// int32_t tlen; +// if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, +// pTask->id.taskId) < 0) { +// mndReleaseVgroup(pMnode, pVgObj); +// taosWUnLockLatch(&pStream->lock); +// mndTransDrop(pTrans); +// return -1; +// } - STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgObj); - action.pCont = buf; - action.contLen = tlen; - action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE; +// STransAction action = {0}; +// action.epSet = mndGetVgroupEpset(pMnode, pVgObj); +// action.pCont = buf; +// action.contLen = tlen; +// action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE; - mndReleaseVgroup(pMnode, pVgObj); +// mndReleaseVgroup(pMnode, pVgObj); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(buf); - taosWUnLockLatch(&pStream->lock); - mndReleaseStream(pMnode, pStream); - mndTransDrop(pTrans); - return -1; - } - } - } - } - // 2. reset tick - pStream->checkpointFreq = checkpointId; - pStream->checkpointId = checkpointId; - pStream->checkpointFreq = taosGetTimestampMs(); - atomic_store_64(&pStream->currentTick, 0); - // 3. commit log: stream checkpoint info - pStream->version = pStream->version + 1; - taosWUnLockLatch(&pStream->lock); +// if (mndTransAppendRedoAction(pTrans, &action) != 0) { +// taosMemoryFree(buf); +// taosWUnLockLatch(&pStream->lock); +// mndReleaseStream(pMnode, pStream); +// mndTransDrop(pTrans); +// return -1; +// } +// } +// } +// } +// // 2. reset tick +// pStream->checkpointFreq = checkpointId; +// pStream->checkpointId = checkpointId; +// pStream->checkpointFreq = taosGetTimestampMs(); +// atomic_store_64(&pStream->currentTick, 0); +// // 3. commit log: stream checkpoint info +// pStream->version = pStream->version + 1; +// taosWUnLockLatch(&pStream->lock); - // // code condtion +// // // code condtion - SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); - if (pCommitRaw == NULL) { - mError("failed to prepare trans rebalance since %s", terrstr()); - goto _ERR; - } - if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - sdbFreeRaw(pCommitRaw); - mError("failed to prepare trans rebalance since %s", terrstr()); - goto _ERR; - } - if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) { - sdbFreeRaw(pCommitRaw); - mError("failed to prepare trans rebalance since %s", terrstr()); - goto _ERR; - } +// SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); +// if (pCommitRaw == NULL) { +// mError("failed to prepare trans rebalance since %s", terrstr()); +// goto _ERR; +// } +// if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { +// sdbFreeRaw(pCommitRaw); +// mError("failed to prepare trans rebalance since %s", terrstr()); +// goto _ERR; +// } +// if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) { +// sdbFreeRaw(pCommitRaw); +// mError("failed to prepare trans rebalance since %s", terrstr()); +// goto _ERR; +// } - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("failed to prepare trans rebalance since %s", terrstr()); - goto _ERR; - } - mndTransDrop(pTrans); - return 0; -_ERR: - mndTransDrop(pTrans); - return -1; -} +// if (mndTransPrepare(pMnode, pTrans) != 0) { +// mError("failed to prepare trans rebalance since %s", terrstr()); +// goto _ERR; +// } +// mndTransDrop(pTrans); +// return 0; +// _ERR: +// mndTransDrop(pTrans); +// return -1; +// } static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream, SMnode *pMnode, int64_t checkpointId) { diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 584b238d1b..5d99929c69 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -168,7 +168,7 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer); int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); // tqStream -int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); +int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver, int64_t checkpointId); int32_t tqStreamTasksScanWal(STQ* pTq); int32_t tqStreamTasksStatusCheck(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 354e210ec1..848468d3e0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -162,7 +162,7 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) { taosWLockLatch(&pMeta->lock); void* pIter = NULL; - while(1) { + while (1) { pIter = taosHashIterate(pMeta->pTasks, pIter); if (pIter == NULL) { break; @@ -207,13 +207,14 @@ void tqNotifyClose(STQ* pTq) { int64_t st = taosGetTimestampMs(); - while(hasStreamTaskInTimer(pTq->pStreamMeta)) { + while (hasStreamTaskInTimer(pTq->pStreamMeta)) { tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pTq->pStreamMeta->vgId); taosMsleep(100); } int64_t el = taosGetTimestampMs() - st; - tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%"PRId64" ms", pTq->pStreamMeta->vgId, el); + tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", + pTq->pStreamMeta->vgId, el); } } @@ -249,8 +250,8 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* tFormatOffset(buf1, TSDB_OFFSET_LEN, &pRsp->reqOffset); tFormatOffset(buf2, TSDB_OFFSET_LEN, &pRsp->rspOffset); - tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId, - pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId); + tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, + vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId); return 0; } @@ -419,8 +420,11 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) { if (ASSERT(pHandle->msg != NULL)) { tqError("pHandle->msg should not be null"); break; - }else{ - SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info}; + } else { + SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, + .pCont = pHandle->msg->pCont, + .contLen = pHandle->msg->contLen, + .info = pHandle->msg->info}; tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); taosMemoryFree(pHandle->msg); pHandle->msg = NULL; @@ -679,9 +683,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg req.oldConsumerId, req.newConsumerId); STqHandle* pHandle = NULL; - while(1){ + while (1) { pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); - if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0){ + if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0) { break; } } @@ -697,7 +701,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } STqHandle handle = {0}; ret = tqCreateHandle(pTq, &req, &handle); - if(ret < 0){ + if (ret < 0) { tqDestroyTqHandle(&handle); goto end; } @@ -739,7 +743,7 @@ end: void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } -int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { +int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver, int64_t checkpointId) { int32_t vgId = TD_VID(pTq->pVnode); pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); @@ -758,16 +762,16 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->pMeta = pTq->pStreamMeta; // checkpoint exists, restore from the last checkpoint - if (pTask->chkInfo.checkpointId != 0) { - ASSERT(pTask->chkInfo.checkpointVer > 0); - pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer; - pTask->dataRange.range.maxVer = pTask->chkInfo.checkpointVer; - pTask->dataRange.range.minVer = pTask->chkInfo.checkpointVer; - } else { - pTask->chkInfo.currentVer = ver; - pTask->dataRange.range.maxVer = ver; - pTask->dataRange.range.minVer = ver; - } + // if (pTask->chkInfo.checkpointId != 0) { + // ASSERT(pTask->chkInfo.checkpointVer > 0); + // pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer; + // pTask->dataRange.range.maxVer = pTask->chkInfo.checkpointVer; + // pTask->dataRange.range.minVer = pTask->chkInfo.checkpointVer; + // } else { + pTask->chkInfo.currentVer = ver; + pTask->dataRange.range.maxVer = ver; + pTask->dataRange.range.minVer = ver; + //} if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { SStreamTask* pSateTask = pTask; @@ -915,7 +919,8 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status); } else { rsp.status = 0; - tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d", + tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 + ") from task:0x%x (vgId:%d), rsp status %d", taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } @@ -1092,15 +1097,16 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // now we can stop the stream task execution pStreamTask->status.taskStatus = TASK_STATUS__HALT; - tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pId, - pStreamTask->info.taskLevel, pId); + tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pId, pStreamTask->info.taskLevel, + pId); // if it's an source task, extract the last version in wal. streamHistoryTaskSetVerRangeStep2(pTask); } if (!streamTaskRecoverScanStep1Finished(pTask)) { - tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s", + tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 + " do secondary scan-history-data after halt the related stream task:%s", pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pId); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); @@ -1356,7 +1362,7 @@ int32_t tqProcessTaskPauseImpl(SStreamMeta* pStreamMeta, SStreamTask* pTask) { int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); - int32_t code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pTask); + int32_t code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pTask); if (code != 0) { return code; } @@ -1403,8 +1409,8 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); - int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); + int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated); if (code != 0) { return code; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 311b8d3d91..e52486b85e 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -241,9 +241,11 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { // add to the ready tasks hash map, not the restored tasks hash map int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { + int64_t checkpointId = 0; + void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); if (p == NULL) { - if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { + if (pMeta->expandFunc(pMeta->ahandle, pTask, ver, checkpointId) < 0) { tFreeStreamTask(pTask); return -1; } @@ -404,7 +406,43 @@ int32_t streamMetaAbort(SStreamMeta* pMeta) { return 0; } +int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) { + int64_t chkpId = 0; + + TBC* pCur = NULL; + if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { + return chkpId; + } + void* pKey = NULL; + int32_t kLen = 0; + void* pVal = NULL; + int32_t vLen = 0; + SDecoder decoder; + + tdbTbcMoveToFirst(pCur); + while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { + SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); + if (pTask == NULL) { + goto _err; + } + + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + tDecodeStreamTask(&decoder, pTask); + tDecoderClear(&decoder); + + chkpId = TMAX(chkpId, pTask->chkInfo.checkpointId); + } + +_err: + tdbFree(pKey); + tdbFree(pVal); + tdbTbcClose(pCur); + + return chkpId; +} int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { + int64_t checkpointId = streamGetLatestCheckpointId(pMeta); + TBC* pCur = NULL; if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { return -1; @@ -417,7 +455,6 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { SDecoder decoder; tdbTbcMoveToFirst(pCur); - while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { @@ -434,7 +471,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { // remove duplicate void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); if (p == NULL) { - if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) { + if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer, checkpointId) < 0) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); From 3020e27114eaf05f541460f8b59f18fcee7226f9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 14 Jul 2023 10:20:37 +0000 Subject: [PATCH 2/2] fix mem leak --- source/libs/stream/src/streamMeta.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e52486b85e..bd8227a962 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -431,6 +431,8 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) { tDecoderClear(&decoder); chkpId = TMAX(chkpId, pTask->chkInfo.checkpointId); + + taosMemoryFree(pTask); // fix mem leak later } _err: