fix(tmq): build sync api based on async api.

This commit is contained in:
Haojun Liao 2023-03-31 15:55:16 +08:00
parent b0fb912da8
commit a88de7e18b
1 changed files with 243 additions and 256 deletions

View File

@ -27,6 +27,8 @@
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000
typedef void (*__tmq_askep_fn_t)(tmq_t* pTmq, int32_t code, SDataBuf* pBuf, void* pParam);
struct SMqMgmt {
int8_t inited;
tmr_h timer;
@ -109,6 +111,10 @@ struct tmq_t {
tsem_t rspSem;
};
typedef struct SAskEpInfo {
int32_t code;
} SAskEpInfo;
enum {
TMQ_VG_STATUS__IDLE = 0,
TMQ_VG_STATUS__WAIT,
@ -171,9 +177,8 @@ typedef struct {
typedef struct {
int64_t refId;
int32_t epoch;
int32_t code;
int32_t async;
tsem_t rspSem;
void* pParam;
__tmq_askep_fn_t pUserFn;
} SMqAskEpCbParam;
typedef struct {
@ -189,16 +194,13 @@ typedef struct {
typedef struct {
int64_t refId;
int32_t epoch;
int8_t automatic;
int8_t async;
int32_t waitingRspNum;
int32_t totalRspNum;
int32_t rspErr;
int32_t code;
tmq_commit_cb* userCb;
/*SArray* successfulOffsets;*/
/*SArray* failedOffsets;*/
void* userParam;
tsem_t rspSem;
} SMqCommitCbParamSet;
typedef struct {
@ -209,12 +211,14 @@ typedef struct {
tmq_t* pTmq;
} SMqCommitCbParam;
static int32_t tmqAskEp(tmq_t* tmq, bool async);
static int32_t doAskEp(tmq_t* tmq);
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
int32_t index, int32_t totalVgroups);
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
static void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param);
static void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param);
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
@ -443,7 +447,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
// taosMemoryFree(pBuf->pData);
// taosMemoryFree(pBuf->pEpSet);
//
// tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
// commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
// return 0;
// }
//
@ -453,7 +457,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
taosMemoryFree(pBuf->pData);
taosMemoryFree(pBuf->pEpSet);
tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
return 0;
}
@ -461,8 +465,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
int32_t index, int32_t totalVgroups) {
STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
if (pOffset == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
return TSDB_CODE_OUT_OF_MEMORY;
}
pOffset->val = pVg->currentOffset;
@ -476,13 +479,13 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
int32_t code = 0;
tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
if (code < 0) {
return -1;
return TSDB_CODE_INVALID_PARA;
}
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
if (buf == NULL) {
taosMemoryFree(pOffset);
return -1;
return TSDB_CODE_OUT_OF_MEMORY;
}
((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
@ -499,7 +502,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
if (pParam == NULL) {
taosMemoryFree(pOffset);
taosMemoryFree(buf);
return -1;
return TSDB_CODE_OUT_OF_MEMORY;
}
pParam->params = pParamSet;
@ -515,7 +518,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
taosMemoryFree(pOffset);
taosMemoryFree(buf);
taosMemoryFree(pParam);
return -1;
return TSDB_CODE_OUT_OF_MEMORY;
}
pMsgSendInfo->msgInfo = (SDataBuf){
@ -546,129 +549,117 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
return 0;
return TSDB_CODE_SUCCESS;
}
static int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
char* topic;
int32_t vgId;
if (TD_RES_TMQ(msg)) {
SMqRspObj* pRspObj = (SMqRspObj*)msg;
topic = pRspObj->topic;
static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam) {
char* pTopicName = NULL;
int32_t vgId = 0;
int32_t code = 0;
if (pRes == NULL || tmq == NULL) {
pCommitFp(tmq, TSDB_CODE_INVALID_PARA, userParam);
return;
}
if (TD_RES_TMQ(pRes)) {
SMqRspObj* pRspObj = (SMqRspObj*)pRes;
pTopicName = pRspObj->topic;
vgId = pRspObj->vgId;
} else if (TD_RES_TMQ_META(msg)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
topic = pMetaRspObj->topic;
} else if (TD_RES_TMQ_META(pRes)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)pRes;
pTopicName = pMetaRspObj->topic;
vgId = pMetaRspObj->vgId;
} else if (TD_RES_TMQ_METADATA(msg)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg;
topic = pRspObj->topic;
} else if (TD_RES_TMQ_METADATA(pRes)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)pRes;
pTopicName = pRspObj->topic;
vgId = pRspObj->vgId;
} else {
return TSDB_CODE_TMQ_INVALID_MSG;
pCommitFp(tmq, TSDB_CODE_TMQ_INVALID_MSG, userParam);
return;
}
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
if (pParamSet == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
return;
}
pParamSet->refId = tmq->refId;
pParamSet->epoch = tmq->epoch;
pParamSet->automatic = 0;
pParamSet->async = async;
pParamSet->userCb = userCb;
pParamSet->userCb = pCommitFp;
pParamSet->userParam = userParam;
tsem_init(&pParamSet->rspSem, 0, 0);
int32_t code = -1;
taosThreadMutexLock(&tmq->lock);
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " user invoked commit offset for %d", tmq->consumerId, numOfTopics);
for (int32_t i = 0; i < numOfTopics; i++) {
tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
int32_t i = 0;
for (; i < numOfTopics; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
if (strcmp(pTopic->topicName, topic) != 0) {
continue;
if (strcmp(pTopic->topicName, pTopicName) == 0) {
break;
}
}
if (i == numOfTopics) {
tscWarn("consumer:0x" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId, pTopicName,
numOfTopics);
taosMemoryFree(pParamSet);
pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
return;
}
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
int32_t j = 0;
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
for (int32_t j = 0; j < numOfVgroups; j++) {
for (j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->vgId != vgId) {
continue;
if (pVg->vgId == vgId) {
break;
}
}
if (j == numOfVgroups) {
tscWarn("consumer:0x" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", tmq->consumerId, vgId,
numOfVgroups, pTopicName);
taosMemoryFree(pParamSet);
pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
return;
}
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
if (doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups) < 0) {
tsem_destroy(&pParamSet->rspSem);
code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups);
// failed to commit, callback user function directly.
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pParamSet);
goto FAIL;
pCommitFp(tmq, code, userParam);
}
goto HANDLE_RSP;
}
}
}
HANDLE_RSP:
if (pParamSet->totalRspNum == 0) {
tsem_destroy(&pParamSet->rspSem);
} else { // do not perform commit, callback user function directly.
taosMemoryFree(pParamSet);
taosThreadMutexUnlock(&tmq->lock);
return 0;
pCommitFp(tmq, code, userParam);
}
if (!async) {
taosThreadMutexUnlock(&tmq->lock);
tsem_wait(&pParamSet->rspSem);
code = pParamSet->rspErr;
tsem_destroy(&pParamSet->rspSem);
taosMemoryFree(pParamSet);
return code;
} else {
code = 0;
}
FAIL:
taosThreadMutexUnlock(&tmq->lock);
if (code != 0 && async) {
userCb(tmq, code, userParam);
}
return 0;
}
static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb, void* userParam) {
int32_t code = -1;
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
if (pParamSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
if (async) {
if (automatic) {
tmq->commitCb(tmq, code, tmq->commitCbUserParam);
} else {
userCb(tmq, code, userParam);
}
}
return -1;
pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
return;
}
pParamSet->refId = tmq->refId;
pParamSet->epoch = tmq->epoch;
pParamSet->automatic = automatic;
pParamSet->async = async;
pParamSet->userCb = userCb;
pParamSet->userCb = pCommitFp;
pParamSet->userParam = userParam;
tsem_init(&pParamSet->rspSem, 0, 0);
// init as 1 to prevent concurrency issue
pParamSet->waitingRspNum = 1;
taosThreadMutexLock(&tmq->lock);
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
@ -682,7 +673,7 @@ static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_comm
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups);
int32_t code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups);
if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 " failed, code:%s ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->committedOffset.version, tstrerror(terrno),
@ -693,7 +684,7 @@ static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_comm
// update the offset value.
pVg->committedOffset = pVg->currentOffset;
} else {
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, not commit, current:%" PRId64 ", ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset.version, j + 1, numOfVgroups);
}
}
@ -701,39 +692,16 @@ static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_comm
tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1,
numOfTopics);
taosThreadMutexUnlock(&tmq->lock);
// no request is sent
if (pParamSet->totalRspNum == 0) {
tsem_destroy(&pParamSet->rspSem);
taosMemoryFree(pParamSet);
return 0;
pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
return;
}
// count down since waiting rsp num init as 1
tmqCommitRspCountDown(pParamSet, tmq->consumerId, "", 0);
if (!async) {
tsem_wait(&pParamSet->rspSem);
code = pParamSet->rspErr;
tsem_destroy(&pParamSet->rspSem);
taosMemoryFree(pParamSet);
#if 0
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
}
return code;
}
static int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
void* userParam) {
if (msg) { // user invoked commit
return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
} else { // this for auto commit
return doAutoCommit(tmq, automatic, async, userCb, userParam);
}
commitRspCountDown(pParamSet, tmq->consumerId, "", 0);
}
static void generateTimedTask(int64_t refId, int32_t type) {
@ -855,7 +823,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
while (pTaskType != NULL) {
if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
tmqAskEp(pTmq, true);
asyncAskEp(pTmq, addToQueueCallbackFn, NULL);
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId;
@ -863,12 +831,11 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam);
asyncCommitAllOffsets(pTmq, pTmq->commitCb, pTmq->commitCbUserParam);
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId;
tscDebug("consumer:0x%" PRIx64 " commit to vnode(s) in %.2fs", pTmq->consumerId,
tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
pTmq->autoCommitInterval / 1000.0);
taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
@ -1213,7 +1180,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
}
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
if (retryCnt++ > MAX_RETRY_COUNT) {
goto FAIL;
}
@ -1513,28 +1480,30 @@ static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
return set;
}
static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
int8_t async = pParam->async;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
if (tmq == NULL) {
if (!async) {
tsem_destroy(&pParam->rspSem);
} else {
taosMemoryFree(pParam);
}
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
pParam->pUserFn(tmq, terrno, NULL, pParam->pParam);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
return -1;
taosMemoryFree(pParam);
return terrno;
}
pParam->code = code;
if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64 ", get topic endpoint error, async:%d, code:%s", tmq->consumerId, pParam->async,
tstrerror(code));
goto END;
tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
pParam->pUserFn(tmq, code, NULL, pParam->pParam);
taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pParam);
return code;
}
// tmq's epoch is monotonically increase,
@ -1545,6 +1514,7 @@ static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
if (head->epoch <= epoch) {
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
tmq->consumerId, head->epoch, epoch);
if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) {
SMqAskEpRsp rsp;
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
@ -1553,45 +1523,17 @@ static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
tDeleteSMqAskEpRsp(&rsp);
}
goto END;
}
} else {
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
head->epoch, epoch);
if (!async) {
SMqAskEpRsp rsp;
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
tmqUpdateEp(tmq, head->epoch, &rsp);
tDeleteSMqAskEpRsp(&rsp);
} else {
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
if (pWrapper == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto END;
pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
}
pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
pWrapper->epoch = head->epoch;
memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
taosWriteQitem(tmq->mqueue, pWrapper);
tsem_post(&tmq->rspSem);
}
END:
taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
if (!async) {
tsem_post(&pParam->rspSem);
} else {
taosMemoryFree(pParam);
}
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pParam);
return code;
}
@ -1980,7 +1922,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
if (retryCnt++ > 40) {
return NULL;
}
@ -2162,96 +2104,159 @@ const char* tmq_get_table_name(TAOS_RES* res) {
return NULL;
}
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
tmqCommitInner(tmq, msg, 0, 1, cb, param);
}
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
}
int32_t tmqAskEp(tmq_t* tmq, bool async) {
int32_t code = TSDB_CODE_SUCCESS;
#if 0
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
if (epStatus == 1) {
int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
tscTrace("consumer:0x%" PRIx64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
if (epSkipCnt < 5000) return 0;
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
if (pRes == NULL) { // here needs to commit all offsets.
asyncCommitAllOffsets(tmq, cb, param);
} else { // only commit one offset
asyncCommitOffset(tmq, pRes, cb, param);
}
atomic_store_32(&tmq->epSkipCnt, 0);
#endif
}
typedef struct SSyncCommitInfo {
tsem_t sem;
int32_t code;
} SSyncCommitInfo;
static void commitCallBackFn(tmq_t *pTmq, int32_t code, void* param) {
SSyncCommitInfo* pInfo = (SSyncCommitInfo*) param;
pInfo->code = code;
tsem_post(&pInfo->sem);
}
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
int32_t code = 0;
SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
tsem_init(&pInfo->sem, 0, 0);
pInfo->code = 0;
if (pRes == NULL) {
asyncCommitOffset(tmq, pRes, commitCallBackFn, pInfo);
} else {
asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
}
tsem_wait(&pInfo->sem);
code = pInfo->code;
taosMemoryFree(pInfo);
tscDebug("consumer:0x%"PRIx64" sync commit done, code:%s", tmq->consumerId, tstrerror(code));
return code;
}
void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
SAskEpInfo* pInfo = param;
pInfo->code = code;
if (code == TSDB_CODE_SUCCESS) {
SMqRspHead* head = pDataBuf->pData;
SMqAskEpRsp rsp;
tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp);
tmqUpdateEp(pTmq, head->epoch, &rsp);
tDeleteSMqAskEpRsp(&rsp);
}
tsem_post(&pTmq->rspSem);
}
void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return;
}
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
if (pWrapper == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return;
}
SMqRspHead* head = pDataBuf->pData;
pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
pWrapper->epoch = head->epoch;
memcpy(&pWrapper->msg, pDataBuf->pData, sizeof(SMqRspHead));
tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &pWrapper->msg);
taosWriteQitem(pTmq->mqueue, pWrapper);
}
int32_t doAskEp(tmq_t* pTmq) {
SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
asyncAskEp(pTmq, updateEpCallbackFn, pInfo);
tsem_wait(&pTmq->rspSem);
int32_t code = pInfo->code;
taosMemoryFree(pInfo);
return code;
}
void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
SMqAskEpReq req = {0};
req.consumerId = tmq->consumerId;
req.epoch = tmq->epoch;
strcpy(req.cgroup, tmq->groupId);
req.consumerId = pTmq->consumerId;
req.epoch = pTmq->epoch;
strcpy(req.cgroup, pTmq->groupId);
int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
if (tlen < 0) {
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", tmq->consumerId);
return -1;
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId);
askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
return;
}
void* pReq = taosMemoryCalloc(1, tlen);
if (pReq == NULL) {
tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", tmq->consumerId, tlen);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
return;
}
if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", tmq->consumerId, tlen);
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
taosMemoryFree(pReq);
return -1;
askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
return;
}
SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
if (pParam == NULL) {
tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", tmq->consumerId);
tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
taosMemoryFree(pReq);
return -1;
askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
return;
}
pParam->refId = tmq->refId;
pParam->epoch = tmq->epoch;
pParam->async = async;
tsem_init(&pParam->rspSem, 0, 0);
pParam->refId = pTmq->refId;
pParam->epoch = pTmq->epoch;
pParam->pUserFn = askEpFn;
pParam->pParam = param;
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam);
taosMemoryFree(pReq);
return -1;
askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
return;
}
sendInfo->msgInfo = (SDataBuf){
.pData = pReq,
.len = tlen,
.handle = NULL,
};
sendInfo->msgInfo = (SDataBuf){ .pData = pReq, .len = tlen, .handle = NULL };
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqAskEpCb;
sendInfo->fp = askEpCallbackFn;
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d, reqId:0x%" PRIx64, tmq->consumerId, async,
sendInfo->requestId);
SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (!async) {
tsem_wait(&pParam->rspSem);
code = pParam->code;
taosMemoryFree(pParam);
}
return code;
asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
}
int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {
@ -2263,38 +2268,20 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) {
if (!pParamSet->async) {
tsem_destroy(&pParamSet->rspSem);
}
taosMemoryFree(pParamSet);
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
return -1;
}
// if no more waiting rsp
if (pParamSet->async) {
// call async cb func
if (pParamSet->automatic && tmq->commitCb) {
tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
} else if (!pParamSet->automatic && pParamSet->userCb) { // sem post
pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
}
pParamSet->userCb(tmq, pParamSet->code, pParamSet->userParam);
taosMemoryFree(pParamSet);
} else {
tsem_post(&pParamSet->rspSem);
}
#if 0
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
taosReleaseRef(tmqMgmt.rsetId, refId);
return 0;
}
void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
if (waitingRspNum == 0) {
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,