From 7fc2e8cc26ab977aa41c8cecedd11266cd593779 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 28 May 2024 10:17:20 +0800 Subject: [PATCH] batch meta --- source/client/src/clientRawBlockWrite.c | 12 ++++- source/client/src/clientTmq.c | 45 +++++++++++++--- source/dnode/vnode/src/tq/tqUtil.c | 68 ++++++++++++++++++++----- source/libs/executor/src/scanoperator.c | 3 +- utils/test/c/tmq_taosx_ci.c | 6 +-- 5 files changed, 107 insertions(+), 27 deletions(-) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index a37cd99360..d5f478d188 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -2051,6 +2051,7 @@ static char* processSimpleMeta(SMqMetaRsp* pMetaRsp) { static char* processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp) { SDecoder coder; SMqBatchMetaRsp rsp = {0}; + SArray* strArray = NULL; tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen); if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) { goto _end; @@ -2058,7 +2059,7 @@ static char* processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp) { int64_t fullSize = 0; int32_t num = taosArrayGetSize(rsp.batchMetaReq); - SArray* strArray = taosArrayInit(num, POINTER_BYTES); + strArray = taosArrayInit(num, POINTER_BYTES); for (int32_t i = 0; i < num; i++) { int32_t len = *(int32_t*)taosArrayGet(rsp.batchMetaLen, i); void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i); @@ -2078,10 +2079,17 @@ static char* processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp) { for (int32_t i = 0; i < num; i++) { char* subStr = taosArrayGetP(strArray, i); strcat(buf, subStr); - strcat(buf, "\n"); + if (i < num - 1) { + strcat(buf, "\n"); + } } + taosArrayDestroyP(strArray, taosMemoryFree); + tDeleteMqBatchMetaRsp(&rsp); + return buf; _end: + taosArrayDestroyP(strArray, taosMemoryFree); + tDeleteMqBatchMetaRsp(&rsp); return NULL; } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 3fbae1b774..69a4503680 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2009,7 +2009,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); } - } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { + } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; int32_t consumerEpoch = atomic_load_32(&tmq->epoch); @@ -2032,12 +2032,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true); // build rsp - void* pRsp = NULL; - if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { - pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); - } else { - pRsp = tmqBuildBatchMetaRspFromWrapper(pollRspWrapper); - } + SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); taosFreeQitem(pRspWrapper); taosWUnLockLatch(&tmq->lock); return pRsp; @@ -2048,6 +2043,42 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); } + } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { + SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; + int32_t consumerEpoch = atomic_load_32(&tmq->epoch); + + tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId); + + if (pollRspWrapper->batchMetaRsp.head.epoch == consumerEpoch) { + taosWLockLatch(&tmq->lock); + SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); + pollRspWrapper->vgHandle = pVg; + pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); + if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) { + tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, + pollRspWrapper->topicName, pollRspWrapper->vgId); + tmqFreeRspWrapper(pRspWrapper); + taosFreeQitem(pRspWrapper); + taosWUnLockLatch(&tmq->lock); + return NULL; + } + + // build rsp + void* pRsp = NULL; + updateVgInfo(pVg, &pollRspWrapper->batchMetaRsp.rspOffset, &pollRspWrapper->batchMetaRsp.rspOffset, + pollRspWrapper->batchMetaRsp.head.walsver, pollRspWrapper->batchMetaRsp.head.walever, + tmq->consumerId, true); + pRsp = tmqBuildBatchMetaRspFromWrapper(pollRspWrapper); + taosFreeQitem(pRspWrapper); + taosWUnLockLatch(&tmq->lock); + return pRsp; + } else { + tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", + tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->batchMetaRsp.head.epoch, consumerEpoch); + setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); + tmqFreeRspWrapper(pRspWrapper); + taosFreeQitem(pRspWrapper); + } } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; int32_t consumerEpoch = atomic_load_32(&tmq->epoch); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index df7539bb69..88c41b99ef 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -191,14 +191,13 @@ end : { static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* offset) { - int code = 0; - int32_t vgId = TD_VID(pTq->pVnode); - SMqMetaRsp metaRsp = {0}; - STaosxRsp taosxRsp = {0}; + int code = 0; + int32_t vgId = TD_VID(pTq->pVnode); + STaosxRsp taosxRsp = {0}; + SMqBatchMetaRsp btMetaRsp = {0}; tqInitTaosxRsp(&taosxRsp.common, *offset); if (offset->type != TMQ_OFFSET__LOG) { - SMqBatchMetaRsp btMetaRsp = {0}; if (tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset) < 0) { code = -1; goto end; @@ -232,11 +231,18 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, uint64_t st = taosGetTimestampMs(); int totalRows = 0; + int32_t totalMetaRows = 0; while (1) { int32_t savedEpoch = atomic_load_32(&pHandle->epoch); ASSERT(savedEpoch <= pRequest->epoch); if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) { + if (totalMetaRows > 0) { + tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer); + tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId); + ASSERT(totalRows == 0); + goto end; + } tqOffsetResetToLog(&taosxRsp.common.rspOffset, fetchVer); code = tqSendDataRsp( pHandle, pMsg, pRequest, &taosxRsp, @@ -270,12 +276,48 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } } - tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); - tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1); - metaRsp.resMsgType = pHead->msgType; - metaRsp.metaRspLen = pHead->bodyLen; - metaRsp.metaRsp = pHead->body; - code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId); + tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s", pHead->version, vgId, TMSG_INFO(pHead->msgType)); + if (!btMetaRsp.batchMetaReq) { + btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES); + btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t)); + } + fetchVer++; + + SMqMetaRsp tmpMetaRsp = {0}; + tmpMetaRsp.resMsgType = pHead->msgType; + tmpMetaRsp.metaRspLen = pHead->bodyLen; + tmpMetaRsp.metaRsp = pHead->body; + uint32_t len = 0; + tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code); + if (TSDB_CODE_SUCCESS != code) { + tqError("tmq extract meta from log, tEncodeMqMetaRsp error"); + continue; + } + int32_t tLen = sizeof(SMqRspHead) + len; + void* tBuf = taosMemoryCalloc(1, tLen); + void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead)); + SEncoder encoder = {0}; + tEncoderInit(&encoder, metaBuff, len); + code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp); + if (code < 0) { + tEncoderClear(&encoder); + tqError("tmq extract meta from log, tEncodeMqMetaRsp error"); + continue; + } + taosArrayPush(btMetaRsp.batchMetaReq, &tBuf); + taosArrayPush(btMetaRsp.batchMetaLen, &tLen); + totalMetaRows++; + if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > 1000)) { + tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer); + tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId); + goto end; + } + continue; + } + + if (totalMetaRows > 0) { + tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer); + tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId); goto end; } @@ -382,8 +424,8 @@ int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SM SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0}; tmsgSendRsp(&resp); - tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, offset type:%d", vgId, - pReq->consumerId, pReq->epoch, pRsp->rspOffset.type); + tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%d offset type:%d", vgId, + pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type); return 0; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 72ea9afae7..851d2f2735 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -44,7 +44,6 @@ int32_t scanDebug = 0; #define STREAM_SCAN_OP_NAME "StreamScanOperator" #define STREAM_SCAN_OP_STATE_NAME "StreamScanFillHistoryState" #define STREAM_SCAN_OP_CHECKPOINT_NAME "StreamScanOperator_Checkpoint" -#define TMQ_MAX_BATCH_SIZE 4096 typedef struct STableMergeScanExecInfo { SFileBlockLoadRecorder blockRecorder; @@ -2894,7 +2893,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { return NULL; } else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) { SSnapContext* sContext = pInfo->sContext; - for(int32_t i = 0; i < TMQ_MAX_BATCH_SIZE; i++) { + for(int32_t i = 0; i < tmqRowSize; i++) { void* data = NULL; int32_t dataLen = 0; int16_t type = 0; diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index 5642b801f4..ef2d70f54f 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -1064,7 +1064,7 @@ void testConsumeExcluded(int topic_type) { char* topic = "create topic topic_excluded with meta as database db_taosx"; pRes = taos_query(pConn, topic); if (taos_errno(pRes) != 0) { - printf("failed to create topic topic_excluded, reason:%s\n", taos_errstr(pRes)); + printf("failed to create topic topic_excluded1, reason:%s\n", taos_errstr(pRes)); taos_close(pConn); return; } @@ -1073,7 +1073,7 @@ void testConsumeExcluded(int topic_type) { char* topic = "create topic topic_excluded as select * from stt"; pRes = taos_query(pConn, topic); if (taos_errno(pRes) != 0) { - printf("failed to create topic topic_excluded, reason:%s\n", taos_errstr(pRes)); + printf("failed to create topic topic_excluded2, reason:%s\n", taos_errstr(pRes)); taos_close(pConn); return; } @@ -1115,7 +1115,7 @@ void testConsumeExcluded(int topic_type) { assert(raw.raw_type != 2 && raw.raw_type != 4 && raw.raw_type != TDMT_VND_CREATE_STB && raw.raw_type != TDMT_VND_ALTER_STB && raw.raw_type != TDMT_VND_CREATE_TABLE && raw.raw_type != TDMT_VND_ALTER_TABLE && raw.raw_type != TDMT_VND_DELETE); - assert(raw.raw_type == TDMT_VND_DROP_STB || raw.raw_type == TDMT_VND_DROP_TABLE); + assert(raw.raw_type == TDMT_VND_DROP_STB || raw.raw_type == TDMT_VND_DROP_TABLE || raw.raw_type == 5); } else if (topic_type == 2) { assert(0); }