tmq optimize offset handling
This commit is contained in:
parent
8b00e0ed06
commit
459ec6ed71
|
@ -1,16 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "consumer.h"
|
|
@ -241,6 +241,10 @@ void tmq_list_destroy(tmq_list_t* list) {
|
|||
taosArrayDestroyEx(container, (void (*)(void*))taosMemoryFree);
|
||||
}
|
||||
|
||||
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
|
||||
return sprintf(dst, "%s:%d", topicName, vg);
|
||||
}
|
||||
|
||||
void tmqClearUnhandleMsg(tmq_t* tmq) {
|
||||
tmq_message_t* msg = NULL;
|
||||
while (1) {
|
||||
|
@ -827,7 +831,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
SMqClientVg* pVg = pParam->pVg;
|
||||
tmq_t* tmq = pParam->tmq;
|
||||
if (code != 0) {
|
||||
printf("msg discard, code:%x\n", code);
|
||||
tscWarn("msg discard, code:%x", code);
|
||||
goto WRITE_QUEUE_FAIL;
|
||||
}
|
||||
|
||||
|
@ -835,12 +839,12 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
|
||||
if (msgEpoch < tmqEpoch) {
|
||||
tsem_post(&tmq->rspSem);
|
||||
printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);
|
||||
tscWarn("discard rsp epoch %d, current epoch %d", msgEpoch, tmqEpoch);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (msgEpoch != tmqEpoch) {
|
||||
printf("mismatch rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);
|
||||
tscWarn("mismatch rsp epoch %d, current epoch %d", msgEpoch, tmqEpoch);
|
||||
} else {
|
||||
atomic_sub_fetch_32(&tmq->waitingRequest, 1);
|
||||
}
|
||||
|
@ -899,19 +903,54 @@ WRITE_QUEUE_FAIL:
|
|||
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
|
||||
/*printf("call update ep %d\n", epoch);*/
|
||||
bool set = false;
|
||||
int32_t sz = taosArrayGetSize(pRsp->topics);
|
||||
SArray* newTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
|
||||
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
|
||||
SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
|
||||
if (newTopics == NULL) {
|
||||
return false;
|
||||
}
|
||||
SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
|
||||
if (pHash == NULL) {
|
||||
taosArrayDestroy(newTopics);
|
||||
return false;
|
||||
}
|
||||
|
||||
// find topic, build hash
|
||||
for (int32_t i = 0; i < topicNumGet; i++) {
|
||||
SMqClientTopic topic = {0};
|
||||
SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
|
||||
taosHashClear(pHash);
|
||||
topic.topicName = strdup(pTopicEp->topic);
|
||||
int32_t vgSz = taosArrayGetSize(pTopicEp->vgs);
|
||||
topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg));
|
||||
for (int32_t j = 0; j < vgSz; j++) {
|
||||
|
||||
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
|
||||
for (int32_t j = 0; j < topicNumCur; j++) {
|
||||
// find old topic
|
||||
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
|
||||
if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
|
||||
int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
|
||||
if (vgNumCur == 0) break;
|
||||
for (int32_t k = 0; k < vgNumCur; k++) {
|
||||
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
|
||||
sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
|
||||
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
|
||||
topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
|
||||
for (int32_t j = 0; j < vgNumGet; j++) {
|
||||
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
|
||||
sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
|
||||
int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
|
||||
int64_t offset = pVgEp->offset;
|
||||
if (pOffset != NULL) {
|
||||
offset = *pOffset;
|
||||
}
|
||||
SMqClientVg clientVg = {
|
||||
.pollCnt = 0,
|
||||
.currentOffset = pVgEp->offset,
|
||||
.currentOffset = offset,
|
||||
.vgId = pVgEp->vgId,
|
||||
.epSet = pVgEp->epSet,
|
||||
.vgStatus = TMQ_VG_STATUS__IDLE,
|
||||
|
@ -922,6 +961,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
|
|||
taosArrayPush(newTopics, &topic);
|
||||
}
|
||||
if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
|
||||
taosHashCleanup(pHash);
|
||||
tmq->clientTopics = newTopics;
|
||||
atomic_store_32(&tmq->epoch, epoch);
|
||||
return set;
|
||||
|
@ -931,7 +971,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
|
||||
tmq_t* tmq = pParam->tmq;
|
||||
if (code != 0) {
|
||||
printf("get topic endpoint error, not ready, wait:%d\n", pParam->sync);
|
||||
tscError("get topic endpoint error, not ready, wait:%d\n", pParam->sync);
|
||||
goto END;
|
||||
}
|
||||
|
||||
|
@ -1302,6 +1342,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
|||
|
||||
while (1) {
|
||||
/*printf("cycle\n");*/
|
||||
tmqAskEp(tmq, false);
|
||||
tmqPollImpl(tmq, blocking_time);
|
||||
|
||||
tsem_wait(&tmq->rspSem);
|
||||
|
|
|
@ -293,7 +293,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
|
|||
topicObj.dbUid = pDb->uid;
|
||||
topicObj.version = 1;
|
||||
topicObj.sql = pCreate->sql;
|
||||
topicObj.physicalPlan = "";
|
||||
topicObj.physicalPlan = NULL;
|
||||
topicObj.logicalPlan = "";
|
||||
topicObj.sqlLen = strlen(pCreate->sql);
|
||||
|
||||
|
@ -302,9 +302,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
|
|||
mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
if (NULL != pPlanStr) {
|
||||
topicObj.physicalPlan = pPlanStr;
|
||||
}
|
||||
|
||||
SNode *pAst = NULL;
|
||||
if (nodesStringToNode(pCreate->ast, &pAst) < 0) {
|
||||
|
|
|
@ -387,11 +387,12 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
|
|||
printf("subscribe err\n");
|
||||
return;
|
||||
}
|
||||
/*taosSsleep(3);*/
|
||||
int32_t batchCnt = 0;
|
||||
int32_t skipLogNum = 0;
|
||||
int64_t startTime = taosGetTimestampUs();
|
||||
while (running) {
|
||||
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1);
|
||||
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 3000);
|
||||
if (tmqmessage) {
|
||||
batchCnt++;
|
||||
skipLogNum += tmqGetSkipLogNum(tmqmessage);
|
||||
|
|
Loading…
Reference in New Issue