From 99c3ebc2825c755224b9c2d11e66e900763de131 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 20 Jul 2023 18:28:21 +0800 Subject: [PATCH 01/10] fix:optimize log & change return value for async interface --- include/client/taos.h | 2 +- include/common/tmsgdef.h | 2 +- source/client/src/clientTmq.c | 49 +++++++++++++++++++++-------------- 3 files changed, 32 insertions(+), 21 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 5ea1510e44..3cc2d907ab 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -288,7 +288,7 @@ DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq); DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset); -DLL_EXPORT int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param); +DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param); DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment); DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment); diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index aa23442291..7d12c2a1d6 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -312,7 +312,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME_PUSH, "vnode-tmq-consume-push", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_WALINFO, "vnode-tmq-vg-walinfo", SMqPollReq, SMqDataBlkRsp) - TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committed-walinfo", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committedinfo", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 3576df434b..96ec88e2e9 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -523,9 +523,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); - - return TSDB_CODE_SUCCESS; + return asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); } static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) { @@ -546,7 +544,6 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) { static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum){ SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); if (pParamSet == NULL) { - pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam); return NULL; } @@ -715,7 +712,9 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us end: taosMemoryFree(pParamSet); - pCommitFp(tmq, code, userParam); + if(pParamSet->callbackFn != NULL) { + pCommitFp(tmq, code, userParam); + } return; } @@ -2307,6 +2306,9 @@ const char* tmq_get_table_name(TAOS_RES* res) { void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) { if (tmq == NULL) { tscError("invalid tmq handle, null"); + if(cb != NULL) { + cb(tmq, TSDB_CODE_INVALID_PARA, param); + } return; } if (pRes == NULL) { // here needs to commit all offsets. @@ -2410,15 +2412,17 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, tsem_destroy(&pInfo->sem); taosMemoryFree(pInfo); - tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code)); + tscInfo("consumer:0x%" PRIx64 " sync send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code)); return code; } -int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param){ +void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param){ + int32_t code = 0; if (tmq == NULL || pTopicName == NULL) { tscError("invalid tmq handle, null"); - return TSDB_CODE_INVALID_PARA; + code = TSDB_CODE_INVALID_PARA; + goto end; } int32_t accId = tmq->pTscObj->acctId; @@ -2427,17 +2431,17 @@ int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId taosWLockLatch(&tmq->lock); SMqClientVg* pVg = NULL; - int32_t code = getClientVg(tmq, tname, vgId, &pVg); + code = getClientVg(tmq, tname, vgId, &pVg); if(code != 0){ taosWUnLockLatch(&tmq->lock); - return code; + goto end; } SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; code = checkWalRange(pOffsetInfo, offset); if (code != 0) { taosWUnLockLatch(&tmq->lock); - return code; + goto end; } taosWUnLockLatch(&tmq->lock); @@ -2445,9 +2449,12 @@ int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param); - tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code)); + tscInfo("consumer:0x%" PRIx64 " async send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code)); - return code; +end: + if(code != 0 && cb != NULL){ + cb(tmq, code, param); + } } void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) { @@ -2832,6 +2839,7 @@ int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId){ tscError("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type); } + tscInfo("consumer:0x%" PRIx64 " tmq_position vgId:%d position:%" PRId64, tmq->consumerId, vgId, position); return position; } @@ -2871,12 +2879,16 @@ int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId){ if(pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG){ committed = pOffsetInfo->committedOffset.version; taosWUnLockLatch(&tmq->lock); - return committed; + goto end; } SEpSet epSet = pVg->epSet; taosWUnLockLatch(&tmq->lock); - return getCommittedFromServer(tmq, tname, vgId, &epSet); + committed = getCommittedFromServer(tmq, tname, vgId, &epSet); + +end: + tscInfo("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed); + return committed; } int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment, @@ -2897,7 +2909,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a taosWLockLatch(&tmq->lock); SMqClientTopic* pTopic = getTopicByName(tmq, tname); if (pTopic == NULL) { - code = TSDB_CODE_INVALID_PARA; + code = TSDB_CODE_TMQ_INVALID_TOPIC; goto end; } @@ -3040,7 +3052,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; - tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset); + tscInfo("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%"PRId64, tmq->consumerId, pTopic->topicName, p->vgId, p->currentOffset); pOffsetInfo->walVerBegin = p->begin; pOffsetInfo->walVerEnd = p->end; @@ -3078,6 +3090,7 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) { return 0; } +// seek interface have to send msg to server to cancel push handle if needed, because consumer may be in wait status if there is no data to poll int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) { if (tmq == NULL || pTopicName == NULL) { tscError("invalid tmq handle, null"); @@ -3163,8 +3176,6 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ sendInfo->msgType = TDMT_VND_TMQ_SEEK; int64_t transporterId = 0; - tscInfo("consumer:0x%" PRIx64 " %s send seek info vgId:%d, epoch %d" PRIx64, - tmq->consumerId, tname, vgId, tmq->epoch); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); tsem_wait(&pParam->sem); From bb86f5c580a8a7d6eafec36185232a823358162f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 21 Jul 2023 11:13:05 +0800 Subject: [PATCH 02/10] fix:commit offset should not plus 1 --- source/dnode/vnode/src/tq/tq.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 03d6932578..8c9eead414 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -307,9 +307,6 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t } else if (pOffset->val.type == TMQ_OFFSET__LOG) { tqInfo("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId, pOffset->val.version); - if (pOffset->val.version + 1 == sversion) { - pOffset->val.version += 1; - } } else { tqError("invalid commit offset type:%d", pOffset->val.type); return -1; From 95a1db6e89a77fc3c152567bd4d9167d90256e0d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 24 Jul 2023 07:09:03 +0000 Subject: [PATCH 03/10] fix err while connect invalid fqdn --- source/client/src/clientImpl.c | 23 +++++++++++++++++++---- source/client/src/clientMsgHandler.c | 15 +++++++++++---- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 14d6394fc4..f5dc627dd8 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1297,13 +1297,20 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe return -1; } - int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]); + int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]); if (code != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_TSC_INVALID_FQDN; return terrno; } - - mgmtEpSet->numOfEps++; + uint32_t addr = taosGetIpv4FromFqdn(mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn); + if (addr == 0xffffffff) { + int32_t code = TAOS_SYSTEM_ERROR(errno); + tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, + tstrerror(errno)); + memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps])); + } else { + mgmtEpSet->numOfEps++; + } } if (secondEp && secondEp[0] != 0) { @@ -1313,7 +1320,15 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe } taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]); - mgmtEpSet->numOfEps++; + uint32_t addr = taosGetIpv4FromFqdn(mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn); + if (addr == 0xffffffff) { + int32_t code = TAOS_SYSTEM_ERROR(errno); + tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, + tstrerror(errno)); + memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps])); + } else { + mgmtEpSet->numOfEps++; + } } if (mgmtEpSet->numOfEps == 0) { diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 9ab618cf3a..7455a2c1c8 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -99,13 +99,20 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { goto End; } + int updateEpSet = 1; if (connectRsp.dnodeNum == 1) { SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); SEpSet dstEpSet = connectRsp.epSet; - rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn, - dstEpSet.eps[dstEpSet.inUse].fqdn); - } else if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) { - SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet; + if (srcEpSet.numOfEps == 1) { + rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn, + dstEpSet.eps[dstEpSet.inUse].fqdn); + updateEpSet = 0; + } + } + if (updateEpSet == 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) { + SEpSet corEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); + + SEpSet* pOrig = &corEpSet; SEp* pOrigEp = &pOrig->eps[pOrig->inUse]; SEp* pNewEp = &connectRsp.epSet.eps[connectRsp.epSet.inUse]; tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in connRsp", pOrig->inUse, pOrig->numOfEps, From 9628a9f74ebb40d4808b56d5545c1967da859006 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 24 Jul 2023 15:53:02 +0800 Subject: [PATCH 04/10] fix:open info log in tmq & ignore wal apply ver when read wal --- source/client/src/clientTmq.c | 2 +- source/dnode/mnode/impl/src/mndConsumer.c | 16 ++++++++-------- source/dnode/mnode/impl/src/mndSubscribe.c | 4 ++-- source/dnode/vnode/src/tq/tq.c | 4 ++-- source/libs/wal/src/walRead.c | 21 +++++++++++---------- 5 files changed, 24 insertions(+), 23 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 96ec88e2e9..fa2e250b2b 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -712,7 +712,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us end: taosMemoryFree(pParamSet); - if(pParamSet->callbackFn != NULL) { + if(pCommitFp != NULL) { pCommitFp(tmq, code, userParam); } return; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 2b538eccc9..50115db81c 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -94,7 +94,7 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){ bool mndRebTryStart() { int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1); - mDebug("tq timer, rebalance counter old val:%d", old); + mInfo("tq timer, rebalance counter old val:%d", old); return old == 0; } @@ -116,7 +116,7 @@ void mndRebCntDec() { int32_t newVal = val - 1; int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal); if (oldVal == val) { - mDebug("rebalance trans end, rebalance counter:%d", newVal); + mInfo("rebalance trans end, rebalance counter:%d", newVal); break; } } @@ -281,7 +281,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { // rebalance cannot be parallel if (!mndRebTryStart()) { - mDebug("mq rebalance already in progress, do nothing"); + mInfo("mq rebalance already in progress, do nothing"); return 0; } @@ -312,7 +312,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); int32_t status = atomic_load_32(&pConsumer->status); - mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d", + mInfo("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d", pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime, hbStatus); @@ -362,7 +362,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { } if (taosHashGetSize(pRebMsg->rebSubHash) != 0) { - mInfo("mq rebalance will be triggered"); + mInfo("mq rebalance will be triggered"); SRpcMsg rpcMsg = { .msgType = TDMT_MND_TMQ_DO_REBALANCE, .pCont = pRebMsg, @@ -416,7 +416,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { for(int i = 0; i < taosArrayGetSize(req.topics); i++){ TopicOffsetRows* data = taosArrayGet(req.topics, i); - mDebug("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName); + mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); if(pSub == NULL){ @@ -1104,13 +1104,13 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * } if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { - mDebug("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId); + mInfo("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId); sdbRelease(pSdb, pConsumer); continue; } taosRLockLatch(&pConsumer->lock); - mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId); + mInfo("showing consumer:0x%" PRIx64, pConsumer->consumerId); int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics); bool hasTopic = true; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index f51a61eda3..6bd23c3b90 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -1207,7 +1207,7 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock int32_t numOfRows = 0; SMqSubscribeObj *pSub = NULL; - mDebug("mnd show subscriptions begin"); + mInfo("mnd show subscriptions begin"); while (numOfRows < rowsCapacity) { pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub); @@ -1247,7 +1247,7 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock sdbRelease(pSdb, pSub); } - mDebug("mnd end show subscriptions"); + mInfo("mnd end show subscriptions"); pShow->numOfRows += numOfRows; return numOfRows; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8c9eead414..89ed3ca1c7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -703,7 +703,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; int32_t vgId = TD_VID(pTq->pVnode); - tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey); + tqInfo("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey); int32_t code = 0; taosWLockLatch(&pTq->lock); @@ -784,7 +784,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg return -1; } - tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey, + tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey, req.oldConsumerId, req.newConsumerId); STqHandle* pHandle = NULL; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index a839d6cbd8..7ff7fe748e 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -70,17 +70,18 @@ int32_t walNextValidMsg(SWalReader *pReader) { int64_t fetchVer = pReader->curVersion; int64_t lastVer = walGetLastVer(pReader->pWal); int64_t committedVer = walGetCommittedVer(pReader->pWal); - int64_t appliedVer = walGetAppliedVer(pReader->pWal); +// int64_t appliedVer = walGetAppliedVer(pReader->pWal); - if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010] - wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer); - } +// if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010] +// wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer); +// } - int64_t endVer = TMIN(appliedVer, committedVer); +// int64_t endVer = TMIN(appliedVer, committedVer); + int64_t endVer = committedVer; wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 - ", applied index:%" PRId64", end index:%" PRId64, - pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); + ", end index:%" PRId64, + pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, endVer); if (fetchVer > endVer){ terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; @@ -370,9 +371,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { pRead->pWal->vers.appliedVer); // TODO: valid ver - if (ver > pRead->pWal->vers.appliedVer) { - return -1; - } +// if (ver > pRead->pWal->vers.appliedVer) { +// return -1; +// } if (pRead->curVersion != ver) { code = walReaderSeekVer(pRead, ver); From 9e3421e63e08c97f4f0440e9e9e5f50a4a7385ed Mon Sep 17 00:00:00 2001 From: sunpeng Date: Mon, 24 Jul 2023 17:03:57 +0800 Subject: [PATCH 05/10] docs: fix python connector docs --- docs/en/14-reference/03-connector/07-python.mdx | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/docs/en/14-reference/03-connector/07-python.mdx b/docs/en/14-reference/03-connector/07-python.mdx index f0a59842fe..831e79eeb7 100644 --- a/docs/en/14-reference/03-connector/07-python.mdx +++ b/docs/en/14-reference/03-connector/07-python.mdx @@ -1007,13 +1007,12 @@ consumer.close() ### Other sample programs | Example program links | Example program content | -| ------------------------------------------------------------------------------------------------------------- | ------------------- ---- | -| [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py) | parameter binding, -bind multiple rows at once | -| [bind_row.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-row.py) | bind_row.py +|-----------------------|-------------------------| +| [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py) | parameter binding, bind multiple rows at once | +| [bind_row.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-row.py) | parameter binding, bind one row at once | | [insert_lines.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/insert-lines.py) | InfluxDB line protocol writing | | [json_tag.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/json-tag.py) | Use JSON type tags | -| [tmq.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/tmq.py) | TMQ subscription | +| [tmq_consumer.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/tmq_consumer.py) | TMQ subscription | ## Other notes From 101a3c23b1ab1df9d7023e1d77c3963d7dfd3f48 Mon Sep 17 00:00:00 2001 From: danielclow <106956386+danielclow@users.noreply.github.com> Date: Mon, 24 Jul 2023 17:50:28 +0800 Subject: [PATCH 06/10] docs: fix links in rust connector doc (#22168) --- docs/en/14-reference/03-connector/06-rust.mdx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/14-reference/03-connector/06-rust.mdx b/docs/en/14-reference/03-connector/06-rust.mdx index 986b5cd104..56f5e20cb4 100644 --- a/docs/en/14-reference/03-connector/06-rust.mdx +++ b/docs/en/14-reference/03-connector/06-rust.mdx @@ -648,12 +648,12 @@ stmt.execute()?; //stmt.execute()?; ``` -For a working example, see [GitHub](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs). +For a working example, see [GitHub](https://github.com/taosdata/taos-connector-rust/blob/main/taos/examples/bind.rs). For information about other structure APIs, see the [Rust documentation](https://docs.rs/taos). -[taos]: https://github.com/taosdata/rust-connector-taos +[taos]: https://github.com/taosdata/taos-connector-rust [r2d2]: https://crates.io/crates/r2d2 [TaosBuilder]: https://docs.rs/taos/latest/taos/struct.TaosBuilder.html [TaosCfg]: https://docs.rs/taos/latest/taos/struct.TaosCfg.html From 0b707a108e1f989290caeb558f4fa3867cb0c452 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 24 Jul 2023 10:28:52 +0000 Subject: [PATCH 07/10] fix err while connect invalid fqdn --- source/client/src/clientImpl.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index f5dc627dd8..6b89739547 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -106,7 +106,7 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas SCorEpSet epSet = {0}; if (ip) { - if (initEpSetFromCfg(ip, NULL, &epSet) < 0) { + if (initEpSetFromCfg(ip, tsSecond, &epSet) < 0) { return NULL; } } else { From 842a0bc8c3d87f85a1ad03449d5aa95a6751d0f0 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 24 Jul 2023 18:48:09 +0800 Subject: [PATCH 08/10] fix err while connect invalid fqdn --- source/client/src/clientImpl.c | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index f5dc627dd8..d448dd1edf 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1304,9 +1304,8 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe } uint32_t addr = taosGetIpv4FromFqdn(mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn); if (addr == 0xffffffff) { - int32_t code = TAOS_SYSTEM_ERROR(errno); tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, - tstrerror(errno)); + tstrerror(TSDB_CODE_TSC_INVALID_FQDN)); memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps])); } else { mgmtEpSet->numOfEps++; @@ -1322,9 +1321,8 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]); uint32_t addr = taosGetIpv4FromFqdn(mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn); if (addr == 0xffffffff) { - int32_t code = TAOS_SYSTEM_ERROR(errno); tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, - tstrerror(errno)); + tstrerror(TSDB_CODE_TSC_INVALID_FQDN)); memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps])); } else { mgmtEpSet->numOfEps++; @@ -1332,8 +1330,8 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe } if (mgmtEpSet->numOfEps == 0) { - terrno = TSDB_CODE_TSC_INVALID_FQDN; - return -1; + terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL; + return TSDB_CODE_RPC_NETWORK_UNAVAIL; } return 0; From 6532afa7b84febf668080db1b458691aa49c942d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 24 Jul 2023 18:49:13 +0800 Subject: [PATCH 09/10] fix err while connect invalid fqdn --- source/client/src/clientImpl.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index fdce502078..d448dd1edf 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -106,7 +106,7 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas SCorEpSet epSet = {0}; if (ip) { - if (initEpSetFromCfg(ip, tsSecond, &epSet) < 0) { + if (initEpSetFromCfg(ip, NULL, &epSet) < 0) { return NULL; } } else { From 29460db6f5cfdb640e8447a1f263d970277d0d29 Mon Sep 17 00:00:00 2001 From: danielclow <106956386+danielclow@users.noreply.github.com> Date: Tue, 25 Jul 2023 00:11:09 +0800 Subject: [PATCH 10/10] docs: change to new product names (#22175) * change to new product names * change product names --- docs/en/05-get-started/03-package.md | 2 +- docs/en/12-taos-sql/02-database.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/05-get-started/03-package.md b/docs/en/05-get-started/03-package.md index 91bf94034c..3e3c04682f 100644 --- a/docs/en/05-get-started/03-package.md +++ b/docs/en/05-get-started/03-package.md @@ -18,7 +18,7 @@ The full package of TDengine includes the TDengine Server (`taosd`), TDengine Cl The standard server installation package includes `taos`, `taosd`, `taosAdapter`, `taosBenchmark`, and sample code. You can also download the Lite package that includes only `taosd` and the C/C++ connector. -The TDengine Community Edition is released as Deb and RPM packages. The Deb package can be installed on Debian, Ubuntu, and derivative systems. The RPM package can be installed on CentOS, RHEL, SUSE, and derivative systems. A .tar.gz package is also provided for enterprise customers, and you can install TDengine over `apt-get` as well. The .tar.tz package includes `taosdump` and the TDinsight installation script. If you want to use these utilities with the Deb or RPM package, download and install taosTools separately. TDengine can also be installed on x64 Windows and x64/m1 macOS. +TDengine OSS is released as Deb and RPM packages. The Deb package can be installed on Debian, Ubuntu, and derivative systems. The RPM package can be installed on CentOS, RHEL, SUSE, and derivative systems. A .tar.gz package is also provided for enterprise customers, and you can install TDengine over `apt-get` as well. The .tar.tz package includes `taosdump` and the TDinsight installation script. If you want to use these utilities with the Deb or RPM package, download and install taosTools separately. TDengine can also be installed on x64 Windows and x64/m1 macOS. ## Operating environment requirements In the Linux system, the minimum requirements for the operating environment are as follows: diff --git a/docs/en/12-taos-sql/02-database.md b/docs/en/12-taos-sql/02-database.md index 68dba3fc56..e783d61497 100644 --- a/docs/en/12-taos-sql/02-database.md +++ b/docs/en/12-taos-sql/02-database.md @@ -58,7 +58,7 @@ database_option: { - WAL_FSYNC_PERIOD: specifies the interval (in milliseconds) at which data is written from the WAL to disk. This parameter takes effect only when the WAL parameter is set to 2. The default value is 3000. Enter a value between 0 and 180000. The value 0 indicates that incoming data is immediately written to disk. - MAXROWS: specifies the maximum number of rows recorded in a block. The default value is 4096. - MINROWS: specifies the minimum number of rows recorded in a block. The default value is 100. -- KEEP: specifies the time for which data is retained. Enter a value between 1 and 365000. The default value is 3650. The value of the KEEP parameter must be greater than or equal to the value of the DURATION parameter. TDengine automatically deletes data that is older than the value of the KEEP parameter. You can use m (minutes), h (hours), and d (days) as the unit, for example KEEP 100h or KEEP 10d. If you do not include a unit, d is used by default. The Enterprise Edition supports [Tiered Storage](https://docs.tdengine.com/tdinternal/arch/#tiered-storage) function, thus multiple KEEP values (comma separated and up to 3 values supported, and meet keep 0 <= keep 1 <= keep 2, e.g. KEEP 100h,100d,3650d) are supported; the Community Edition does not support Tiered Storage function (although multiple keep values are configured, they do not take effect, only the maximum keep value is used as KEEP). +- KEEP: specifies the time for which data is retained. Enter a value between 1 and 365000. The default value is 3650. The value of the KEEP parameter must be greater than or equal to the value of the DURATION parameter. TDengine automatically deletes data that is older than the value of the KEEP parameter. You can use m (minutes), h (hours), and d (days) as the unit, for example KEEP 100h or KEEP 10d. If you do not include a unit, d is used by default. TDengine Enterprise supports [Tiered Storage](https://docs.tdengine.com/tdinternal/arch/#tiered-storage) function, thus multiple KEEP values (comma separated and up to 3 values supported, and meet keep 0 <= keep 1 <= keep 2, e.g. KEEP 100h,100d,3650d) are supported; TDengine OSS does not support Tiered Storage function (although multiple keep values are configured, they do not take effect, only the maximum keep value is used as KEEP). - PAGES: specifies the number of pages in the metadata storage engine cache on each vnode. Enter a value greater than or equal to 64. The default value is 256. The space occupied by metadata storage on each vnode is equal to the product of the values of the PAGESIZE and PAGES parameters. The space occupied by default is 1 MB. - PAGESIZE: specifies the size (in KB) of each page in the metadata storage engine cache on each vnode. The default value is 4. Enter a value between 1 and 16384. - PRECISION: specifies the precision at which a database records timestamps. Enter ms for milliseconds, us for microseconds, or ns for nanoseconds. The default value is ms.