fix(tmq): fix error in taosx
This commit is contained in:
parent
564e9bb833
commit
7f2aae69d4
|
@ -150,6 +150,7 @@ typedef struct {
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
SMqClientVg* vgHandle;
|
SMqClientVg* vgHandle;
|
||||||
SMqClientTopic* topicHandle;
|
SMqClientTopic* topicHandle;
|
||||||
|
uint64_t reqId;
|
||||||
union {
|
union {
|
||||||
SMqDataRsp dataRsp;
|
SMqDataRsp dataRsp;
|
||||||
SMqMetaRsp metaRsp;
|
SMqMetaRsp metaRsp;
|
||||||
|
@ -458,8 +459,6 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
|
||||||
pVg->epSet = *pBuf->pEpSet;
|
pVg->epSet = *pBuf->pEpSet;
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the offset value.
|
|
||||||
pVg->committedOffset = pVg->currentOffset;
|
|
||||||
tscDebug("consumer:0x%" PRIx64 " subKey:%s vgId:%d, commit offset success. ordinal:%d/%d", pTmq->consumerId,
|
tscDebug("consumer:0x%" PRIx64 " subKey:%s vgId:%d, commit offset success. ordinal:%d/%d", pTmq->consumerId,
|
||||||
pParam->pOffset->subKey, pParam->vgId, index + 1, numOfVgroups);
|
pParam->pOffset->subKey, pParam->vgId, index + 1, numOfVgroups);
|
||||||
}
|
}
|
||||||
|
@ -699,6 +698,9 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
|
||||||
j + 1, numOfVgroups);
|
j + 1, numOfVgroups);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update the offset value.
|
||||||
|
pVg->committedOffset = pVg->currentOffset;
|
||||||
} else {
|
} else {
|
||||||
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
|
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
|
||||||
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset.version, j + 1, numOfVgroups);
|
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset.version, j + 1, numOfVgroups);
|
||||||
|
@ -1261,8 +1263,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
taosMemoryFree(pParam);
|
taosMemoryFree(pParam);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tscWarn("consumer:0x%"PRIx64" msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%"PRIx64, tmq->consumerId, vgId,
|
tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId,
|
||||||
epoch, tstrerror(code), requestId);
|
vgId, epoch, tstrerror(code), requestId);
|
||||||
|
|
||||||
if (pMsg->pData) taosMemoryFree(pMsg->pData);
|
if (pMsg->pData) taosMemoryFree(pMsg->pData);
|
||||||
if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);
|
if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);
|
||||||
|
@ -1281,8 +1283,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
|
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
|
||||||
/*pRspWrapper->vgHandle = pVg;*/
|
|
||||||
/*pRspWrapper->topicHandle = pTopic;*/
|
|
||||||
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
||||||
tsem_post(&tmq->rspSem);
|
tsem_post(&tmq->rspSem);
|
||||||
}
|
}
|
||||||
|
@ -1304,8 +1304,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (msgEpoch != tmqEpoch) {
|
if (msgEpoch != tmqEpoch) {
|
||||||
tscWarn("consumer:0x%"PRIx64" mismatch rsp from vgId:%d, epoch %d, current epoch %d, reqId:0x%"PRIx64, tmq->consumerId, vgId,
|
tscWarn("consumer:0x%" PRIx64 " mismatch rsp from vgId:%d, epoch %d, current epoch %d, reqId:0x%" PRIx64,
|
||||||
msgEpoch, tmqEpoch, requestId);
|
tmq->consumerId, vgId, msgEpoch, tmqEpoch, requestId);
|
||||||
}
|
}
|
||||||
|
|
||||||
// handle meta rsp
|
// handle meta rsp
|
||||||
|
@ -1322,6 +1322,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
pRspWrapper->tmqRspType = rspType;
|
pRspWrapper->tmqRspType = rspType;
|
||||||
pRspWrapper->vgHandle = pVg;
|
pRspWrapper->vgHandle = pVg;
|
||||||
pRspWrapper->topicHandle = pTopic;
|
pRspWrapper->topicHandle = pTopic;
|
||||||
|
pRspWrapper->reqId = requestId;
|
||||||
|
|
||||||
if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
|
if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
|
@ -1350,13 +1351,11 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
|
|
||||||
taosMemoryFree(pMsg->pData);
|
taosMemoryFree(pMsg->pData);
|
||||||
taosMemoryFree(pMsg->pEpSet);
|
taosMemoryFree(pMsg->pEpSet);
|
||||||
|
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
||||||
|
|
||||||
tscDebug("consumer:0x%" PRIx64 ", put poll res into mqueue, total in queue:%d, reqId:0x%" PRIx64, tmq->consumerId,
|
tscDebug("consumer:0x%" PRIx64 ", put poll res into mqueue, total in queue:%d, reqId:0x%" PRIx64, tmq->consumerId,
|
||||||
tmq->mqueue->numOfItems, requestId);
|
tmq->mqueue->numOfItems, requestId);
|
||||||
|
|
||||||
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
|
||||||
tsem_post(&tmq->rspSem);
|
tsem_post(&tmq->rspSem);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
CREATE_MSG_FAIL:
|
CREATE_MSG_FAIL:
|
||||||
|
@ -1763,6 +1762,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tscDebug("consumer:0x%"PRIx64" handle rsp, type:%d", tmq->consumerId, rspWrapper->tmqRspType);
|
||||||
|
|
||||||
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
|
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
|
||||||
taosFreeQitem(rspWrapper);
|
taosFreeQitem(rspWrapper);
|
||||||
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
||||||
|
@ -1779,7 +1780,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
|
|
||||||
if (pollRspWrapper->dataRsp.blockNum == 0) {
|
if (pollRspWrapper->dataRsp.blockNum == 0) {
|
||||||
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d", tmq->consumerId, pVg->vgId);
|
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId,
|
||||||
|
pollRspWrapper->reqId);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
rspWrapper = NULL;
|
rspWrapper = NULL;
|
||||||
continue;
|
continue;
|
||||||
|
@ -1789,8 +1791,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
char buf[80];
|
char buf[80];
|
||||||
tFormatOffset(buf, 80, &pVg->currentOffset);
|
tFormatOffset(buf, 80, &pVg->currentOffset);
|
||||||
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
|
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
|
||||||
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d", tmq->consumerId,
|
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, reqId:0x%"PRIx64, tmq->consumerId,
|
||||||
pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum);
|
pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, pollRspWrapper->reqId);
|
||||||
|
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
return pRsp;
|
return pRsp;
|
||||||
|
@ -1824,17 +1826,19 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
}
|
}
|
||||||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
|
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||||
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
|
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
|
||||||
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
|
|
||||||
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
||||||
|
|
||||||
if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
|
if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
|
||||||
SMqClientVg* pVg = pollRspWrapper->vgHandle;
|
SMqClientVg* pVg = pollRspWrapper->vgHandle;
|
||||||
/*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
|
|
||||||
* rspMsg->msg.rspOffset);*/
|
|
||||||
pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
|
pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
|
||||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
|
|
||||||
if (pollRspWrapper->taosxRsp.blockNum == 0) {
|
if (pollRspWrapper->taosxRsp.blockNum == 0) {
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
rspWrapper = NULL;
|
rspWrapper = NULL;
|
||||||
|
|
||||||
|
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId,
|
||||||
|
pollRspWrapper->reqId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1845,8 +1849,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
} else {
|
} else {
|
||||||
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
|
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
|
|
||||||
|
char buf[80];
|
||||||
|
tFormatOffset(buf, 80, &pVg->currentOffset);
|
||||||
|
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, reqId:0x%"PRIx64, tmq->consumerId, pVg->vgId,
|
||||||
|
buf, pollRspWrapper->dataRsp.blockNum, pollRspWrapper->reqId);
|
||||||
return pRsp;
|
return pRsp;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||||
tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
||||||
|
@ -1854,6 +1865,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId);
|
||||||
|
|
||||||
bool reset = false;
|
bool reset = false;
|
||||||
tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
|
tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
|
||||||
taosFreeQitem(rspWrapper);
|
taosFreeQitem(rspWrapper);
|
||||||
|
|
|
@ -141,7 +141,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
||||||
// tqRead
|
// tqRead
|
||||||
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
||||||
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
|
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
|
||||||
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
|
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
|
||||||
|
|
||||||
// tqExec
|
// tqExec
|
||||||
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp);
|
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp);
|
||||||
|
|
|
@ -330,15 +330,16 @@ int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, co
|
||||||
.contLen = tlen,
|
.contLen = tlen,
|
||||||
.code = 0,
|
.code = 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
|
|
||||||
char buf1[80] = {0};
|
char buf1[80] = {0};
|
||||||
char buf2[80] = {0};
|
char buf2[80] = {0};
|
||||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||||
tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s",
|
|
||||||
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
|
||||||
|
|
||||||
|
tqDebug("taosx rsp, vgId:%d, consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s",
|
||||||
|
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -600,7 +601,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
||||||
|
|
||||||
if (metaRsp.metaRspLen > 0) {
|
if (metaRsp.metaRspLen > 0) {
|
||||||
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
|
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey %s, vg %d, send meta offset type:%d,uid:%" PRId64
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
|
||||||
",version:%" PRId64,
|
",version:%" PRId64,
|
||||||
consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
|
consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
|
||||||
metaRsp.rspOffset.version);
|
metaRsp.rspOffset.version);
|
||||||
|
@ -617,7 +618,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
||||||
offset = taosxRsp.rspOffset;
|
offset = taosxRsp.rspOffset;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%" PRId64
|
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
|
||||||
",version:%" PRId64,
|
",version:%" PRId64,
|
||||||
consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
|
consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
|
||||||
taosxRsp.rspOffset.version);
|
taosxRsp.rspOffset.version);
|
||||||
|
@ -637,7 +638,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
||||||
// todo refactor: this is not correct.
|
// todo refactor: this is not correct.
|
||||||
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
||||||
if (savedEpoch > pRequest->epoch) {
|
if (savedEpoch > pRequest->epoch) {
|
||||||
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, vg %d offset %" PRId64
|
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64
|
||||||
", found new consumer epoch %d, discard req epoch %d",
|
", found new consumer epoch %d, discard req epoch %d",
|
||||||
consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
|
consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
|
||||||
break;
|
break;
|
||||||
|
@ -645,9 +646,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
||||||
|
|
||||||
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
|
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||||
if (tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp) < 0) {
|
code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp);
|
||||||
code = -1;
|
|
||||||
}
|
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
taosMemoryFreeClear(pCkHead);
|
taosMemoryFreeClear(pCkHead);
|
||||||
return code;
|
return code;
|
||||||
|
@ -669,11 +668,10 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
||||||
pRequest->subKey);
|
pRequest->subKey);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (taosxRsp.blockNum > 0 /* threshold */) {
|
|
||||||
|
if (taosxRsp.blockNum > 0) {
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||||
if (tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp) < 0) {
|
code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp);
|
||||||
code = -1;
|
|
||||||
}
|
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
taosMemoryFreeClear(pCkHead);
|
taosMemoryFreeClear(pCkHead);
|
||||||
return code;
|
return code;
|
||||||
|
@ -741,21 +739,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
taosRUnLockLatch(&pTq->pushLock);
|
taosRUnLockLatch(&pTq->pushLock);
|
||||||
|
|
||||||
taosWLockLatch(&pTq->pushLock);
|
taosWLockLatch(&pTq->pushLock);
|
||||||
int32_t savedEpoch = pHandle->epoch;
|
|
||||||
|
|
||||||
// 3. update the epoch value
|
// 3. update the epoch value
|
||||||
// while (savedEpoch < reqEpoch) {
|
int32_t savedEpoch = pHandle->epoch;
|
||||||
|
if (savedEpoch < reqEpoch) {
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
|
||||||
pHandle->epoch = reqEpoch;
|
pHandle->epoch = reqEpoch;
|
||||||
// savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch);
|
}
|
||||||
// }
|
|
||||||
|
|
||||||
taosWUnLockLatch(&pTq->pushLock);
|
taosWUnLockLatch(&pTq->pushLock);
|
||||||
|
|
||||||
char buf[80];
|
char buf[80];
|
||||||
tFormatOffset(buf, 80, &reqOffset);
|
tFormatOffset(buf, 80, &reqOffset);
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s", consumerId,
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
|
||||||
req.epoch, pHandle->subKey, vgId, buf);
|
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
|
||||||
|
|
||||||
return extractDataForMq(pTq, pHandle, &req, pMsg);
|
return extractDataForMq(pTq, pHandle, &req, pMsg);
|
||||||
}
|
}
|
||||||
|
|
|
@ -183,14 +183,15 @@ end:
|
||||||
return tbSuid == realTbSuid;
|
return tbSuid == realTbSuid;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
|
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
taosThreadMutexLock(&pHandle->pWalReader->mutex);
|
taosThreadMutexLock(&pHandle->pWalReader->mutex);
|
||||||
int64_t offset = *fetchOffset;
|
int64_t offset = *fetchOffset;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
|
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
|
||||||
tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return",
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return",
|
||||||
pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset);
|
pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset);
|
||||||
*fetchOffset = offset - 1;
|
*fetchOffset = offset - 1;
|
||||||
code = -1;
|
code = -1;
|
||||||
|
@ -241,6 +242,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
|
||||||
offset++;
|
offset++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
END:
|
END:
|
||||||
taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
|
taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -386,7 +386,7 @@ void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
|
||||||
pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][1] += rows;
|
pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][1] += rows;
|
||||||
pInfo->numOfVgroups++;
|
pInfo->numOfVgroups++;
|
||||||
|
|
||||||
taosFprintfFile(g_fp, "consume id %d, add one new vogroup id: %d\n", pInfo->consumerId, vgroupId);
|
taosFprintfFile(g_fp, "consume id %d, add new vgroupId:%d\n", pInfo->consumerId, vgroupId);
|
||||||
if (pInfo->numOfVgroups > MAX_VGROUP_CNT) {
|
if (pInfo->numOfVgroups > MAX_VGROUP_CNT) {
|
||||||
taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId,
|
taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId,
|
||||||
pInfo->numOfVgroups, vgroupId);
|
pInfo->numOfVgroups, vgroupId);
|
||||||
|
@ -578,18 +578,25 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
|
||||||
char buf[1024];
|
char buf[1024];
|
||||||
int32_t totalRows = 0;
|
int32_t totalRows = 0;
|
||||||
|
|
||||||
// printf("topic: %s\n", tmq_get_topic_name(msg));
|
|
||||||
int32_t vgroupId = tmq_get_vgroup_id(msg);
|
int32_t vgroupId = tmq_get_vgroup_id(msg);
|
||||||
const char* dbName = tmq_get_db_name(msg);
|
const char* dbName = tmq_get_db_name(msg);
|
||||||
|
|
||||||
taosFprintfFile(g_fp, "consumerId: %d, msg index:%d\n", pInfo->consumerId, msgIndex);
|
taosFprintfFile(g_fp, "consumerId: %d, msg index:%d\n", pInfo->consumerId, msgIndex);
|
||||||
taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table",
|
int32_t index = 0;
|
||||||
tmq_get_topic_name(msg), vgroupId);
|
for (index = 0; index < pInfo->numOfVgroups; index++) {
|
||||||
|
if (vgroupId == pInfo->rowsOfPerVgroups[index][0]) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId:%d, currentRows:%d\n", dbName != NULL ? dbName : "invalid table",
|
||||||
|
tmq_get_topic_name(msg), vgroupId, pInfo->rowsOfPerVgroups[index][1]);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
TAOS_ROW row = taos_fetch_row(msg);
|
TAOS_ROW row = taos_fetch_row(msg);
|
||||||
|
if (row == NULL) {
|
||||||
if (row == NULL) break;
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
||||||
int32_t numOfFields = taos_field_count(msg);
|
int32_t numOfFields = taos_field_count(msg);
|
||||||
|
@ -607,7 +614,6 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
|
dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
|
||||||
|
|
||||||
taos_print_row(buf, row, fields, numOfFields);
|
taos_print_row(buf, row, fields, numOfFields);
|
||||||
|
|
||||||
if (0 != g_stConfInfo.showRowFlag) {
|
if (0 != g_stConfInfo.showRowFlag) {
|
||||||
|
@ -621,7 +627,6 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
|
||||||
}
|
}
|
||||||
|
|
||||||
addRowsToVgroupId(pInfo, vgroupId, totalRows);
|
addRowsToVgroupId(pInfo, vgroupId, totalRows);
|
||||||
|
|
||||||
return totalRows;
|
return totalRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -817,7 +822,6 @@ void loop_consume(SThreadInfo* pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_free_result(tmqMsg);
|
taos_free_result(tmqMsg);
|
||||||
|
|
||||||
totalMsgs++;
|
totalMsgs++;
|
||||||
|
|
||||||
int64_t currentPrintTime = taosGetTimestampMs();
|
int64_t currentPrintTime = taosGetTimestampMs();
|
||||||
|
|
Loading…
Reference in New Issue