opti:ask ep logic in tmq
This commit is contained in:
parent
f4ec83025d
commit
d707f69a6c
|
@ -29,8 +29,6 @@
|
||||||
|
|
||||||
#define OFFSET_IS_RESET_OFFSET(_of) ((_of) < 0)
|
#define OFFSET_IS_RESET_OFFSET(_of) ((_of) < 0)
|
||||||
|
|
||||||
typedef void (*__tmq_askep_fn_t)(tmq_t* pTmq, int32_t code, SDataBuf* pBuf, void* pParam);
|
|
||||||
|
|
||||||
struct SMqMgmt {
|
struct SMqMgmt {
|
||||||
int8_t inited;
|
int8_t inited;
|
||||||
tmr_h timer;
|
tmr_h timer;
|
||||||
|
@ -189,8 +187,8 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t refId;
|
int64_t refId;
|
||||||
|
bool sync;
|
||||||
void* pParam;
|
void* pParam;
|
||||||
__tmq_askep_fn_t pUserFn;
|
|
||||||
} SMqAskEpCbParam;
|
} SMqAskEpCbParam;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -252,13 +250,12 @@ typedef struct SSyncCommitInfo {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
} SSyncCommitInfo;
|
} SSyncCommitInfo;
|
||||||
|
|
||||||
static int32_t doAskEp(tmq_t* tmq);
|
static int32_t syncAskEp(tmq_t* tmq);
|
||||||
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
|
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
|
||||||
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
|
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
|
||||||
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet);
|
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet);
|
||||||
static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
|
static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
|
||||||
static void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param);
|
static void askEp(tmq_t* pTmq, void* param, bool sync);
|
||||||
static void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param);
|
|
||||||
|
|
||||||
tmq_conf_t* tmq_conf_new() {
|
tmq_conf_t* tmq_conf_new() {
|
||||||
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
|
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
|
||||||
|
@ -848,7 +845,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
||||||
|
|
||||||
while (pTaskType != NULL) {
|
while (pTaskType != NULL) {
|
||||||
if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
|
if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
|
||||||
asyncAskEp(pTmq, addToQueueCallbackFn, NULL);
|
askEp(pTmq, NULL, false);
|
||||||
|
|
||||||
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
|
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
|
||||||
*pRefId = pTmq->refId;
|
*pRefId = pTmq->refId;
|
||||||
|
@ -876,7 +873,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
|
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
|
||||||
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
|
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
|
||||||
// do nothing
|
// do nothing
|
||||||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
|
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
|
||||||
|
@ -907,8 +904,6 @@ static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
|
||||||
taosArrayDestroy(pRsp->taosxRsp.createTableLen);
|
taosArrayDestroy(pRsp->taosxRsp.createTableLen);
|
||||||
taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
|
taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmqClearUnhandleMsg(tmq_t* tmq) {
|
void tmqClearUnhandleMsg(tmq_t* tmq) {
|
||||||
|
@ -1222,7 +1217,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t retryCnt = 0;
|
int32_t retryCnt = 0;
|
||||||
while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
|
while (TSDB_CODE_MND_CONSUMER_NOT_READY == syncAskEp(tmq)) {
|
||||||
if (retryCnt++ > MAX_RETRY_COUNT) {
|
if (retryCnt++ > MAX_RETRY_COUNT) {
|
||||||
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId);
|
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId);
|
||||||
code = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
code = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
||||||
|
@ -1546,48 +1541,6 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
|
||||||
return set;
|
return set;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
|
|
||||||
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
|
|
||||||
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
|
|
||||||
if (tmq == NULL) {
|
|
||||||
code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
|
|
||||||
goto END;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
|
|
||||||
taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
|
|
||||||
goto END;
|
|
||||||
}
|
|
||||||
|
|
||||||
SMqRspHead* head = pMsg->pData;
|
|
||||||
int32_t epoch = atomic_load_32(&tmq->epoch);
|
|
||||||
if (head->epoch <= epoch) {
|
|
||||||
tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
|
|
||||||
tmq->consumerId, head->epoch, epoch);
|
|
||||||
|
|
||||||
if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) {
|
|
||||||
SMqAskEpRsp rsp;
|
|
||||||
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
|
|
||||||
int8_t flag = (taosArrayGetSize(rsp.topics) == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
|
|
||||||
atomic_store_8(&tmq->status, flag);
|
|
||||||
tDeleteSMqAskEpRsp(&rsp);
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
|
|
||||||
head->epoch, epoch);
|
|
||||||
}
|
|
||||||
taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
|
|
||||||
|
|
||||||
END:
|
|
||||||
pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
|
|
||||||
taosMemoryFree(pMsg->pEpSet);
|
|
||||||
taosMemoryFree(pMsg->pData);
|
|
||||||
taosMemoryFree(pParam);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
|
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
|
||||||
int32_t groupLen = strlen(tmq->groupId);
|
int32_t groupLen = strlen(tmq->groupId);
|
||||||
memcpy(pReq->subKey, tmq->groupId, groupLen);
|
memcpy(pReq->subKey, tmq->groupId, groupLen);
|
||||||
|
@ -1789,7 +1742,7 @@ end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper) {
|
static int32_t tmqHandleEpRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper) {
|
||||||
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
|
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
|
||||||
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
|
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
|
||||||
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
|
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
|
||||||
|
@ -1878,9 +1831,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
||||||
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
|
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
|
||||||
", total:%" PRId64 ", reqId:0x%" PRIx64,
|
", total:%" PRId64 ", reqId:0x%" PRIx64,
|
||||||
tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
|
tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
|
||||||
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
|
||||||
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
|
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
|
||||||
taosFreeQitem(pollRspWrapper);
|
tmqFreeRspWrapper(pRspWrapper);
|
||||||
|
taosFreeQitem(pRspWrapper);
|
||||||
} else { // build rsp
|
} else { // build rsp
|
||||||
int64_t numOfRows = 0;
|
int64_t numOfRows = 0;
|
||||||
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
|
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
|
||||||
|
@ -1890,7 +1843,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
||||||
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
|
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
|
||||||
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
|
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
|
||||||
pollRspWrapper->reqId);
|
pollRspWrapper->reqId);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pRspWrapper);
|
||||||
taosWUnLockLatch(&tmq->lock);
|
taosWUnLockLatch(&tmq->lock);
|
||||||
return pRsp;
|
return pRsp;
|
||||||
}
|
}
|
||||||
|
@ -1899,11 +1852,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
||||||
tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||||
tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
|
tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
|
||||||
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
|
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
|
||||||
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
tmqFreeRspWrapper(pRspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pRspWrapper);
|
||||||
}
|
}
|
||||||
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
||||||
// todo handle the wal range and epset for each vgroup
|
|
||||||
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
||||||
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
||||||
|
|
||||||
|
@ -1924,15 +1876,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
||||||
updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true);
|
updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true);
|
||||||
// build rsp
|
// build rsp
|
||||||
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
|
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pRspWrapper);
|
||||||
taosWUnLockLatch(&tmq->lock);
|
taosWUnLockLatch(&tmq->lock);
|
||||||
return pRsp;
|
return pRsp;
|
||||||
} else {
|
} else {
|
||||||
tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||||
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
|
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
|
||||||
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
|
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
|
||||||
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
tmqFreeRspWrapper(pRspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pRspWrapper);
|
||||||
}
|
}
|
||||||
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
|
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
|
||||||
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
||||||
|
@ -1956,8 +1908,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
||||||
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64,
|
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64,
|
||||||
tmq->consumerId, pVg->vgId, pVg->numOfRows, pollRspWrapper->reqId);
|
tmq->consumerId, pVg->vgId, pVg->numOfRows, pollRspWrapper->reqId);
|
||||||
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
|
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
|
||||||
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
tmqFreeRspWrapper(pRspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pRspWrapper);
|
||||||
taosWUnLockLatch(&tmq->lock);
|
taosWUnLockLatch(&tmq->lock);
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1982,20 +1934,25 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
||||||
tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
|
tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
|
||||||
tmq->totalRows, pollRspWrapper->reqId);
|
tmq->totalRows, pollRspWrapper->reqId);
|
||||||
|
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pRspWrapper);
|
||||||
taosWUnLockLatch(&tmq->lock);
|
taosWUnLockLatch(&tmq->lock);
|
||||||
return pRsp;
|
return pRsp;
|
||||||
} else {
|
} else {
|
||||||
tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||||
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
||||||
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
|
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
|
||||||
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
tmqFreeRspWrapper(pRspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pRspWrapper);
|
||||||
}
|
}
|
||||||
} else {
|
} else if(pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP){
|
||||||
tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId);
|
tscDebug("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId);
|
||||||
tmqHandleNoPollRsp(tmq, pRspWrapper);
|
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)pRspWrapper;
|
||||||
|
SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg;
|
||||||
|
doUpdateLocalEp(tmq, pEpRspWrapper->epoch, rspMsg);
|
||||||
|
tmqFreeRspWrapper(pRspWrapper);
|
||||||
taosFreeQitem(pRspWrapper);
|
taosFreeQitem(pRspWrapper);
|
||||||
|
} else {
|
||||||
|
tscError("consumer:0x%" PRIx64 " invalid msg received:%d", tmq->consumerId, pRspWrapper->tmqRspType);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2018,7 +1975,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
|
|
||||||
while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
|
while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
|
||||||
int32_t retryCnt = 0;
|
int32_t retryCnt = 0;
|
||||||
while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
|
while (TSDB_CODE_MND_CONSUMER_NOT_READY == syncAskEp(tmq)) {
|
||||||
if (retryCnt++ > 40) {
|
if (retryCnt++ > 40) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -2407,55 +2364,70 @@ end:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void defaultAskEpCb(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
|
int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
SAskEpInfo* pInfo = param;
|
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
|
||||||
pInfo->code = code;
|
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
|
||||||
|
if (tmq == NULL) {
|
||||||
|
code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
if (pTmq == NULL || code != TSDB_CODE_SUCCESS){
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqRspHead* head = pDataBuf->pData;
|
SMqRspHead* head = pMsg->pData;
|
||||||
SMqAskEpRsp rsp;
|
int32_t epoch = atomic_load_32(&tmq->epoch);
|
||||||
tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp);
|
if (head->epoch <= epoch) {
|
||||||
doUpdateLocalEp(pTmq, head->epoch, &rsp);
|
tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
|
||||||
tDeleteSMqAskEpRsp(&rsp);
|
tmq->consumerId, head->epoch, epoch);
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
|
||||||
|
tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
|
||||||
|
head->epoch, epoch);
|
||||||
|
if(pParam->sync){
|
||||||
|
SMqAskEpRsp rsp = {0};
|
||||||
|
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
|
||||||
|
doUpdateLocalEp(tmq, head->epoch, &rsp);
|
||||||
|
tDeleteSMqAskEpRsp(&rsp);
|
||||||
|
}else{
|
||||||
|
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
|
||||||
|
if (pWrapper == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
|
||||||
|
pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
|
||||||
|
pWrapper->epoch = head->epoch;
|
||||||
|
memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
|
||||||
|
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
|
||||||
|
|
||||||
|
taosWriteQitem(tmq->mqueue, pWrapper);
|
||||||
|
}
|
||||||
|
|
||||||
END:
|
END:
|
||||||
tsem_post(&pInfo->sem);
|
taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
|
||||||
|
|
||||||
|
FAIL:
|
||||||
|
if(pParam->sync){
|
||||||
|
SAskEpInfo* pInfo = pParam->pParam;
|
||||||
|
pInfo->code = code;
|
||||||
|
tsem_post(&pInfo->sem);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pMsg->pEpSet);
|
||||||
|
taosMemoryFree(pMsg->pData);
|
||||||
|
taosMemoryFree(pParam);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
|
int32_t syncAskEp(tmq_t* pTmq) {
|
||||||
if (pTmq == NULL){
|
|
||||||
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
terrno = code;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
|
|
||||||
if (pWrapper == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SMqRspHead* head = pDataBuf->pData;
|
|
||||||
|
|
||||||
pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
|
|
||||||
pWrapper->epoch = head->epoch;
|
|
||||||
memcpy(&pWrapper->msg, pDataBuf->pData, sizeof(SMqRspHead));
|
|
||||||
tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &pWrapper->msg);
|
|
||||||
|
|
||||||
taosWriteQitem(pTmq->mqueue, pWrapper);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t doAskEp(tmq_t* pTmq) {
|
|
||||||
SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
|
SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
|
||||||
tsem_init(&pInfo->sem, 0, 0);
|
tsem_init(&pInfo->sem, 0, 0);
|
||||||
|
|
||||||
asyncAskEp(pTmq, defaultAskEpCb, pInfo);
|
askEp(pTmq, pInfo, true);
|
||||||
tsem_wait(&pInfo->sem);
|
tsem_wait(&pInfo->sem);
|
||||||
|
|
||||||
int32_t code = pInfo->code;
|
int32_t code = pInfo->code;
|
||||||
|
@ -2464,7 +2436,7 @@ int32_t doAskEp(tmq_t* pTmq) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
|
void askEp(tmq_t* pTmq, void* param, bool sync) {
|
||||||
SMqAskEpReq req = {0};
|
SMqAskEpReq req = {0};
|
||||||
req.consumerId = pTmq->consumerId;
|
req.consumerId = pTmq->consumerId;
|
||||||
req.epoch = pTmq->epoch;
|
req.epoch = pTmq->epoch;
|
||||||
|
@ -2501,7 +2473,7 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pParam->refId = pTmq->refId;
|
pParam->refId = pTmq->refId;
|
||||||
pParam->pUserFn = askEpFn;
|
pParam->sync = sync;
|
||||||
pParam->pParam = param;
|
pParam->pParam = param;
|
||||||
|
|
||||||
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||||
|
@ -2515,7 +2487,7 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
|
||||||
sendInfo->requestId = generateRequestId();
|
sendInfo->requestId = generateRequestId();
|
||||||
sendInfo->requestObjRefId = 0;
|
sendInfo->requestObjRefId = 0;
|
||||||
sendInfo->param = pParam;
|
sendInfo->param = pParam;
|
||||||
sendInfo->fp = askEpCallbackFn;
|
sendInfo->fp = askEpCb;
|
||||||
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
|
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
|
||||||
|
|
||||||
SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
|
SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
|
||||||
|
@ -2530,7 +2502,7 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
|
||||||
FAIL:
|
FAIL:
|
||||||
taosMemoryFreeClear(pParam);
|
taosMemoryFreeClear(pParam);
|
||||||
taosMemoryFreeClear(pReq);
|
taosMemoryFreeClear(pReq);
|
||||||
askEpFn(pTmq, code, NULL, param);
|
askEpCb(pParam, NULL, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {
|
int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue