Merge pull request #12684 from taosdata/feature/tq
enh(tmq): remove old tmq_commit api
This commit is contained in:
commit
0da446bc9a
|
@ -239,7 +239,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||||
msg_process(tmqmessage);
|
msg_process(tmqmessage);
|
||||||
taos_free_result(tmqmessage);
|
taos_free_result(tmqmessage);
|
||||||
|
|
||||||
tmq_commit(tmq, NULL, 1);
|
tmq_commit_async(tmq, NULL, tmq_commit_cb_print, NULL);
|
||||||
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
|
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -232,11 +232,11 @@ DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
|
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
|
||||||
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t wait_time);
|
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t wait_time);
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
|
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async);
|
|
||||||
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param);
|
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets);
|
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets);
|
||||||
|
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param);
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
|
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async);
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset);
|
DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
|
@ -1480,6 +1480,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
|
char clientId[256];
|
||||||
SArray* topicNames; // SArray<char**>
|
SArray* topicNames; // SArray<char**>
|
||||||
} SCMSubscribeReq;
|
} SCMSubscribeReq;
|
||||||
|
|
||||||
|
@ -1487,6 +1488,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
|
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
|
||||||
tlen += taosEncodeString(buf, pReq->cgroup);
|
tlen += taosEncodeString(buf, pReq->cgroup);
|
||||||
|
tlen += taosEncodeString(buf, pReq->clientId);
|
||||||
|
|
||||||
int32_t topicNum = taosArrayGetSize(pReq->topicNames);
|
int32_t topicNum = taosArrayGetSize(pReq->topicNames);
|
||||||
tlen += taosEncodeFixedI32(buf, topicNum);
|
tlen += taosEncodeFixedI32(buf, topicNum);
|
||||||
|
@ -1500,6 +1502,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
|
||||||
static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) {
|
static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) {
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
||||||
buf = taosDecodeStringTo(buf, pReq->cgroup);
|
buf = taosDecodeStringTo(buf, pReq->cgroup);
|
||||||
|
buf = taosDecodeStringTo(buf, pReq->clientId);
|
||||||
|
|
||||||
int32_t topicNum;
|
int32_t topicNum;
|
||||||
buf = taosDecodeFixedI32(buf, &topicNum);
|
buf = taosDecodeFixedI32(buf, &topicNum);
|
||||||
|
|
|
@ -263,7 +263,7 @@ static const SSysDbTableSchema topicSchema[] = {
|
||||||
static const SSysDbTableSchema consumerSchema[] = {
|
static const SSysDbTableSchema consumerSchema[] = {
|
||||||
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
{.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
|
|
|
@ -463,7 +463,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
char appId[TSDB_CGROUP_LEN];
|
char clientId[256];
|
||||||
int8_t updateType; // used only for update
|
int8_t updateType; // used only for update
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
int32_t status;
|
int32_t status;
|
||||||
|
|
|
@ -427,6 +427,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
|
pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
|
||||||
if (pConsumerOld == NULL) {
|
if (pConsumerOld == NULL) {
|
||||||
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
||||||
|
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
|
||||||
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
||||||
pConsumerNew->rebNewTopics = newSub;
|
pConsumerNew->rebNewTopics = newSub;
|
||||||
subscribe.topicNames = NULL;
|
subscribe.topicNames = NULL;
|
||||||
|
@ -848,11 +849,11 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
|
||||||
|
|
||||||
// app id
|
// app id
|
||||||
char appId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
char clientId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
tstrncpy(varDataVal(appId), pConsumer->appId, TSDB_CGROUP_LEN);
|
tstrncpy(varDataVal(clientId), pConsumer->clientId, TSDB_CGROUP_LEN);
|
||||||
varDataSetLen(appId, strlen(varDataVal(appId)));
|
varDataSetLen(clientId, strlen(varDataVal(clientId)));
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)appId, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)clientId, false);
|
||||||
|
|
||||||
// status
|
// status
|
||||||
char status[20 + VARSTR_HEADER_SIZE] = {0};
|
char status[20 + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
|
|
@ -493,7 +493,7 @@ static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTo
|
||||||
|
|
||||||
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
/*SSdb *pSdb = pMnode->pSdb;*/
|
||||||
SMDropTopicReq dropReq = {0};
|
SMDropTopicReq dropReq = {0};
|
||||||
|
|
||||||
if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
||||||
|
@ -502,16 +502,16 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
|
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
|
||||||
// if (pTopic == NULL) {
|
if (pTopic == NULL) {
|
||||||
// if (dropReq.igNotExists) {
|
if (dropReq.igNotExists) {
|
||||||
// mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
|
mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
|
||||||
// return 0;
|
return 0;
|
||||||
// } else {
|
} else {
|
||||||
// terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
|
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
|
||||||
// mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
|
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
|
||||||
// return -1;
|
return -1;
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
|
|
||||||
if (pTopic->refConsumerCnt != 0) {
|
if (pTopic->refConsumerCnt != 0) {
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
@ -528,12 +528,10 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
||||||
|
|
||||||
#if 1
|
|
||||||
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
|
|
@ -409,53 +409,53 @@ sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt;
|
||||||
# row 1
|
# row 1
|
||||||
if $data11 != 4 then
|
if $data11 != 4 then
|
||||||
print ======$data11
|
print ======$data11
|
||||||
# return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data12 != 4 then
|
if $data12 != 4 then
|
||||||
print ======$data12
|
print ======$data12
|
||||||
# return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data13 != 10 then
|
if $data13 != 10 then
|
||||||
print ======$data13
|
print ======$data13
|
||||||
# return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data14 != 3 then
|
if $data14 != 3 then
|
||||||
print ======$data14
|
print ======$data14
|
||||||
# return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data15 != 1 then
|
if $data15 != 1 then
|
||||||
print ======$data15
|
print ======$data15
|
||||||
# return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
# row 2
|
# row 2
|
||||||
if $data21 != 4 then
|
if $data21 != 4 then
|
||||||
print ======$data21
|
print ======$data21
|
||||||
# return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data22 != 4 then
|
if $data22 != 4 then
|
||||||
print ======$data22
|
print ======$data22
|
||||||
# return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data23 != 15 then
|
if $data23 != 15 then
|
||||||
print ======$data23
|
print ======$data23
|
||||||
# return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data24 != 4 then
|
if $data24 != 4 then
|
||||||
print ======$data24
|
print ======$data24
|
||||||
# return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data25 != 3 then
|
if $data25 != 3 then
|
||||||
print ======$data25
|
print ======$data25
|
||||||
# return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -368,7 +368,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||||
/*msg_process(tmqmessage);*/
|
/*msg_process(tmqmessage);*/
|
||||||
taos_free_result(tmqmessage);
|
taos_free_result(tmqmessage);
|
||||||
|
|
||||||
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
|
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit_sync(tmq, NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -105,14 +105,8 @@ void initLogFile() {
|
||||||
|
|
||||||
now = taosTime(NULL);
|
now = taosTime(NULL);
|
||||||
taosLocalTime(&now, &curTime);
|
taosLocalTime(&now, &curTime);
|
||||||
sprintf(filename,"%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt",
|
sprintf(filename, "%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt", configDir, curTime.tm_year + 1900,
|
||||||
configDir,
|
curTime.tm_mon + 1, curTime.tm_mday, curTime.tm_hour, curTime.tm_min, curTime.tm_sec);
|
||||||
curTime.tm_year+1900,
|
|
||||||
curTime.tm_mon+1,
|
|
||||||
curTime.tm_mday,
|
|
||||||
curTime.tm_hour,
|
|
||||||
curTime.tm_min,
|
|
||||||
curTime.tm_sec);
|
|
||||||
// sprintf(filename, "%s/../log/tmqlog.txt", configDir);
|
// sprintf(filename, "%s/../log/tmqlog.txt", configDir);
|
||||||
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||||
if (NULL == pFile) {
|
if (NULL == pFile) {
|
||||||
|
@ -398,7 +392,8 @@ void* consumeThreadFunc(void* param) {
|
||||||
if (pInfo->ifManualCommit) {
|
if (pInfo->ifManualCommit) {
|
||||||
taosFprintfFile(g_fp, "tmq_commit() manual commit when consume end.\n");
|
taosFprintfFile(g_fp, "tmq_commit() manual commit when consume end.\n");
|
||||||
pPrint("tmq_commit() manual commit when consume end.\n");
|
pPrint("tmq_commit() manual commit when consume end.\n");
|
||||||
tmq_commit(pInfo->tmq, NULL, 0);
|
/*tmq_commit(pInfo->tmq, NULL, 0);*/
|
||||||
|
tmq_commit_sync(pInfo->tmq, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
err = tmq_unsubscribe(pInfo->tmq);
|
err = tmq_unsubscribe(pInfo->tmq);
|
||||||
|
|
Loading…
Reference in New Issue