diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 1aa08ff802..47230bc95c 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -214,7 +214,7 @@ int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead); void walRefFirstVer(SWal *, SWalRef *); void walRefLastVer(SWal *, SWalRef *); -SWalRef *walRefCommittedVer(SWal *); +void walRefCommitVer(SWal *, SWalRef *); SWalRef *walOpenRef(SWal *); void walCloseRef(SWal *pWal, int64_t refId); diff --git a/include/util/talgo.h b/include/util/talgo.h index f9d51c4b5b..7c92c0fe87 100644 --- a/include/util/talgo.h +++ b/include/util/talgo.h @@ -31,7 +31,7 @@ typedef void *(*__array_item_dup_fn_t)(void *); typedef void (*FDelete)(void *); typedef int32_t (*FEncode)(void **buf, const void *dst); -typedef void *(*FDecode)(const void *buf, void *dst); +typedef void *(*FDecode)(const void *buf, void *dst, int8_t sver); #define TD_EQ 0x1 #define TD_GT 0x2 diff --git a/include/util/tarray.h b/include/util/tarray.h index 12d0bcdcda..9638a2a035 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -244,7 +244,7 @@ int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param); int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode); -void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz); +void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz, int8_t sver); #ifdef __cplusplus } diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index bea237d09e..efa955ec84 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1756,9 +1756,8 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, request->code = code; info->cost.endTime = taosGetTimestampUs(); info->cost.code = code; - if (code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING || - code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT || - code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { + if (NEED_CLIENT_HANDLE_ERROR(code) || code == TSDB_CODE_SDB_OBJ_CREATING || + code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT) { if (cnt++ >= 10) { uInfo("SML:%" PRIx64 " retry:%d/10 end code:%d, msg:%s", info->id, cnt, code, tstrerror(code)); break; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 9ee3f11c37..62beee0303 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -566,14 +566,14 @@ void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer typedef struct { int32_t vgId; - char* qmsg; // SubPlanToString +// char* qmsg; // SubPlanToString SEpSet epSet; } SMqVgEp; SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp); void tDeleteSMqVgEp(SMqVgEp* pVgEp); int32_t tEncodeSMqVgEp(void** buf, const SMqVgEp* pVgEp); -void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp); +void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp, int8_t sver); typedef struct { int64_t consumerId; // -1 for unassigned @@ -598,6 +598,7 @@ typedef struct { SArray* unassignedVgs; // SArray SArray* offsetRows; char dbName[TSDB_DB_FNAME_LEN]; + char* qmsg; // SubPlanToString } SMqSubscribeObj; SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]); @@ -696,12 +697,12 @@ int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj, int32_t sver); void tFreeStreamObj(SStreamObj* pObj); -typedef struct { - char streamName[TSDB_STREAM_FNAME_LEN]; - int64_t uid; - int64_t streamUid; - SArray* childInfo; // SArray -} SStreamCheckpointObj; +//typedef struct { +// char streamName[TSDB_STREAM_FNAME_LEN]; +// int64_t uid; +// int64_t streamUid; +// SArray* childInfo; // SArray +//} SStreamCheckpointObj; #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index b5254dbb88..c8ee35f402 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -665,7 +665,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SCMSubscribeReq subscribe = {0}; tDeserializeSCMSubscribeReq(msgStr, &subscribe); - uint64_t consumerId = subscribe.consumerId; + int64_t consumerId = subscribe.consumerId; char *cgroup = subscribe.cgroup; SMqConsumerObj *pExistedConsumer = NULL; SMqConsumerObj *pConsumerNew = NULL; diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 7f79408e8c..f0fa40cacf 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -187,14 +187,14 @@ SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp)); if (pVgEpNew == NULL) return NULL; pVgEpNew->vgId = pVgEp->vgId; - pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg); +// pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg); pVgEpNew->epSet = pVgEp->epSet; return pVgEpNew; } void tDeleteSMqVgEp(SMqVgEp *pVgEp) { if (pVgEp) { - taosMemoryFreeClear(pVgEp->qmsg); +// taosMemoryFreeClear(pVgEp->qmsg); taosMemoryFree(pVgEp); } } @@ -202,14 +202,18 @@ void tDeleteSMqVgEp(SMqVgEp *pVgEp) { int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pVgEp->vgId); - tlen += taosEncodeString(buf, pVgEp->qmsg); +// tlen += taosEncodeString(buf, pVgEp->qmsg); tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); return tlen; } -void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) { +void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) { buf = taosDecodeFixedI32(buf, &pVgEp->vgId); - buf = taosDecodeString(buf, &pVgEp->qmsg); + if(sver == 1){ + uint64_t size = 0; + buf = taosDecodeVariantU64(buf, &size); + buf = POINTER_SHIFT(buf, size); + } buf = taosDecodeSEpSet(buf, &pVgEp->epSet); return (void *)buf; } @@ -387,7 +391,6 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval); buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg); } - return (void *)buf; } @@ -395,13 +398,13 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s // SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp)); // if (pConsumerEpNew == NULL) return NULL; // pConsumerEpNew->consumerId = pConsumerEpOld->consumerId; -// pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp); +// pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL); // return pConsumerEpNew; //} // //void tDeleteSMqConsumerEp(void *data) { // SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data; -// taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp); +// taosArrayDestroy(pConsumerEp->vgs); //} int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { @@ -437,7 +440,7 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) { buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); - buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp)); + buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver); if (sver > 1){ int32_t szVgs = 0; buf = taosDecodeFixedI32(buf, &szVgs); @@ -521,6 +524,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp); pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL); memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN); + pSubNew->qmsg = taosStrdup(pSub->qmsg); return pSubNew; } @@ -535,6 +539,7 @@ void tDeleteSubscribeObj(SMqSubscribeObj *pSub) { } taosHashCleanup(pSub->consumerHash); taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp); + taosMemoryFreeClear(pSub->qmsg); taosArrayDestroy(pSub->offsetRows); } @@ -579,6 +584,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { // do nothing } } + tlen += taosEncodeString(buf, pSub->qmsg); return tlen; } @@ -601,7 +607,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) { taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp)); } - buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp)); + buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver); buf = taosDecodeStringTo(buf, pSub->dbName); if (sver > 1){ @@ -625,6 +631,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) { } } } + buf = taosDecodeString(buf, &pSub->qmsg); } return (void *)buf; } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 64082536da..9a611fe46a 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -570,25 +570,21 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib mDebug("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId); - if (pSubplan) { - int32_t msgLen; - - pSubplan->execNode.epSet = pVgEp->epSet; - pSubplan->execNode.nodeId = pVgEp->vgId; - - if (qSubPlanToString(pSubplan, &pVgEp->qmsg, &msgLen) < 0) { - sdbRelease(pSdb, pVgroup); - qDestroyQueryPlan(pPlan); - terrno = TSDB_CODE_QRY_INVALID_INPUT; - return -1; - } - } else { - pVgEp->qmsg = taosStrdup(""); - } - sdbRelease(pSdb, pVgroup); } + if (pSubplan) { + int32_t msgLen; + + if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) { + qDestroyQueryPlan(pPlan); + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } + } else { + pSub->qmsg = taosStrdup(""); + } + qDestroyQueryPlan(pPlan); return 0; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 7c5ba6eae5..61691a30d5 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -99,13 +99,23 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj return pSub; } -static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscribeObj *pSub, - const SMqRebOutputVg *pRebVg) { +static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub, + const SMqRebOutputVg *pRebVg, SSubplan* pPlan) { SMqRebVgReq req = {0}; req.oldConsumerId = pRebVg->oldConsumerId; req.newConsumerId = pRebVg->newConsumerId; req.vgId = pRebVg->pVgEp->vgId; - req.qmsg = pRebVg->pVgEp->qmsg; + if(pPlan){ + pPlan->execNode.epSet = pRebVg->pVgEp->epSet; + pPlan->execNode.nodeId = pRebVg->pVgEp->vgId; + int32_t msgLen; + if (qSubPlanToString(pPlan, &req.qmsg, &msgLen) < 0) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } + }else{ + req.qmsg = taosStrdup(""); + } req.subType = pSub->subType; req.withMeta = pSub->withMeta; req.suid = pSub->stbUid; @@ -115,6 +125,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri int32_t ret = 0; tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, ret); if (ret < 0) { + taosMemoryFree(req.qmsg); return -1; } @@ -122,6 +133,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri void *buf = taosMemoryMalloc(tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(req.qmsg); return -1; } @@ -135,17 +147,19 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri if (tEncodeSMqRebVgReq(&encoder, &req) < 0) { taosMemoryFreeClear(buf); tEncoderClear(&encoder); + taosMemoryFree(req.qmsg); return -1; } tEncoderClear(&encoder); *pBuf = buf; *pLen = tlen; + taosMemoryFree(req.qmsg); return 0; } -static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub, - const SMqRebOutputVg *pRebVg) { +static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub, + const SMqRebOutputVg *pRebVg, SSubplan* pPlan) { // if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { // terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; // return -1; @@ -153,7 +167,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM void *buf; int32_t tlen; - if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg) < 0) { + if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg, pPlan) < 0) { return -1; } @@ -519,14 +533,25 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR } static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) { + struct SSubplan* pPlan = NULL; + if(strcmp(pOutput->pSub->qmsg, "") != 0){ + int32_t code = qStringToSubplan(pOutput->pSub->qmsg, &pPlan); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } + } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb"); if (pTrans == NULL) { + nodesDestroyNode((SNode*)pPlan); return -1; } mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL); if (mndTransCheckConflict(pMnode, pTrans) != 0) { mndTransDrop(pTrans); + nodesDestroyNode((SNode*)pPlan); return -1; } @@ -536,11 +561,13 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu int32_t vgNum = taosArrayGetSize(rebVgs); for (int32_t i = 0; i < vgNum; i++) { SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i); - if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) { + if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg, pPlan) < 0) { mndTransDrop(pTrans); + nodesDestroyNode((SNode*)pPlan); return -1; } } + nodesDestroyNode((SNode*)pPlan); // 2. redo log: subscribe and vg assignment // subscribe diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 4ba8d6d69f..b35dc71ed9 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -139,6 +139,7 @@ static STqMgmt tqMgmt = {0}; int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle); int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); +void tqDestroyTqHandle(void* data); // tqRead int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset); @@ -161,6 +162,8 @@ int32_t tqMetaRestoreHandle(STQ* pTq); int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen); int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key); int32_t tqMetaRestoreCheckInfo(STQ* pTq); +int32_t tqMetaGetHandle(STQ* pTq, const char* key); +int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle); STqOffsetStore* tqOffsetOpen(STQ* pTq); void tqOffsetClose(STqOffsetStore*); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index edefd3ba90..f4d3407aa2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -62,7 +62,7 @@ void tqCleanUp() { } } -static void destroyTqHandle(void* data) { +void tqDestroyTqHandle(void* data) { STqHandle* pData = (STqHandle*)data; qDestroyTask(pData->execHandle.task); @@ -102,7 +102,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { pTq->walLogLastVer = pVnode->pWal->vers.lastVer; pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); - taosHashSetFreeFp(pTq->pHandle, destroyTqHandle); + taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle); taosInitRWLatch(&pTq->lock); pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); @@ -661,13 +661,17 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg return -1; } - SVnode* pVnode = pTq->pVnode; - int32_t vgId = TD_VID(pVnode); - - tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey, + tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey, req.oldConsumerId, req.newConsumerId); - STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); + STqHandle* pHandle = NULL; + while(1){ + pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); + if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0){ + break; + } + } + if (pHandle == NULL) { if (req.oldConsumerId != -1) { tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64, @@ -678,86 +682,13 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId); goto end; } - - STqHandle tqHandle = {0}; - pHandle = &tqHandle; - - memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN); - pHandle->consumerId = req.newConsumerId; - pHandle->epoch = -1; - - pHandle->execHandle.subType = req.subType; - pHandle->fetchMeta = req.withMeta; - - // TODO version should be assigned and refed during preprocess - SWalRef* pRef = walRefCommittedVer(pVnode->pWal); - if (pRef == NULL) { - ret = -1; + STqHandle handle = {0}; + ret = tqCreateHandle(pTq, &req, &handle); + if(ret < 0){ + tqDestroyTqHandle(&handle); goto end; } - - int64_t ver = pRef->refVer; - pHandle->pRef = pRef; - - SReadHandle handle = {.vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver}; - initStorageAPI(&handle.api); - - pHandle->snapshotVer = ver; - - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - pHandle->execHandle.execCol.qmsg = taosStrdup(req.qmsg); - - pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, - &pHandle->execHandle.numOfCols, req.newConsumerId); - void* scanner = NULL; - qExtractStreamScanner(pHandle->execHandle.task, &scanner); - pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner); - } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { - pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL); - pHandle->execHandle.pTqReader = tqReaderOpen(pVnode); - - pHandle->execHandle.execDb.pFilterOutTbUid = - taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - buildSnapContext(handle.vnode, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, - (SSnapContext**)(&handle.sContext)); - - pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId); - } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { - pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL); - pHandle->execHandle.execTb.suid = req.suid; - pHandle->execHandle.execTb.qmsg = taosStrdup(req.qmsg); - - if (strcmp(pHandle->execHandle.execTb.qmsg, "") != 0) { - if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) { - tqError("nodesStringToNode error in sub stable, since %s, vgId:%d, subkey:%s consumer:0x%" PRIx64, terrstr(), - pVnode->config.vgId, req.subKey, pHandle->consumerId); - return -1; - } - } - - buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, - (SSnapContext**)(&handle.sContext)); - pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId); - - SArray* tbUidList = NULL; - ret = qGetTableList(req.suid, pVnode, pHandle->execHandle.execTb.node, &tbUidList, pHandle->execHandle.task); - if (ret != TDB_CODE_SUCCESS) { - tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey, - pHandle->consumerId); - taosArrayDestroy(tbUidList); - goto end; - } - tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%" PRIx64 " suid:%" PRId64, - pVnode->config.vgId, req.subKey, pHandle->consumerId, req.suid); - pHandle->execHandle.pTqReader = tqReaderOpen(pVnode); - tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList, NULL); - taosArrayDestroy(tbUidList); - } - - taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); - tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey, pHandle->consumerId); - ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); - goto end; + ret = tqMetaSaveHandle(pTq, req.subKey, &handle); } else { taosWLockLatch(&pTq->lock); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index ba6d7cb501..3b0e6749c2 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -88,9 +88,9 @@ int32_t tqMetaOpen(STQ* pTq) { return -1; } - if (tqMetaRestoreHandle(pTq) < 0) { - return -1; - } +// if (tqMetaRestoreHandle(pTq) < 0) { +// return -1; +// } if (tqMetaRestoreCheckInfo(pTq) < 0) { return -1; @@ -274,6 +274,120 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { return 0; } +static int buildHandle(STQ* pTq, STqHandle* handle){ + SVnode* pVnode = pTq->pVnode; + int32_t vgId = TD_VID(pVnode); + + handle->pRef = walOpenRef(pVnode->pWal); + if (handle->pRef == NULL) { + return -1; + } + walSetRefVer(handle->pRef, handle->snapshotVer); + + SReadHandle reader = { + .vnode = pVnode, + .initTableReader = true, + .initTqReader = true, + .version = handle->snapshotVer, + }; + + initStorageAPI(&reader.api); + + if (handle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + handle->execHandle.task = + qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId, &handle->execHandle.numOfCols, handle->consumerId); + if (handle->execHandle.task == NULL) { + tqError("cannot create exec task for %s", handle->subKey); + return -1; + } + void* scanner = NULL; + qExtractStreamScanner(handle->execHandle.task, &scanner); + if (scanner == NULL) { + tqError("cannot extract stream scanner for %s", handle->subKey); + return -1; + } + handle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner); + if (handle->execHandle.pTqReader == NULL) { + tqError("cannot extract exec reader for %s", handle->subKey); + return -1; + } + } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) { + handle->pWalReader = walOpenReader(pVnode->pWal, NULL); + handle->execHandle.pTqReader = tqReaderOpen(pVnode); + + buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta, + (SSnapContext**)(&reader.sContext)); + handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId); + } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { + handle->pWalReader = walOpenReader(pVnode->pWal, NULL); + + if(handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) { + if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) { + tqError("nodesStringToNode error in sub stable, since %s", terrstr()); + return -1; + } + } + buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid, handle->execHandle.subType, + handle->fetchMeta, (SSnapContext**)(&reader.sContext)); + handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId); + + SArray* tbUidList = NULL; + int ret = qGetTableList(handle->execHandle.execTb.suid, pVnode, handle->execHandle.execTb.node, &tbUidList, handle->execHandle.task); + if(ret != TDB_CODE_SUCCESS) { + tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle->subKey, handle->consumerId); + taosArrayDestroy(tbUidList); + return -1; + } + tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId, handle->execHandle.execTb.suid); + handle->execHandle.pTqReader = tqReaderOpen(pVnode); + tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL); + taosArrayDestroy(tbUidList); + } + return 0; +} + +static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){ + int32_t vgId = TD_VID(pTq->pVnode); + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + tDecodeSTqHandle(&decoder, handle); + tDecoderClear(&decoder); + + if(buildHandle(pTq, handle) < 0){ + return -1; + } + tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId); + return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); +} + +int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ + int32_t vgId = TD_VID(pTq->pVnode); + + memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN); + handle->consumerId = req->newConsumerId; + handle->epoch = -1; + + handle->execHandle.subType = req->subType; + handle->fetchMeta = req->withMeta; + if(req->subType == TOPIC_SUB_TYPE__COLUMN){ + handle->execHandle.execCol.qmsg = taosStrdup(req->qmsg); + }else if(req->subType == TOPIC_SUB_TYPE__DB){ + handle->execHandle.execDb.pFilterOutTbUid = + taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + }else if(req->subType == TOPIC_SUB_TYPE__TABLE){ + handle->execHandle.execTb.suid = req->suid; + handle->execHandle.execTb.qmsg = taosStrdup(req->qmsg); + } + + handle->snapshotVer = walGetLastVer(pTq->pVnode->pWal); + + if(buildHandle(pTq, handle) < 0){ + return -1; + } + tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId); + return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); +} + int32_t tqMetaRestoreHandle(STQ* pTq) { int code = 0; TBC* pCur = NULL; @@ -281,97 +395,40 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { return -1; } - int32_t vgId = TD_VID(pTq->pVnode); void* pKey = NULL; int kLen = 0; void* pVal = NULL; int vLen = 0; - SDecoder decoder; tdbTbcMoveToFirst(pCur); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { STqHandle handle = {0}; - tDecoderInit(&decoder, (uint8_t*)pVal, vLen); - tDecodeSTqHandle(&decoder, &handle); - tDecoderClear(&decoder); - - handle.pRef = walOpenRef(pTq->pVnode->pWal); - if (handle.pRef == NULL) { - code = -1; - goto end; + code = restoreHandle(pTq, pVal, vLen, &handle); + if (code < 0){ + tqDestroyTqHandle(&handle); + break; } - walSetRefVer(handle.pRef, handle.snapshotVer); - - SReadHandle reader = { - .vnode = pTq->pVnode, - .initTableReader = true, - .initTqReader = true, - .version = handle.snapshotVer - }; - - initStorageAPI(&reader.api); - - if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - handle.execHandle.task = - qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, 0); - if (handle.execHandle.task == NULL) { - tqError("cannot create exec task for %s", handle.subKey); - code = -1; - goto end; - } - void* scanner = NULL; - qExtractStreamScanner(handle.execHandle.task, &scanner); - if (scanner == NULL) { - tqError("cannot extract stream scanner for %s", handle.subKey); - code = -1; - goto end; - } - handle.execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner); - if (handle.execHandle.pTqReader == NULL) { - tqError("cannot extract exec reader for %s", handle.subKey); - code = -1; - goto end; - } - } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) { - handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode); - - buildSnapContext(reader.vnode, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, - (SSnapContext**)(&reader.sContext)); - handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0); - } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) { - handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - - if(handle.execHandle.execTb.qmsg != NULL && strcmp(handle.execHandle.execTb.qmsg, "") != 0) { - if (nodesStringToNode(handle.execHandle.execTb.qmsg, &handle.execHandle.execTb.node) != 0) { - tqError("nodesStringToNode error in sub stable, since %s", terrstr()); - return -1; - } - } - buildSnapContext(reader.vnode, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, - handle.fetchMeta, (SSnapContext**)(&reader.sContext)); - handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0); - - SArray* tbUidList = NULL; - int ret = qGetTableList(handle.execHandle.execTb.suid, pTq->pVnode, handle.execHandle.execTb.node, &tbUidList, handle.execHandle.task); - if(ret != TDB_CODE_SUCCESS) { - tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle.subKey, handle.consumerId); - taosArrayDestroy(tbUidList); - goto end; - } - tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid); - handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode); - tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList, NULL); - taosArrayDestroy(tbUidList); - } - tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId); - taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle)); } -end: tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); return code; } + +int32_t tqMetaGetHandle(STQ* pTq, const char* key) { + void* pVal = NULL; + int vLen = 0; + + if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &pVal, &vLen) < 0) { + return -1; + } + STqHandle handle = {0}; + int code = restoreHandle(pTq, pVal, vLen, &handle); + if (code < 0){ + tqDestroyTqHandle(&handle); + } + tdbFree(pVal); + return code; +} diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index eb36389f1d..2f1bcfee83 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -81,26 +81,11 @@ void walRefLastVer(SWal *pWal, SWalRef *pRef) { wDebug("vgId:%d, wal ref version %" PRId64 " for last", pWal->cfg.vgId, ver); } -SWalRef *walRefCommittedVer(SWal *pWal) { - SWalRef *pRef = walOpenRef(pWal); - if (pRef == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } +void walRefCommitVer(SWal *pWal, SWalRef *pRef) { taosThreadMutexLock(&pWal->mutex); - int64_t ver = walGetCommittedVer(pWal); - - wDebug("vgId:%d, wal ref version %" PRId64 " for committed", pWal->cfg.vgId, ver); - pRef->refVer = ver; - // bsearch in fileSet - SWalFileInfo tmpInfo; - tmpInfo.firstVer = ver; - SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); - ASSERT(pRet != NULL); - // pRef->refFile = pRet->firstVer; taosThreadMutexUnlock(&pWal->mutex); - return pRef; + wDebug("vgId:%d, wal ref version %" PRId64 " for committed", pWal->cfg.vgId, ver); } diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 3cbd440278..f5e15e7436 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -476,13 +476,13 @@ int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode) { return tlen; } -void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz) { +void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz, int8_t sver) { int32_t sz; buf = taosDecodeFixedI32(buf, &sz); *pArray = taosArrayInit(sz, sizeof(void*)); for (int32_t i = 0; i < sz; i++) { void* data = taosMemoryCalloc(1, dataSz); - buf = decode(buf, data); + buf = decode(buf, data, sver); taosArrayPush(*pArray, &data); } return (void*)buf;