|
|
|
@ -29,8 +29,6 @@
|
|
|
|
|
|
|
|
|
|
#define OFFSET_IS_RESET_OFFSET(_of) ((_of) < 0)
|
|
|
|
|
|
|
|
|
|
typedef void (*__tmq_askep_fn_t)(tmq_t* pTmq, int32_t code, SDataBuf* pBuf, void* pParam);
|
|
|
|
|
|
|
|
|
|
struct SMqMgmt {
|
|
|
|
|
int8_t inited;
|
|
|
|
|
tmr_h timer;
|
|
|
|
@ -42,11 +40,13 @@ volatile int32_t tmqInitRes = 0; // initialize rsp code
|
|
|
|
|
static struct SMqMgmt tmqMgmt = {0};
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
int32_t code;
|
|
|
|
|
int8_t tmqRspType;
|
|
|
|
|
int32_t epoch;
|
|
|
|
|
} SMqRspWrapper;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
int32_t code;
|
|
|
|
|
int8_t tmqRspType;
|
|
|
|
|
int32_t epoch;
|
|
|
|
|
SMqAskEpRsp msg;
|
|
|
|
@ -133,7 +133,6 @@ enum {
|
|
|
|
|
|
|
|
|
|
enum {
|
|
|
|
|
TMQ_DELAYED_TASK__ASK_EP = 1,
|
|
|
|
|
TMQ_DELAYED_TASK__REPORT,
|
|
|
|
|
TMQ_DELAYED_TASK__COMMIT,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
@ -165,6 +164,7 @@ typedef struct {
|
|
|
|
|
} SMqClientTopic;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
int32_t code;
|
|
|
|
|
int8_t tmqRspType;
|
|
|
|
|
int32_t epoch; // epoch can be used to guard the vgHandle
|
|
|
|
|
int32_t vgId;
|
|
|
|
@ -189,8 +189,8 @@ typedef struct {
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
int64_t refId;
|
|
|
|
|
bool sync;
|
|
|
|
|
void* pParam;
|
|
|
|
|
__tmq_askep_fn_t pUserFn;
|
|
|
|
|
} SMqAskEpCbParam;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
@ -252,13 +252,12 @@ typedef struct SSyncCommitInfo {
|
|
|
|
|
int32_t code;
|
|
|
|
|
} SSyncCommitInfo;
|
|
|
|
|
|
|
|
|
|
static int32_t doAskEp(tmq_t* tmq);
|
|
|
|
|
static int32_t syncAskEp(tmq_t* tmq);
|
|
|
|
|
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
|
|
|
|
|
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
|
|
|
|
|
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet);
|
|
|
|
|
static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
|
|
|
|
|
static void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param);
|
|
|
|
|
static void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param);
|
|
|
|
|
static void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpset);
|
|
|
|
|
|
|
|
|
|
tmq_conf_t* tmq_conf_new() {
|
|
|
|
|
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
|
|
|
|
@ -848,7 +847,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
|
|
|
|
|
|
|
|
|
while (pTaskType != NULL) {
|
|
|
|
|
if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
|
|
|
|
|
asyncAskEp(pTmq, addToQueueCallbackFn, NULL);
|
|
|
|
|
askEp(pTmq, NULL, false, false);
|
|
|
|
|
|
|
|
|
|
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
|
|
|
|
|
*pRefId = pTmq->refId;
|
|
|
|
@ -865,7 +864,8 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
|
|
|
|
tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
|
|
|
|
|
pTmq->autoCommitInterval / 1000.0);
|
|
|
|
|
taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
|
|
|
|
|
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
|
|
|
|
|
} else {
|
|
|
|
|
tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosFreeQitem(pTaskType);
|
|
|
|
@ -876,10 +876,8 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
|
|
|
|
|
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
|
|
|
|
|
// do nothing
|
|
|
|
|
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
|
|
|
|
|
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
|
|
|
|
|
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
|
|
|
|
|
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
|
|
|
|
|
tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
|
|
|
|
|
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
|
|
|
|
@ -907,8 +905,6 @@ static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
|
|
|
|
|
taosArrayDestroy(pRsp->taosxRsp.createTableLen);
|
|
|
|
|
taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void tmqClearUnhandleMsg(tmq_t* tmq) {
|
|
|
|
@ -1222,7 +1218,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t retryCnt = 0;
|
|
|
|
|
while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
|
|
|
|
|
while (TSDB_CODE_MND_CONSUMER_NOT_READY == syncAskEp(tmq)) {
|
|
|
|
|
if (retryCnt++ > MAX_RETRY_COUNT) {
|
|
|
|
|
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId);
|
|
|
|
|
code = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
|
|
|
@ -1305,33 +1301,19 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|
|
|
|
uint64_t requestId = pParam->requestId;
|
|
|
|
|
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
|
|
|
|
|
if (tmq == NULL) {
|
|
|
|
|
taosMemoryFree(pParam);
|
|
|
|
|
taosMemoryFreeClear(pMsg->pData);
|
|
|
|
|
taosMemoryFreeClear(pMsg->pEpSet);
|
|
|
|
|
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
|
|
|
|
|
return -1;
|
|
|
|
|
code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
|
|
|
|
|
goto FAIL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (code != 0) {
|
|
|
|
|
if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
|
|
|
|
|
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
|
|
|
|
|
tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, set status to be RECOVER",
|
|
|
|
|
tmq->consumerId);
|
|
|
|
|
} else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
|
|
|
|
|
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
|
|
|
|
|
if (pRspWrapper == NULL) {
|
|
|
|
|
tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since out of memory, reqId:0x%" PRIx64,
|
|
|
|
|
tmq->consumerId, vgId, requestId);
|
|
|
|
|
goto END;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
|
|
|
|
|
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
|
|
|
|
} else{
|
|
|
|
|
tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s, reqId:0x%" PRIx64, tmq->consumerId,
|
|
|
|
|
vgId, tstrerror(code), requestId);
|
|
|
|
|
}
|
|
|
|
|
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
|
|
|
|
|
if (pRspWrapper == NULL) {
|
|
|
|
|
tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
|
|
|
|
|
taosReleaseRef(tmqMgmt.rsetId, refId);
|
|
|
|
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
|
goto FAIL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if(code != 0){
|
|
|
|
|
goto END;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1340,30 +1322,19 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|
|
|
|
if (msgEpoch < clientEpoch) {
|
|
|
|
|
// do not write into queue since updating epoch reset
|
|
|
|
|
tscWarn("consumer:0x%" PRIx64
|
|
|
|
|
" msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
|
|
|
|
|
" msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
|
|
|
|
|
tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
|
|
|
|
|
code = -1;
|
|
|
|
|
goto END;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ASSERT(msgEpoch == clientEpoch);
|
|
|
|
|
|
|
|
|
|
// handle meta rsp
|
|
|
|
|
int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
|
|
|
|
|
|
|
|
|
|
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
|
|
|
|
|
if (pRspWrapper == NULL) {
|
|
|
|
|
tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
|
|
|
|
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
|
goto END;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pRspWrapper->tmqRspType = rspType;
|
|
|
|
|
pRspWrapper->reqId = requestId;
|
|
|
|
|
pRspWrapper->pEpset = pMsg->pEpSet;
|
|
|
|
|
pMsg->pEpSet = NULL;
|
|
|
|
|
pRspWrapper->vgId = vgId;
|
|
|
|
|
strcpy(pRspWrapper->topicName, pParam->topicName);
|
|
|
|
|
|
|
|
|
|
if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
|
|
|
|
|
SDecoder decoder;
|
|
|
|
@ -1392,22 +1363,22 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|
|
|
|
tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
END:
|
|
|
|
|
pRspWrapper->code = code;
|
|
|
|
|
pRspWrapper->vgId = vgId;
|
|
|
|
|
strcpy(pRspWrapper->topicName, pParam->topicName);
|
|
|
|
|
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
|
|
|
|
|
|
|
|
|
int32_t total = taosQueueItemSize(tmq->mqueue);
|
|
|
|
|
tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
|
|
|
|
|
tmq->consumerId, rspType, vgId, total, requestId);
|
|
|
|
|
|
|
|
|
|
END:
|
|
|
|
|
if(code != 0){
|
|
|
|
|
setVgIdle(tmq, pParam->topicName, vgId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tsem_post(&tmq->rspSem);
|
|
|
|
|
taosReleaseRef(tmqMgmt.rsetId, refId);
|
|
|
|
|
|
|
|
|
|
FAIL:
|
|
|
|
|
tsem_post(&tmq->rspSem);
|
|
|
|
|
taosMemoryFree(pParam);
|
|
|
|
|
taosMemoryFreeClear(pMsg->pData);
|
|
|
|
|
taosMemoryFreeClear(pMsg->pEpSet);
|
|
|
|
|
if(pMsg) taosMemoryFreeClear(pMsg->pData);
|
|
|
|
|
if(pMsg) taosMemoryFreeClear(pMsg->pEpSet);
|
|
|
|
|
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
@ -1478,7 +1449,9 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
|
|
|
|
|
bool set = false;
|
|
|
|
|
|
|
|
|
|
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
|
|
|
|
|
if (epoch <= tmq->epoch) {
|
|
|
|
|
if (topicNumGet <= 0 && epoch <= tmq->epoch) {
|
|
|
|
|
tscInfo("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d",
|
|
|
|
|
tmq->consumerId, tmq->epoch, epoch, topicNumGet);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1546,48 +1519,6 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
|
|
|
|
|
return set;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
|
|
|
|
|
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
|
|
|
|
|
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
|
|
|
|
|
if (tmq == NULL) {
|
|
|
|
|
code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
|
|
|
|
|
goto END;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
|
|
|
|
|
taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
|
|
|
|
|
goto END;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SMqRspHead* head = pMsg->pData;
|
|
|
|
|
int32_t epoch = atomic_load_32(&tmq->epoch);
|
|
|
|
|
if (head->epoch <= epoch) {
|
|
|
|
|
tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
|
|
|
|
|
tmq->consumerId, head->epoch, epoch);
|
|
|
|
|
|
|
|
|
|
if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) {
|
|
|
|
|
SMqAskEpRsp rsp;
|
|
|
|
|
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
|
|
|
|
|
int8_t flag = (taosArrayGetSize(rsp.topics) == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
|
|
|
|
|
atomic_store_8(&tmq->status, flag);
|
|
|
|
|
tDeleteSMqAskEpRsp(&rsp);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
|
|
|
|
|
head->epoch, epoch);
|
|
|
|
|
}
|
|
|
|
|
taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
|
|
|
|
|
|
|
|
|
|
END:
|
|
|
|
|
pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
|
|
|
|
|
taosMemoryFree(pMsg->pEpSet);
|
|
|
|
|
taosMemoryFree(pMsg->pData);
|
|
|
|
|
taosMemoryFree(pParam);
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
|
|
|
|
|
int32_t groupLen = strlen(tmq->groupId);
|
|
|
|
|
memcpy(pReq->subKey, tmq->groupId, groupLen);
|
|
|
|
@ -1722,10 +1653,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
|
|
|
|
|
int64_t transporterId = 0;
|
|
|
|
|
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
|
|
|
|
|
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
|
|
|
|
|
|
|
|
|
|
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId,
|
|
|
|
|
pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId);
|
|
|
|
|
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
|
|
|
|
|
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId,
|
|
|
|
|
pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
|
|
|
|
|
if(code != 0){
|
|
|
|
|
goto FAIL;
|
|
|
|
|
}
|
|
|
|
@ -1735,11 +1665,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
|
|
|
|
|
pTmq->pollCnt++;
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
|
|
FAIL:
|
|
|
|
|
taosMemoryFreeClear(pParam);
|
|
|
|
|
taosMemoryFreeClear(msg);
|
|
|
|
|
return code;
|
|
|
|
|
return tmqPollCb(pParam, NULL, code);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// broadcast the poll request to all related vnodes
|
|
|
|
@ -1776,8 +1704,6 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
|
|
|
|
atomic_store_32(&pVg->vgSkipCnt, 0);
|
|
|
|
|
code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
|
|
|
|
tsem_post(&tmq->rspSem);
|
|
|
|
|
goto end;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -1789,22 +1715,6 @@ end:
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper) {
|
|
|
|
|
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
|
|
|
|
|
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
|
|
|
|
|
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
|
|
|
|
|
SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg;
|
|
|
|
|
doUpdateLocalEp(tmq, rspWrapper->epoch, rspMsg);
|
|
|
|
|
tDeleteSMqAskEpRsp(rspMsg);
|
|
|
|
|
} else {
|
|
|
|
|
tmqFreeRspWrapper(rspWrapper);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId, bool hasData){
|
|
|
|
|
if (!pVg->seekUpdated) {
|
|
|
|
|
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId);
|
|
|
|
@ -1839,11 +1749,28 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
|
|
|
|
|
|
|
|
|
tscDebug("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType);
|
|
|
|
|
|
|
|
|
|
if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
|
|
|
|
|
if (pRspWrapper->code != 0) {
|
|
|
|
|
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
|
|
|
|
if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
|
|
|
|
|
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
|
|
|
|
|
tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, set status to be RECOVER", tmq->consumerId);
|
|
|
|
|
} else if (pRspWrapper->code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
|
|
|
|
|
terrno = pRspWrapper->code;
|
|
|
|
|
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(pRspWrapper->code));
|
|
|
|
|
taosFreeQitem(pRspWrapper);
|
|
|
|
|
return NULL;
|
|
|
|
|
} else{
|
|
|
|
|
if(pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID){ // for vnode transform
|
|
|
|
|
askEp(tmq, NULL, false, true);
|
|
|
|
|
}
|
|
|
|
|
tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId, tstrerror(pRspWrapper->code));
|
|
|
|
|
taosWLockLatch(&tmq->lock);
|
|
|
|
|
SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
|
|
|
|
|
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
|
|
|
|
|
taosWUnLockLatch(&tmq->lock);
|
|
|
|
|
}
|
|
|
|
|
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
|
|
|
|
|
taosFreeQitem(pRspWrapper);
|
|
|
|
|
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
|
|
|
|
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno));
|
|
|
|
|
return NULL;
|
|
|
|
|
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
|
|
|
|
|
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
|
|
|
|
|
|
|
|
@ -1878,9 +1805,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
|
|
|
|
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
|
|
|
|
|
", total:%" PRId64 ", reqId:0x%" PRIx64,
|
|
|
|
|
tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
|
|
|
|
|
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
|
|
|
|
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
|
|
|
|
|
taosFreeQitem(pollRspWrapper);
|
|
|
|
|
tmqFreeRspWrapper(pRspWrapper);
|
|
|
|
|
taosFreeQitem(pRspWrapper);
|
|
|
|
|
taosWUnLockLatch(&tmq->lock);
|
|
|
|
|
} else { // build rsp
|
|
|
|
|
int64_t numOfRows = 0;
|
|
|
|
|
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
|
|
|
|
@ -1890,20 +1818,18 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
|
|
|
|
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
|
|
|
|
|
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
|
|
|
|
|
pollRspWrapper->reqId);
|
|
|
|
|
taosFreeQitem(pollRspWrapper);
|
|
|
|
|
taosFreeQitem(pRspWrapper);
|
|
|
|
|
taosWUnLockLatch(&tmq->lock);
|
|
|
|
|
return pRsp;
|
|
|
|
|
}
|
|
|
|
|
taosWUnLockLatch(&tmq->lock);
|
|
|
|
|
} else {
|
|
|
|
|
tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
|
|
|
|
tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
|
|
|
|
|
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
|
|
|
|
|
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
|
|
|
|
taosFreeQitem(pollRspWrapper);
|
|
|
|
|
tmqFreeRspWrapper(pRspWrapper);
|
|
|
|
|
taosFreeQitem(pRspWrapper);
|
|
|
|
|
}
|
|
|
|
|
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
|
|
|
|
// todo handle the wal range and epset for each vgroup
|
|
|
|
|
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
|
|
|
|
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
|
|
|
|
|
|
|
|
@ -1924,15 +1850,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
|
|
|
|
updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true);
|
|
|
|
|
// build rsp
|
|
|
|
|
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
|
|
|
|
|
taosFreeQitem(pollRspWrapper);
|
|
|
|
|
taosFreeQitem(pRspWrapper);
|
|
|
|
|
taosWUnLockLatch(&tmq->lock);
|
|
|
|
|
return pRsp;
|
|
|
|
|
} else {
|
|
|
|
|
tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
|
|
|
|
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
|
|
|
|
|
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
|
|
|
|
|
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
|
|
|
|
taosFreeQitem(pollRspWrapper);
|
|
|
|
|
tmqFreeRspWrapper(pRspWrapper);
|
|
|
|
|
taosFreeQitem(pRspWrapper);
|
|
|
|
|
}
|
|
|
|
|
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
|
|
|
|
|
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
|
|
|
@ -1956,8 +1882,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
|
|
|
|
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64,
|
|
|
|
|
tmq->consumerId, pVg->vgId, pVg->numOfRows, pollRspWrapper->reqId);
|
|
|
|
|
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
|
|
|
|
|
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
|
|
|
|
taosFreeQitem(pollRspWrapper);
|
|
|
|
|
tmqFreeRspWrapper(pRspWrapper);
|
|
|
|
|
taosFreeQitem(pRspWrapper);
|
|
|
|
|
taosWUnLockLatch(&tmq->lock);
|
|
|
|
|
continue;
|
|
|
|
|
} else {
|
|
|
|
@ -1982,20 +1908,25 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
|
|
|
|
tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
|
|
|
|
|
tmq->totalRows, pollRspWrapper->reqId);
|
|
|
|
|
|
|
|
|
|
taosFreeQitem(pollRspWrapper);
|
|
|
|
|
taosFreeQitem(pRspWrapper);
|
|
|
|
|
taosWUnLockLatch(&tmq->lock);
|
|
|
|
|
return pRsp;
|
|
|
|
|
} else {
|
|
|
|
|
tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
|
|
|
|
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
|
|
|
|
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
|
|
|
|
|
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
|
|
|
|
taosFreeQitem(pollRspWrapper);
|
|
|
|
|
tmqFreeRspWrapper(pRspWrapper);
|
|
|
|
|
taosFreeQitem(pRspWrapper);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId);
|
|
|
|
|
tmqHandleNoPollRsp(tmq, pRspWrapper);
|
|
|
|
|
} else if(pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP){
|
|
|
|
|
tscDebug("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId);
|
|
|
|
|
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)pRspWrapper;
|
|
|
|
|
SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg;
|
|
|
|
|
doUpdateLocalEp(tmq, pEpRspWrapper->epoch, rspMsg);
|
|
|
|
|
tmqFreeRspWrapper(pRspWrapper);
|
|
|
|
|
taosFreeQitem(pRspWrapper);
|
|
|
|
|
} else {
|
|
|
|
|
tscError("consumer:0x%" PRIx64 " invalid msg received:%d", tmq->consumerId, pRspWrapper->tmqRspType);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -2018,7 +1949,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
|
|
|
|
|
|
|
|
|
while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
|
|
|
|
|
int32_t retryCnt = 0;
|
|
|
|
|
while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
|
|
|
|
|
while (TSDB_CODE_MND_CONSUMER_NOT_READY == syncAskEp(tmq)) {
|
|
|
|
|
if (retryCnt++ > 40) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
@ -2407,55 +2338,64 @@ end:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void defaultAskEpCb(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
|
|
|
|
|
SAskEpInfo* pInfo = param;
|
|
|
|
|
pInfo->code = code;
|
|
|
|
|
int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|
|
|
|
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
|
|
|
|
|
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
|
|
|
|
|
if (tmq == NULL) {
|
|
|
|
|
code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
|
|
|
|
|
goto FAIL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pTmq == NULL || code != TSDB_CODE_SUCCESS){
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
|
|
|
|
|
goto END;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SMqRspHead* head = pDataBuf->pData;
|
|
|
|
|
SMqAskEpRsp rsp;
|
|
|
|
|
tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp);
|
|
|
|
|
doUpdateLocalEp(pTmq, head->epoch, &rsp);
|
|
|
|
|
tDeleteSMqAskEpRsp(&rsp);
|
|
|
|
|
SMqRspHead* head = pMsg->pData;
|
|
|
|
|
int32_t epoch = atomic_load_32(&tmq->epoch);
|
|
|
|
|
tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId,
|
|
|
|
|
head->epoch, epoch);
|
|
|
|
|
if(pParam->sync){
|
|
|
|
|
SMqAskEpRsp rsp = {0};
|
|
|
|
|
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
|
|
|
|
|
doUpdateLocalEp(tmq, head->epoch, &rsp);
|
|
|
|
|
tDeleteSMqAskEpRsp(&rsp);
|
|
|
|
|
}else{
|
|
|
|
|
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
|
|
|
|
|
if (pWrapper == NULL) {
|
|
|
|
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
|
goto END;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
|
|
|
|
|
pWrapper->epoch = head->epoch;
|
|
|
|
|
memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
|
|
|
|
|
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
|
|
|
|
|
|
|
|
|
|
taosWriteQitem(tmq->mqueue, pWrapper);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
END:
|
|
|
|
|
tsem_post(&pInfo->sem);
|
|
|
|
|
taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
|
|
|
|
|
|
|
|
|
|
FAIL:
|
|
|
|
|
if(pParam->sync){
|
|
|
|
|
SAskEpInfo* pInfo = pParam->pParam;
|
|
|
|
|
pInfo->code = code;
|
|
|
|
|
tsem_post(&pInfo->sem);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosMemoryFree(pMsg->pEpSet);
|
|
|
|
|
taosMemoryFree(pMsg->pData);
|
|
|
|
|
taosMemoryFree(pParam);
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
|
|
|
|
|
if (pTmq == NULL){
|
|
|
|
|
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
terrno = code;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
|
|
|
|
|
if (pWrapper == NULL) {
|
|
|
|
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SMqRspHead* head = pDataBuf->pData;
|
|
|
|
|
|
|
|
|
|
pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
|
|
|
|
|
pWrapper->epoch = head->epoch;
|
|
|
|
|
memcpy(&pWrapper->msg, pDataBuf->pData, sizeof(SMqRspHead));
|
|
|
|
|
tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &pWrapper->msg);
|
|
|
|
|
|
|
|
|
|
taosWriteQitem(pTmq->mqueue, pWrapper);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t doAskEp(tmq_t* pTmq) {
|
|
|
|
|
int32_t syncAskEp(tmq_t* pTmq) {
|
|
|
|
|
SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
|
|
|
|
|
tsem_init(&pInfo->sem, 0, 0);
|
|
|
|
|
|
|
|
|
|
asyncAskEp(pTmq, defaultAskEpCb, pInfo);
|
|
|
|
|
askEp(pTmq, pInfo, true, false);
|
|
|
|
|
tsem_wait(&pInfo->sem);
|
|
|
|
|
|
|
|
|
|
int32_t code = pInfo->code;
|
|
|
|
@ -2464,10 +2404,10 @@ int32_t doAskEp(tmq_t* pTmq) {
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
|
|
|
|
|
void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
|
|
|
|
|
SMqAskEpReq req = {0};
|
|
|
|
|
req.consumerId = pTmq->consumerId;
|
|
|
|
|
req.epoch = pTmq->epoch;
|
|
|
|
|
req.epoch = updateEpSet ? -1 :pTmq->epoch;
|
|
|
|
|
strcpy(req.cgroup, pTmq->groupId);
|
|
|
|
|
int code = 0;
|
|
|
|
|
SMqAskEpCbParam* pParam = NULL;
|
|
|
|
@ -2501,7 +2441,7 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pParam->refId = pTmq->refId;
|
|
|
|
|
pParam->pUserFn = askEpFn;
|
|
|
|
|
pParam->sync = sync;
|
|
|
|
|
pParam->pParam = param;
|
|
|
|
|
|
|
|
|
|
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
|
|
|
@ -2515,7 +2455,7 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
|
|
|
|
|
sendInfo->requestId = generateRequestId();
|
|
|
|
|
sendInfo->requestObjRefId = 0;
|
|
|
|
|
sendInfo->param = pParam;
|
|
|
|
|
sendInfo->fp = askEpCallbackFn;
|
|
|
|
|
sendInfo->fp = askEpCb;
|
|
|
|
|
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
|
|
|
|
|
|
|
|
|
|
SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
|
|
|
|
@ -2530,7 +2470,7 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
|
|
|
|
|
FAIL:
|
|
|
|
|
taosMemoryFreeClear(pParam);
|
|
|
|
|
taosMemoryFreeClear(pReq);
|
|
|
|
|
askEpFn(pTmq, code, NULL, param);
|
|
|
|
|
askEpCb(pParam, NULL, code);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {
|
|
|
|
|