From d6fca036e518c9102113f6988b16047a0b2f3e2d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 25 Feb 2023 11:02:42 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/common/ttime.h | 8 +-- source/common/src/ttime.c | 10 ++-- source/dnode/mnode/impl/src/mndConsumer.c | 59 ++++++++++++++--------- source/dnode/vnode/src/tq/tq.c | 29 ++++++----- source/dnode/vnode/src/tq/tqMeta.c | 4 +- source/dnode/vnode/src/tq/tqPush.c | 9 ++-- 6 files changed, 68 insertions(+), 51 deletions(-) diff --git a/include/common/ttime.h b/include/common/ttime.h index eaf44c2771..4a7c47d172 100644 --- a/include/common/ttime.h +++ b/include/common/ttime.h @@ -80,15 +80,15 @@ int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision); int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit, int32_t timePrecision); -int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth); +int32_t taosParseTime(const char* timestr, int64_t* pTime, int32_t len, int32_t timePrec, int8_t dayligth); void deltaToUtcInitOnce(); char getPrecisionUnit(int32_t precision); -int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision); -int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit); +int64_t convertTimePrecision(int64_t ts, int32_t fromPrecision, int32_t toPrecision); +int64_t convertTimeFromPrecisionToUnit(int64_t ts, int32_t fromPrecision, char toUnit); int32_t convertStringToTimestamp(int16_t type, char* inputData, int64_t timePrec, int64_t* timeVal); -void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t time, int32_t precision); +void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t ts, int32_t precision); #ifdef __cplusplus } diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 559ffd2aaf..7996498d45 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -68,12 +68,12 @@ static int64_t user_mktime64(const uint32_t year0, const uint32_t mon0, const ui // ==== mktime() kernel code =================// static int64_t m_deltaUtc = 0; -void deltaToUtcInitOnce() { - struct tm tm = {0}; - (void)taosStrpTime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm); - m_deltaUtc = (int64_t)taosMktime(&tm); - // printf("====delta:%lld\n\n", seconds); +void deltaToUtcInitOnce() { + struct tm tm = {0}; + (void)taosStrpTime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm); + m_deltaUtc = (int64_t)taosMktime(&tm); + // printf("====delta:%lld\n\n", seconds); } static int64_t parseFraction(char* str, char** end, int32_t timePrec); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 7a1aed903b..e7d75312e4 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -557,6 +557,27 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj return 0; } +static int32_t validateTopics(const SArray* pTopicList, SMnode* pMnode, const char* pUser) { + int32_t numOfTopics = taosArrayGetSize(pTopicList); + + for (int32_t i = 0; i < numOfTopics; i++) { + char *pOneTopic = taosArrayGetP(pTopicList, i); + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic); + if (pTopic == NULL) { // terrno has been set by callee function + return -1; + } + + if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) { + mndReleaseTopic(pMnode, pTopic); + return -1; + } + + mndReleaseTopic(pMnode, pTopic); + } + + return 0; +} + int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; char *msgStr = pMsg->pCont; @@ -570,11 +591,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SMqConsumerObj *pConsumerNew = NULL; int32_t code = -1; - SArray *newSub = subscribe.topicNames; - taosArraySort(newSub, taosArrayCompareString); - taosArrayRemoveDuplicateP(newSub, taosArrayCompareString, taosMemoryFree); + SArray *pTopicList = subscribe.topicNames; + taosArraySort(pTopicList, taosArrayCompareString); + taosArrayRemoveDuplicateP(pTopicList, taosArrayCompareString, taosMemoryFree); - int32_t newTopicNum = taosArrayGetSize(newSub); + int32_t newTopicNum = taosArrayGetSize(pTopicList); // check topic existence STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); @@ -582,34 +603,24 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { goto _over; } - for (int32_t i = 0; i < newTopicNum; i++) { - char *topic = taosArrayGetP(newSub, i); - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); - if (pTopic == NULL) { // terrno has been set by callee function - goto _over; - } - - if (mndCheckTopicPrivilege(pMnode, pMsg->info.conn.user, MND_OPER_SUBSCRIBE, pTopic) != 0) { - mndReleaseTopic(pMnode, pTopic); - goto _over; - } - - mndReleaseTopic(pMnode, pTopic); + code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user); + if (code != TSDB_CODE_SUCCESS) { + goto _over; } pConsumerOld = mndAcquireConsumer(pMnode, consumerId); if (pConsumerOld == NULL) { - mInfo("receive subscribe request from new consumer:%" PRId64, consumerId); + mInfo("receive subscribe request from new consumer:0x%" PRIx64" cgroup:%s", consumerId, subscribe.cgroup); pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256); pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; taosArrayDestroy(pConsumerNew->rebNewTopics); - pConsumerNew->rebNewTopics = newSub; + pConsumerNew->rebNewTopics = pTopicList; // all subscribe topics should re-balance. subscribe.topicNames = NULL; for (int32_t i = 0; i < newTopicNum; i++) { - char *newTopicCopy = strdup(taosArrayGetP(newSub, i)); + char *newTopicCopy = strdup(taosArrayGetP(pTopicList, i)); taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy); } @@ -621,7 +632,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t status = atomic_load_32(&pConsumerOld->status); - mInfo("receive subscribe request from existing consumer:%" PRId64 ", current status: %s, subscribe topic num: %d", + mInfo("receive subscribe request from existing consumer:0x%" PRIx64 ", current status: %s, subscribe topic num: %d", consumerId, mndConsumerStatusName(status), newTopicNum); if (status != MQ_CONSUMER_STATUS__READY) { @@ -637,7 +648,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; for (int32_t i = 0; i < newTopicNum; i++) { - char *newTopicCopy = strdup(taosArrayGetP(newSub, i)); + char *newTopicCopy = strdup(taosArrayGetP(pTopicList, i)); taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy); } @@ -649,7 +660,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t i = 0, j = 0; while (i < oldTopicNum || j < newTopicNum) { if (i >= oldTopicNum) { - char *newTopicCopy = strdup(taosArrayGetP(newSub, j)); + char *newTopicCopy = strdup(taosArrayGetP(pTopicList, j)); taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy); j++; continue; @@ -660,7 +671,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { continue; } else { char *oldTopic = taosArrayGetP(pConsumerOld->currentTopics, i); - char *newTopic = taosArrayGetP(newSub, j); + char *newTopic = taosArrayGetP(pTopicList, j); int comp = compareLenPrefixedStr(oldTopic, newTopic); if (comp == 0) { i++; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2b311babb0..f31cf97cf9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -275,7 +275,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con char buf2[80] = {0}; tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); - tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s", + tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, reqOffset:%s, rspOffset:%s", TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); return 0; @@ -334,7 +334,7 @@ int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, co char buf2[80] = {0}; tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); - tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s", + tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s", TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); return 0; @@ -495,14 +495,15 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } // update epoch if need - int32_t consumerEpoch = atomic_load_32(&pHandle->epoch); - while (consumerEpoch < reqEpoch) { - consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch); + int32_t savedEpoch = atomic_load_32(&pHandle->epoch); + while (savedEpoch < reqEpoch) { + tqDebug("tmq poll: consumer:0x%"PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch); + savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch); } char buf[80]; tFormatOffset(buf, 80, &reqOffset); - tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId, + tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s", consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf); // 2.reset offset if needed @@ -538,7 +539,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType); tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); - tqDebug("tmq poll: consumer:0x %" PRIx64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId, + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version); if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) { code = -1; @@ -573,6 +574,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew); #if 1 + // till now, all data has been rsp to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version) { STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry)); @@ -585,8 +587,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { pPushEntry->dataRsp.head.epoch = reqEpoch; pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*)); - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d save handle to push mgr", consumerId, pHandle->subKey, - TD_VID(pTq->pVnode)); + + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr", + consumerId, pHandle->subKey, dataRsp.reqOffset.version, TD_VID(pTq->pVnode)); // unlock taosWUnLockLatch(&pTq->pushLock); return 0; @@ -599,7 +602,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { code = -1; } - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "", + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp data block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "", consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, dataRsp.rspOffset.ts); @@ -658,11 +661,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { walSetReaderCapacity(pHandle->pWalReader, 2048); while (1) { - consumerEpoch = atomic_load_32(&pHandle->epoch); - if (consumerEpoch > reqEpoch) { + savedEpoch = atomic_load_32(&pHandle->epoch); + if (savedEpoch > reqEpoch) { tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, vg %d offset %" PRId64 ", found new consumer epoch %d, discard req epoch %d", - consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch); + consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, savedEpoch, reqEpoch); break; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 34f57bc697..095251ab73 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -195,8 +195,8 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { int32_t vlen; tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code); - tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, (int32_t)strlen(pHandle->subKey), - pHandle->consumerId, TD_VID(pTq->pVnode)); + tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 "epoch:%d vgId:%d", pHandle->subKey, + (int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode)); void* buf = taosMemoryCalloc(1, vlen); if (buf == NULL) { diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index b9df3e5826..7a356238a0 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -199,7 +199,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo)); taosWUnLockLatch(&pHandle->pushHandle.lock); - tqDebug("vgId:%d, offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%" PRId64 ", rspOffset:%" PRId64, + tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%" PRId64 ", rspOffset:%" PRId64, TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset); @@ -213,13 +213,14 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ #endif int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { - tqDebug("vgId:%d, tq push msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType)); + tqDebug("vgId:%d tq push msg version:%" PRId64 " type: %s", pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType)); if (msgType == TDMT_VND_SUBMIT) { // lock push mgr to avoid potential msg lost taosWLockLatch(&pTq->pushLock); - tqDebug("vgId:%d, push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr)); if (taosHashGetSize(pTq->pPushMgr) != 0) { + + tqDebug("vgId:%d, push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr)); SArray* cachedKeys = taosArrayInit(0, sizeof(void*)); SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t)); void* data = taosMemoryMalloc(msgLen); @@ -245,11 +246,13 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) tqDebug("vgId:%d, cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey); continue; } + if (pPushEntry->dataRsp.reqOffset.version >= ver) { tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", pTq->pVnode->config.vgId, pPushEntry->dataRsp.reqOffset.version, ver); continue; } + STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task;