From 738cfd5d8d1914b590d7d94f04c41132d6f27a50 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 1 Jun 2023 23:14:30 +0800 Subject: [PATCH 1/5] fix:open compatibility test cases --- tests/parallel_test/cases.task | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 6a9a2b609a..0b235d0dc8 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -564,7 +564,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_privilege.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/multilevel.py -#,,n,system-test,python3 ./test.py -f 0-others/compatibility.py +,,n,system-test,python3 ./test.py -f 0-others/compatibility.py ,,n,system-test,python3 ./test.py -f 0-others/tag_index_basic.py ,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py ,,n,system-test,python3 ./test.py -N 3 -f 0-others/walRetention.py From fb59ec4591d144185b367675210b3409e4240091 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 13 Jun 2023 09:32:54 +0800 Subject: [PATCH 2/5] feat:move plan msg from vginfo to subscription info --- source/dnode/mnode/impl/inc/mndDef.h | 19 ++++----- source/dnode/mnode/impl/src/mndConsumer.c | 2 +- source/dnode/mnode/impl/src/mndDef.c | 46 +++++++++++----------- source/dnode/mnode/impl/src/mndScheduler.c | 28 ++++++------- source/dnode/mnode/impl/src/mndSubscribe.c | 41 +++++++++++++++---- 5 files changed, 81 insertions(+), 55 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 82b714e6eb..38380dee95 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -559,12 +559,12 @@ void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer typedef struct { int32_t vgId; - char* qmsg; // SubPlanToString +// char* qmsg; // SubPlanToString SEpSet epSet; } SMqVgEp; -SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp); -void tDeleteSMqVgEp(SMqVgEp* pVgEp); +//SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp); +//void tDeleteSMqVgEp(SMqVgEp* pVgEp); int32_t tEncodeSMqVgEp(void** buf, const SMqVgEp* pVgEp); void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp); @@ -589,6 +589,7 @@ typedef struct { SHashObj* consumerHash; // consumerId -> SMqConsumerEp SArray* unassignedVgs; // SArray char dbName[TSDB_DB_FNAME_LEN]; + char* qmsg; // SubPlanToString } SMqSubscribeObj; SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]); @@ -687,12 +688,12 @@ int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj, int32_t sver); void tFreeStreamObj(SStreamObj* pObj); -typedef struct { - char streamName[TSDB_STREAM_FNAME_LEN]; - int64_t uid; - int64_t streamUid; - SArray* childInfo; // SArray -} SStreamCheckpointObj; +//typedef struct { +// char streamName[TSDB_STREAM_FNAME_LEN]; +// int64_t uid; +// int64_t streamUid; +// SArray* childInfo; // SArray +//} SStreamCheckpointObj; #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 117c1082a5..f530128572 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -644,7 +644,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SCMSubscribeReq subscribe = {0}; tDeserializeSCMSubscribeReq(msgStr, &subscribe); - uint64_t consumerId = subscribe.consumerId; + int64_t consumerId = subscribe.consumerId; char *cgroup = subscribe.cgroup; SMqConsumerObj *pExistedConsumer = NULL; SMqConsumerObj *pConsumerNew = NULL; diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 6dab018236..83e8967c14 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -183,33 +183,33 @@ void tFreeStreamObj(SStreamObj *pStream) { } } -SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { - SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp)); - if (pVgEpNew == NULL) return NULL; - pVgEpNew->vgId = pVgEp->vgId; - pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg); - pVgEpNew->epSet = pVgEp->epSet; - return pVgEpNew; -} +//SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { +// SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp)); +// if (pVgEpNew == NULL) return NULL; +// pVgEpNew->vgId = pVgEp->vgId; +//// pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg); +// pVgEpNew->epSet = pVgEp->epSet; +// return pVgEpNew; +//} -void tDeleteSMqVgEp(SMqVgEp *pVgEp) { - if (pVgEp) { - taosMemoryFreeClear(pVgEp->qmsg); - taosMemoryFree(pVgEp); - } -} +//void tDeleteSMqVgEp(SMqVgEp *pVgEp) { +// if (pVgEp) { +//// taosMemoryFreeClear(pVgEp->qmsg); +// taosMemoryFree(pVgEp); +// } +//} int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pVgEp->vgId); - tlen += taosEncodeString(buf, pVgEp->qmsg); +// tlen += taosEncodeString(buf, pVgEp->qmsg); tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); return tlen; } void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) { buf = taosDecodeFixedI32(buf, &pVgEp->vgId); - buf = taosDecodeString(buf, &pVgEp->qmsg); +// buf = taosDecodeString(buf, &pVgEp->qmsg); buf = taosDecodeSEpSet(buf, &pVgEp->epSet); return (void *)buf; } @@ -382,13 +382,13 @@ SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) { SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp)); if (pConsumerEpNew == NULL) return NULL; pConsumerEpNew->consumerId = pConsumerEpOld->consumerId; - pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp); + pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL); return pConsumerEpNew; } void tDeleteSMqConsumerEp(void *data) { SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data; - taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp); + taosArrayDestroy(pConsumerEp->vgs); } int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { @@ -463,12 +463,13 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { pConsumerEp = (SMqConsumerEp *)pIter; SMqConsumerEp newEp = { .consumerId = pConsumerEp->consumerId, - .vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp), + .vgs = taosArrayDup(pConsumerEp->vgs, NULL), }; taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp)); } - pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp); + pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, NULL); memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN); + pSubNew->qmsg = taosStrdup(pSub->qmsg); return pSubNew; } @@ -478,10 +479,11 @@ void tDeleteSubscribeObj(SMqSubscribeObj *pSub) { pIter = taosHashIterate(pSub->consumerHash, pIter); if (pIter == NULL) break; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp); + taosArrayDestroy(pConsumerEp->vgs); } taosHashCleanup(pSub->consumerHash); - taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp); + taosArrayDestroy(pSub->unassignedVgs); + taosMemoryFreeClear(pSub->qmsg); } int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 64082536da..9a611fe46a 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -570,25 +570,21 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib mDebug("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId); - if (pSubplan) { - int32_t msgLen; - - pSubplan->execNode.epSet = pVgEp->epSet; - pSubplan->execNode.nodeId = pVgEp->vgId; - - if (qSubPlanToString(pSubplan, &pVgEp->qmsg, &msgLen) < 0) { - sdbRelease(pSdb, pVgroup); - qDestroyQueryPlan(pPlan); - terrno = TSDB_CODE_QRY_INVALID_INPUT; - return -1; - } - } else { - pVgEp->qmsg = taosStrdup(""); - } - sdbRelease(pSdb, pVgroup); } + if (pSubplan) { + int32_t msgLen; + + if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) { + qDestroyQueryPlan(pPlan); + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } + } else { + pSub->qmsg = taosStrdup(""); + } + qDestroyQueryPlan(pPlan); return 0; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 74421afa33..42af4944a5 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -99,13 +99,23 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj return pSub; } -static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscribeObj *pSub, - const SMqRebOutputVg *pRebVg) { +static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub, + const SMqRebOutputVg *pRebVg, SSubplan* pPlan) { SMqRebVgReq req = {0}; req.oldConsumerId = pRebVg->oldConsumerId; req.newConsumerId = pRebVg->newConsumerId; req.vgId = pRebVg->pVgEp->vgId; - req.qmsg = pRebVg->pVgEp->qmsg; + if(pPlan){ + pPlan->execNode.epSet = pRebVg->pVgEp->epSet; + pPlan->execNode.nodeId = pRebVg->pVgEp->vgId; + int32_t msgLen; + if (qSubPlanToString(pPlan, &req.qmsg, &msgLen) < 0) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } + }else{ + req.qmsg = taosStrdup(""); + } req.subType = pSub->subType; req.withMeta = pSub->withMeta; req.suid = pSub->stbUid; @@ -115,6 +125,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri int32_t ret = 0; tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, ret); if (ret < 0) { + taosMemoryFree(req.qmsg); return -1; } @@ -122,6 +133,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri void *buf = taosMemoryMalloc(tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(req.qmsg); return -1; } @@ -135,17 +147,19 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri if (tEncodeSMqRebVgReq(&encoder, &req) < 0) { taosMemoryFreeClear(buf); tEncoderClear(&encoder); + taosMemoryFree(req.qmsg); return -1; } tEncoderClear(&encoder); *pBuf = buf; *pLen = tlen; + taosMemoryFree(req.qmsg); return 0; } -static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub, - const SMqRebOutputVg *pRebVg) { +static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub, + const SMqRebOutputVg *pRebVg, SSubplan* pPlan) { // if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { // terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; // return -1; @@ -153,7 +167,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM void *buf; int32_t tlen; - if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg) < 0) { + if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg, pPlan) < 0) { return -1; } @@ -483,14 +497,25 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR } static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) { + struct SSubplan* pPlan = NULL; + if(strcmp(pOutput->pSub->qmsg, "") != 0){ + int32_t code = qStringToSubplan(pOutput->pSub->qmsg, &pPlan); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } + } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb"); if (pTrans == NULL) { + nodesDestroyNode((SNode*)pPlan); return -1; } mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL); if (mndTrancCheckConflict(pMnode, pTrans) != 0) { mndTransDrop(pTrans); + nodesDestroyNode((SNode*)pPlan); return -1; } @@ -500,11 +525,13 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu int32_t vgNum = taosArrayGetSize(rebVgs); for (int32_t i = 0; i < vgNum; i++) { SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i); - if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) { + if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg, pPlan) < 0) { mndTransDrop(pTrans); + nodesDestroyNode((SNode*)pPlan); return -1; } } + nodesDestroyNode((SNode*)pPlan); // 2. redo log: subscribe and vg assignment // subscribe From 8a81278024a05a664ec07c0e09c4ee56f7cbe8e9 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 13 Jun 2023 15:14:11 +0800 Subject: [PATCH 3/5] fix:load pHandle if necessary wher tq restore --- include/libs/wal/wal.h | 2 +- include/util/talgo.h | 2 +- include/util/tarray.h | 2 +- source/dnode/mnode/impl/inc/mndDef.h | 2 +- source/dnode/mnode/impl/src/mndDef.c | 38 +++-- source/dnode/vnode/src/inc/tq.h | 3 + source/dnode/vnode/src/tq/tq.c | 101 ++----------- source/dnode/vnode/src/tq/tqMeta.c | 217 +++++++++++++++++---------- source/libs/wal/src/walRef.c | 19 +-- source/util/src/tarray.c | 4 +- 10 files changed, 186 insertions(+), 204 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 1aa08ff802..47230bc95c 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -214,7 +214,7 @@ int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead); void walRefFirstVer(SWal *, SWalRef *); void walRefLastVer(SWal *, SWalRef *); -SWalRef *walRefCommittedVer(SWal *); +void walRefCommitVer(SWal *, SWalRef *); SWalRef *walOpenRef(SWal *); void walCloseRef(SWal *pWal, int64_t refId); diff --git a/include/util/talgo.h b/include/util/talgo.h index f9d51c4b5b..7c92c0fe87 100644 --- a/include/util/talgo.h +++ b/include/util/talgo.h @@ -31,7 +31,7 @@ typedef void *(*__array_item_dup_fn_t)(void *); typedef void (*FDelete)(void *); typedef int32_t (*FEncode)(void **buf, const void *dst); -typedef void *(*FDecode)(const void *buf, void *dst); +typedef void *(*FDecode)(const void *buf, void *dst, int8_t sver); #define TD_EQ 0x1 #define TD_GT 0x2 diff --git a/include/util/tarray.h b/include/util/tarray.h index 4bf24b46b9..a93c695370 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -244,7 +244,7 @@ int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param); int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode); -void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz); +void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz, int8_t sver); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 3a3050839b..4a90f3939f 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -572,7 +572,7 @@ typedef struct { //SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp); //void tDeleteSMqVgEp(SMqVgEp* pVgEp); int32_t tEncodeSMqVgEp(void** buf, const SMqVgEp* pVgEp); -void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp); +void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp, int8_t sver); typedef struct { int64_t consumerId; // -1 for unassigned diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index b6cc962d16..15f0cc9b71 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -207,9 +207,13 @@ int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) { return tlen; } -void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) { +void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) { buf = taosDecodeFixedI32(buf, &pVgEp->vgId); -// buf = taosDecodeString(buf, &pVgEp->qmsg); + if(sver == 1){ + uint64_t size = 0; + buf = taosDecodeVariantU64(buf, &size); + buf = POINTER_SHIFT(buf, size); + } buf = taosDecodeSEpSet(buf, &pVgEp->epSet); return (void *)buf; } @@ -390,18 +394,18 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s return (void *)buf; } -SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) { - SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp)); - if (pConsumerEpNew == NULL) return NULL; - pConsumerEpNew->consumerId = pConsumerEpOld->consumerId; - pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL); - return pConsumerEpNew; -} - -void tDeleteSMqConsumerEp(void *data) { - SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data; - taosArrayDestroy(pConsumerEp->vgs); -} +//SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) { +// SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp)); +// if (pConsumerEpNew == NULL) return NULL; +// pConsumerEpNew->consumerId = pConsumerEpOld->consumerId; +// pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL); +// return pConsumerEpNew; +//} +// +//void tDeleteSMqConsumerEp(void *data) { +// SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data; +// taosArrayDestroy(pConsumerEp->vgs); +//} int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { int32_t tlen = 0; @@ -436,7 +440,7 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) { buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); - buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp)); + buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver); if (sver > 1){ int32_t szVgs = 0; buf = taosDecodeFixedI32(buf, &szVgs); @@ -580,6 +584,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { // do nothing } } + tlen += taosEncodeString(buf, pSub->qmsg); return tlen; } @@ -602,7 +607,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) { taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp)); } - buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp)); + buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver); buf = taosDecodeStringTo(buf, pSub->dbName); if (sver > 1){ @@ -626,6 +631,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) { } } } + buf = taosDecodeString(buf, &pSub->qmsg); } return (void *)buf; } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 4ba8d6d69f..b35dc71ed9 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -139,6 +139,7 @@ static STqMgmt tqMgmt = {0}; int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle); int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); +void tqDestroyTqHandle(void* data); // tqRead int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset); @@ -161,6 +162,8 @@ int32_t tqMetaRestoreHandle(STQ* pTq); int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen); int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key); int32_t tqMetaRestoreCheckInfo(STQ* pTq); +int32_t tqMetaGetHandle(STQ* pTq, const char* key); +int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle); STqOffsetStore* tqOffsetOpen(STQ* pTq); void tqOffsetClose(STqOffsetStore*); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index edefd3ba90..f4d3407aa2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -62,7 +62,7 @@ void tqCleanUp() { } } -static void destroyTqHandle(void* data) { +void tqDestroyTqHandle(void* data) { STqHandle* pData = (STqHandle*)data; qDestroyTask(pData->execHandle.task); @@ -102,7 +102,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { pTq->walLogLastVer = pVnode->pWal->vers.lastVer; pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); - taosHashSetFreeFp(pTq->pHandle, destroyTqHandle); + taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle); taosInitRWLatch(&pTq->lock); pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); @@ -661,13 +661,17 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg return -1; } - SVnode* pVnode = pTq->pVnode; - int32_t vgId = TD_VID(pVnode); - - tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey, + tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey, req.oldConsumerId, req.newConsumerId); - STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); + STqHandle* pHandle = NULL; + while(1){ + pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); + if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0){ + break; + } + } + if (pHandle == NULL) { if (req.oldConsumerId != -1) { tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64, @@ -678,86 +682,13 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId); goto end; } - - STqHandle tqHandle = {0}; - pHandle = &tqHandle; - - memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN); - pHandle->consumerId = req.newConsumerId; - pHandle->epoch = -1; - - pHandle->execHandle.subType = req.subType; - pHandle->fetchMeta = req.withMeta; - - // TODO version should be assigned and refed during preprocess - SWalRef* pRef = walRefCommittedVer(pVnode->pWal); - if (pRef == NULL) { - ret = -1; + STqHandle handle = {0}; + ret = tqCreateHandle(pTq, &req, &handle); + if(ret < 0){ + tqDestroyTqHandle(&handle); goto end; } - - int64_t ver = pRef->refVer; - pHandle->pRef = pRef; - - SReadHandle handle = {.vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver}; - initStorageAPI(&handle.api); - - pHandle->snapshotVer = ver; - - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - pHandle->execHandle.execCol.qmsg = taosStrdup(req.qmsg); - - pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, - &pHandle->execHandle.numOfCols, req.newConsumerId); - void* scanner = NULL; - qExtractStreamScanner(pHandle->execHandle.task, &scanner); - pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner); - } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { - pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL); - pHandle->execHandle.pTqReader = tqReaderOpen(pVnode); - - pHandle->execHandle.execDb.pFilterOutTbUid = - taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - buildSnapContext(handle.vnode, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, - (SSnapContext**)(&handle.sContext)); - - pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId); - } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { - pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL); - pHandle->execHandle.execTb.suid = req.suid; - pHandle->execHandle.execTb.qmsg = taosStrdup(req.qmsg); - - if (strcmp(pHandle->execHandle.execTb.qmsg, "") != 0) { - if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) { - tqError("nodesStringToNode error in sub stable, since %s, vgId:%d, subkey:%s consumer:0x%" PRIx64, terrstr(), - pVnode->config.vgId, req.subKey, pHandle->consumerId); - return -1; - } - } - - buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, - (SSnapContext**)(&handle.sContext)); - pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId); - - SArray* tbUidList = NULL; - ret = qGetTableList(req.suid, pVnode, pHandle->execHandle.execTb.node, &tbUidList, pHandle->execHandle.task); - if (ret != TDB_CODE_SUCCESS) { - tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey, - pHandle->consumerId); - taosArrayDestroy(tbUidList); - goto end; - } - tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%" PRIx64 " suid:%" PRId64, - pVnode->config.vgId, req.subKey, pHandle->consumerId, req.suid); - pHandle->execHandle.pTqReader = tqReaderOpen(pVnode); - tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList, NULL); - taosArrayDestroy(tbUidList); - } - - taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); - tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey, pHandle->consumerId); - ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); - goto end; + ret = tqMetaSaveHandle(pTq, req.subKey, &handle); } else { taosWLockLatch(&pTq->lock); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index ba6d7cb501..3b0e6749c2 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -88,9 +88,9 @@ int32_t tqMetaOpen(STQ* pTq) { return -1; } - if (tqMetaRestoreHandle(pTq) < 0) { - return -1; - } +// if (tqMetaRestoreHandle(pTq) < 0) { +// return -1; +// } if (tqMetaRestoreCheckInfo(pTq) < 0) { return -1; @@ -274,6 +274,120 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { return 0; } +static int buildHandle(STQ* pTq, STqHandle* handle){ + SVnode* pVnode = pTq->pVnode; + int32_t vgId = TD_VID(pVnode); + + handle->pRef = walOpenRef(pVnode->pWal); + if (handle->pRef == NULL) { + return -1; + } + walSetRefVer(handle->pRef, handle->snapshotVer); + + SReadHandle reader = { + .vnode = pVnode, + .initTableReader = true, + .initTqReader = true, + .version = handle->snapshotVer, + }; + + initStorageAPI(&reader.api); + + if (handle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + handle->execHandle.task = + qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId, &handle->execHandle.numOfCols, handle->consumerId); + if (handle->execHandle.task == NULL) { + tqError("cannot create exec task for %s", handle->subKey); + return -1; + } + void* scanner = NULL; + qExtractStreamScanner(handle->execHandle.task, &scanner); + if (scanner == NULL) { + tqError("cannot extract stream scanner for %s", handle->subKey); + return -1; + } + handle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner); + if (handle->execHandle.pTqReader == NULL) { + tqError("cannot extract exec reader for %s", handle->subKey); + return -1; + } + } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) { + handle->pWalReader = walOpenReader(pVnode->pWal, NULL); + handle->execHandle.pTqReader = tqReaderOpen(pVnode); + + buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta, + (SSnapContext**)(&reader.sContext)); + handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId); + } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { + handle->pWalReader = walOpenReader(pVnode->pWal, NULL); + + if(handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) { + if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) { + tqError("nodesStringToNode error in sub stable, since %s", terrstr()); + return -1; + } + } + buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid, handle->execHandle.subType, + handle->fetchMeta, (SSnapContext**)(&reader.sContext)); + handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId); + + SArray* tbUidList = NULL; + int ret = qGetTableList(handle->execHandle.execTb.suid, pVnode, handle->execHandle.execTb.node, &tbUidList, handle->execHandle.task); + if(ret != TDB_CODE_SUCCESS) { + tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle->subKey, handle->consumerId); + taosArrayDestroy(tbUidList); + return -1; + } + tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId, handle->execHandle.execTb.suid); + handle->execHandle.pTqReader = tqReaderOpen(pVnode); + tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL); + taosArrayDestroy(tbUidList); + } + return 0; +} + +static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){ + int32_t vgId = TD_VID(pTq->pVnode); + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + tDecodeSTqHandle(&decoder, handle); + tDecoderClear(&decoder); + + if(buildHandle(pTq, handle) < 0){ + return -1; + } + tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId); + return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); +} + +int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ + int32_t vgId = TD_VID(pTq->pVnode); + + memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN); + handle->consumerId = req->newConsumerId; + handle->epoch = -1; + + handle->execHandle.subType = req->subType; + handle->fetchMeta = req->withMeta; + if(req->subType == TOPIC_SUB_TYPE__COLUMN){ + handle->execHandle.execCol.qmsg = taosStrdup(req->qmsg); + }else if(req->subType == TOPIC_SUB_TYPE__DB){ + handle->execHandle.execDb.pFilterOutTbUid = + taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + }else if(req->subType == TOPIC_SUB_TYPE__TABLE){ + handle->execHandle.execTb.suid = req->suid; + handle->execHandle.execTb.qmsg = taosStrdup(req->qmsg); + } + + handle->snapshotVer = walGetLastVer(pTq->pVnode->pWal); + + if(buildHandle(pTq, handle) < 0){ + return -1; + } + tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId); + return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); +} + int32_t tqMetaRestoreHandle(STQ* pTq) { int code = 0; TBC* pCur = NULL; @@ -281,97 +395,40 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { return -1; } - int32_t vgId = TD_VID(pTq->pVnode); void* pKey = NULL; int kLen = 0; void* pVal = NULL; int vLen = 0; - SDecoder decoder; tdbTbcMoveToFirst(pCur); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { STqHandle handle = {0}; - tDecoderInit(&decoder, (uint8_t*)pVal, vLen); - tDecodeSTqHandle(&decoder, &handle); - tDecoderClear(&decoder); - - handle.pRef = walOpenRef(pTq->pVnode->pWal); - if (handle.pRef == NULL) { - code = -1; - goto end; + code = restoreHandle(pTq, pVal, vLen, &handle); + if (code < 0){ + tqDestroyTqHandle(&handle); + break; } - walSetRefVer(handle.pRef, handle.snapshotVer); - - SReadHandle reader = { - .vnode = pTq->pVnode, - .initTableReader = true, - .initTqReader = true, - .version = handle.snapshotVer - }; - - initStorageAPI(&reader.api); - - if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - handle.execHandle.task = - qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, 0); - if (handle.execHandle.task == NULL) { - tqError("cannot create exec task for %s", handle.subKey); - code = -1; - goto end; - } - void* scanner = NULL; - qExtractStreamScanner(handle.execHandle.task, &scanner); - if (scanner == NULL) { - tqError("cannot extract stream scanner for %s", handle.subKey); - code = -1; - goto end; - } - handle.execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner); - if (handle.execHandle.pTqReader == NULL) { - tqError("cannot extract exec reader for %s", handle.subKey); - code = -1; - goto end; - } - } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) { - handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode); - - buildSnapContext(reader.vnode, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, - (SSnapContext**)(&reader.sContext)); - handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0); - } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) { - handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - - if(handle.execHandle.execTb.qmsg != NULL && strcmp(handle.execHandle.execTb.qmsg, "") != 0) { - if (nodesStringToNode(handle.execHandle.execTb.qmsg, &handle.execHandle.execTb.node) != 0) { - tqError("nodesStringToNode error in sub stable, since %s", terrstr()); - return -1; - } - } - buildSnapContext(reader.vnode, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, - handle.fetchMeta, (SSnapContext**)(&reader.sContext)); - handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0); - - SArray* tbUidList = NULL; - int ret = qGetTableList(handle.execHandle.execTb.suid, pTq->pVnode, handle.execHandle.execTb.node, &tbUidList, handle.execHandle.task); - if(ret != TDB_CODE_SUCCESS) { - tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle.subKey, handle.consumerId); - taosArrayDestroy(tbUidList); - goto end; - } - tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid); - handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode); - tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList, NULL); - taosArrayDestroy(tbUidList); - } - tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId); - taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle)); } -end: tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); return code; } + +int32_t tqMetaGetHandle(STQ* pTq, const char* key) { + void* pVal = NULL; + int vLen = 0; + + if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &pVal, &vLen) < 0) { + return -1; + } + STqHandle handle = {0}; + int code = restoreHandle(pTq, pVal, vLen, &handle); + if (code < 0){ + tqDestroyTqHandle(&handle); + } + tdbFree(pVal); + return code; +} diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index eb36389f1d..2f1bcfee83 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -81,26 +81,11 @@ void walRefLastVer(SWal *pWal, SWalRef *pRef) { wDebug("vgId:%d, wal ref version %" PRId64 " for last", pWal->cfg.vgId, ver); } -SWalRef *walRefCommittedVer(SWal *pWal) { - SWalRef *pRef = walOpenRef(pWal); - if (pRef == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } +void walRefCommitVer(SWal *pWal, SWalRef *pRef) { taosThreadMutexLock(&pWal->mutex); - int64_t ver = walGetCommittedVer(pWal); - - wDebug("vgId:%d, wal ref version %" PRId64 " for committed", pWal->cfg.vgId, ver); - pRef->refVer = ver; - // bsearch in fileSet - SWalFileInfo tmpInfo; - tmpInfo.firstVer = ver; - SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); - ASSERT(pRet != NULL); - // pRef->refFile = pRet->firstVer; taosThreadMutexUnlock(&pWal->mutex); - return pRef; + wDebug("vgId:%d, wal ref version %" PRId64 " for committed", pWal->cfg.vgId, ver); } diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 6c7c5ddb0d..8906391a9a 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -476,13 +476,13 @@ int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode) { return tlen; } -void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz) { +void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz, int8_t sver) { int32_t sz; buf = taosDecodeFixedI32(buf, &sz); *pArray = taosArrayInit(sz, sizeof(void*)); for (int32_t i = 0; i < sz; i++) { void* data = taosMemoryCalloc(1, dataSz); - buf = decode(buf, data); + buf = decode(buf, data, sver); taosArrayPush(*pArray, &data); } return (void*)buf; From 827e84b1a5523f40c5f7f092aa34cf58362a85e9 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 13 Jun 2023 17:10:23 +0800 Subject: [PATCH 4/5] fix:memory leak --- source/dnode/mnode/impl/inc/mndDef.h | 4 ++-- source/dnode/mnode/impl/src/mndDef.c | 36 ++++++++++++++-------------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index af66f8d149..62beee0303 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -570,8 +570,8 @@ typedef struct { SEpSet epSet; } SMqVgEp; -//SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp); -//void tDeleteSMqVgEp(SMqVgEp* pVgEp); +SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp); +void tDeleteSMqVgEp(SMqVgEp* pVgEp); int32_t tEncodeSMqVgEp(void** buf, const SMqVgEp* pVgEp); void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp, int8_t sver); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 15f0cc9b71..f0fa40cacf 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -183,21 +183,21 @@ void tFreeStreamObj(SStreamObj *pStream) { } } -//SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { -// SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp)); -// if (pVgEpNew == NULL) return NULL; -// pVgEpNew->vgId = pVgEp->vgId; -//// pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg); -// pVgEpNew->epSet = pVgEp->epSet; -// return pVgEpNew; -//} +SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { + SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp)); + if (pVgEpNew == NULL) return NULL; + pVgEpNew->vgId = pVgEp->vgId; +// pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg); + pVgEpNew->epSet = pVgEp->epSet; + return pVgEpNew; +} -//void tDeleteSMqVgEp(SMqVgEp *pVgEp) { -// if (pVgEp) { -//// taosMemoryFreeClear(pVgEp->qmsg); -// taosMemoryFree(pVgEp); -// } -//} +void tDeleteSMqVgEp(SMqVgEp *pVgEp) { + if (pVgEp) { +// taosMemoryFreeClear(pVgEp->qmsg); + taosMemoryFree(pVgEp); + } +} int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) { int32_t tlen = 0; @@ -517,11 +517,11 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { pConsumerEp = (SMqConsumerEp *)pIter; SMqConsumerEp newEp = { .consumerId = pConsumerEp->consumerId, - .vgs = taosArrayDup(pConsumerEp->vgs, NULL), + .vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp), }; taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp)); } - pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, NULL); + pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp); pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL); memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN); pSubNew->qmsg = taosStrdup(pSub->qmsg); @@ -534,11 +534,11 @@ void tDeleteSubscribeObj(SMqSubscribeObj *pSub) { pIter = taosHashIterate(pSub->consumerHash, pIter); if (pIter == NULL) break; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - taosArrayDestroy(pConsumerEp->vgs); + taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp); taosArrayDestroy(pConsumerEp->offsetRows); } taosHashCleanup(pSub->consumerHash); - taosArrayDestroy(pSub->unassignedVgs); + taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp); taosMemoryFreeClear(pSub->qmsg); taosArrayDestroy(pSub->offsetRows); } From 7e6c7e767ade93921ddfaca04dd52c34c080cdef Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 13 Jun 2023 19:10:45 +0800 Subject: [PATCH 5/5] fix:Vnode is closed or removed if drop database --- source/client/src/clientSml.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index bea237d09e..efa955ec84 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1756,9 +1756,8 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, request->code = code; info->cost.endTime = taosGetTimestampUs(); info->cost.code = code; - if (code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING || - code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT || - code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { + if (NEED_CLIENT_HANDLE_ERROR(code) || code == TSDB_CODE_SDB_OBJ_CREATING || + code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT) { if (cnt++ >= 10) { uInfo("SML:%" PRIx64 " retry:%d/10 end code:%d, msg:%s", info->id, cnt, code, tstrerror(code)); break;