Merge pull request #20040 from taosdata/fix/liaohj

refactor: do some internal refactor and fix race condition.
This commit is contained in:
Haojun Liao 2023-02-18 15:28:11 +08:00 committed by GitHub
commit 8405999fc6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 280 additions and 152 deletions

View File

@ -32,15 +32,15 @@
sem_post(x)
#endif
int32_t tmqAskEp(tmq_t* tmq, bool async);
typedef struct {
struct SMqMgmt{
int8_t inited;
tmr_h timer;
int32_t rsetId;
} SMqMgmt;
};
static SMqMgmt tmqMgmt = {0};
static TdThreadOnce tmqInit = PTHREAD_ONCE_INIT; // initialize only once
volatile int32_t tmqInitRes = 0; // initialize rsp code
static struct SMqMgmt tmqMgmt = {0};
typedef struct {
int8_t tmqRspType;
@ -65,8 +65,7 @@ struct tmq_conf_t {
int8_t withTbName;
int8_t snapEnable;
int32_t snapBatchSize;
bool hbBgEnable;
bool hbBgEnable;
uint16_t port;
int32_t autoCommitInterval;
@ -78,18 +77,17 @@ struct tmq_conf_t {
};
struct tmq_t {
int64_t refId;
int64_t refId;
// conf
char groupId[TSDB_CGROUP_LEN];
char clientId[256];
int8_t withTbName;
int8_t useSnapshot;
int8_t autoCommit;
int32_t autoCommitInterval;
int32_t resetOffsetCfg;
int64_t consumerId;
bool hbBgEnable;
char groupId[TSDB_CGROUP_LEN];
char clientId[256];
int8_t withTbName;
int8_t useSnapshot;
int8_t autoCommit;
int32_t autoCommitInterval;
int32_t resetOffsetCfg;
uint64_t consumerId;
bool hbBgEnable;
tmq_commit_cb* commitCb;
void* commitCbUserParam;
@ -155,11 +153,9 @@ typedef struct {
typedef struct {
// subscribe info
char topicName[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
SArray* vgs; // SArray<SMqClientVg>
char topicName[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
SArray* vgs; // SArray<SMqClientVg>
SSchemaWrapper schema;
} SMqClientTopic;
@ -221,13 +217,21 @@ typedef struct {
/*int32_t vgId;*/
} SMqCommitCbParam;
static int32_t tmqAskEp(tmq_t* tmq, bool async);
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
if (conf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return conf;
}
conf->withTbName = false;
conf->autoCommit = true;
conf->autoCommitInterval = 5000;
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
conf->hbBgEnable = true;
return conf;
}
@ -505,7 +509,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
.handle = NULL,
};
tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
tscDebug("consumer:0x%" PRIx64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
pVg->vgId, pOffset->val.version);
// TODO: put into cb
@ -636,17 +640,15 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
(int32_t)taosArrayGetSize(pTopic->vgs));
for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
for (int32_t j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
pVg->vgId);
tscDebug("consumer:0x%" PRIx64 " begin commit for topic %s, vgId:%d, ordinal:%d/%d", tmq->consumerId, pTopic->topicName,
pVg->vgId, j + 1, numOfVgroups);
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
tscDebug("consumer: %" PRId64 ", vg:%d, current %" PRId64 ", committed %" PRId64 "", tmq->consumerId, pVg->vgId,
tscDebug("consumer:0x%" PRId64 " vg:%d, current %" PRId64 ", committed %" PRId64 "", tmq->consumerId, pVg->vgId,
pVg->currentOffset.version, pVg->committedOffset.version);
if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
continue;
@ -786,34 +788,46 @@ OVER:
taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
}
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
STaosQall* qall = taosAllocateQall();
taosReadAllQitems(tmq->delayedTask, qall);
while (1) {
int8_t* pTaskType = NULL;
taosGetQitem(qall, (void**)&pTaskType);
if (pTaskType == NULL) break;
taosReadAllQitems(pTmq->delayedTask, qall);
if (qall->numOfItems == 0) {
taosFreeQall(qall);
return TSDB_CODE_SUCCESS;
}
tscDebug("consumer:0x%"PRIx64" handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems);
int8_t* pTaskType = NULL;
taosGetQitem(qall, (void**)&pTaskType);
while (pTaskType != NULL) {
if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
tmqAskEp(tmq, true);
tmqAskEp(pTmq, true);
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = tmq->refId;
*pRefId = pTmq->refId;
taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &tmq->epTimer);
tscDebug("consumer:0x%"PRIx64" will retrieve ep from mnode in 1s", pTmq->consumerId);
taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam);
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = tmq->refId;
*pRefId = pTmq->refId;
taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId, tmqMgmt.timer, &tmq->commitTimer);
tscDebug("consumer:0x%"PRIx64" will commit to mnode in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval/1000.0);
taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
// do nothing
} else {
ASSERT(0);
}
taosFreeQitem(pTaskType);
taosGetQitem(qall, (void**)&pTaskType);
}
taosFreeQall(qall);
return 0;
}
@ -932,31 +946,37 @@ void tmqFreeImpl(void* handle) {
taosMemoryFree(tmq);
}
static void tmqMgmtInit(void) {
tmqInitRes = 0;
tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
if (tmqMgmt.timer == NULL) {
tmqInitRes = TSDB_CODE_OUT_OF_MEMORY;
}
tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
if (tmqMgmt.rsetId < 0) {
tmqInitRes = terrno;
}
}
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
// init timer
int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
if (inited == 0) {
tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
if (tmqMgmt.timer == NULL) {
atomic_store_8(&tmqMgmt.inited, 0);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
taosThreadOnce(&tmqInit, tmqMgmtInit);
if (tmqInitRes != 0) {
terrno = tmqInitRes;
return NULL;
}
tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
if (pTmq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tscError("setting up new consumer failed since %s, consumer group %s", terrstr(), conf->groupId);
tscError("failed to create consumer, consumer group %s, code:%s", conf->groupId, terrstr());
return NULL;
}
const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
ASSERT(user);
ASSERT(pass);
ASSERT(conf->groupId[0]);
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
@ -966,7 +986,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
tscError("consumer:0x%" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
pTmq->groupId);
goto FAIL;
}
@ -996,7 +1016,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
// init semaphore
if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
pTmq->groupId);
goto FAIL;
}
@ -1004,7 +1024,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
// init connection
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
if (pTmq->pTscObj == NULL) {
tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
pTmq->groupId);
tsem_destroy(&pTmq->rspSem);
goto FAIL;
@ -1022,8 +1042,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
}
tscInfo("consumer %" PRId64 " is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);
tscInfo("consumer:0x%" PRIx64 " is setup, consumer groupId %s", pTmq->consumerId, pTmq->groupId);
return pTmq;
FAIL:
@ -1032,6 +1051,7 @@ FAIL:
if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
if (pTmq->qall) taosFreeQall(pTmq->qall);
taosMemoryFree(pTmq);
return NULL;
}
@ -1041,44 +1061,52 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
void* buf = NULL;
SMsgSendInfo* sendInfo = NULL;
SCMSubscribeReq req = {0};
int32_t code = -1;
int32_t code = 0;
tscDebug("tmq subscribe, consumer: %" PRId64 ", topic num %d", tmq->consumerId, sz);
tscDebug("consumer:0x%"PRIx64" tmq subscribe start, numOfTopic %d", tmq->consumerId, sz);
req.consumerId = tmq->consumerId;
tstrncpy(req.clientId, tmq->clientId, 256);
tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
req.topicNames = taosArrayInit(sz, sizeof(void*));
if (req.topicNames == NULL) goto FAIL;
tscDebug("tmq subscribe, consumer: %" PRId64 ", topic num %d", tmq->consumerId, sz);
if (req.topicNames == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
for (int32_t i = 0; i < sz; i++) {
char* topic = taosArrayGetP(container, i);
SName name = {0};
tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
if (topicFName == NULL) {
goto FAIL;
}
tNameExtractFullName(&name, topicFName);
tscDebug("subscribe topic: %s", topicFName);
tNameExtractFullName(&name, topicFName);
tscDebug("consumer:0x%"PRIx64", subscribe topic: %s", tmq->consumerId, topicFName);
taosArrayPush(req.topicNames, &topicFName);
}
int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
buf = taosMemoryMalloc(tlen);
if (buf == NULL) goto FAIL;
if (buf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
void* abuf = buf;
tSerializeSCMSubscribeReq(&abuf, &req);
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) goto FAIL;
if (sendInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
SMqSubscribeCbParam param = {
.rspErr = 0,
@ -1086,7 +1114,9 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
.epoch = tmq->epoch,
};
if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;
if (tsem_init(&param.rspSem, 0, 0) != 0) {
goto FAIL;
}
sendInfo->msgInfo = (SDataBuf){
.pData = buf,
@ -1112,15 +1142,18 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
tsem_wait(&param.rspSem);
tsem_destroy(&param.rspSem);
code = param.rspErr;
if (code != 0) goto FAIL;
if (param.rspErr != 0) {
code = param.rspErr;
goto FAIL;
}
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
if (retryCnt++ > 10) {
goto FAIL;
}
tscDebug("consumer not ready, retry");
tscDebug("consumer:0x%"PRIx64", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
taosMsleep(500);
}
@ -1138,7 +1171,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
}
code = 0;
FAIL:
taosArrayDestroyP(req.topicNames, taosMemoryFree);
taosMemoryFree(buf);
@ -1233,7 +1265,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecoderClear(&decoder);
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
tscDebug("consumer:0x%" PRIx64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
rspType);
@ -1256,7 +1288,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
tscDebug("consumer:%" PRId64 ", put poll res into mqueue %p", tmq->consumerId, pRspWrapper);
tscDebug("consumer:0x%" PRIx64 ", put poll res into mqueue %p", tmq->consumerId, pRspWrapper);
taosWriteQitem(tmq->mqueue, pRspWrapper);
tsem_post(&tmq->rspSem);
@ -1273,10 +1305,12 @@ CREATE_MSG_FAIL:
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
bool set = false;
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch,
topicNumGet);
tscDebug("consumer:0x%" PRIx64", update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
if (newTopics == NULL) {
@ -1288,19 +1322,19 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
taosArrayDestroy(newTopics);
return false;
}
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
for (int32_t i = 0; i < topicNumCur; i++) {
// find old topic
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
if (pTopicCur->vgs) {
int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
tscDebug("consumer:0x%" PRIx64 ", new vg num: %d", tmq->consumerId, vgNumCur);
for (int32_t j = 0; j < vgNumCur; j++) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
char buf[80];
tFormatOffset(buf, 80, &pVgCur->currentOffset);
tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
tscDebug("consumer:0x%" PRIx64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
pVgCur->vgId, vgKey, buf);
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
}
@ -1316,7 +1350,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
tscDebug("consumer:0x%" PRIx64 ", update topic: %s", tmq->consumerId, topic.topicName);
int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
@ -1342,6 +1376,8 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
}
taosArrayPush(newTopics, &topic);
}
// destroy current buffered existed topics info
if (tmq->clientTopics) {
int32_t sz = taosArrayGetSize(tmq->clientTopics);
for (int32_t i = 0; i < sz; i++) {
@ -1349,17 +1385,21 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
taosArrayDestroy(pTopic->vgs);
}
taosArrayDestroy(tmq->clientTopics);
}
taosHashCleanup(pHash);
tmq->clientTopics = newTopics;
if (taosArrayGetSize(tmq->clientTopics) == 0)
if (taosArrayGetSize(tmq->clientTopics) == 0) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
else
} else {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
}
atomic_store_32(&tmq->epoch, epoch);
tscDebug("consumer:0x%" PRIx64 ", update topic info completed", tmq->consumerId);
return set;
}
@ -1382,8 +1422,8 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
pParam->code = code;
if (code != 0) {
tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d, code %x", tmq->consumerId,
pParam->async, code);
tscError("consumer:0x%" PRIx64 ", get topic endpoint error, async:%d, code:%s", tmq->consumerId,
pParam->async, tstrerror(code));
goto END;
}
@ -1392,11 +1432,15 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
// Epoch will only increase when received newer epoch ep msg
SMqRspHead* head = pMsg->pData;
int32_t epoch = atomic_load_32(&tmq->epoch);
tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
if (head->epoch <= epoch) {
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
tmq->consumerId, head->epoch, epoch);
goto END;
}
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
head->epoch, epoch);
if (!async) {
SMqAskEpRsp rsp;
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
@ -1411,6 +1455,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
code = -1;
goto END;
}
pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
pWrapper->epoch = head->epoch;
memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
@ -1434,16 +1479,17 @@ END:
}
int32_t tmqAskEp(tmq_t* tmq, bool async) {
int32_t code = 0;
int32_t code = TSDB_CODE_SUCCESS;
#if 0
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
if (epStatus == 1) {
int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
tscTrace("consumer:0x%" PRIx64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
if (epSkipCnt < 5000) return 0;
}
atomic_store_32(&tmq->epSkipCnt, 0);
#endif
SMqAskEpReq req = {0};
req.consumerId = tmq->consumerId;
req.epoch = tmq->epoch;
@ -1451,27 +1497,31 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
if (tlen < 0) {
tscError("tSerializeSMqAskEpReq failed");
tscError("consumer:0x%"PRIx64", tSerializeSMqAskEpReq failed", tmq->consumerId);
return -1;
}
void* pReq = taosMemoryCalloc(1, tlen);
if (pReq == NULL) {
tscError("failed to malloc askEpReq msg, size:%d", tlen);
tscError("consumer:0x%"PRIx64", failed to malloc askEpReq msg, size:%d", tmq->consumerId, tlen);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
tscError("tSerializeSMqAskEpReq %d failed", tlen);
tscError("consumer:0x%"PRIx64", tSerializeSMqAskEpReq %d failed", tmq->consumerId, tlen);
taosMemoryFree(pReq);
return -1;
}
SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
if (pParam == NULL) {
tscError("failed to malloc subscribe param");
tscError("consumer:0x%"PRIx64", failed to malloc subscribe param", tmq->consumerId);
taosMemoryFree(pReq);
/*atomic_store_8(&tmq->epStatus, 0);*/
return -1;
}
pParam->refId = tmq->refId;
pParam->epoch = tmq->epoch;
pParam->async = async;
@ -1492,15 +1542,14 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
.handle = NULL,
};
sendInfo->requestId = generateRequestId();
sendInfo->requestId = tmq->consumerId;
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqAskEpCb;
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
tscDebug("consumer:%" PRId64 ", ask ep", tmq->consumerId);
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d", tmq->consumerId, async);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
@ -1510,6 +1559,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
code = pParam->code;
taosMemoryFree(pParam);
}
return code;
}
@ -1582,6 +1632,7 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
return pRspObj;
}
// broadcast the poll request to all related vnodes
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
@ -1590,7 +1641,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus != TMQ_VG_STATUS__IDLE) {
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
tscTrace("consumer:0x%" PRIx64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
vgSkipCnt);
continue;
/*if (vgSkipCnt < 10000) continue;*/
@ -1598,7 +1649,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
if (skipCnt < 30000) {
continue;
} else {
tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
tscDebug("consumer:0x%" PRIx64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
}
#endif
}
@ -1654,6 +1705,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
.len = msgSize,
.handle = NULL,
};
sendInfo->requestId = req.reqId;
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
@ -1661,18 +1713,19 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
int64_t transporterId = 0;
/*printf("send poll\n");*/
char offsetFormatBuf[80];
tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
tscDebug("consumer:0x%" PRIx64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:0x%" PRIx64,
tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
/*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++;
tmq->pollCnt++;
}
}
return 0;
}
@ -1710,7 +1763,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
}
}
tscDebug("consumer:%" PRId64 " handle rsp %p", tmq->consumerId, rspWrapper);
tscDebug("consumer:0x%" PRIx64 " handle rsp %p", tmq->consumerId, rspWrapper);
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
taosFreeQitem(rspWrapper);
@ -1718,7 +1771,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL;
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
tscDebug("consumer %" PRId64 " actual process poll rsp", tmq->consumerId);
tscDebug("consumer:0x%" PRIx64 " actual process poll rsp", tmq->consumerId);
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
@ -1737,8 +1790,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper);
return pRsp;
} else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(pollRspWrapper);
}
@ -1756,8 +1809,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper);
return pRsp;
} else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(pollRspWrapper);
}
@ -1787,8 +1840,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper);
return pRsp;
} else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(pollRspWrapper);
}
@ -1798,7 +1851,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
taosFreeQitem(rspWrapper);
if (pollIfReset && reset) {
tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId);
tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
tmqPollImpl(tmq, timeout);
}
}
@ -1809,7 +1862,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
void* rspObj;
int64_t startTime = taosGetTimestampMs();
tscDebug("consumer:%" PRId64 ", start poll at %" PRId64, tmq->consumerId, startTime);
tscDebug("consumer:0x%" PRIx64 ", start poll at %" PRId64, tmq->consumerId, startTime);
#if 0
tmqHandleAllDelayedTask(tmq);
@ -1822,7 +1875,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
// in no topic status, delayed task also need to be processed
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
tscDebug("consumer:%" PRId64 ", poll return since consumer status is init", tmq->consumerId);
tscDebug("consumer:0x%" PRIx64 ", poll return since consumer status is init", tmq->consumerId);
return NULL;
}
@ -1839,28 +1892,30 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
while (1) {
tmqHandleAllDelayedTask(tmq);
if (tmqPollImpl(tmq, timeout) < 0) {
tscDebug("consumer:%" PRId64 " return since poll err", tmq->consumerId);
tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
/*return NULL;*/
}
rspObj = tmqHandleAllRsp(tmq, timeout, false);
if (rspObj) {
tscDebug("consumer:%" PRId64 ", return rsp %p", tmq->consumerId, rspObj);
tscDebug("consumer:0x%" PRIx64 ", return rsp %p", tmq->consumerId, rspObj);
return (TAOS_RES*)rspObj;
} else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
tscDebug("consumer:%" PRId64 ", return null since no committed offset", tmq->consumerId);
tscDebug("consumer:0x%" PRIx64 ", return null since no committed offset", tmq->consumerId);
return NULL;
}
if (timeout != -1) {
int64_t currentTime = taosGetTimestampMs();
int64_t passedTime = currentTime - startTime;
if (passedTime > timeout) {
tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
tscDebug("consumer:0x%" PRIx64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
tmq->consumerId, tmq->epoch, startTime, currentTime);
return NULL;
}
/*tscInfo("consumer:%" PRId64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/
/*tscInfo("consumer:0x%" PRIx64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/
/*", left time %" PRId64,*/
/*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - passedTime));*/
tsem_timewait(&tmq->rspSem, (timeout - passedTime));

View File

@ -162,6 +162,11 @@ void *queryThread(void *arg) {
}
static int32_t numOfThreads = 1;
void tmq_commit_cb_print(tmq_t *pTmq, int32_t code, void *param) {
printf("success, code:%d\n", code);
}
} // namespace
int main(int argc, char** argv) {
@ -176,12 +181,12 @@ int main(int argc, char** argv) {
return RUN_ALL_TESTS();
}
TEST(testCase, driverInit_Test) {
TEST(clientCase, driverInit_Test) {
// taosInitGlobalCfg();
// taos_init();
}
TEST(testCase, connect_Test) {
TEST(clientCase, connect_Test) {
taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -190,8 +195,8 @@ TEST(testCase, connect_Test) {
}
taos_close(pConn);
}
#if 0
TEST(testCase, create_user_Test) {
TEST(clientCase, create_user_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -204,7 +209,7 @@ TEST(testCase, create_user_Test) {
taos_close(pConn);
}
TEST(testCase, create_account_Test) {
TEST(clientCase, create_account_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -217,7 +222,7 @@ TEST(testCase, create_account_Test) {
taos_close(pConn);
}
TEST(testCase, drop_account_Test) {
TEST(clientCase, drop_account_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -230,7 +235,7 @@ TEST(testCase, drop_account_Test) {
taos_close(pConn);
}
TEST(testCase, show_user_Test) {
TEST(clientCase, show_user_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -250,7 +255,7 @@ TEST(testCase, show_user_Test) {
taos_close(pConn);
}
TEST(testCase, drop_user_Test) {
TEST(clientCase, drop_user_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -263,7 +268,7 @@ TEST(testCase, drop_user_Test) {
taos_close(pConn);
}
TEST(testCase, show_db_Test) {
TEST(clientCase, show_db_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -282,7 +287,7 @@ TEST(testCase, show_db_Test) {
taos_close(pConn);
}
TEST(testCase, create_db_Test) {
TEST(clientCase, create_db_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -306,7 +311,7 @@ TEST(testCase, create_db_Test) {
taos_close(pConn);
}
TEST(testCase, create_dnode_Test) {
TEST(clientCase, create_dnode_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -325,7 +330,7 @@ TEST(testCase, create_dnode_Test) {
taos_close(pConn);
}
TEST(testCase, drop_dnode_Test) {
TEST(clientCase, drop_dnode_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -349,7 +354,7 @@ TEST(testCase, drop_dnode_Test) {
taos_close(pConn);
}
TEST(testCase, use_db_test) {
TEST(clientCase, use_db_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -367,7 +372,7 @@ TEST(testCase, use_db_test) {
taos_close(pConn);
}
// TEST(testCase, drop_db_test) {
// TEST(clientCase, drop_db_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
@ -389,7 +394,7 @@ TEST(testCase, use_db_test) {
// taos_close(pConn);
//}
TEST(testCase, create_stable_Test) {
TEST(clientCase, create_stable_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -428,7 +433,7 @@ TEST(testCase, create_stable_Test) {
taos_close(pConn);
}
TEST(testCase, create_table_Test) {
TEST(clientCase, create_table_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -447,7 +452,7 @@ TEST(testCase, create_table_Test) {
taos_close(pConn);
}
TEST(testCase, create_ctable_Test) {
TEST(clientCase, create_ctable_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -472,7 +477,7 @@ TEST(testCase, create_ctable_Test) {
taos_close(pConn);
}
TEST(testCase, show_stable_Test) {
TEST(clientCase, show_stable_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != nullptr);
@ -497,7 +502,7 @@ TEST(testCase, show_stable_Test) {
taos_close(pConn);
}
TEST(testCase, show_vgroup_Test) {
TEST(clientCase, show_vgroup_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -529,7 +534,7 @@ TEST(testCase, show_vgroup_Test) {
taos_close(pConn);
}
TEST(testCase, create_multiple_tables) {
TEST(clientCase, create_multiple_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
@ -600,7 +605,7 @@ TEST(testCase, create_multiple_tables) {
taos_close(pConn);
}
TEST(testCase, show_table_Test) {
TEST(clientCase, show_table_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -634,7 +639,7 @@ TEST(testCase, show_table_Test) {
taos_close(pConn);
}
//TEST(testCase, drop_stable_Test) {
//TEST(clientCase, drop_stable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != nullptr);
//
@ -659,14 +664,14 @@ TEST(testCase, show_table_Test) {
// taos_close(pConn);
//}
TEST(testCase, generated_request_id_test) {
TEST(clientCase, generated_request_id_test) {
SHashObj* phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
for (int32_t i = 0; i < 50000; ++i) {
uint64_t v = generateRequestId();
void* result = taosHashGet(phash, &v, sizeof(v));
if (result != nullptr) {
printf("0x%lx, index:%d\n", v, i);
// printf("0x%llx, index:%d\n", v, i);
}
assert(result == nullptr);
taosHashPut(phash, &v, sizeof(v), NULL, 0);
@ -675,7 +680,7 @@ TEST(testCase, generated_request_id_test) {
taosHashCleanup(phash);
}
TEST(testCase, insert_test) {
TEST(clientCase, insert_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
@ -692,9 +697,8 @@ TEST(testCase, insert_test) {
taos_free_result(pRes);
taos_close(pConn);
}
#endif
TEST(testCase, projection_query_tables) {
TEST(clientCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
@ -752,8 +756,7 @@ TEST(testCase, projection_query_tables) {
taos_close(pConn);
}
#if 0
TEST(testCase, tsbs_perf_test) {
TEST(clientCase, tsbs_perf_test) {
TdThread qid[20] = {0};
for(int32_t i = 0; i < numOfThreads; ++i) {
@ -762,7 +765,7 @@ TEST(testCase, tsbs_perf_test) {
getchar();
}
TEST(testCase, projection_query_stables) {
TEST(clientCase, projection_query_stables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
@ -790,7 +793,7 @@ TEST(testCase, projection_query_stables) {
taos_close(pConn);
}
TEST(testCase, agg_query_tables) {
TEST(clientCase, agg_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
@ -825,7 +828,7 @@ create table tm1 using m1 tags(2);
insert into tm0 values('2021-1-1 1:1:1.120', 1) ('2021-1-1 1:1:2.9', 2) tm1 values('2021-1-1 1:1:1.120', 11) ('2021-1-1 1:1:2.99', 22);
*/
TEST(testCase, async_api_test) {
TEST(clientCase, async_api_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
@ -859,7 +862,7 @@ TEST(testCase, async_api_test) {
taos_close(pConn);
}
TEST(testCase, update_test) {
TEST(clientCase, update_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
@ -895,6 +898,76 @@ TEST(testCase, update_test) {
}
}
#endif
TEST(clientCase, subscription_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
// TAOS_RES* pRes = taos_query(pConn, "create topic topic_t1 as select * from t1");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create topic, code:%s", taos_errstr(pRes));
// taos_free_result(pRes);
// return;
// }
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topic_t1");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
TAOS_FIELD* fields = NULL;
int32_t numOfFields = 0;
int32_t precision = 0;
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t timeout = 5000;
while (1) {
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
char buf[1024];
int32_t rows = 0;
const char* topicName = tmq_get_topic_name(pRes);
const char* dbName = tmq_get_db_name(pRes);
int32_t vgroupId = tmq_get_vgroup_id(pRes);
printf("topic: %s\n", topicName);
printf("db: %s\n", dbName);
printf("vgroup id: %d\n", vgroupId);
while (1) {
TAOS_ROW row = taos_fetch_row(pRes);
if (row == NULL) break;
fields = taos_fetch_fields(pRes);
numOfFields = taos_field_count(pRes);
precision = taos_result_precision(pRes);
rows++;
taos_print_row(buf, row, fields, numOfFields);
printf("precision: %d, row content: %s\n", precision, buf);
}
}
// return rows;
}
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
#pragma GCC diagnostic pop