refactor: tmq client
This commit is contained in:
parent
d2fc6215c6
commit
68e0ddfa88
|
@ -1902,7 +1902,7 @@ static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) {
|
||||||
if (pRebInfo == NULL) {
|
if (pRebInfo == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
strcpy(pRebInfo->key, key);
|
tstrncpy(pRebInfo->key, key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||||
pRebInfo->lostConsumers = taosArrayInit(0, sizeof(int64_t));
|
pRebInfo->lostConsumers = taosArrayInit(0, sizeof(int64_t));
|
||||||
if (pRebInfo->lostConsumers == NULL) {
|
if (pRebInfo->lostConsumers == NULL) {
|
||||||
goto _err;
|
goto _err;
|
||||||
|
|
|
@ -233,12 +233,12 @@ void tmq_conf_destroy(tmq_conf_t* conf) {
|
||||||
|
|
||||||
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
|
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
|
||||||
if (strcmp(key, "group.id") == 0) {
|
if (strcmp(key, "group.id") == 0) {
|
||||||
strcpy(conf->groupId, value);
|
tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
|
||||||
return TMQ_CONF_OK;
|
return TMQ_CONF_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strcmp(key, "client.id") == 0) {
|
if (strcmp(key, "client.id") == 0) {
|
||||||
strcpy(conf->clientId, value);
|
tstrncpy(conf->clientId, value, 256);
|
||||||
return TMQ_CONF_OK;
|
return TMQ_CONF_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -452,7 +452,6 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
|
||||||
int32_t code;
|
int32_t code;
|
||||||
tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
|
tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
ASSERT(0);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
|
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
|
||||||
|
@ -464,15 +463,22 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
tEncoderInit(&encoder, abuf, len);
|
tEncoderInit(&encoder, abuf, len);
|
||||||
tEncodeSTqOffset(&encoder, pOffset);
|
tEncodeSTqOffset(&encoder, pOffset);
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
// build param
|
// build param
|
||||||
SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
|
SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
|
||||||
|
if (pParam == NULL) {
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
pParam->params = pParamSet;
|
pParam->params = pParamSet;
|
||||||
pParam->pOffset = pOffset;
|
pParam->pOffset = pOffset;
|
||||||
|
|
||||||
// build send info
|
// build send info
|
||||||
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||||
if (pMsgSendInfo == NULL) {
|
if (pMsgSendInfo == NULL) {
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
taosMemoryFree(pParam);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pMsgSendInfo->msgInfo = (SDataBuf){
|
pMsgSendInfo->msgInfo = (SDataBuf){
|
||||||
|
@ -547,6 +553,8 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm
|
||||||
|
|
||||||
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
|
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
|
||||||
if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
|
if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
|
||||||
|
tsem_destroy(&pParamSet->rspSem);
|
||||||
|
taosMemoryFree(pParamSet);
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
goto HANDLE_RSP;
|
goto HANDLE_RSP;
|
||||||
|
@ -565,6 +573,7 @@ HANDLE_RSP:
|
||||||
tsem_wait(&pParamSet->rspSem);
|
tsem_wait(&pParamSet->rspSem);
|
||||||
code = pParamSet->rspErr;
|
code = pParamSet->rspErr;
|
||||||
tsem_destroy(&pParamSet->rspSem);
|
tsem_destroy(&pParamSet->rspSem);
|
||||||
|
taosMemoryFree(pParamSet);
|
||||||
return code;
|
return code;
|
||||||
} else {
|
} else {
|
||||||
code = 0;
|
code = 0;
|
||||||
|
@ -587,7 +596,14 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t
|
||||||
|
|
||||||
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
|
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
|
||||||
if (pParamSet == NULL) {
|
if (pParamSet == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
if (async) {
|
||||||
|
if (automatic) {
|
||||||
|
tmq->commitCb(tmq, code, tmq->commitCbUserParam);
|
||||||
|
} else {
|
||||||
|
userCb(tmq, code, userParam);
|
||||||
|
}
|
||||||
|
}
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -642,16 +658,6 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t
|
||||||
code = pParamSet->rspErr;
|
code = pParamSet->rspErr;
|
||||||
tsem_destroy(&pParamSet->rspSem);
|
tsem_destroy(&pParamSet->rspSem);
|
||||||
taosMemoryFree(pParamSet);
|
taosMemoryFree(pParamSet);
|
||||||
} else {
|
|
||||||
code = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (code != 0 && async) {
|
|
||||||
if (automatic) {
|
|
||||||
tmq->commitCb(tmq, code, tmq->commitCbUserParam);
|
|
||||||
} else {
|
|
||||||
userCb(tmq, code, userParam);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
|
@ -722,6 +728,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
||||||
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||||
if (sendInfo == NULL) {
|
if (sendInfo == NULL) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
|
goto OVER;
|
||||||
}
|
}
|
||||||
sendInfo->msgInfo = (SDataBuf){
|
sendInfo->msgInfo = (SDataBuf){
|
||||||
.pData = pReq,
|
.pData = pReq,
|
||||||
|
@ -871,8 +878,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
|
tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
|
||||||
if (pTmq == NULL) {
|
if (pTmq == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
|
tscError("setting up new consumer failed since %s, consumer group %s", terrstr(), conf->groupId);
|
||||||
pTmq->groupId);
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1061,9 +1067,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
code = 0;
|
code = 0;
|
||||||
FAIL:
|
FAIL:
|
||||||
if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
|
if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
|
||||||
if (code != 0 && buf) {
|
taosMemoryFree(buf);
|
||||||
taosMemoryFree(buf);
|
|
||||||
}
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1101,7 +1106,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
|
if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
|
||||||
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
|
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
|
||||||
if (pRspWrapper == NULL) {
|
if (pRspWrapper == NULL) {
|
||||||
taosMemoryFree(pMsg->pData);
|
|
||||||
tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
|
tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
|
||||||
goto CREATE_MSG_FAIL;
|
goto CREATE_MSG_FAIL;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue