fix(tmq): update some logs.
This commit is contained in:
parent
a5e9fad0f9
commit
4cb1c51d4e
|
@ -425,6 +425,7 @@ static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOf
|
||||||
|
|
||||||
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||||
STqOffset offset = {0};
|
STqOffset offset = {0};
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
||||||
|
@ -436,10 +437,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
|
||||||
|
|
||||||
if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
|
if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||||
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
|
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
|
||||||
offset.subKey, TD_VID(pTq->pVnode), offset.val.uid, offset.val.ts);
|
offset.subKey, vgId, offset.val.uid, offset.val.ts);
|
||||||
} else if (offset.val.type == TMQ_OFFSET__LOG) {
|
} else if (offset.val.type == TMQ_OFFSET__LOG) {
|
||||||
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
|
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
|
||||||
TD_VID(pTq->pVnode), offset.val.version);
|
vgId, offset.val.version);
|
||||||
if (offset.val.version + 1 == sversion) {
|
if (offset.val.version + 1 == sversion) {
|
||||||
offset.val.version += 1;
|
offset.val.version += 1;
|
||||||
}
|
}
|
||||||
|
@ -542,6 +543,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
||||||
uint64_t consumerId = pRequest->consumerId;
|
uint64_t consumerId = pRequest->consumerId;
|
||||||
STqOffsetVal reqOffset = pRequest->reqOffset;
|
STqOffsetVal reqOffset = pRequest->reqOffset;
|
||||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
|
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
*pBlockReturned = false;
|
*pBlockReturned = false;
|
||||||
|
|
||||||
// In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
|
// In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
|
||||||
|
@ -551,12 +554,15 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
||||||
char formatBuf[80];
|
char formatBuf[80];
|
||||||
tFormatOffset(formatBuf, 80, pOffsetVal);
|
tFormatOffset(formatBuf, 80, pOffsetVal);
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, prev offset found, offset reset to %s and continue.",
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, prev offset found, offset reset to %s and continue.",
|
||||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), formatBuf);
|
consumerId, pHandle->subKey, vgId, formatBuf);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
|
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
|
||||||
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
|
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
|
||||||
if (pRequest->useSnapshot) {
|
if (pRequest->useSnapshot) {
|
||||||
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
|
||||||
|
consumerId, pHandle->subKey, vgId);
|
||||||
|
|
||||||
if (pHandle->fetchMeta) {
|
if (pHandle->fetchMeta) {
|
||||||
tqOffsetResetToMeta(pOffsetVal, 0);
|
tqOffsetResetToMeta(pOffsetVal, 0);
|
||||||
} else {
|
} else {
|
||||||
|
@ -577,8 +583,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
||||||
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
||||||
|
|
||||||
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId,
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
|
||||||
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
|
pHandle->subKey, vgId, dataRsp.rspOffset.version);
|
||||||
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
tDeleteSMqDataRsp(&dataRsp);
|
||||||
|
|
||||||
|
@ -589,16 +595,14 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
||||||
tqInitTaosxRsp(&taosxRsp, pRequest);
|
tqInitTaosxRsp(&taosxRsp, pRequest);
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||||
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||||
// int32_t code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp);
|
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
|
|
||||||
*pBlockReturned = true;
|
*pBlockReturned = true;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
|
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
|
||||||
tqError("tmq poll: subkey %s, no offset committed for consumer:0x%" PRIx64
|
tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed",
|
||||||
" in vg %d, subkey %s, reset none failed",
|
pHandle->subKey, consumerId, vgId, pRequest->subKey);
|
||||||
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pRequest->subKey);
|
|
||||||
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,7 +92,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("vgId:%d, tmq task executed, get %p", pTq->pVnode->config.vgId, pDataBlock);
|
tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task executed, get %p", pHandle->consumerId, pTq->pVnode->config.vgId, pDataBlock);
|
||||||
|
|
||||||
// current scan should be stopped asap, since the rebalance occurs.
|
// current scan should be stopped asap, since the rebalance occurs.
|
||||||
if (pDataBlock == NULL) {
|
if (pDataBlock == NULL) {
|
||||||
|
@ -120,7 +120,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(pRsp->withTbName || pRsp->withSchema){
|
if (pRsp->withTbName || pRsp->withSchema) {
|
||||||
tqError("get column should not with meta:%d,%d", pRsp->withTbName, pRsp->withSchema);
|
tqError("get column should not with meta:%d,%d", pRsp->withTbName, pRsp->withSchema);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue