refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-03-11 11:25:37 +08:00
parent 1e01f90283
commit 9e860f02ac
3 changed files with 104 additions and 111 deletions

View File

@ -25,13 +25,6 @@
extern "C" {
#endif
// TODO remove it
enum {
TMQ_CONF__RESET_OFFSET__NONE = -3,
TMQ_CONF__RESET_OFFSET__EARLIEAST = -2,
TMQ_CONF__RESET_OFFSET__LATEST = -1,
};
// clang-format off
#define IS_META_MSG(x) ( \
x == TDMT_VND_CREATE_STB \

View File

@ -20,18 +20,10 @@
#include "tdatablock.h"
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
#include "tqueue.h"
#include "tref.h"
#include "ttimer.h"
#if 0
#undef tsem_post
#define tsem_post(x) \
tscInfo("call sem post at %s %d", __FUNCTION__, __LINE__); \
sem_post(x)
#endif
struct SMqMgmt {
int8_t inited;
tmr_h timer;
@ -216,6 +208,7 @@ typedef struct {
static int32_t tmqAskEp(tmq_t* tmq, bool async);
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
@ -227,7 +220,7 @@ tmq_conf_t* tmq_conf_new() {
conf->withTbName = false;
conf->autoCommit = true;
conf->autoCommitInterval = 5000;
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST;
conf->hbBgEnable = true;
return conf;
@ -235,29 +228,35 @@ tmq_conf_t* tmq_conf_new() {
void tmq_conf_destroy(tmq_conf_t* conf) {
if (conf) {
if (conf->ip) taosMemoryFree(conf->ip);
if (conf->user) taosMemoryFree(conf->user);
if (conf->pass) taosMemoryFree(conf->pass);
if (conf->ip) {
taosMemoryFree(conf->ip);
}
if (conf->user) {
taosMemoryFree(conf->user);
}
if (conf->pass) {
taosMemoryFree(conf->pass);
}
taosMemoryFree(conf);
}
}
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
if (strcmp(key, "group.id") == 0) {
if (strcasecmp(key, "group.id") == 0) {
tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
return TMQ_CONF_OK;
}
if (strcmp(key, "client.id") == 0) {
if (strcasecmp(key, "client.id") == 0) {
tstrncpy(conf->clientId, value, 256);
return TMQ_CONF_OK;
}
if (strcmp(key, "enable.auto.commit") == 0) {
if (strcmp(value, "true") == 0) {
if (strcasecmp(key, "enable.auto.commit") == 0) {
if (strcasecmp(value, "true") == 0) {
conf->autoCommit = true;
return TMQ_CONF_OK;
} else if (strcmp(value, "false") == 0) {
} else if (strcasecmp(value, "false") == 0) {
conf->autoCommit = false;
return TMQ_CONF_OK;
} else {
@ -265,31 +264,31 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
}
}
if (strcmp(key, "auto.commit.interval.ms") == 0) {
if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
conf->autoCommitInterval = atoi(value);
return TMQ_CONF_OK;
}
if (strcmp(key, "auto.offset.reset") == 0) {
if (strcmp(value, "none") == 0) {
conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
if (strcasecmp(key, "auto.offset.reset") == 0) {
if (strcasecmp(value, "none") == 0) {
conf->resetOffset = TMQ_OFFSET__RESET_NONE;
return TMQ_CONF_OK;
} else if (strcmp(value, "earliest") == 0) {
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
} else if (strcasecmp(value, "earliest") == 0) {
conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST;
return TMQ_CONF_OK;
} else if (strcmp(value, "latest") == 0) {
conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
} else if (strcasecmp(value, "latest") == 0) {
conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
return TMQ_CONF_OK;
} else {
return TMQ_CONF_INVALID;
}
}
if (strcmp(key, "msg.with.table.name") == 0) {
if (strcmp(value, "true") == 0) {
if (strcasecmp(key, "msg.with.table.name") == 0) {
if (strcasecmp(value, "true") == 0) {
conf->withTbName = true;
return TMQ_CONF_OK;
} else if (strcmp(value, "false") == 0) {
} else if (strcasecmp(value, "false") == 0) {
conf->withTbName = false;
return TMQ_CONF_OK;
} else {
@ -297,11 +296,11 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
}
}
if (strcmp(key, "experimental.snapshot.enable") == 0) {
if (strcmp(value, "true") == 0) {
if (strcasecmp(key, "experimental.snapshot.enable") == 0) {
if (strcasecmp(value, "true") == 0) {
conf->snapEnable = true;
return TMQ_CONF_OK;
} else if (strcmp(value, "false") == 0) {
} else if (strcasecmp(value, "false") == 0) {
conf->snapEnable = false;
return TMQ_CONF_OK;
} else {
@ -309,42 +308,40 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
}
}
if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) {
conf->snapBatchSize = atoi(value);
return TMQ_CONF_OK;
}
if (strcmp(key, "enable.heartbeat.background") == 0) {
if (strcmp(value, "true") == 0) {
if (strcasecmp(key, "enable.heartbeat.background") == 0) {
if (strcasecmp(value, "true") == 0) {
conf->hbBgEnable = true;
return TMQ_CONF_OK;
} else if (strcmp(value, "false") == 0) {
} else if (strcasecmp(value, "false") == 0) {
conf->hbBgEnable = false;
return TMQ_CONF_OK;
} else {
return TMQ_CONF_INVALID;
}
return TMQ_CONF_OK;
}
if (strcmp(key, "td.connect.ip") == 0) {
if (strcasecmp(key, "td.connect.ip") == 0) {
conf->ip = taosStrdup(value);
return TMQ_CONF_OK;
}
if (strcmp(key, "td.connect.user") == 0) {
if (strcasecmp(key, "td.connect.user") == 0) {
conf->user = taosStrdup(value);
return TMQ_CONF_OK;
}
if (strcmp(key, "td.connect.pass") == 0) {
if (strcasecmp(key, "td.connect.pass") == 0) {
conf->pass = taosStrdup(value);
return TMQ_CONF_OK;
}
if (strcmp(key, "td.connect.port") == 0) {
if (strcasecmp(key, "td.connect.port") == 0) {
conf->port = atoi(value);
return TMQ_CONF_OK;
}
if (strcmp(key, "td.connect.db") == 0) {
/*conf->db = taosStrdup(value);*/
if (strcasecmp(key, "td.connect.db") == 0) {
return TMQ_CONF_OK;
}
@ -352,7 +349,6 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
}
tmq_list_t* tmq_list_new() {
//
return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
}
@ -382,13 +378,31 @@ char** tmq_list_to_c_array(const tmq_list_t* list) {
return container->pData;
}
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) {
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
if (waitingRspNum == 0) {
tmqCommitDone(pParamSet);
static void updateVgEpset(tmq_t* pTmq, SMqCommitCbParam* pParam, SEpSet* pEpSet) {
int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
for(int32_t i = 0; i < numOfTopics; ++i) {
SMqClientTopic* pTopic = taosArrayGet(pTmq->clientTopics, i);
if (strcmp(pTopic->topicName, pParam->topicName) != 0) {
continue;
}
int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
for(int32_t j = 0; j < numOfVgs; ++j) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
if (pClientVg->vgId == pParam->vgId) {
SEp* pEp = GET_ACTIVE_EP(pEpSet);
SEp* pOld = GET_ACTIVE_EP(&(pClientVg->epSet));
uDebug("subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d", pParam->pOffset->subKey, pParam->vgId,
pEp->fqdn, pEp->port, pOld->fqdn, pOld->port);
pClientVg->epSet = *pEpSet;
break;
}
}
break;
}
}
// todo retry to send the commit if failed
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
@ -402,33 +416,10 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
}
#endif
// update the epset if needed
if (pBuf->pEpSet != NULL) {
// todo extract method
taosThreadMutexLock(&pParam->pTmq->lock);
int32_t numOfTopics = taosArrayGetSize(pParam->pTmq->clientTopics);
for(int32_t i = 0; i < numOfTopics; ++i) {
SMqClientTopic* pTopic = taosArrayGet(pParam->pTmq->clientTopics, i);
if (strcmp(pTopic->topicName, pParam->topicName) != 0) {
continue;
}
int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
for(int32_t j = 0; j < numOfVgs; ++j) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
if (pClientVg->vgId == pParam->vgId) {
SEp* pEp = GET_ACTIVE_EP(pBuf->pEpSet);
SEp* pOld = GET_ACTIVE_EP(&(pClientVg->epSet));
uDebug("subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d", pParam->pOffset->subKey, pParam->vgId,
pEp->fqdn, pEp->port, pOld->fqdn, pOld->port);
pClientVg->epSet = *pBuf->pEpSet;
break;
}
}
break;
}
updateVgEpset(pParam->pTmq, pParam, pBuf->pEpSet);
taosThreadMutexUnlock(&pParam->pTmq->lock);
}
@ -436,10 +427,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
taosMemoryFree(pBuf->pData);
taosMemoryFree(pBuf->pEpSet);
/*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
* pOffset->version);*/
tmqCommitRspCountDown(pParamSet);
tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
return 0;
}
@ -522,7 +510,6 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, const char* pTopic
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->fp = tmqCommitCb;
pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
// send msg
atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
atomic_add_fetch_32(&pParamSet->totalRspNum, 1);
@ -652,9 +639,10 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
for (int32_t i = 0; i < numOfTopics; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
// todo race condition: fix it
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName,
numOfVgroups);
for (int32_t j = 0; j < numOfVgroups; j++) {
SMqClientVg clientVg = *(SMqClientVg*)taosArrayGet(pTopic->vgs, j);
if (clientVg.currentOffset.type > 0 && !tOffsetEqual(&clientVg.currentOffset, &clientVg.committedOffset)) {
@ -668,6 +656,8 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
}
}
tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum,
numOfTopics);
taosThreadMutexUnlock(&tmq->lock);
// no request is sent
@ -678,7 +668,7 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
}
// count down since waiting rsp num init as 1
tmqCommitRspCountDown(pParamSet);
tmqCommitRspCountDown(pParamSet, tmq->consumerId, "", 0);
if (!async) {
tsem_wait(&pParamSet->rspSem);
@ -696,9 +686,9 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
static int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
void* userParam) {
if (msg) {
if (msg) { // user invoked commit?
return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
} else {
} else { // this for auto commit
return tmqCommitConsumerImpl(tmq, automatic, async, userCb, userParam);
}
}
@ -986,7 +976,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
if (pTmq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tscError("failed to create consumer, consumer group %s, code:%s", conf->groupId, terrstr());
tscError("failed to create consumer, groupId:%s, code:%s", conf->groupId, terrstr());
return NULL;
}
@ -1002,9 +992,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL ||
conf->groupId[0] == 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tscError("consumer:0x%" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(),
pTmq->groupId);
goto FAIL;
goto _failed;
}
// init status
@ -1034,22 +1024,20 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
pTmq->groupId);
goto FAIL;
goto _failed;
}
// init connection
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
if (pTmq->pTscObj == NULL) {
tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
pTmq->groupId);
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
tsem_destroy(&pTmq->rspSem);
goto FAIL;
goto _failed;
}
pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
if (pTmq->refId < 0) {
tmqFreeImpl(pTmq);
return NULL;
goto _failed;
}
if (pTmq->hbBgEnable) {
@ -1058,16 +1046,17 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
}
tscInfo("consumer:0x%" PRIx64 " is setup, groupId:%s", pTmq->consumerId, pTmq->groupId);
char buf[80] = {0};
STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
tFormatOffset(buf, tListLen(buf), &offset);
tscInfo("consumer:0x%" PRIx64 " is setup, groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, backgroudHB:%d",
pTmq->consumerId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval, buf,
pTmq->hbBgEnable);
return pTmq;
FAIL:
if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
if (pTmq->qall) taosFreeQall(pTmq->qall);
taosMemoryFree(pTmq);
_failed:
tmqFreeImpl(pTmq);
return NULL;
}
@ -2123,10 +2112,10 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
// call async cb func
if (pParamSet->automatic && tmq->commitCb) {
tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
} else if (!pParamSet->automatic && pParamSet->userCb) {
// sem post
} else if (!pParamSet->automatic && pParamSet->userCb) { // sem post
pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
}
taosMemoryFree(pParamSet);
} else {
tsem_post(&pParamSet->rspSem);
@ -2137,4 +2126,14 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
return 0;
}
}
void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
waitingRspNum);
if (waitingRspNum == 0) {
tmqCommitDone(pParamSet);
}
}

View File

@ -6586,8 +6586,9 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
snprintf(buf, maxLen, "offset(snapshot) uid:%" PRId64 " ts:%" PRId64, pVal->uid, pVal->ts);
} else {
ASSERT(0);
return TSDB_CODE_INVALID_PARA;
}
return 0;
}