fix:[TD-337958]send batch metadata & remove data if subscribe only meta

This commit is contained in:
wangmm0220 2025-02-20 19:50:37 +08:00
parent 39bc844be0
commit 7b5edade95
3 changed files with 134 additions and 57 deletions

View File

@ -1098,6 +1098,22 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* block
*pSubmitTbDataRet = pSubmitTbData;
}
if (fetchMeta == ONLY_META) {
if (pSubmitTbData->pCreateTbReq != NULL) {
if (pRsp->createTableReq == NULL){
pRsp->createTableReq = taosArrayInit(0, POINTER_BYTES);
if (pRsp->createTableReq == NULL){
return terrno;
}
}
if (taosArrayPush(pRsp->createTableReq, &pSubmitTbData->pCreateTbReq) == NULL){
return terrno;
}
pSubmitTbData->pCreateTbReq = NULL;
}
return 0;
}
int32_t sversion = pSubmitTbData->sver;
int64_t uid = pSubmitTbData->uid;
pReader->lastBlkUid = uid;

View File

@ -339,6 +339,9 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
bool tmp = (pSubmitTbData->flags & pRequest->sourceExcluded) != 0;
TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
if (pHandle->fetchMeta == ONLY_META){
goto END;
}
int32_t blockNum = taosArrayGetSize(pBlocks) == 0 ? 1 : taosArrayGetSize(pBlocks);
if (pRsp->withTbName) {
@ -347,7 +350,6 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
TSDB_CHECK_CODE(code, lino, END);
}
tmp = (pHandle->fetchMeta == ONLY_META && pSubmitTbData->pCreateTbReq == NULL);
TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
for (int32_t i = 0; i < blockNum; i++) {
if (taosArrayGetSize(pBlocks) == 0){

View File

@ -226,6 +226,82 @@ static void tDeleteCommon(void* parm) {}
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : \
(pRequest->rawData == 1 ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP)
static int32_t buildBatchMeta(SMqBatchMetaRsp *btMetaRsp, int16_t type, int32_t bodyLen, void* body){
int32_t code = 0;
if (!btMetaRsp->batchMetaReq) {
btMetaRsp->batchMetaReq = taosArrayInit(4, POINTER_BYTES);
TQ_NULL_GO_TO_END(btMetaRsp->batchMetaReq);
btMetaRsp->batchMetaLen = taosArrayInit(4, sizeof(int32_t));
TQ_NULL_GO_TO_END(btMetaRsp->batchMetaLen);
}
SMqMetaRsp tmpMetaRsp = {0};
tmpMetaRsp.resMsgType = type;
tmpMetaRsp.metaRspLen = bodyLen;
tmpMetaRsp.metaRsp = body;
uint32_t len = 0;
tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
if (TSDB_CODE_SUCCESS != code) {
tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
goto END;
}
int32_t tLen = sizeof(SMqRspHead) + len;
void* tBuf = taosMemoryCalloc(1, tLen);
TQ_NULL_GO_TO_END(tBuf);
void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
SEncoder encoder = {0};
tEncoderInit(&encoder, metaBuff, len);
code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
tEncoderClear(&encoder);
if (code < 0) {
tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
goto END;
}
TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaReq, &tBuf));
TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaLen, &tLen));
END:
return code;
}
static int32_t buildBatchCreateContent(SMqDataRsp *taosxRsp, void** pBuf, int32_t *len){
int32_t code = 0;
SVCreateTbBatchReq pReq = {0};
pReq.nReqs = taosArrayGetSize(taosxRsp->createTableReq);
pReq.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
TQ_NULL_GO_TO_END(pReq.pArray);
for (int i = 0; i < taosArrayGetSize(taosxRsp->createTableReq); i++){
void *createTableReq = taosArrayGetP(taosxRsp->createTableReq, i);
TQ_NULL_GO_TO_END(taosArrayPush(pReq.pArray, createTableReq));
}
tEncodeSize(tEncodeSVCreateTbBatchReq, &pReq, *len, code);
if (code < 0) {
goto END;
}
len += sizeof(SMsgHead);
*pBuf = taosMemoryMalloc(*len);
TQ_NULL_GO_TO_END(pBuf);
SEncoder coder = {0};
tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *len);
code = tEncodeSVCreateTbBatchReq(&coder, &pReq);
tEncoderClear(&coder);
END:
taosArrayDestroy(pReq.pArray);
return code;
}
#define SEND_BATCH_META_RSP \
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);\
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);\
goto END;
#define SEND_DATA_RSP \
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);\
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, POLL_RSP_TYPE(pRequest, taosxRsp), vgId);\
goto END;
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* offset) {
int32_t vgId = TD_VID(pTq->pVnode);
@ -261,6 +337,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
uint64_t st = taosGetTimestampMs();
int totalRows = 0;
int32_t totalMetaRows = 0;
int32_t totalAutoMetaRows = 0;
while (1) {
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
if (savedEpoch > pRequest->epoch) {
@ -271,15 +348,10 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
}
if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
if (totalMetaRows > 0) {
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
goto END;
if (totalMetaRows > 0 || totalAutoMetaRows > 0) {
SEND_BATCH_META_RSP
}
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
POLL_RSP_TYPE(pRequest, taosxRsp), vgId);
goto END;
SEND_DATA_RSP
}
SWalCont* pHead = &pHandle->pWalReader->pHead->head;
@ -288,11 +360,11 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
// process meta
if (pHead->msgType != TDMT_VND_SUBMIT) {
if (totalAutoMetaRows > 0){
SEND_BATCH_META_RSP
}
if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
POLL_RSP_TYPE(pRequest, taosxRsp), vgId);
goto END;
SEND_DATA_RSP
}
if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
@ -318,53 +390,20 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
goto END;
}
if (!btMetaRsp.batchMetaReq) {
btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES);
TQ_NULL_GO_TO_END(btMetaRsp.batchMetaReq);
btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t));
TQ_NULL_GO_TO_END(btMetaRsp.batchMetaLen);
}
code = buildBatchMeta(&btMetaRsp, pHead->msgType, pHead->bodyLen, pHead->body);
fetchVer++;
SMqMetaRsp tmpMetaRsp = {0};
tmpMetaRsp.resMsgType = pHead->msgType;
tmpMetaRsp.metaRspLen = pHead->bodyLen;
tmpMetaRsp.metaRsp = pHead->body;
uint32_t len = 0;
tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
if (TSDB_CODE_SUCCESS != code) {
tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
if (code != 0){
continue;
}
int32_t tLen = sizeof(SMqRspHead) + len;
void* tBuf = taosMemoryCalloc(1, tLen);
TQ_NULL_GO_TO_END(tBuf);
void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
SEncoder encoder = {0};
tEncoderInit(&encoder, metaBuff, len);
code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
tEncoderClear(&encoder);
if (code < 0) {
tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
continue;
}
TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaReq, &tBuf));
TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaLen, &tLen));
totalMetaRows++;
if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > pRequest->timeout)) {
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
goto END;
SEND_BATCH_META_RSP
}
continue;
}
if (totalMetaRows > 0) {
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
goto END;
SEND_BATCH_META_RSP
}
// process data
@ -376,17 +415,37 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest));
if (pHandle->fetchMeta == ONLY_META){
int32_t len = 0;
void *pBuf = NULL;
code = buildBatchCreateContent(&taosxRsp, &pBuf, &len);
if (code == 0){
code = buildBatchMeta(&btMetaRsp, TDMT_VND_CREATE_TABLE, len, pBuf);
}
taosMemoryFree(pBuf);
taosArrayDestroyP(taosxRsp.createTableReq, NULL);
taosxRsp.createTableReq = NULL;
fetchVer++;
if (code != 0){
continue;
}
totalAutoMetaRows ++;
if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > pRequest->timeout)) {
SEND_BATCH_META_RSP
}
continue;
}
if ((pRequest->rawData == 0 && totalRows >= pRequest->minPollRows) ||
(taosGetTimestampMs() - st > pRequest->timeout) ||
(pRequest->rawData != 0 && (taosArrayGetSize(taosxRsp.blockData) > pRequest->minPollRows ||
terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT))) {
// tqDebug("start to send rsp, block num:%d, totalRows:%d, createTableNum:%d, terrno:%d",
// (int)taosArrayGetSize(taosxRsp.blockData), totalRows, taosxRsp.createTableNum, terrno);
tqOffsetResetToLog(&taosxRsp.rspOffset, terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT ? fetchVer : fetchVer + 1);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
POLL_RSP_TYPE(pRequest, taosxRsp), vgId);
if (terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){terrno = 0;}
goto END;
if (terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){
terrno = 0;
} else{
fetchVer++;
}
SEND_DATA_RSP
} else {
fetchVer++;
}