fix(tmq): fix the invalid write and do some internal refactor.
This commit is contained in:
parent
147e66a50d
commit
18b121c69c
|
@ -24,7 +24,7 @@
|
|||
#include "tref.h"
|
||||
#include "ttimer.h"
|
||||
|
||||
#define EMPTY_BLOCK_POLL_IDLE_DURATION 100
|
||||
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
|
||||
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000
|
||||
|
||||
struct SMqMgmt {
|
||||
|
@ -148,7 +148,8 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int8_t tmqRspType;
|
||||
int32_t epoch;
|
||||
int32_t epoch; // epoch can be used to guard the vgHandle
|
||||
int32_t vgId;
|
||||
SMqClientVg* vgHandle;
|
||||
SMqClientTopic* topicHandle;
|
||||
uint64_t reqId;
|
||||
|
@ -1329,6 +1330,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
pRspWrapper->topicHandle = pTopic;
|
||||
pRspWrapper->reqId = requestId;
|
||||
pRspWrapper->pEpset = pMsg->pEpSet;
|
||||
pRspWrapper->vgId = pVg->vgId;
|
||||
|
||||
pMsg->pEpSet = NULL;
|
||||
if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
|
||||
|
@ -1747,7 +1749,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
|||
for (int j = 0; j < numOfVg; j++) {
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms
|
||||
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 100ms before start next poll", tmq->consumerId, tmq->epoch,
|
||||
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch,
|
||||
pVg->vgId);
|
||||
continue;
|
||||
}
|
||||
|
@ -1864,9 +1866,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
return pRsp;
|
||||
}
|
||||
} else {
|
||||
SMqClientVg* pVg = pollRspWrapper->vgHandle;
|
||||
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||
tmq->consumerId, pVg->vgId, pDataRsp->head.epoch, consumerEpoch);
|
||||
tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
|
||||
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
||||
taosFreeQitem(pollRspWrapper);
|
||||
}
|
||||
|
@ -1886,7 +1887,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
return pRsp;
|
||||
} else {
|
||||
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||
tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
|
||||
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
|
||||
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
||||
taosFreeQitem(pollRspWrapper);
|
||||
}
|
||||
|
@ -1933,7 +1934,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
|
||||
} else {
|
||||
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||
tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
||||
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
||||
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
||||
taosFreeQitem(pollRspWrapper);
|
||||
}
|
||||
|
@ -1955,7 +1956,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
|||
void* rspObj;
|
||||
int64_t startTime = taosGetTimestampMs();
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64, tmq->consumerId, startTime);
|
||||
tscDebug("consumer:0x%" PRIx64 " start to poll at %"PRId64", timeout:%" PRId64, tmq->consumerId, startTime, timeout);
|
||||
|
||||
#if 0
|
||||
tmqHandleAllDelayedTask(tmq);
|
||||
|
|
|
@ -141,7 +141,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
|||
// tqRead
|
||||
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
||||
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
|
||||
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
|
||||
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId);
|
||||
|
||||
// tqExec
|
||||
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp);
|
||||
|
|
|
@ -309,70 +309,6 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
|
|||
return 0;
|
||||
}
|
||||
|
||||
//int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) {
|
||||
//#if 0
|
||||
// A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||
// A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
||||
//
|
||||
// if (pRsp->withSchema) {
|
||||
// A(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
|
||||
// } else {
|
||||
// A(taosArrayGetSize(pRsp->blockSchema) == 0);
|
||||
// }
|
||||
//
|
||||
// if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
||||
// if (pRsp->blockNum > 0) {
|
||||
// A(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||
// } else {
|
||||
// A(pRsp->rspOffset.version >= pRsp->reqOffset.version);
|
||||
// }
|
||||
// }
|
||||
//#endif
|
||||
//
|
||||
// int32_t len = 0;
|
||||
// int32_t code = 0;
|
||||
// tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
|
||||
// if (code < 0) {
|
||||
// return -1;
|
||||
// }
|
||||
//
|
||||
// int32_t tlen = sizeof(SMqRspHead) + len;
|
||||
// void* buf = rpcMallocCont(tlen);
|
||||
// if (buf == NULL) {
|
||||
// terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
// return -1;
|
||||
// }
|
||||
//
|
||||
// ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP;
|
||||
// ((SMqRspHead*)buf)->epoch = pReq->epoch;
|
||||
// ((SMqRspHead*)buf)->consumerId = pReq->consumerId;
|
||||
//
|
||||
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||
//
|
||||
// SEncoder encoder = {0};
|
||||
// tEncoderInit(&encoder, abuf, len);
|
||||
// tEncodeSTaosxRsp(&encoder, pRsp);
|
||||
// tEncoderClear(&encoder);
|
||||
//
|
||||
// SRpcMsg rsp = {
|
||||
// .info = pMsg->info,
|
||||
// .pCont = buf,
|
||||
// .contLen = tlen,
|
||||
// .code = 0,
|
||||
// };
|
||||
//
|
||||
// tmsgSendRsp(&rsp);
|
||||
//
|
||||
// char buf1[80] = {0};
|
||||
// char buf2[80] = {0};
|
||||
// tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||
// tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||
//
|
||||
// tqDebug("taosx rsp, vgId:%d, consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s",
|
||||
// TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
||||
// return 0;
|
||||
//}
|
||||
|
||||
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
|
||||
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
|
||||
pLeft->val.version <= pRight->val.version;
|
||||
|
@ -615,9 +551,9 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
|||
|
||||
// NOTE: this pHandle->consumerId may have been changed already.
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
|
||||
", ts:%" PRId64,
|
||||
", ts:%" PRId64", reqId:0x%"PRIx64,
|
||||
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
|
||||
dataRsp.rspOffset.ts);
|
||||
dataRsp.rspOffset.ts, pRequest->reqId);
|
||||
|
||||
tDeleteSMqDataRsp(&dataRsp);
|
||||
return code;
|
||||
|
@ -680,17 +616,12 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
|||
break;
|
||||
}
|
||||
|
||||
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
|
||||
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||
// if (terrno == 0) { // failed to seek to given ver, but no errors happen.
|
||||
// code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, (SMqDataRsp*) &taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||
// return code;
|
||||
// } else { // error happens, return to consumers
|
||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
return code;
|
||||
// }
|
||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
return code;
|
||||
}
|
||||
|
||||
SWalCont* pHead = &pCkHead->head;
|
||||
|
|
|
@ -183,23 +183,24 @@ end:
|
|||
return tbSuid == realTbSuid;
|
||||
}
|
||||
|
||||
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
|
||||
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead, uint64_t reqId) {
|
||||
int32_t code = 0;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
|
||||
taosThreadMutexLock(&pHandle->pWalReader->mutex);
|
||||
int64_t offset = *fetchOffset;
|
||||
|
||||
while (1) {
|
||||
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return",
|
||||
pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset);
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return, reqId:0x%"PRIx64,
|
||||
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId);
|
||||
*fetchOffset = offset - 1;
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d, taosx get msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, offset,
|
||||
TMSG_INFO((*ppCkHead)->head.msgType));
|
||||
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId,
|
||||
pHandle->consumerId, offset, TMSG_INFO((*ppCkHead)->head.msgType), reqId);
|
||||
|
||||
if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
|
||||
code = walFetchBody(pHandle->pWalReader, ppCkHead);
|
||||
|
|
|
@ -688,16 +688,17 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
|
|||
}
|
||||
|
||||
static int32_t g_once_commit_flag = 0;
|
||||
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
||||
taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code);
|
||||
|
||||
if (0 == g_once_commit_flag) {
|
||||
g_once_commit_flag = 1;
|
||||
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
|
||||
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
||||
taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code);
|
||||
|
||||
if (0 == g_once_commit_flag) {
|
||||
g_once_commit_flag = 1;
|
||||
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
|
||||
}
|
||||
|
||||
char tmpString[128];
|
||||
taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
|
||||
char tmpString[128];
|
||||
taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
|
||||
}
|
||||
|
||||
void build_consumer(SThreadInfo* pInfo) {
|
||||
|
|
Loading…
Reference in New Issue