From 575f64197df124e57806b778a4c01fc7cbded9a4 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sun, 24 Apr 2022 16:51:32 +0800 Subject: [PATCH] enh(wal): skip read for specific msg --- include/common/tmsg.h | 6 - include/libs/wal/wal.h | 8 +- source/client/src/tmq.c | 3 - source/common/src/tmsg.c | 5 +- source/dnode/mnode/impl/inc/mndDef.h | 2 - source/dnode/mnode/impl/src/mndDef.c | 3 - source/dnode/mnode/impl/src/mndSubscribe.c | 179 +-------------------- source/dnode/mnode/impl/src/mndTopic.c | 2 - source/dnode/vnode/src/inc/tq.h | 1 - source/dnode/vnode/src/tq/tq.c | 36 ++++- source/libs/wal/src/walRead.c | 87 ++++++++++ source/libs/wal/src/walWrite.c | 8 +- 12 files changed, 134 insertions(+), 206 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 757fa8e74b..75a59a7f75 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1969,7 +1969,6 @@ typedef struct { int8_t withTbName; int8_t withSchema; int8_t withTag; - int8_t withTagSchema; char* qmsg; } SMqRebVgReq; @@ -1984,7 +1983,6 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR tlen += taosEncodeFixedI8(buf, pReq->withTbName); tlen += taosEncodeFixedI8(buf, pReq->withSchema); tlen += taosEncodeFixedI8(buf, pReq->withTag); - tlen += taosEncodeFixedI8(buf, pReq->withTagSchema); if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { tlen += taosEncodeString(buf, pReq->qmsg); } @@ -2001,7 +1999,6 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq) buf = taosDecodeFixedI8(buf, &pReq->withTbName); buf = taosDecodeFixedI8(buf, &pReq->withSchema); buf = taosDecodeFixedI8(buf, &pReq->withTag); - buf = taosDecodeFixedI8(buf, &pReq->withTagSchema); if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { buf = taosDecodeString(buf, &pReq->qmsg); } @@ -2590,7 +2587,6 @@ typedef struct { int8_t withTbName; int8_t withSchema; int8_t withTag; - int8_t withTagSchema; SArray* blockDataLen; // SArray SArray* blockData; // SArray SArray* blockTbName; // SArray @@ -2609,7 +2605,6 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp tlen += taosEncodeFixedI8(buf, pRsp->withTbName); tlen += taosEncodeFixedI8(buf, pRsp->withSchema); tlen += taosEncodeFixedI8(buf, pRsp->withTag); - tlen += taosEncodeFixedI8(buf, pRsp->withTagSchema); for (int32_t i = 0; i < pRsp->blockNum; i++) { int32_t bLen = *(int32_t*)taosArrayGet(pRsp->blockDataLen, i); @@ -2632,7 +2627,6 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p buf = taosDecodeFixedI8(buf, &pRsp->withTbName); buf = taosDecodeFixedI8(buf, &pRsp->withSchema); buf = taosDecodeFixedI8(buf, &pRsp->withTag); - buf = taosDecodeFixedI8(buf, &pRsp->withTagSchema); for (int32_t i = 0; i < pRsp->blockNum; i++) { int32_t bLen = 0; diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index d41e797536..7b0d70b769 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -192,7 +192,13 @@ int32_t walEndSnapshot(SWal *); SWalReadHandle *walOpenReadHandle(SWal *); void walCloseReadHandle(SWalReadHandle *); int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver); -int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead); + +// only for tq usage +// int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead); +void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity); +int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead); +int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead); +int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead); // deprecated #if 0 diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 699fa7ecf5..2ccd88eea8 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -382,12 +382,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ); if (pTmq->pTscObj == NULL) return NULL; - /*pTmq->inWaiting = 0;*/ pTmq->status = 0; pTmq->pollCnt = 0; pTmq->epoch = 0; - /*pTmq->waitingRequest = 0;*/ - /*pTmq->readyRequest = 0;*/ pTmq->epStatus = 0; pTmq->epSkipCnt = 0; // set conf diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index eb7c9a763f..0d3009fc65 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -126,7 +126,6 @@ int32_t tDecodeSQueryNodeAddr(SCoder *pDecoder, SQueryNodeAddr *pAddr) { return 0; } - int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) { int32_t tlen = 0; tlen += taosEncodeFixedI8(buf, pEp->inUse); @@ -2747,11 +2746,11 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo if (tEncodeI8(&encoder, pReq->withTbName) < 0) return -1; if (tEncodeI8(&encoder, pReq->withSchema) < 0) return -1; if (tEncodeI8(&encoder, pReq->withTag) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->subscribeDbName) < 0) return -1; if (tEncodeI32(&encoder, sqlLen) < 0) return -1; if (tEncodeI32(&encoder, astLen) < 0) return -1; if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1; - if (0 == astLen && tEncodeCStr(&encoder, pReq->subscribeDbName) < 0) return -1; tEndEncode(&encoder); @@ -2773,6 +2772,7 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR if (tDecodeI8(&decoder, &pReq->withTbName) < 0) return -1; if (tDecodeI8(&decoder, &pReq->withSchema) < 0) return -1; if (tDecodeI8(&decoder, &pReq->withTag) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->subscribeDbName) < 0) return -1; if (tDecodeI32(&decoder, &sqlLen) < 0) return -1; if (tDecodeI32(&decoder, &astLen) < 0) return -1; @@ -2787,7 +2787,6 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR if (pReq->ast == NULL) return -1; if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1; } else { - if (tDecodeCStrTo(&decoder, pReq->subscribeDbName) < 0) return -1; } tEndDecode(&decoder); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 59a00f0eaf..d2bc8c9282 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -449,7 +449,6 @@ typedef struct { int8_t withTbName; int8_t withSchema; int8_t withTag; - int8_t withTagSchema; SRWLatch lock; int32_t sqlLen; int32_t astLen; @@ -516,7 +515,6 @@ typedef struct { int8_t withTbName; int8_t withSchema; int8_t withTag; - int8_t withTagSchema; SHashObj* consumerHash; // consumerId -> SMqConsumerEpInSub // TODO put -1 into unassignVgs // SArray* unassignedVgs; diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 78d54d273d..800a013c4f 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -237,7 +237,6 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { pSubNew->withTbName = pSub->withTbName; pSubNew->withSchema = pSub->withSchema; pSubNew->withTag = pSub->withTag; - pSubNew->withTagSchema = pSub->withTagSchema; pSubNew->vgNum = pSub->vgNum; pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); @@ -270,7 +269,6 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { tlen += taosEncodeFixedI8(buf, pSub->withTbName); tlen += taosEncodeFixedI8(buf, pSub->withSchema); tlen += taosEncodeFixedI8(buf, pSub->withTag); - tlen += taosEncodeFixedI8(buf, pSub->withTagSchema); void *pIter = NULL; int32_t sz = taosHashGetSize(pSub->consumerHash); @@ -297,7 +295,6 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) { buf = taosDecodeFixedI8(buf, &pSub->withTbName); buf = taosDecodeFixedI8(buf, &pSub->withSchema); buf = taosDecodeFixedI8(buf, &pSub->withTag); - buf = taosDecodeFixedI8(buf, &pSub->withTagSchema); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index e37bd60e12..6a1994d7b8 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -35,11 +35,6 @@ #define MND_SUBSCRIBE_REBALANCE_CNT 3 -enum { - MQ_SUBSCRIBE_STATUS__ACTIVE = 1, - MQ_SUBSCRIBE_STATUS__DELETED, -}; - static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *); static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw); static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *); @@ -89,7 +84,6 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic, pSub->withTbName = pTopic->withTbName; pSub->withSchema = pTopic->withSchema; pSub->withTag = pTopic->withTag; - pSub->withTagSchema = pTopic->withTagSchema; ASSERT(taosHashGetSize(pSub->consumerHash) == 1); @@ -115,7 +109,6 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri req.withTbName = pSub->withTbName; req.withSchema = pSub->withSchema; req.withTag = pSub->withTag; - req.withTagSchema = pSub->withTagSchema; strncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req); @@ -514,9 +507,11 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { // TODO replace assert with error check ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0); + // if add more consumer to balanced subscribe, // possibly no vg is changed /*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/ + ASSERT(mndPersistRebResult(pMnode, pMsg, &rebOutput) == 0); if (rebInput.pTopic) { @@ -673,177 +668,7 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) { sdbRelease(pSdb, pSub); } -#if 0 -static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { - SMnode *pMnode = pMsg->pNode; - char *msgStr = pMsg->rpcMsg.pCont; - SCMSubscribeReq subscribe; - tDeserializeSCMSubscribeReq(msgStr, &subscribe); - int64_t consumerId = subscribe.consumerId; - char *cgroup = subscribe.consumerGroup; - - SArray *newSub = subscribe.topicNames; - int32_t newTopicNum = subscribe.topicNum; - - taosArraySortString(newSub, taosArrayCompareString); - - SArray *oldSub = NULL; - int32_t oldTopicNum = 0; - bool createConsumer = false; - // create consumer if not exist - SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); - if (pConsumer == NULL) { - // create consumer - pConsumer = mndCreateConsumer(consumerId, cgroup); - createConsumer = true; - } else { - pConsumer->epoch++; - oldSub = pConsumer->currentTopics; - } - pConsumer->currentTopics = newSub; - - if (oldSub != NULL) { - oldTopicNum = taosArrayGetSize(oldSub); - } - - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg); - if (pTrans == NULL) { - // TODO: free memory - return -1; - } - - int32_t i = 0, j = 0; - while (i < newTopicNum || j < oldTopicNum) { - char *newTopicName = NULL; - char *oldTopicName = NULL; - if (i >= newTopicNum) { - // encode unset topic msg to all vnodes related to that topic - oldTopicName = taosArrayGetP(oldSub, j); - j++; - } else if (j >= oldTopicNum) { - newTopicName = taosArrayGetP(newSub, i); - i++; - } else { - newTopicName = taosArrayGetP(newSub, i); - oldTopicName = taosArrayGetP(oldSub, j); - - int32_t comp = compareLenPrefixedStr(newTopicName, oldTopicName); - if (comp == 0) { - // do nothing - oldTopicName = newTopicName = NULL; - i++; - j++; - continue; - } else if (comp < 0) { - oldTopicName = NULL; - i++; - } else { - newTopicName = NULL; - j++; - } - } - - if (oldTopicName != NULL) { - ASSERT(newTopicName == NULL); - - // cancel subscribe of old topic - SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, oldTopicName); - ASSERT(pSub); - int32_t csz = taosArrayGetSize(pSub->consumers); - for (int32_t ci = 0; ci < csz; ci++) { - SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, ci); - if (pSubConsumer->consumerId == consumerId) { - int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo); - for (int32_t vgi = 0; vgi < vgsz; vgi++) { - SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi); - mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp, oldTopicName); - taosArrayPush(pSub->unassignedVg, pConsumerEp); - } - taosArrayRemove(pSub->consumers, ci); - break; - } - } - char *oldTopicNameDup = strdup(oldTopicName); - taosArrayPush(pConsumer->recentRemovedTopics, &oldTopicNameDup); - atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY); - /*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/ - } else if (newTopicName != NULL) { - ASSERT(oldTopicName == NULL); - - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName); - if (pTopic == NULL) { - mError("topic being subscribed not exist: %s", newTopicName); - continue; - } - - SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, newTopicName); - bool createSub = false; - if (pSub == NULL) { - mDebug("create new subscription by consumer %" PRId64 ", group: %s, topic %s", consumerId, cgroup, - newTopicName); - pSub = mndCreateSubscription(pMnode, pTopic, cgroup); - createSub = true; - - mndCreateOffset(pTrans, cgroup, newTopicName, pSub->unassignedVg); - } - - SMqSubConsumer mqSubConsumer; - mqSubConsumer.consumerId = consumerId; - mqSubConsumer.vgInfo = taosArrayInit(0, sizeof(SMqConsumerEp)); - taosArrayPush(pSub->consumers, &mqSubConsumer); - - // if have un assigned vg, assign one to the consumer - if (taosArrayGetSize(pSub->unassignedVg) > 0) { - SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg); - pConsumerEp->oldConsumerId = pConsumerEp->consumerId; - pConsumerEp->consumerId = consumerId; - taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp); - if (pConsumerEp->oldConsumerId == -1) { - mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 "", pConsumerEp->vgId, newTopicName, - pConsumerEp->consumerId); - mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); - } else { - mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp, newTopicName); - } - // to trigger rebalance at once, do not set status active - /*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/ - } - - SSdbRaw *pRaw = mndSubActionEncode(pSub); - sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mndTransAppendRedolog(pTrans, pRaw); - - if (!createSub) mndReleaseSubscribe(pMnode, pSub); - mndReleaseTopic(pMnode, pTopic); - } - } - - /*if (oldSub) taosArrayDestroyEx(oldSub, (void (*)(void *))taosMemoryFree);*/ - - // persist consumerObj - SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer); - sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); - mndTransAppendRedolog(pTrans, pConsumerRaw); - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("mq-subscribe-trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer); - return -1; - } - - mndTransDrop(pTrans); - if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer); - return TSDB_CODE_MND_ACTION_IN_PROGRESS; -} -#endif - static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } - -static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) { - SSdb *pSdb = pMnode->pSdb; - sdbCancelFetch(pSdb, pIter); -} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index a4b98ba01a..1d9b55f7e5 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -82,7 +82,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER); SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER); SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER); - SDB_SET_INT8(pRaw, dataPos, pTopic->withTagSchema, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER); @@ -146,7 +145,6 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER); SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER); SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER); - SDB_GET_INT8(pRaw, dataPos, &pTopic->withTagSchema, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER); pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char)); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 6e004a89fc..347d28f1ea 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -159,7 +159,6 @@ typedef struct { int8_t withTbName; int8_t withSchema; int8_t withTag; - int8_t withTagSchema; char* qmsg; STqPushHandle pushHandle; // SRWLatch lock; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0b3feb5010..434c5bc5bb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -31,6 +31,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { pTq->path = strdup(path); pTq->pVnode = pVnode; pTq->pWal = pWal; + #if 0 pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, (FTqDelete)taosMemoryFree, 0); @@ -401,6 +402,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { consumerEpoch = atomic_val_compare_exchange_32(&pExec->epoch, consumerEpoch, reqEpoch); } + SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048); + if (pHeadWithCkSum == NULL) { + return -1; + } + + walSetReaderCapacity(pExec->pWalReader, 2048); + SMqDataBlkRsp rsp = {0}; rsp.reqOffset = pReq->currentOffset; rsp.blockData = taosArrayInit(0, sizeof(void*)); @@ -414,6 +422,26 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { break; } + taosThreadMutexLock(&pExec->pWalReader->mutex); + + if (walFetchHead(pExec->pWalReader, fetchOffset, pHeadWithCkSum) < 0) { + vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, + TD_VID(pTq->pVnode), fetchOffset); + taosThreadMutexUnlock(&pExec->pWalReader->mutex); + break; + } + + if (pHeadWithCkSum->head.msgType != TDMT_VND_SUBMIT) { + walSkipFetchBody(pExec->pWalReader, pHeadWithCkSum); + } else { + walFetchBody(pExec->pWalReader, &pHeadWithCkSum); + } + + SWalReadHead* pHead = &pHeadWithCkSum->head; + + taosThreadMutexUnlock(&pExec->pWalReader->mutex); + +#if 0 SWalReadHead* pHead; if (walReadWithHandle_s(pExec->pWalReader, fetchOffset, &pHead) < 0) { // TODO: no more log, set timer to wait blocking time @@ -443,14 +471,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { return 0; #endif - break; - } + break; + } +#endif vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, pHead->msgType); if (pHead->msgType == TDMT_VND_SUBMIT) { SSubmitReq* pCont = (SSubmitReq*)&pHead->body; + // table subscribe if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { qTaskInfo_t task = pExec->task[workerId]; ASSERT(task); @@ -484,6 +514,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { taosArrayPush(rsp.blockData, &buf); rsp.blockNum++; } + // db subscribe } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { STqReadHandle* pReader = pExec->pExecReader[workerId]; tqReadHandleSetMsg(pReader, pCont, 0); @@ -789,7 +820,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pExec->withTbName = req.withTbName; pExec->withSchema = req.withSchema; pExec->withTag = req.withTag; - pExec->withTagSchema = req.withTagSchema; pExec->qmsg = req.qmsg; req.qmsg = NULL; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 6d598699c5..70a3559cd9 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -138,6 +138,91 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { return 0; } +void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity) { pRead->capacity = capacity; } + +int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) { + int32_t code; + // TODO: valid ver + + if (pRead->curVersion != ver) { + code = walReadSeekVer(pRead, ver); + if (code < 0) return -1; + } + + if (!taosValidFile(pRead->pReadLogTFile)) { + return -1; + } + + code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalHead)); + if (code != sizeof(SWalHead)) { + return -1; + } + + code = walValidHeadCksum(pHead); + + if (code != 0) { + wError("unexpected wal log version: % " PRId64 ", since head checksum not passed", ver); + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + return -1; + } + + return 0; +} + +int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead) { + int32_t code; + + ASSERT(pRead->curVersion == pHead->head.version); + + code = taosLSeekFile(pRead->pReadLogTFile, pHead->head.bodyLen, SEEK_CUR); + if (code < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + pRead->curVersion = -1; + return -1; + } + + pRead->curVersion++; + + return 0; +} + +int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead) { + SWalReadHead *pReadHead = &((*ppHead)->head); + int64_t ver = pReadHead->version; + + if (pRead->capacity < pReadHead->bodyLen) { + void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead) + pReadHead->bodyLen); + if (ptr == NULL) { + terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; + return -1; + } + *ppHead = ptr; + pRead->capacity = pReadHead->bodyLen; + } + + if (pReadHead->bodyLen != taosReadFile(pRead->pReadLogTFile, pReadHead->body, pReadHead->bodyLen)) { + return -1; + } + + if (pReadHead->version != ver) { + wError("unexpected wal log version: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, + ver); + pRead->curVersion = -1; + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + return -1; + } + + if (walValidBodyCksum(*ppHead) != 0) { + wError("unexpected wal log version: % " PRId64 ", since body checksum not passed", ver); + pRead->curVersion = -1; + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + return -1; + } + + pRead->curVersion = ver + 1; + return 0; +} + int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead) { taosThreadMutexLock(&pRead->mutex); if (walReadWithHandle(pRead, ver) < 0) { @@ -172,12 +257,14 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { if (code != sizeof(SWalHead)) { return -1; } + code = walValidHeadCksum(pRead->pHead); if (code != 0) { wError("unexpected wal log version: % " PRId64 ", since head checksum not passed", ver); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } + if (pRead->capacity < pRead->pHead->head.bodyLen) { void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.bodyLen); if (ptr == NULL) { diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 530930c261..207b7d8421 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -13,8 +13,6 @@ * along with this program. If not, see . */ -#define _DEFAULT_SOURCE - #include "os.h" #include "taoserror.h" #include "tchecksum.h" @@ -298,14 +296,14 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog pWal->writeHead.head.bodyLen = bodyLen; pWal->writeHead.head.msgType = msgType; - // sync info + // sync info for sync module pWal->writeHead.head.syncMeta = syncMeta; pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead); pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen); if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) { - // ftruncate + // TODO ftruncate terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); @@ -313,7 +311,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog } if (taosWriteFile(pWal->pWriteLogTFile, (char *)body, bodyLen) != bodyLen) { - // ftruncate + // TODO ftruncate terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno));