Merge pull request #22131 from taosdata/mark/tmq
fix:optimize log & change return value for async interface
This commit is contained in:
commit
c0e35df927
|
@ -288,7 +288,7 @@ DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
|
||||||
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
|
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
|
||||||
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
|
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
|
||||||
DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
|
DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
|
||||||
DLL_EXPORT int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
|
DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
|
||||||
DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,
|
DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,
|
||||||
int32_t *numOfAssignment);
|
int32_t *numOfAssignment);
|
||||||
DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment);
|
DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment);
|
||||||
|
|
|
@ -312,7 +312,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME_PUSH, "vnode-tmq-consume-push", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME_PUSH, "vnode-tmq-consume-push", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_WALINFO, "vnode-tmq-vg-walinfo", SMqPollReq, SMqDataBlkRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_WALINFO, "vnode-tmq-vg-walinfo", SMqPollReq, SMqDataBlkRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committed-walinfo", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committedinfo", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -523,9 +523,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
|
||||||
|
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo);
|
return asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
|
static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
|
||||||
|
@ -546,7 +544,6 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
|
||||||
static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum){
|
static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum){
|
||||||
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
|
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
|
||||||
if (pParamSet == NULL) {
|
if (pParamSet == NULL) {
|
||||||
pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -715,7 +712,9 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
|
||||||
|
|
||||||
end:
|
end:
|
||||||
taosMemoryFree(pParamSet);
|
taosMemoryFree(pParamSet);
|
||||||
|
if(pCommitFp != NULL) {
|
||||||
pCommitFp(tmq, code, userParam);
|
pCommitFp(tmq, code, userParam);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2307,6 +2306,9 @@ const char* tmq_get_table_name(TAOS_RES* res) {
|
||||||
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
|
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
|
||||||
if (tmq == NULL) {
|
if (tmq == NULL) {
|
||||||
tscError("invalid tmq handle, null");
|
tscError("invalid tmq handle, null");
|
||||||
|
if(cb != NULL) {
|
||||||
|
cb(tmq, TSDB_CODE_INVALID_PARA, param);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (pRes == NULL) { // here needs to commit all offsets.
|
if (pRes == NULL) { // here needs to commit all offsets.
|
||||||
|
@ -2410,15 +2412,17 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId,
|
||||||
tsem_destroy(&pInfo->sem);
|
tsem_destroy(&pInfo->sem);
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
|
|
||||||
tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
|
tscInfo("consumer:0x%" PRIx64 " sync send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param){
|
void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param){
|
||||||
|
int32_t code = 0;
|
||||||
if (tmq == NULL || pTopicName == NULL) {
|
if (tmq == NULL || pTopicName == NULL) {
|
||||||
tscError("invalid tmq handle, null");
|
tscError("invalid tmq handle, null");
|
||||||
return TSDB_CODE_INVALID_PARA;
|
code = TSDB_CODE_INVALID_PARA;
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t accId = tmq->pTscObj->acctId;
|
int32_t accId = tmq->pTscObj->acctId;
|
||||||
|
@ -2427,17 +2431,17 @@ int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId
|
||||||
|
|
||||||
taosWLockLatch(&tmq->lock);
|
taosWLockLatch(&tmq->lock);
|
||||||
SMqClientVg* pVg = NULL;
|
SMqClientVg* pVg = NULL;
|
||||||
int32_t code = getClientVg(tmq, tname, vgId, &pVg);
|
code = getClientVg(tmq, tname, vgId, &pVg);
|
||||||
if(code != 0){
|
if(code != 0){
|
||||||
taosWUnLockLatch(&tmq->lock);
|
taosWUnLockLatch(&tmq->lock);
|
||||||
return code;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
|
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
|
||||||
code = checkWalRange(pOffsetInfo, offset);
|
code = checkWalRange(pOffsetInfo, offset);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
taosWUnLockLatch(&tmq->lock);
|
taosWUnLockLatch(&tmq->lock);
|
||||||
return code;
|
goto end;
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&tmq->lock);
|
taosWUnLockLatch(&tmq->lock);
|
||||||
|
|
||||||
|
@ -2445,9 +2449,12 @@ int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId
|
||||||
|
|
||||||
code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
|
code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
|
||||||
|
|
||||||
tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
|
tscInfo("consumer:0x%" PRIx64 " async send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
|
||||||
|
|
||||||
return code;
|
end:
|
||||||
|
if(code != 0 && cb != NULL){
|
||||||
|
cb(tmq, code, param);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
|
void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
|
||||||
|
@ -2832,6 +2839,7 @@ int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId){
|
||||||
tscError("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type);
|
tscError("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tscInfo("consumer:0x%" PRIx64 " tmq_position vgId:%d position:%" PRId64, tmq->consumerId, vgId, position);
|
||||||
return position;
|
return position;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2871,12 +2879,16 @@ int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId){
|
||||||
if(pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG){
|
if(pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG){
|
||||||
committed = pOffsetInfo->committedOffset.version;
|
committed = pOffsetInfo->committedOffset.version;
|
||||||
taosWUnLockLatch(&tmq->lock);
|
taosWUnLockLatch(&tmq->lock);
|
||||||
return committed;
|
goto end;
|
||||||
}
|
}
|
||||||
SEpSet epSet = pVg->epSet;
|
SEpSet epSet = pVg->epSet;
|
||||||
taosWUnLockLatch(&tmq->lock);
|
taosWUnLockLatch(&tmq->lock);
|
||||||
|
|
||||||
return getCommittedFromServer(tmq, tname, vgId, &epSet);
|
committed = getCommittedFromServer(tmq, tname, vgId, &epSet);
|
||||||
|
|
||||||
|
end:
|
||||||
|
tscInfo("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed);
|
||||||
|
return committed;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
|
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
|
||||||
|
@ -2897,7 +2909,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
||||||
taosWLockLatch(&tmq->lock);
|
taosWLockLatch(&tmq->lock);
|
||||||
SMqClientTopic* pTopic = getTopicByName(tmq, tname);
|
SMqClientTopic* pTopic = getTopicByName(tmq, tname);
|
||||||
if (pTopic == NULL) {
|
if (pTopic == NULL) {
|
||||||
code = TSDB_CODE_INVALID_PARA;
|
code = TSDB_CODE_TMQ_INVALID_TOPIC;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3040,7 +3052,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
||||||
}
|
}
|
||||||
|
|
||||||
SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
|
SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
|
||||||
tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset);
|
tscInfo("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%"PRId64, tmq->consumerId, pTopic->topicName, p->vgId, p->currentOffset);
|
||||||
|
|
||||||
pOffsetInfo->walVerBegin = p->begin;
|
pOffsetInfo->walVerBegin = p->begin;
|
||||||
pOffsetInfo->walVerEnd = p->end;
|
pOffsetInfo->walVerEnd = p->end;
|
||||||
|
@ -3078,6 +3090,7 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// seek interface have to send msg to server to cancel push handle if needed, because consumer may be in wait status if there is no data to poll
|
||||||
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
|
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
|
||||||
if (tmq == NULL || pTopicName == NULL) {
|
if (tmq == NULL || pTopicName == NULL) {
|
||||||
tscError("invalid tmq handle, null");
|
tscError("invalid tmq handle, null");
|
||||||
|
@ -3163,8 +3176,6 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
|
||||||
sendInfo->msgType = TDMT_VND_TMQ_SEEK;
|
sendInfo->msgType = TDMT_VND_TMQ_SEEK;
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
tscInfo("consumer:0x%" PRIx64 " %s send seek info vgId:%d, epoch %d" PRIx64,
|
|
||||||
tmq->consumerId, tname, vgId, tmq->epoch);
|
|
||||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||||
|
|
||||||
tsem_wait(&pParam->sem);
|
tsem_wait(&pParam->sem);
|
||||||
|
|
|
@ -94,7 +94,7 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){
|
||||||
|
|
||||||
bool mndRebTryStart() {
|
bool mndRebTryStart() {
|
||||||
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
|
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
|
||||||
mDebug("tq timer, rebalance counter old val:%d", old);
|
mInfo("tq timer, rebalance counter old val:%d", old);
|
||||||
return old == 0;
|
return old == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,7 +116,7 @@ void mndRebCntDec() {
|
||||||
int32_t newVal = val - 1;
|
int32_t newVal = val - 1;
|
||||||
int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
|
int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
|
||||||
if (oldVal == val) {
|
if (oldVal == val) {
|
||||||
mDebug("rebalance trans end, rebalance counter:%d", newVal);
|
mInfo("rebalance trans end, rebalance counter:%d", newVal);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -281,7 +281,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
// rebalance cannot be parallel
|
// rebalance cannot be parallel
|
||||||
if (!mndRebTryStart()) {
|
if (!mndRebTryStart()) {
|
||||||
mDebug("mq rebalance already in progress, do nothing");
|
mInfo("mq rebalance already in progress, do nothing");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -312,7 +312,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
||||||
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
|
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
|
||||||
int32_t status = atomic_load_32(&pConsumer->status);
|
int32_t status = atomic_load_32(&pConsumer->status);
|
||||||
|
|
||||||
mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
|
mInfo("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
|
||||||
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
|
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
|
||||||
hbStatus);
|
hbStatus);
|
||||||
|
|
||||||
|
@ -416,7 +416,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
for(int i = 0; i < taosArrayGetSize(req.topics); i++){
|
for(int i = 0; i < taosArrayGetSize(req.topics); i++){
|
||||||
TopicOffsetRows* data = taosArrayGet(req.topics, i);
|
TopicOffsetRows* data = taosArrayGet(req.topics, i);
|
||||||
mDebug("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
|
mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
|
||||||
|
|
||||||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
|
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
|
||||||
if(pSub == NULL){
|
if(pSub == NULL){
|
||||||
|
@ -1109,13 +1109,13 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
|
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
|
||||||
mDebug("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
|
mInfo("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
|
||||||
sdbRelease(pSdb, pConsumer);
|
sdbRelease(pSdb, pConsumer);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRLockLatch(&pConsumer->lock);
|
taosRLockLatch(&pConsumer->lock);
|
||||||
mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId);
|
mInfo("showing consumer:0x%" PRIx64, pConsumer->consumerId);
|
||||||
|
|
||||||
int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
|
int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
|
||||||
bool hasTopic = true;
|
bool hasTopic = true;
|
||||||
|
|
|
@ -1207,7 +1207,7 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
SMqSubscribeObj *pSub = NULL;
|
SMqSubscribeObj *pSub = NULL;
|
||||||
|
|
||||||
mDebug("mnd show subscriptions begin");
|
mInfo("mnd show subscriptions begin");
|
||||||
|
|
||||||
while (numOfRows < rowsCapacity) {
|
while (numOfRows < rowsCapacity) {
|
||||||
pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
|
pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
|
||||||
|
@ -1247,7 +1247,7 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
|
||||||
sdbRelease(pSdb, pSub);
|
sdbRelease(pSdb, pSub);
|
||||||
}
|
}
|
||||||
|
|
||||||
mDebug("mnd end show subscriptions");
|
mInfo("mnd end show subscriptions");
|
||||||
|
|
||||||
pShow->numOfRows += numOfRows;
|
pShow->numOfRows += numOfRows;
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
|
|
|
@ -703,7 +703,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
|
tqInfo("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
taosWLockLatch(&pTq->lock);
|
taosWLockLatch(&pTq->lock);
|
||||||
|
@ -784,7 +784,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
|
tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
|
||||||
req.oldConsumerId, req.newConsumerId);
|
req.oldConsumerId, req.newConsumerId);
|
||||||
|
|
||||||
STqHandle* pHandle = NULL;
|
STqHandle* pHandle = NULL;
|
||||||
|
|
|
@ -70,17 +70,18 @@ int32_t walNextValidMsg(SWalReader *pReader) {
|
||||||
int64_t fetchVer = pReader->curVersion;
|
int64_t fetchVer = pReader->curVersion;
|
||||||
int64_t lastVer = walGetLastVer(pReader->pWal);
|
int64_t lastVer = walGetLastVer(pReader->pWal);
|
||||||
int64_t committedVer = walGetCommittedVer(pReader->pWal);
|
int64_t committedVer = walGetCommittedVer(pReader->pWal);
|
||||||
int64_t appliedVer = walGetAppliedVer(pReader->pWal);
|
// int64_t appliedVer = walGetAppliedVer(pReader->pWal);
|
||||||
|
|
||||||
if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
|
// if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
|
||||||
wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
|
// wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
|
||||||
}
|
// }
|
||||||
|
|
||||||
int64_t endVer = TMIN(appliedVer, committedVer);
|
// int64_t endVer = TMIN(appliedVer, committedVer);
|
||||||
|
int64_t endVer = committedVer;
|
||||||
|
|
||||||
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
|
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
|
||||||
", applied index:%" PRId64", end index:%" PRId64,
|
", end index:%" PRId64,
|
||||||
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
|
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, endVer);
|
||||||
|
|
||||||
if (fetchVer > endVer){
|
if (fetchVer > endVer){
|
||||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||||
|
@ -370,9 +371,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
||||||
pRead->pWal->vers.appliedVer);
|
pRead->pWal->vers.appliedVer);
|
||||||
|
|
||||||
// TODO: valid ver
|
// TODO: valid ver
|
||||||
if (ver > pRead->pWal->vers.appliedVer) {
|
// if (ver > pRead->pWal->vers.appliedVer) {
|
||||||
return -1;
|
// return -1;
|
||||||
}
|
// }
|
||||||
|
|
||||||
if (pRead->curVersion != ver) {
|
if (pRead->curVersion != ver) {
|
||||||
code = walReaderSeekVer(pRead, ver);
|
code = walReaderSeekVer(pRead, ver);
|
||||||
|
|
Loading…
Reference in New Issue