Merge branch 'main' of https://github.com/taosdata/TDengine into fix/TS-3724

This commit is contained in:
wangmm0220 2023-07-25 09:19:03 +08:00
commit b6667732bf
13 changed files with 93 additions and 62 deletions

View File

@ -18,7 +18,7 @@ The full package of TDengine includes the TDengine Server (`taosd`), TDengine Cl
The standard server installation package includes `taos`, `taosd`, `taosAdapter`, `taosBenchmark`, and sample code. You can also download the Lite package that includes only `taosd` and the C/C++ connector.
The TDengine Community Edition is released as Deb and RPM packages. The Deb package can be installed on Debian, Ubuntu, and derivative systems. The RPM package can be installed on CentOS, RHEL, SUSE, and derivative systems. A .tar.gz package is also provided for enterprise customers, and you can install TDengine over `apt-get` as well. The .tar.tz package includes `taosdump` and the TDinsight installation script. If you want to use these utilities with the Deb or RPM package, download and install taosTools separately. TDengine can also be installed on x64 Windows and x64/m1 macOS.
TDengine OSS is released as Deb and RPM packages. The Deb package can be installed on Debian, Ubuntu, and derivative systems. The RPM package can be installed on CentOS, RHEL, SUSE, and derivative systems. A .tar.gz package is also provided for enterprise customers, and you can install TDengine over `apt-get` as well. The .tar.tz package includes `taosdump` and the TDinsight installation script. If you want to use these utilities with the Deb or RPM package, download and install taosTools separately. TDengine can also be installed on x64 Windows and x64/m1 macOS.
## Operating environment requirements
In the Linux system, the minimum requirements for the operating environment are as follows:

View File

@ -58,7 +58,7 @@ database_option: {
- WAL_FSYNC_PERIOD: specifies the interval (in milliseconds) at which data is written from the WAL to disk. This parameter takes effect only when the WAL parameter is set to 2. The default value is 3000. Enter a value between 0 and 180000. The value 0 indicates that incoming data is immediately written to disk.
- MAXROWS: specifies the maximum number of rows recorded in a block. The default value is 4096.
- MINROWS: specifies the minimum number of rows recorded in a block. The default value is 100.
- KEEP: specifies the time for which data is retained. Enter a value between 1 and 365000. The default value is 3650. The value of the KEEP parameter must be greater than or equal to the value of the DURATION parameter. TDengine automatically deletes data that is older than the value of the KEEP parameter. You can use m (minutes), h (hours), and d (days) as the unit, for example KEEP 100h or KEEP 10d. If you do not include a unit, d is used by default. The Enterprise Edition supports [Tiered Storage](https://docs.tdengine.com/tdinternal/arch/#tiered-storage) function, thus multiple KEEP values (comma separated and up to 3 values supported, and meet keep 0 <= keep 1 <= keep 2, e.g. KEEP 100h,100d,3650d) are supported; the Community Edition does not support Tiered Storage function (although multiple keep values are configured, they do not take effect, only the maximum keep value is used as KEEP).
- KEEP: specifies the time for which data is retained. Enter a value between 1 and 365000. The default value is 3650. The value of the KEEP parameter must be greater than or equal to the value of the DURATION parameter. TDengine automatically deletes data that is older than the value of the KEEP parameter. You can use m (minutes), h (hours), and d (days) as the unit, for example KEEP 100h or KEEP 10d. If you do not include a unit, d is used by default. TDengine Enterprise supports [Tiered Storage](https://docs.tdengine.com/tdinternal/arch/#tiered-storage) function, thus multiple KEEP values (comma separated and up to 3 values supported, and meet keep 0 <= keep 1 <= keep 2, e.g. KEEP 100h,100d,3650d) are supported; TDengine OSS does not support Tiered Storage function (although multiple keep values are configured, they do not take effect, only the maximum keep value is used as KEEP).
- PAGES: specifies the number of pages in the metadata storage engine cache on each vnode. Enter a value greater than or equal to 64. The default value is 256. The space occupied by metadata storage on each vnode is equal to the product of the values of the PAGESIZE and PAGES parameters. The space occupied by default is 1 MB.
- PAGESIZE: specifies the size (in KB) of each page in the metadata storage engine cache on each vnode. The default value is 4. Enter a value between 1 and 16384.
- PRECISION: specifies the precision at which a database records timestamps. Enter ms for milliseconds, us for microseconds, or ns for nanoseconds. The default value is ms.

View File

@ -648,12 +648,12 @@ stmt.execute()?;
//stmt.execute()?;
```
For a working example, see [GitHub](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs).
For a working example, see [GitHub](https://github.com/taosdata/taos-connector-rust/blob/main/taos/examples/bind.rs).
For information about other structure APIs, see the [Rust documentation](https://docs.rs/taos).
[taos]: https://github.com/taosdata/rust-connector-taos
[taos]: https://github.com/taosdata/taos-connector-rust
[r2d2]: https://crates.io/crates/r2d2
[TaosBuilder]: https://docs.rs/taos/latest/taos/struct.TaosBuilder.html
[TaosCfg]: https://docs.rs/taos/latest/taos/struct.TaosCfg.html

View File

@ -1007,13 +1007,12 @@ consumer.close()
### Other sample programs
| Example program links | Example program content |
| ------------------------------------------------------------------------------------------------------------- | ------------------- ---- |
| [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py) | parameter binding,
bind multiple rows at once |
| [bind_row.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-row.py) | bind_row.py
|-----------------------|-------------------------|
| [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py) | parameter binding, bind multiple rows at once |
| [bind_row.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-row.py) | parameter binding, bind one row at once |
| [insert_lines.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/insert-lines.py) | InfluxDB line protocol writing |
| [json_tag.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/json-tag.py) | Use JSON type tags |
| [tmq.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/tmq.py) | TMQ subscription |
| [tmq_consumer.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/tmq_consumer.py) | TMQ subscription |
## Other notes

View File

@ -288,7 +288,7 @@ DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
DLL_EXPORT int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,
int32_t *numOfAssignment);
DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment);

View File

@ -312,7 +312,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME_PUSH, "vnode-tmq-consume-push", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_WALINFO, "vnode-tmq-vg-walinfo", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committed-walinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committedinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL)

View File

@ -1297,13 +1297,19 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
return -1;
}
int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return terrno;
}
mgmtEpSet->numOfEps++;
uint32_t addr = taosGetIpv4FromFqdn(mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn);
if (addr == 0xffffffff) {
tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
} else {
mgmtEpSet->numOfEps++;
}
}
if (secondEp && secondEp[0] != 0) {
@ -1313,12 +1319,19 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
}
taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
mgmtEpSet->numOfEps++;
uint32_t addr = taosGetIpv4FromFqdn(mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn);
if (addr == 0xffffffff) {
tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
} else {
mgmtEpSet->numOfEps++;
}
}
if (mgmtEpSet->numOfEps == 0) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
return TSDB_CODE_RPC_NETWORK_UNAVAIL;
}
return 0;

View File

@ -99,13 +99,20 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
goto End;
}
int updateEpSet = 1;
if (connectRsp.dnodeNum == 1) {
SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
SEpSet dstEpSet = connectRsp.epSet;
rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn,
dstEpSet.eps[dstEpSet.inUse].fqdn);
} else if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
if (srcEpSet.numOfEps == 1) {
rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn,
dstEpSet.eps[dstEpSet.inUse].fqdn);
updateEpSet = 0;
}
}
if (updateEpSet == 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
SEpSet corEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
SEpSet* pOrig = &corEpSet;
SEp* pOrigEp = &pOrig->eps[pOrig->inUse];
SEp* pNewEp = &connectRsp.epSet.eps[connectRsp.epSet.inUse];
tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in connRsp", pOrig->inUse, pOrig->numOfEps,

View File

@ -523,9 +523,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo);
return TSDB_CODE_SUCCESS;
return asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo);
}
static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
@ -546,7 +544,6 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum){
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
if (pParamSet == NULL) {
pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
return NULL;
}
@ -715,7 +712,9 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
end:
taosMemoryFree(pParamSet);
pCommitFp(tmq, code, userParam);
if(pCommitFp != NULL) {
pCommitFp(tmq, code, userParam);
}
return;
}
@ -2307,6 +2306,9 @@ const char* tmq_get_table_name(TAOS_RES* res) {
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
if (tmq == NULL) {
tscError("invalid tmq handle, null");
if(cb != NULL) {
cb(tmq, TSDB_CODE_INVALID_PARA, param);
}
return;
}
if (pRes == NULL) { // here needs to commit all offsets.
@ -2410,15 +2412,17 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId,
tsem_destroy(&pInfo->sem);
taosMemoryFree(pInfo);
tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
tscInfo("consumer:0x%" PRIx64 " sync send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
return code;
}
int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param){
void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param){
int32_t code = 0;
if (tmq == NULL || pTopicName == NULL) {
tscError("invalid tmq handle, null");
return TSDB_CODE_INVALID_PARA;
code = TSDB_CODE_INVALID_PARA;
goto end;
}
int32_t accId = tmq->pTscObj->acctId;
@ -2427,17 +2431,17 @@ int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = NULL;
int32_t code = getClientVg(tmq, tname, vgId, &pVg);
code = getClientVg(tmq, tname, vgId, &pVg);
if(code != 0){
taosWUnLockLatch(&tmq->lock);
return code;
goto end;
}
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
code = checkWalRange(pOffsetInfo, offset);
if (code != 0) {
taosWUnLockLatch(&tmq->lock);
return code;
goto end;
}
taosWUnLockLatch(&tmq->lock);
@ -2445,9 +2449,12 @@ int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId
code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
tscInfo("consumer:0x%" PRIx64 " async send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
return code;
end:
if(code != 0 && cb != NULL){
cb(tmq, code, param);
}
}
void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
@ -2832,6 +2839,7 @@ int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId){
tscError("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type);
}
tscInfo("consumer:0x%" PRIx64 " tmq_position vgId:%d position:%" PRId64, tmq->consumerId, vgId, position);
return position;
}
@ -2871,12 +2879,16 @@ int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId){
if(pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG){
committed = pOffsetInfo->committedOffset.version;
taosWUnLockLatch(&tmq->lock);
return committed;
goto end;
}
SEpSet epSet = pVg->epSet;
taosWUnLockLatch(&tmq->lock);
return getCommittedFromServer(tmq, tname, vgId, &epSet);
committed = getCommittedFromServer(tmq, tname, vgId, &epSet);
end:
tscInfo("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed);
return committed;
}
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
@ -2897,7 +2909,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
taosWLockLatch(&tmq->lock);
SMqClientTopic* pTopic = getTopicByName(tmq, tname);
if (pTopic == NULL) {
code = TSDB_CODE_INVALID_PARA;
code = TSDB_CODE_TMQ_INVALID_TOPIC;
goto end;
}
@ -3040,7 +3052,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
}
SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset);
tscInfo("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%"PRId64, tmq->consumerId, pTopic->topicName, p->vgId, p->currentOffset);
pOffsetInfo->walVerBegin = p->begin;
pOffsetInfo->walVerEnd = p->end;
@ -3078,6 +3090,7 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
return 0;
}
// seek interface have to send msg to server to cancel push handle if needed, because consumer may be in wait status if there is no data to poll
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
if (tmq == NULL || pTopicName == NULL) {
tscError("invalid tmq handle, null");
@ -3163,8 +3176,6 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
sendInfo->msgType = TDMT_VND_TMQ_SEEK;
int64_t transporterId = 0;
tscInfo("consumer:0x%" PRIx64 " %s send seek info vgId:%d, epoch %d" PRIx64,
tmq->consumerId, tname, vgId, tmq->epoch);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pParam->sem);

View File

@ -94,7 +94,7 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){
bool mndRebTryStart() {
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
mDebug("tq timer, rebalance counter old val:%d", old);
mInfo("tq timer, rebalance counter old val:%d", old);
return old == 0;
}
@ -116,7 +116,7 @@ void mndRebCntDec() {
int32_t newVal = val - 1;
int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
if (oldVal == val) {
mDebug("rebalance trans end, rebalance counter:%d", newVal);
mInfo("rebalance trans end, rebalance counter:%d", newVal);
break;
}
}
@ -281,7 +281,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
// rebalance cannot be parallel
if (!mndRebTryStart()) {
mDebug("mq rebalance already in progress, do nothing");
mInfo("mq rebalance already in progress, do nothing");
return 0;
}
@ -312,7 +312,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
int32_t status = atomic_load_32(&pConsumer->status);
mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
mInfo("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
hbStatus);
@ -362,7 +362,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
}
if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
mInfo("mq rebalance will be triggered");
mInfo("mq rebalance will be triggered");
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_TMQ_DO_REBALANCE,
.pCont = pRebMsg,
@ -416,7 +416,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
for(int i = 0; i < taosArrayGetSize(req.topics); i++){
TopicOffsetRows* data = taosArrayGet(req.topics, i);
mDebug("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
if(pSub == NULL){
@ -1109,13 +1109,13 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
}
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
mDebug("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
mInfo("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
sdbRelease(pSdb, pConsumer);
continue;
}
taosRLockLatch(&pConsumer->lock);
mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId);
mInfo("showing consumer:0x%" PRIx64, pConsumer->consumerId);
int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
bool hasTopic = true;

View File

@ -1207,7 +1207,7 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
int32_t numOfRows = 0;
SMqSubscribeObj *pSub = NULL;
mDebug("mnd show subscriptions begin");
mInfo("mnd show subscriptions begin");
while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
@ -1247,7 +1247,7 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
sdbRelease(pSdb, pSub);
}
mDebug("mnd end show subscriptions");
mInfo("mnd end show subscriptions");
pShow->numOfRows += numOfRows;
return numOfRows;

View File

@ -703,7 +703,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
int32_t vgId = TD_VID(pTq->pVnode);
tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
tqInfo("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
int32_t code = 0;
taosWLockLatch(&pTq->lock);
@ -784,7 +784,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
return -1;
}
tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
req.oldConsumerId, req.newConsumerId);
STqHandle* pHandle = NULL;

View File

@ -70,17 +70,18 @@ int32_t walNextValidMsg(SWalReader *pReader) {
int64_t fetchVer = pReader->curVersion;
int64_t lastVer = walGetLastVer(pReader->pWal);
int64_t committedVer = walGetCommittedVer(pReader->pWal);
int64_t appliedVer = walGetAppliedVer(pReader->pWal);
// int64_t appliedVer = walGetAppliedVer(pReader->pWal);
if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
}
// if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
// wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
// }
int64_t endVer = TMIN(appliedVer, committedVer);
// int64_t endVer = TMIN(appliedVer, committedVer);
int64_t endVer = committedVer;
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
", applied index:%" PRId64", end index:%" PRId64,
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
", end index:%" PRId64,
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, endVer);
if (fetchVer > endVer){
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
@ -370,9 +371,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
pRead->pWal->vers.appliedVer);
// TODO: valid ver
if (ver > pRead->pWal->vers.appliedVer) {
return -1;
}
// if (ver > pRead->pWal->vers.appliedVer) {
// return -1;
// }
if (pRead->curVersion != ver) {
code = walReaderSeekVer(pRead, ver);