From 441a150d6615bb89cfa1d285ac0f7bbf2d4eb190 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 23 May 2024 17:35:54 +0800 Subject: [PATCH 01/11] optimize tmq snapshot meta --- include/client/taos.h | 1 + include/common/tcommon.h | 1 + include/common/tmsg.h | 14 ++ include/libs/executor/executor.h | 2 +- include/util/tencode.h | 1 + source/client/inc/clientInt.h | 15 +- source/client/src/clientMain.c | 34 +++-- source/client/src/clientRawBlockWrite.c | 181 ++++++++++++++++++------ source/client/src/clientTmq.c | 64 +++++++-- source/common/src/tmsg.c | 52 +++++++ source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tqScan.c | 8 +- source/dnode/vnode/src/tq/tqUtil.c | 50 ++++++- source/libs/executor/inc/querytask.h | 2 +- source/libs/executor/src/executor.c | 4 +- source/libs/executor/src/scanoperator.c | 77 +++++++--- utils/test/c/tmq_taosx_ci.c | 1 + 17 files changed, 399 insertions(+), 110 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 45dc85f6d9..66b5b92869 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -283,6 +283,7 @@ typedef enum tmq_res_t { TMQ_RES_DATA = 1, TMQ_RES_TABLE_META = 2, TMQ_RES_METADATA = 3, + TMQ_RES_BATCH_TABLE_META = 4, } tmq_res_t; typedef struct tmq_topic_assignment { diff --git a/include/common/tcommon.h b/include/common/tcommon.h index d28477ae40..a5a805ff40 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -144,6 +144,7 @@ enum { TMQ_MSG_TYPE__EP_RSP, TMQ_MSG_TYPE__POLL_DATA_META_RSP, TMQ_MSG_TYPE__WALINFO_RSP, + TMQ_MSG_TYPE__POLL_BATCH_META_RSP, }; enum { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 3ed6b40d4d..1657a17ff0 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3978,6 +3978,20 @@ int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const void* pRsp); int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, void* pRsp); void tDeleteSTaosxRsp(void* pRsp); +typedef struct SMqBatchMetaRsp { + SMqRspHead head; // not serialize + STqOffsetVal rspOffset; + SArray* batchMetaLen; + SArray* batchMetaReq; + void* pMetaBuff; // not serialize + uint32_t metaBuffLen; // not serialize +} SMqBatchMetaRsp; + +int32_t tEncodeMqBatchMetaRsp(SEncoder* pEncoder, const SMqBatchMetaRsp* pRsp); +int32_t tDecodeMqBatchMetaRsp(SDecoder* pDecoder, SMqBatchMetaRsp* pRsp); +int32_t tSemiDecodeMqBatchMetaRsp(SDecoder* pDecoder, SMqBatchMetaRsp* pRsp); +void tDeleteMqBatchMetaRsp(SMqBatchMetaRsp* pRsp); + typedef struct { SMqRspHead head; char cgroup[TSDB_CGROUP_LEN]; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index fe20639c77..9b557500f0 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -201,7 +201,7 @@ void qStreamSetSourceExcluded(qTaskInfo_t tinfo, int8_t sourceExcluded); void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); -SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); +SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo); diff --git a/include/util/tencode.h b/include/util/tencode.h index 596fa2b4d3..b8da040689 100644 --- a/include/util/tencode.h +++ b/include/util/tencode.h @@ -77,6 +77,7 @@ typedef struct { #define TD_CODER_CURRENT(CODER) ((CODER)->data + (CODER)->pos) #define TD_CODER_MOVE_POS(CODER, MOVE) ((CODER)->pos += (MOVE)) #define TD_CODER_CHECK_CAPACITY_FAILED(CODER, EXPSIZE) (((CODER)->size - (CODER)->pos) < (EXPSIZE)) +#define TD_CODER_REMAIN_CAPACITY(CODER) ((CODER)->size - (CODER)->pos) #define tEncodeSize(E, S, SIZE, RET) \ do { \ diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 9507472df0..577b7d7ac4 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -44,6 +44,7 @@ enum { RES_TYPE__TMQ, RES_TYPE__TMQ_META, RES_TYPE__TMQ_METADATA, + RES_TYPE__TMQ_BATCH_META, }; #define SHOW_VARIABLES_RESULT_COLS 3 @@ -51,10 +52,11 @@ enum { #define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE) #define SHOW_VARIABLES_RESULT_FIELD3_LEN (TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE) -#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) -#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) -#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META) -#define TD_RES_TMQ_METADATA(res) (*(int8_t*)res == RES_TYPE__TMQ_METADATA) +#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) +#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) +#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META) +#define TD_RES_TMQ_METADATA(res) (*(int8_t*)res == RES_TYPE__TMQ_METADATA) +#define TD_RES_TMQ_BATCH_META(res) (*(int8_t*)res == RES_TYPE__TMQ_BATCH_META) typedef struct SAppInstInfo SAppInstInfo; @@ -241,6 +243,11 @@ typedef struct { STaosxRsp rsp; } SMqTaosxRspObj; +typedef struct { + SMqRspObjCommon common; + SMqBatchMetaRsp rsp; +} SMqBatchMetaRspObj; + typedef struct SReqRelInfo { uint64_t userRefId; uint64_t prevRefId; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index ba7f65c52b..da5da044a7 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -301,7 +301,7 @@ void taos_close(TAOS *taos) { } int taos_errno(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ_META(res)) { + if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return terrno; } @@ -313,7 +313,7 @@ int taos_errno(TAOS_RES *res) { } const char *taos_errstr(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ_META(res)) { + if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return (const char *)tstrerror(terrno); } @@ -354,6 +354,10 @@ void taos_free_result(TAOS_RES *res) { SMqMetaRspObj *pRspObj = (SMqMetaRspObj *)res; tDeleteMqMetaRsp(&pRspObj->metaRsp); taosMemoryFree(pRspObj); + } else if (TD_RES_TMQ_BATCH_META(res)) { + SMqBatchMetaRspObj *pBtRspObj = (SMqBatchMetaRspObj *)res; + tDeleteMqBatchMetaRsp(&pBtRspObj->rsp); + taosMemoryFree(pBtRspObj); } } @@ -371,7 +375,7 @@ void taos_kill_query(TAOS *taos) { } int taos_field_count(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ_META(res)) { + if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return 0; } @@ -382,7 +386,7 @@ int taos_field_count(TAOS_RES *res) { int taos_num_fields(TAOS_RES *res) { return taos_field_count(res); } TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { - if (taos_num_fields(res) == 0 || TD_RES_TMQ_META(res)) { + if (taos_num_fields(res) == 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return NULL; } @@ -437,7 +441,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { pResultInfo->current += 1; return pResultInfo->row; } - } else if (TD_RES_TMQ_META(res)) { + } else if (TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return NULL; } else { // assert to avoid un-initialization error @@ -548,7 +552,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) } int *taos_fetch_lengths(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ_META(res)) { + if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return NULL; } @@ -557,7 +561,7 @@ int *taos_fetch_lengths(TAOS_RES *res) { } TAOS_ROW *taos_result_block(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ_META(res)) { + if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { terrno = TSDB_CODE_INVALID_PARA; return NULL; } @@ -625,7 +629,7 @@ const char *taos_get_client_info() { return version; } // return int32_t int taos_affected_rows(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) { + if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) { return 0; } @@ -636,7 +640,7 @@ int taos_affected_rows(TAOS_RES *res) { // return int64_t int64_t taos_affected_rows64(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) { + if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) { return 0; } @@ -646,7 +650,7 @@ int64_t taos_affected_rows64(TAOS_RES *res) { } int taos_result_precision(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ_META(res)) { + if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return TSDB_TIME_PRECISION_MILLI; } @@ -686,7 +690,7 @@ int taos_select_db(TAOS *taos, const char *db) { } void taos_stop_query(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) { + if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) { return; } @@ -694,7 +698,7 @@ void taos_stop_query(TAOS_RES *res) { } bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { - if (res == NULL || TD_RES_TMQ_META(res)) { + if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return true; } SReqResultInfo *pResultInfo = tscGetCurResInfo(res); @@ -719,7 +723,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { } int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { - if (res == NULL || TD_RES_TMQ_META(res)) { + if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return 0; } @@ -761,7 +765,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { *numOfRows = 0; *pData = NULL; - if (res == NULL || TD_RES_TMQ_META(res)) { + if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return 0; } @@ -797,7 +801,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { } int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { - if (res == NULL || TD_RES_TMQ_META(res)) { + if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return 0; } diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index c5069d46aa..a37cd99360 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -26,6 +26,8 @@ #define LOG_ID_TAG "connId:0x%" PRIx64 ",reqId:0x%" PRIx64 #define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId +static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen); + static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); } static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, @@ -2025,38 +2027,81 @@ end: return code; } +static char* processSimpleMeta(SMqMetaRsp* pMetaRsp) { + if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) { + return processCreateStb(pMetaRsp); + } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) { + return processAlterStb(pMetaRsp); + } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_STB) { + return processDropSTable(pMetaRsp); + } else if (pMetaRsp->resMsgType == TDMT_VND_CREATE_TABLE) { + return processCreateTable(pMetaRsp); + } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_TABLE) { + return processAlterTable(pMetaRsp); + } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) { + return processDropTable(pMetaRsp); + } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) { + return processDropTable(pMetaRsp); + } else if (pMetaRsp->resMsgType == TDMT_VND_DELETE) { + return processDeleteTable(pMetaRsp); + } + + return NULL; +} +static char* processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp) { + SDecoder coder; + SMqBatchMetaRsp rsp = {0}; + tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen); + if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) { + goto _end; + } + + int64_t fullSize = 0; + int32_t num = taosArrayGetSize(rsp.batchMetaReq); + SArray* strArray = taosArrayInit(num, POINTER_BYTES); + for (int32_t i = 0; i < num; i++) { + int32_t len = *(int32_t*)taosArrayGet(rsp.batchMetaLen, i); + void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i); + SDecoder metaCoder = {0}; + SMqMetaRsp metaRsp = {0}; + tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), len - sizeof(SMqRspHead)); + if(tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0 ) { + goto _end; + } + char* subStr = processSimpleMeta(&metaRsp); + tDeleteMqMetaRsp(&metaRsp); + fullSize += strlen(subStr); + taosArrayPush(strArray, &subStr); + } + + char* buf = (char*)taosMemoryCalloc(1, fullSize + num + 1); + for (int32_t i = 0; i < num; i++) { + char* subStr = taosArrayGetP(strArray, i); + strcat(buf, subStr); + strcat(buf, "\n"); + } + +_end: + return NULL; +} + char* tmq_get_json_meta(TAOS_RES* res) { if (res == NULL) return NULL; uDebug("tmq_get_json_meta res:%p", res); - if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res)) { + if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res) && !TD_RES_TMQ_BATCH_META(res)) { return NULL; } if (TD_RES_TMQ_METADATA(res)) { SMqTaosxRspObj* pMetaDataRspObj = (SMqTaosxRspObj*)res; return processAutoCreateTable(&pMetaDataRspObj->rsp); + } else if (TD_RES_TMQ_BATCH_META(res)) { + SMqBatchMetaRspObj* pBatchMetaRspObj = (SMqBatchMetaRspObj*)res; + return processBatchMetaToJson(&pBatchMetaRspObj->rsp); } SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; - if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) { - return processCreateStb(&pMetaRspObj->metaRsp); - } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB) { - return processAlterStb(&pMetaRspObj->metaRsp); - } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB) { - return processDropSTable(&pMetaRspObj->metaRsp); - } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE) { - return processCreateTable(&pMetaRspObj->metaRsp); - } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE) { - return processAlterTable(&pMetaRspObj->metaRsp); - } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) { - return processDropTable(&pMetaRspObj->metaRsp); - } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) { - return processDropTable(&pMetaRspObj->metaRsp); - } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) { - return processDeleteTable(&pMetaRspObj->metaRsp); - } - - return NULL; + return processSimpleMeta(&pMetaRspObj->metaRsp); } void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); } @@ -2147,6 +2192,12 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { } raw->raw_type = RES_TYPE__TMQ_METADATA; uDebug("tmq get raw type metadata:%p", raw); + } else if (TD_RES_TMQ_BATCH_META(res)) { + SMqBatchMetaRspObj* pBtMetaRspObj = (SMqBatchMetaRspObj*)res; + raw->raw = pBtMetaRspObj->rsp.pMetaBuff; + raw->raw_len = pBtMetaRspObj->rsp.metaBuffLen; + raw->raw_type = RES_TYPE__TMQ_BATCH_META; + uDebug("tmq get raw batch meta:%p", raw); } else { uError("tmq get raw error type:%d", *(int8_t*)res); terrno = TSDB_CODE_TMQ_INVALID_MSG; @@ -2163,32 +2214,74 @@ void tmq_free_raw(tmq_raw_data raw) { memset(terrMsg, 0, ERR_MSG_LEN); } +static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) { + if (type == TDMT_VND_CREATE_STB) { + return taosCreateStb(taos, buf, len); + } else if (type == TDMT_VND_ALTER_STB) { + return taosCreateStb(taos, buf, len); + } else if (type == TDMT_VND_DROP_STB) { + return taosDropStb(taos, buf, len); + } else if (type == TDMT_VND_CREATE_TABLE) { + return taosCreateTable(taos, buf, len); + } else if (type == TDMT_VND_ALTER_TABLE) { + return taosAlterTable(taos, buf, len); + } else if (type == TDMT_VND_DROP_TABLE) { + return taosDropTable(taos, buf, len); + } else if (type == TDMT_VND_DELETE) { + return taosDeleteData(taos, buf, len); + } else if (type == RES_TYPE__TMQ) { + return tmqWriteRawDataImpl(taos, buf, len); + } else if (type == RES_TYPE__TMQ_METADATA) { + return tmqWriteRawMetaDataImpl(taos, buf, len); + } else if (type == RES_TYPE__TMQ_BATCH_META) { + return tmqWriteBatchMetaDataImpl(taos, buf, len); + } + return TSDB_CODE_INVALID_PARA; +} + int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) { if (!taos) { - goto end; + return TSDB_CODE_INVALID_PARA; } - if (raw.raw_type == TDMT_VND_CREATE_STB) { - return taosCreateStb(taos, raw.raw, raw.raw_len); - } else if (raw.raw_type == TDMT_VND_ALTER_STB) { - return taosCreateStb(taos, raw.raw, raw.raw_len); - } else if (raw.raw_type == TDMT_VND_DROP_STB) { - return taosDropStb(taos, raw.raw, raw.raw_len); - } else if (raw.raw_type == TDMT_VND_CREATE_TABLE) { - return taosCreateTable(taos, raw.raw, raw.raw_len); - } else if (raw.raw_type == TDMT_VND_ALTER_TABLE) { - return taosAlterTable(taos, raw.raw, raw.raw_len); - } else if (raw.raw_type == TDMT_VND_DROP_TABLE) { - return taosDropTable(taos, raw.raw, raw.raw_len); - } else if (raw.raw_type == TDMT_VND_DELETE) { - return taosDeleteData(taos, raw.raw, raw.raw_len); - } else if (raw.raw_type == RES_TYPE__TMQ) { - return tmqWriteRawDataImpl(taos, raw.raw, raw.raw_len); - } else if (raw.raw_type == RES_TYPE__TMQ_METADATA) { - return tmqWriteRawMetaDataImpl(taos, raw.raw, raw.raw_len); - } - -end: - terrno = TSDB_CODE_INVALID_PARA; - return terrno; + return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type); +} + +static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen) { + if (taos == NULL || meta == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + SMqBatchMetaRsp rsp = {0}; + SDecoder coder; + int32_t code = TSDB_CODE_SUCCESS; + + // decode and process req + tDecoderInit(&coder, meta, metaLen); + if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) { + code = TSDB_CODE_INVALID_PARA; + goto _end; + } + int32_t num = taosArrayGetSize(rsp.batchMetaReq); + for (int32_t i = 0; i < num; i++) { + int32_t len = *(int32_t*)taosArrayGet(rsp.batchMetaLen, i); + void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i); + SDecoder metaCoder = {0}; + SMqMetaRsp metaRsp = {0}; + tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), len - sizeof(SMqRspHead)); + if (tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0) { + code = TSDB_CODE_INVALID_PARA; + goto _end; + } + code = writeRawImpl(taos, metaRsp.metaRsp, metaRsp.metaRspLen, metaRsp.resMsgType); + tDeleteMqMetaRsp(&metaRsp); + if (code != TSDB_CODE_SUCCESS) { + goto _end; + } + } + +_end: + tDeleteMqBatchMetaRsp(&rsp); + errno = code; + return code; } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 3141e21f26..df08a66bd3 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -171,9 +171,10 @@ typedef struct { uint64_t reqId; SEpSet* pEpset; union { - SMqDataRsp dataRsp; - SMqMetaRsp metaRsp; - STaosxRsp taosxRsp; + SMqDataRsp dataRsp; + SMqMetaRsp metaRsp; + STaosxRsp taosxRsp; + SMqBatchMetaRsp batchMetaRsp; }; } SMqPollRspWrapper; @@ -635,6 +636,11 @@ static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_c pTopicName = pRspObj->common.topic; vgId = pRspObj->common.vgId; offsetVal = pRspObj->rsp.common.rspOffset; + } else if (TD_RES_TMQ_BATCH_META(pRes)) { + SMqBatchMetaRspObj* pBtRspObj = (SMqBatchMetaRspObj*)pRes; + pTopicName = pBtRspObj->common.topic; + vgId = pBtRspObj->common.vgId; + offsetVal = pBtRspObj->rsp.rspOffset; } else { code = TSDB_CODE_TMQ_INVALID_MSG; goto end; @@ -934,7 +940,11 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; taosMemoryFreeClear(pRsp->pEpset); tDeleteSTaosxRsp(&pRsp->taosxRsp); - } + } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { + SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; + taosMemoryFreeClear(pRsp->pEpset); + tDeleteMqBatchMetaRsp(&pRsp->batchMetaRsp); + } } void tmqClearUnhandleMsg(tmq_t* tmq) { @@ -1418,6 +1428,17 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } tDecoderClear(&decoder); memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead)); + } else if (rspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { + SDecoder decoder; + tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); + if(tSemiDecodeMqBatchMetaRsp(&decoder, &pRspWrapper->batchMetaRsp) < 0){ + tDecoderClear(&decoder); + taosReleaseRef(tmqMgmt.rsetId, refId); + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } + tDecoderClear(&decoder); + memcpy(&pRspWrapper->batchMetaRsp, pMsg->pData, sizeof(SMqRspHead)); } else { // invalid rspType tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType); } @@ -1616,6 +1637,19 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { return pRspObj; } +SMqBatchMetaRspObj* tmqBuildBatchMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { + SMqBatchMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqBatchMetaRspObj)); + if(pRspObj == NULL) { + return NULL; + } + pRspObj->common.resType = RES_TYPE__TMQ_BATCH_META; + tstrncpy(pRspObj->common.topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); + tstrncpy(pRspObj->common.db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); + pRspObj->common.vgId = pWrapper->vgHandle->vgId; + + memcpy(&pRspObj->rsp, &pWrapper->batchMetaRsp, sizeof(SMqBatchMetaRsp)); + return pRspObj; +} void changeByteEndian(char* pData){ char* p = pData; @@ -1972,7 +2006,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); } - } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { + } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; int32_t consumerEpoch = atomic_load_32(&tmq->epoch); @@ -1995,7 +2029,12 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true); // build rsp - SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); + void* pRsp = NULL; + if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { + pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); + } else { + pRsp = tmqBuildBatchMetaRspFromWrapper(pollRspWrapper); + } taosFreeQitem(pRspWrapper); taosWUnLockLatch(&tmq->lock); return pRsp; @@ -2217,6 +2256,8 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) { return TMQ_RES_TABLE_META; } else if (TD_RES_TMQ_METADATA(res)) { return TMQ_RES_METADATA; + } else if (TD_RES_TMQ_BATCH_META(res)) { + return TMQ_RES_BATCH_TABLE_META; } else { return TMQ_RES_INVALID; } @@ -2226,7 +2267,7 @@ const char* tmq_get_topic_name(TAOS_RES* res) { if (res == NULL) { return NULL; } - if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { + if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) { return strchr(((SMqRspObjCommon*)res)->topic, '.') + 1; } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; @@ -2241,7 +2282,7 @@ const char* tmq_get_db_name(TAOS_RES* res) { return NULL; } - if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { + if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) { return strchr(((SMqRspObjCommon*)res)->db, '.') + 1; } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; @@ -2255,7 +2296,7 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { if (res == NULL) { return -1; } - if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { + if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) { return ((SMqRspObjCommon*)res)->vgId; } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; @@ -2282,6 +2323,11 @@ int64_t tmq_get_vgroup_offset(TAOS_RES* res) { if (pRspObj->metaRsp.rspOffset.type == TMQ_OFFSET__LOG) { return pRspObj->metaRsp.rspOffset.version; } + } else if (TD_RES_TMQ_BATCH_META(res)) { + SMqBatchMetaRspObj* pBtRspObj = (SMqBatchMetaRspObj*)res; + if (pBtRspObj->rsp.rspOffset.type == TMQ_OFFSET__LOG) { + return pBtRspObj->rsp.rspOffset.version; + } } else { tscError("invalid tmq type:%d", *(int8_t*)res); } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index b7d1417451..3f76d03289 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -10557,3 +10557,55 @@ void tFreeFetchTtlExpiredTbsRsp(void *p) { SVFetchTtlExpiredTbsRsp *pRsp = p; taosArrayDestroy(pRsp->pExpiredTbs); } + +int32_t tEncodeMqBatchMetaRsp(SEncoder *pEncoder, const SMqBatchMetaRsp *pRsp) { + if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1; + + int32_t size = taosArrayGetSize(pRsp->batchMetaReq); + if (tEncodeI32(pEncoder, size) < 0) return -1; + if (size > 0) { + for (int32_t i = 0; i < size; i++) { + void *pMetaReq = taosArrayGetP(pRsp->batchMetaReq, i); + int32_t metaLen = *(int32_t *)taosArrayGet(pRsp->batchMetaLen, i); + if (tEncodeBinary(pEncoder, pMetaReq, metaLen) < 0) return -1; + } + } + return 0; +} + +int32_t tDecodeMqBatchMetaRsp(SDecoder *pDecoder, SMqBatchMetaRsp *pRsp) { + int32_t size = 0; + if (tDecodeI32(pDecoder, &size) < 0) return -1; + if (size > 0) { + pRsp->batchMetaReq = taosArrayInit(size, POINTER_BYTES); + pRsp->batchMetaLen = taosArrayInit(size, sizeof(int32_t)); + for (int32_t i = 0; i < size; i++) { + void *pCreate = NULL; + uint64_t len = 0; + if (tDecodeBinaryAlloc(pDecoder, &pCreate, &len) < 0) return -1; + int32_t l = (int32_t)len; + taosArrayPush(pRsp->batchMetaReq, &pCreate); + taosArrayPush(pRsp->batchMetaLen, &l); + } + } + return 0; +} + +int32_t tSemiDecodeMqBatchMetaRsp(SDecoder *pDecoder, SMqBatchMetaRsp *pRsp) { + if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1; + if (pDecoder->size < pDecoder->pos) { + return -1; + } + pRsp->metaBuffLen = TD_CODER_REMAIN_CAPACITY(pDecoder); + pRsp->pMetaBuff = taosMemoryCalloc(1, pRsp->metaBuffLen); + memcpy(pRsp->pMetaBuff, TD_CODER_CURRENT(pDecoder), pRsp->metaBuffLen); + return 0; +} + +void tDeleteMqBatchMetaRsp(SMqBatchMetaRsp *pRsp) { + taosMemoryFreeClear(pRsp->pMetaBuff); + taosArrayDestroyP(pRsp->batchMetaReq, taosMemoryFree); + taosArrayDestroy(pRsp->batchMetaLen); + pRsp->batchMetaReq = NULL; + pRsp->batchMetaLen = NULL; +} diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index e2ecdca59f..08d32b2b81 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -113,7 +113,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); void tqDestroyTqHandle(void* data); // tqRead -int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset); +int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* offset); int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest); int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId); diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 08f1689f2f..b9c9595c74 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -163,7 +163,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* return 0; } -int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) { +int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset) { const STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; @@ -218,10 +218,10 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta } // get meta - SMqMetaRsp* tmp = qStreamExtractMetaMsg(task); - if (tmp->metaRspLen > 0) { + SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task); + if (taosArrayGetSize(tmp->batchMetaReq) > 0) { qStreamExtractOffset(task, &tmp->rspOffset); - *pMetaRsp = *tmp; + *pBatchMetaRsp = *tmp; tqDebug("tmqsnap task get meta"); break; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 28f4a19949..df7539bb69 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -17,6 +17,8 @@ static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId); +static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, + const SMqBatchMetaRsp* pRsp, int32_t vgId); int32_t tqInitDataRsp(SMqDataRspCommon* pRsp, STqOffsetVal pOffset) { tOffsetCopy(&pRsp->reqOffset, &pOffset); @@ -187,7 +189,6 @@ end : { } \ tDecoderClear(&decoder); - static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* offset) { int code = 0; @@ -197,18 +198,19 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, tqInitTaosxRsp(&taosxRsp.common, *offset); if (offset->type != TMQ_OFFSET__LOG) { - if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { + SMqBatchMetaRsp btMetaRsp = {0}; + if (tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset) < 0) { code = -1; goto end; } - if (metaRsp.metaRspLen > 0) { - code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId); + if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) { + code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId); tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64, - pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, - metaRsp.rspOffset.ts); - tDeleteMqMetaRsp(&metaRsp); + pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid, + btMetaRsp.rspOffset.ts); + tDeleteMqBatchMetaRsp(&btMetaRsp); goto end; } @@ -352,6 +354,40 @@ static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int pMsgHead->walever = ever; } +int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqBatchMetaRsp* pRsp, + int32_t vgId) { + int32_t len = 0; + int32_t code = 0; + tEncodeSize(tEncodeMqBatchMetaRsp, pRsp, len, code); + if (code < 0) { + return -1; + } + int32_t tlen = sizeof(SMqRspHead) + len; + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + return -1; + } + + int64_t sver = 0, ever = 0; + walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); + initMqRspHead(buf, TMQ_MSG_TYPE__POLL_BATCH_META_RSP, pReq->epoch, pReq->consumerId, sver, ever); + + void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); + + SEncoder encoder = {0}; + tEncoderInit(&encoder, abuf, len); + tEncodeMqBatchMetaRsp(&encoder, pRsp); + tEncoderClear(&encoder); + + SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0}; + + tmsgSendRsp(&resp); + tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, offset type:%d", vgId, + pReq->consumerId, pReq->epoch, pRsp->rspOffset.type); + + return 0; +} + int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId) { int32_t len = 0; diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index 886ce9705d..18f51df2e9 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -58,7 +58,7 @@ typedef struct STaskStopInfo { typedef struct { STqOffsetVal currentOffset; // for tmq - SMqMetaRsp metaRsp; // for tmq fetching meta + SMqBatchMetaRsp btMetaRsp; // for tmq fetching meta int8_t sourceExcluded; int64_t snapshotVer; SSchemaWrapper* schema; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 2f360044c9..69be1c76c7 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1078,9 +1078,9 @@ const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) { return pTaskInfo->streamInfo.tbName; } -SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { +SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - return &pTaskInfo->streamInfo.metaRsp; + return &pTaskInfo->streamInfo.btMetaRsp; } void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 110aabf9b1..72ea9afae7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -44,6 +44,7 @@ int32_t scanDebug = 0; #define STREAM_SCAN_OP_NAME "StreamScanOperator" #define STREAM_SCAN_OP_STATE_NAME "StreamScanFillHistoryState" #define STREAM_SCAN_OP_CHECKPOINT_NAME "StreamScanOperator_Checkpoint" +#define TMQ_MAX_BATCH_SIZE 4096 typedef struct STableMergeScanExecInfo { SFileBlockLoadRecorder blockRecorder; @@ -2845,8 +2846,8 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { SStreamRawScanInfo* pInfo = pOperator->info; int32_t code = TSDB_CODE_SUCCESS; - pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta - pTaskInfo->streamInfo.metaRsp.metaRsp = NULL; + pTaskInfo->streamInfo.btMetaRsp.batchMetaReq = NULL; // use batchMetaReq != NULL to judge if data is meta + pTaskInfo->streamInfo.btMetaRsp.batchMetaLen = NULL; qDebug("tmqsnap doRawScan called"); if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { @@ -2893,28 +2894,60 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { return NULL; } else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) { SSnapContext* sContext = pInfo->sContext; - void* data = NULL; - int32_t dataLen = 0; - int16_t type = 0; - int64_t uid = 0; - if (pAPI->snapshotFn.getTableInfoFromSnapshot(sContext, &data, &dataLen, &type, &uid) < 0) { - qError("tmqsnap getTableInfoFromSnapshot error"); - taosMemoryFreeClear(data); - return NULL; - } + for(int32_t i = 0; i < TMQ_MAX_BATCH_SIZE; i++) { + void* data = NULL; + int32_t dataLen = 0; + int16_t type = 0; + int64_t uid = 0; + if (pAPI->snapshotFn.getTableInfoFromSnapshot(sContext, &data, &dataLen, &type, &uid) < 0) { + qError("tmqsnap getTableInfoFromSnapshot error"); + taosMemoryFreeClear(data); + break; + } - if (!sContext->queryMeta) { // change to get data next poll request - STqOffsetVal offset = {0}; - SValue val = {0}; - tqOffsetResetToData(&offset, 0, INT64_MIN, val); - qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType); - } else { - tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid); - pTaskInfo->streamInfo.metaRsp.resMsgType = type; - pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen; - pTaskInfo->streamInfo.metaRsp.metaRsp = data; - } + if (!sContext->queryMeta) { // change to get data next poll request + STqOffsetVal offset = {0}; + SValue val = {0}; + tqOffsetResetToData(&offset, 0, INT64_MIN, val); + qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType); + break; + } else { + tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid); + SMqMetaRsp tmpMetaRsp = {0}; + tmpMetaRsp.resMsgType = type; + tmpMetaRsp.metaRspLen = dataLen; + tmpMetaRsp.metaRsp = data; + if (!pTaskInfo->streamInfo.btMetaRsp.batchMetaReq) { + pTaskInfo->streamInfo.btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES); + pTaskInfo->streamInfo.btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t)); + } + int32_t code = TSDB_CODE_SUCCESS; + uint32_t len = 0; + tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code); + if (TSDB_CODE_SUCCESS != code) { + qError("tmqsnap tEncodeMqMetaRsp error"); + taosMemoryFreeClear(data); + break; + } + int32_t tLen = sizeof(SMqRspHead) + len; + void* tBuf = taosMemoryCalloc(1, tLen); + void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead)); + SEncoder encoder = {0}; + tEncoderInit(&encoder, metaBuff, len); + code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp); + if (code < 0) { + qError("tmqsnap tEncodeMqMetaRsp error"); + tEncoderClear(&encoder); + taosMemoryFreeClear(tBuf); + taosMemoryFreeClear(data); + break; + } + taosMemoryFreeClear(data); + taosArrayPush(pTaskInfo->streamInfo.btMetaRsp.batchMetaReq, &tBuf); + taosArrayPush(pTaskInfo->streamInfo.btMetaRsp.batchMetaLen, &tLen); + } + } return NULL; } return NULL; diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index a391093609..5642b801f4 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -1138,6 +1138,7 @@ void testConsumeExcluded(int topic_type) { taos_close(pConn); return; } + taos_close(pConn); taos_free_result(pRes); } From be82e4acd2b2350c1a01f7ae2b7f5cc94cfd64b0 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 24 May 2024 11:25:30 +0800 Subject: [PATCH 02/11] add log --- source/client/src/clientTmq.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index df08a66bd3..1d993eef6f 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1439,6 +1439,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } tDecoderClear(&decoder); memcpy(&pRspWrapper->batchMetaRsp, pMsg->pData, sizeof(SMqRspHead)); + tscDebug("consumer:0x%" PRIx64 " recv poll batchmeta rsp, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, vgId, + requestId); } else { // invalid rspType tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType); } @@ -1648,6 +1650,7 @@ SMqBatchMetaRspObj* tmqBuildBatchMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) pRspObj->common.vgId = pWrapper->vgHandle->vgId; memcpy(&pRspObj->rsp, &pWrapper->batchMetaRsp, sizeof(SMqBatchMetaRsp)); + tscDebug("build batchmeta Rsp from wrapper"); return pRspObj; } From d33b2521fd87f454b084c96925cebb928ba14a90 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 24 May 2024 11:42:08 +0800 Subject: [PATCH 03/11] adj res type --- source/client/src/clientTmq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 1d993eef6f..3fbae1b774 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2260,7 +2260,7 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) { } else if (TD_RES_TMQ_METADATA(res)) { return TMQ_RES_METADATA; } else if (TD_RES_TMQ_BATCH_META(res)) { - return TMQ_RES_BATCH_TABLE_META; + return TMQ_RES_TABLE_META; } else { return TMQ_RES_INVALID; } From 7fc2e8cc26ab977aa41c8cecedd11266cd593779 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 28 May 2024 10:17:20 +0800 Subject: [PATCH 04/11] batch meta --- source/client/src/clientRawBlockWrite.c | 12 ++++- source/client/src/clientTmq.c | 45 +++++++++++++--- source/dnode/vnode/src/tq/tqUtil.c | 68 ++++++++++++++++++++----- source/libs/executor/src/scanoperator.c | 3 +- utils/test/c/tmq_taosx_ci.c | 6 +-- 5 files changed, 107 insertions(+), 27 deletions(-) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index a37cd99360..d5f478d188 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -2051,6 +2051,7 @@ static char* processSimpleMeta(SMqMetaRsp* pMetaRsp) { static char* processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp) { SDecoder coder; SMqBatchMetaRsp rsp = {0}; + SArray* strArray = NULL; tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen); if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) { goto _end; @@ -2058,7 +2059,7 @@ static char* processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp) { int64_t fullSize = 0; int32_t num = taosArrayGetSize(rsp.batchMetaReq); - SArray* strArray = taosArrayInit(num, POINTER_BYTES); + strArray = taosArrayInit(num, POINTER_BYTES); for (int32_t i = 0; i < num; i++) { int32_t len = *(int32_t*)taosArrayGet(rsp.batchMetaLen, i); void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i); @@ -2078,10 +2079,17 @@ static char* processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp) { for (int32_t i = 0; i < num; i++) { char* subStr = taosArrayGetP(strArray, i); strcat(buf, subStr); - strcat(buf, "\n"); + if (i < num - 1) { + strcat(buf, "\n"); + } } + taosArrayDestroyP(strArray, taosMemoryFree); + tDeleteMqBatchMetaRsp(&rsp); + return buf; _end: + taosArrayDestroyP(strArray, taosMemoryFree); + tDeleteMqBatchMetaRsp(&rsp); return NULL; } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 3fbae1b774..69a4503680 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2009,7 +2009,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); } - } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { + } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; int32_t consumerEpoch = atomic_load_32(&tmq->epoch); @@ -2032,12 +2032,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true); // build rsp - void* pRsp = NULL; - if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { - pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); - } else { - pRsp = tmqBuildBatchMetaRspFromWrapper(pollRspWrapper); - } + SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); taosFreeQitem(pRspWrapper); taosWUnLockLatch(&tmq->lock); return pRsp; @@ -2048,6 +2043,42 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); } + } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { + SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; + int32_t consumerEpoch = atomic_load_32(&tmq->epoch); + + tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId); + + if (pollRspWrapper->batchMetaRsp.head.epoch == consumerEpoch) { + taosWLockLatch(&tmq->lock); + SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); + pollRspWrapper->vgHandle = pVg; + pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); + if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) { + tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, + pollRspWrapper->topicName, pollRspWrapper->vgId); + tmqFreeRspWrapper(pRspWrapper); + taosFreeQitem(pRspWrapper); + taosWUnLockLatch(&tmq->lock); + return NULL; + } + + // build rsp + void* pRsp = NULL; + updateVgInfo(pVg, &pollRspWrapper->batchMetaRsp.rspOffset, &pollRspWrapper->batchMetaRsp.rspOffset, + pollRspWrapper->batchMetaRsp.head.walsver, pollRspWrapper->batchMetaRsp.head.walever, + tmq->consumerId, true); + pRsp = tmqBuildBatchMetaRspFromWrapper(pollRspWrapper); + taosFreeQitem(pRspWrapper); + taosWUnLockLatch(&tmq->lock); + return pRsp; + } else { + tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", + tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->batchMetaRsp.head.epoch, consumerEpoch); + setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); + tmqFreeRspWrapper(pRspWrapper); + taosFreeQitem(pRspWrapper); + } } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; int32_t consumerEpoch = atomic_load_32(&tmq->epoch); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index df7539bb69..88c41b99ef 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -191,14 +191,13 @@ end : { static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* offset) { - int code = 0; - int32_t vgId = TD_VID(pTq->pVnode); - SMqMetaRsp metaRsp = {0}; - STaosxRsp taosxRsp = {0}; + int code = 0; + int32_t vgId = TD_VID(pTq->pVnode); + STaosxRsp taosxRsp = {0}; + SMqBatchMetaRsp btMetaRsp = {0}; tqInitTaosxRsp(&taosxRsp.common, *offset); if (offset->type != TMQ_OFFSET__LOG) { - SMqBatchMetaRsp btMetaRsp = {0}; if (tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset) < 0) { code = -1; goto end; @@ -232,11 +231,18 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, uint64_t st = taosGetTimestampMs(); int totalRows = 0; + int32_t totalMetaRows = 0; while (1) { int32_t savedEpoch = atomic_load_32(&pHandle->epoch); ASSERT(savedEpoch <= pRequest->epoch); if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) { + if (totalMetaRows > 0) { + tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer); + tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId); + ASSERT(totalRows == 0); + goto end; + } tqOffsetResetToLog(&taosxRsp.common.rspOffset, fetchVer); code = tqSendDataRsp( pHandle, pMsg, pRequest, &taosxRsp, @@ -270,12 +276,48 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } } - tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); - tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1); - metaRsp.resMsgType = pHead->msgType; - metaRsp.metaRspLen = pHead->bodyLen; - metaRsp.metaRsp = pHead->body; - code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId); + tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s", pHead->version, vgId, TMSG_INFO(pHead->msgType)); + if (!btMetaRsp.batchMetaReq) { + btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES); + btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t)); + } + fetchVer++; + + SMqMetaRsp tmpMetaRsp = {0}; + tmpMetaRsp.resMsgType = pHead->msgType; + tmpMetaRsp.metaRspLen = pHead->bodyLen; + tmpMetaRsp.metaRsp = pHead->body; + uint32_t len = 0; + tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code); + if (TSDB_CODE_SUCCESS != code) { + tqError("tmq extract meta from log, tEncodeMqMetaRsp error"); + continue; + } + int32_t tLen = sizeof(SMqRspHead) + len; + void* tBuf = taosMemoryCalloc(1, tLen); + void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead)); + SEncoder encoder = {0}; + tEncoderInit(&encoder, metaBuff, len); + code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp); + if (code < 0) { + tEncoderClear(&encoder); + tqError("tmq extract meta from log, tEncodeMqMetaRsp error"); + continue; + } + taosArrayPush(btMetaRsp.batchMetaReq, &tBuf); + taosArrayPush(btMetaRsp.batchMetaLen, &tLen); + totalMetaRows++; + if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > 1000)) { + tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer); + tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId); + goto end; + } + continue; + } + + if (totalMetaRows > 0) { + tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer); + tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId); goto end; } @@ -382,8 +424,8 @@ int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SM SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0}; tmsgSendRsp(&resp); - tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, offset type:%d", vgId, - pReq->consumerId, pReq->epoch, pRsp->rspOffset.type); + tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%d offset type:%d", vgId, + pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type); return 0; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 72ea9afae7..851d2f2735 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -44,7 +44,6 @@ int32_t scanDebug = 0; #define STREAM_SCAN_OP_NAME "StreamScanOperator" #define STREAM_SCAN_OP_STATE_NAME "StreamScanFillHistoryState" #define STREAM_SCAN_OP_CHECKPOINT_NAME "StreamScanOperator_Checkpoint" -#define TMQ_MAX_BATCH_SIZE 4096 typedef struct STableMergeScanExecInfo { SFileBlockLoadRecorder blockRecorder; @@ -2894,7 +2893,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { return NULL; } else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) { SSnapContext* sContext = pInfo->sContext; - for(int32_t i = 0; i < TMQ_MAX_BATCH_SIZE; i++) { + for(int32_t i = 0; i < tmqRowSize; i++) { void* data = NULL; int32_t dataLen = 0; int16_t type = 0; diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index 5642b801f4..ef2d70f54f 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -1064,7 +1064,7 @@ void testConsumeExcluded(int topic_type) { char* topic = "create topic topic_excluded with meta as database db_taosx"; pRes = taos_query(pConn, topic); if (taos_errno(pRes) != 0) { - printf("failed to create topic topic_excluded, reason:%s\n", taos_errstr(pRes)); + printf("failed to create topic topic_excluded1, reason:%s\n", taos_errstr(pRes)); taos_close(pConn); return; } @@ -1073,7 +1073,7 @@ void testConsumeExcluded(int topic_type) { char* topic = "create topic topic_excluded as select * from stt"; pRes = taos_query(pConn, topic); if (taos_errno(pRes) != 0) { - printf("failed to create topic topic_excluded, reason:%s\n", taos_errstr(pRes)); + printf("failed to create topic topic_excluded2, reason:%s\n", taos_errstr(pRes)); taos_close(pConn); return; } @@ -1115,7 +1115,7 @@ void testConsumeExcluded(int topic_type) { assert(raw.raw_type != 2 && raw.raw_type != 4 && raw.raw_type != TDMT_VND_CREATE_STB && raw.raw_type != TDMT_VND_ALTER_STB && raw.raw_type != TDMT_VND_CREATE_TABLE && raw.raw_type != TDMT_VND_ALTER_TABLE && raw.raw_type != TDMT_VND_DELETE); - assert(raw.raw_type == TDMT_VND_DROP_STB || raw.raw_type == TDMT_VND_DROP_TABLE); + assert(raw.raw_type == TDMT_VND_DROP_STB || raw.raw_type == TDMT_VND_DROP_TABLE || raw.raw_type == 5); } else if (topic_type == 2) { assert(0); } From 57bf9d10482a7a5c36bfade104540758e4ed6de4 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 28 May 2024 10:56:43 +0800 Subject: [PATCH 05/11] adj log --- source/dnode/vnode/src/tq/tqUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 88c41b99ef..4b1ff04331 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -424,7 +424,7 @@ int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SM SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0}; tmsgSendRsp(&resp); - tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%d offset type:%d", vgId, + tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%ld offset type:%d", vgId, pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type); return 0; From d815bb117be31fcd233a1c204fb3827b0a9dbd31 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 28 May 2024 16:56:23 +0800 Subject: [PATCH 06/11] add json for batch meta --- include/client/taos.h | 3 +- source/client/src/clientRawBlockWrite.c | 127 +++++++++++------------- utils/test/c/tmq_taosx_ci.c | 20 +++- 3 files changed, 75 insertions(+), 75 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 66b5b92869..46e4e7633b 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -282,8 +282,7 @@ typedef enum tmq_res_t { TMQ_RES_INVALID = -1, TMQ_RES_DATA = 1, TMQ_RES_TABLE_META = 2, - TMQ_RES_METADATA = 3, - TMQ_RES_BATCH_TABLE_META = 4, + TMQ_RES_METADATA = 3 } tmq_res_t; typedef struct tmq_topic_assignment { diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index d5f478d188..5e6d693562 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -26,11 +26,13 @@ #define LOG_ID_TAG "connId:0x%" PRIx64 ",reqId:0x%" PRIx64 #define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId +#define TMQ_META_VERSION "1.0" + static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen); static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); } -static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, +static cJSON* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t, SColCmprWrapper* pColCmprRow) { int8_t buildDefaultCompress = 0; if (pColCmprRow->nCols <= 0) { @@ -124,9 +126,7 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch } cJSON_AddItemToObject(json, "tags", tags); - string = cJSON_PrintUnformatted(json); - cJSON_Delete(json); - return string; + return json; } static int32_t setCompressOption(cJSON* json, uint32_t para) { @@ -153,7 +153,7 @@ static int32_t setCompressOption(cJSON* json, uint32_t para) { } return 0; } -static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) { +static cJSON* buildAlterSTableJson(void* alterData, int32_t alterDataLen) { SMAlterStbReq req = {0}; cJSON* json = NULL; char* string = NULL; @@ -247,18 +247,16 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) { default: break; } - string = cJSON_PrintUnformatted(json); end: - cJSON_Delete(json); tFreeSMAltertbReq(&req); - return string; + return json; } -static char* processCreateStb(SMqMetaRsp* metaRsp) { +static cJSON* processCreateStb(SMqMetaRsp* metaRsp) { SVCreateStbReq req = {0}; SDecoder coder; - char* string = NULL; + cJSON* pJson = NULL; uDebug("create stable data:%p", metaRsp); // decode and process req @@ -269,17 +267,17 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) { if (tDecodeSVCreateStbReq(&coder, &req) < 0) { goto _err; } - string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr); + pJson = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr); _err: - uDebug("create stable return, sql json:%s", string); + uDebug("create stable return, sql json:%s", cJSON_PrintUnformatted(pJson)); tDecoderClear(&coder); - return string; + return pJson; } -static char* processAlterStb(SMqMetaRsp* metaRsp) { +static cJSON* processAlterStb(SMqMetaRsp* metaRsp) { SVCreateStbReq req = {0}; SDecoder coder; - char* string = NULL; + cJSON* pJson = NULL; uDebug("alter stable data:%p", metaRsp); // decode and process req @@ -290,11 +288,11 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) { if (tDecodeSVCreateStbReq(&coder, &req) < 0) { goto _err; } - string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen); + pJson = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen); _err: - uDebug("alter stable return, sql json:%s", string); + uDebug("alter stable return, sql json:%s", cJSON_PrintUnformatted(pJson)); tDecoderClear(&coder); - return string; + return pJson; } static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { @@ -387,7 +385,7 @@ end: taosArrayDestroy(pTagVals); } -static char* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) { +static cJSON* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) { char* string = NULL; cJSON* json = cJSON_CreateObject(); if (json == NULL) { @@ -412,16 +410,14 @@ static char* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) { cJSON_AddItemToArray(createList, create); } cJSON_AddItemToObject(json, "createList", createList); - string = cJSON_PrintUnformatted(json); - cJSON_Delete(json); - return string; + return json; } -static char* processCreateTable(SMqMetaRsp* metaRsp) { +static cJSON* processCreateTable(SMqMetaRsp* metaRsp) { SDecoder decoder = {0}; SVCreateTbBatchReq req = {0}; SVCreateTbReq* pCreateReq; - char* string = NULL; + cJSON* pJson = NULL; // decode uDebug("create table data:%p", metaRsp); void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); @@ -435,15 +431,15 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) { if (req.nReqs > 0) { pCreateReq = req.pReqs; if (pCreateReq->type == TSDB_CHILD_TABLE) { - string = buildCreateCTableJson(req.pReqs, req.nReqs); + pJson = buildCreateCTableJson(req.pReqs, req.nReqs); } else if (pCreateReq->type == TSDB_NORMAL_TABLE) { - string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, + pJson = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE, &pCreateReq->colCmpr); } } _exit: - uDebug("create table return, sql json:%s", string); + uDebug("create table return, sql json:%s", cJSON_PrintUnformatted(pJson)); for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; taosMemoryFreeClear(pCreateReq->comment); @@ -453,7 +449,7 @@ _exit: } } tDecoderClear(&decoder); - return string; + return pJson; } static char* processAutoCreateTable(STaosxRsp* rsp) { @@ -482,7 +478,9 @@ static char* processAutoCreateTable(STaosxRsp* rsp) { goto _exit; } } - string = buildCreateCTableJson(pCreateReq, rsp->createTableNum); + cJSON* pJson = buildCreateCTableJson(pCreateReq, rsp->createTableNum); + string = cJSON_PrintUnformatted(pJson); + cJSON_Delete(pJson); _exit: uDebug("auto created table return, sql json:%s", string); for (int i = 0; i < rsp->createTableNum; i++) { @@ -497,7 +495,7 @@ _exit: return string; } -static char* processAlterTable(SMqMetaRsp* metaRsp) { +static cJSON* processAlterTable(SMqMetaRsp* metaRsp) { SDecoder decoder = {0}; SVAlterTbReq vAlterTbReq = {0}; char* string = NULL; @@ -622,19 +620,16 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) { default: break; } - string = cJSON_PrintUnformatted(json); _exit: - uDebug("alter table return, sql json:%s", string); - cJSON_Delete(json); + uDebug("alter table return, sql json:%s", cJSON_PrintUnformatted(json)); tDecoderClear(&decoder); - return string; + return json; } -static char* processDropSTable(SMqMetaRsp* metaRsp) { +static cJSON* processDropSTable(SMqMetaRsp* metaRsp) { SDecoder decoder = {0}; SVDropStbReq req = {0}; - char* string = NULL; cJSON* json = NULL; uDebug("processDropSTable data:%p", metaRsp); @@ -659,18 +654,15 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) { cJSON* tableName = cJSON_CreateString(req.name); cJSON_AddItemToObject(json, "tableName", tableName); - string = cJSON_PrintUnformatted(json); _exit: - uDebug("processDropSTable return, sql json:%s", string); - cJSON_Delete(json); + uDebug("processDropSTable return, sql json:%s", cJSON_PrintUnformatted(json)); tDecoderClear(&decoder); - return string; + return json; } -static char* processDeleteTable(SMqMetaRsp* metaRsp) { +static cJSON* processDeleteTable(SMqMetaRsp* metaRsp) { SDeleteRes req = {0}; SDecoder coder = {0}; cJSON* json = NULL; - char* string = NULL; uDebug("processDeleteTable data:%p", metaRsp); // decode and process req @@ -698,18 +690,15 @@ static char* processDeleteTable(SMqMetaRsp* metaRsp) { cJSON* sqlJson = cJSON_CreateString(sql); cJSON_AddItemToObject(json, "sql", sqlJson); - string = cJSON_PrintUnformatted(json); _exit: - uDebug("processDeleteTable return, sql json:%s", string); - cJSON_Delete(json); + uDebug("processDeleteTable return, sql json:%s", cJSON_PrintUnformatted(json)); tDecoderClear(&coder); - return string; + return json; } -static char* processDropTable(SMqMetaRsp* metaRsp) { +static cJSON* processDropTable(SMqMetaRsp* metaRsp) { SDecoder decoder = {0}; SVDropTbBatchReq req = {0}; - char* string = NULL; cJSON* json = NULL; uDebug("processDropTable data:%p", metaRsp); @@ -743,12 +732,10 @@ static char* processDropTable(SMqMetaRsp* metaRsp) { } cJSON_AddItemToObject(json, "tableNameList", tableNameList); - string = cJSON_PrintUnformatted(json); _exit: - uDebug("processDropTable return, json sql:%s", string); - cJSON_Delete(json); + uDebug("processDropTable return, json sql:%s", cJSON_PrintUnformatted(json)); tDecoderClear(&decoder); - return string; + return json; } static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { @@ -1805,6 +1792,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { char err[ERR_MSG_LEN] = {0}; code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, fields, pSW->nCols, true, err, ERR_MSG_LEN); taosMemoryFree(fields); + taosMemoryFreeClear(pTableMeta); if (code != TSDB_CODE_SUCCESS) { SET_ERROR_MSG("table:%s, err:%s", tbName, err); goto end; @@ -1997,6 +1985,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) char err[ERR_MSG_LEN] = {0}; code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqDst, fields, pSW->nCols, true, err, ERR_MSG_LEN); taosMemoryFree(fields); + taosMemoryFreeClear(pTableMeta); if (code != TSDB_CODE_SUCCESS) { SET_ERROR_MSG("table:%s, err:%s", tbName, err); goto end; @@ -2027,7 +2016,7 @@ end: return code; } -static char* processSimpleMeta(SMqMetaRsp* pMetaRsp) { +static cJSON* processSimpleMeta(SMqMetaRsp* pMetaRsp) { if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) { return processCreateStb(pMetaRsp); } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) { @@ -2051,15 +2040,15 @@ static char* processSimpleMeta(SMqMetaRsp* pMetaRsp) { static char* processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp) { SDecoder coder; SMqBatchMetaRsp rsp = {0}; - SArray* strArray = NULL; tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen); if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) { goto _end; } - int64_t fullSize = 0; + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION); + cJSON* pMetaArr = cJSON_CreateArray(); int32_t num = taosArrayGetSize(rsp.batchMetaReq); - strArray = taosArrayInit(num, POINTER_BYTES); for (int32_t i = 0; i < num; i++) { int32_t len = *(int32_t*)taosArrayGet(rsp.batchMetaLen, i); void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i); @@ -2069,26 +2058,19 @@ static char* processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp) { if(tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0 ) { goto _end; } - char* subStr = processSimpleMeta(&metaRsp); + cJSON* pItem = processSimpleMeta(&metaRsp); tDeleteMqMetaRsp(&metaRsp); - fullSize += strlen(subStr); - taosArrayPush(strArray, &subStr); + cJSON_AddItemToArray(pMetaArr, pItem); } - char* buf = (char*)taosMemoryCalloc(1, fullSize + num + 1); - for (int32_t i = 0; i < num; i++) { - char* subStr = taosArrayGetP(strArray, i); - strcat(buf, subStr); - if (i < num - 1) { - strcat(buf, "\n"); - } - } - taosArrayDestroyP(strArray, taosMemoryFree); + cJSON_AddItemToObject(pJson, "metas", pMetaArr); tDeleteMqBatchMetaRsp(&rsp); - return buf; + char* fullStr = cJSON_PrintUnformatted(pJson); + cJSON_Delete(pJson); + return fullStr; _end: - taosArrayDestroyP(strArray, taosMemoryFree); + cJSON_Delete(pJson); tDeleteMqBatchMetaRsp(&rsp); return NULL; } @@ -2109,7 +2091,10 @@ char* tmq_get_json_meta(TAOS_RES* res) { } SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; - return processSimpleMeta(&pMetaRspObj->metaRsp); + cJSON* pJson = processSimpleMeta(&pMetaRspObj->metaRsp); + char* string = cJSON_PrintUnformatted(pJson); + cJSON_Delete(pJson); + return string; } void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); } diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index ef2d70f54f..c1b312335f 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -18,6 +18,7 @@ #include #include #include +#include "cJSON.h" #include "taos.h" #include "tmsg.h" #include "types.h" @@ -61,8 +62,23 @@ static void msg_process(TAOS_RES* msg) { if (result) { printf("meta result: %s\n", result); if (g_fp && strcmp(result, "") != 0) { - taosFprintfFile(g_fp, result); - taosFprintfFile(g_fp, "\n"); + // RES_TYPE__TMQ_BATCH_META + if ((*(int8_t*)msg) == 5) { + cJSON* pJson = cJSON_Parse(result); + cJSON* pJsonArray = cJSON_GetObjectItem(pJson, "metas"); + int32_t num = cJSON_GetArraySize(pJsonArray); + for (int32_t i = 0; i < num; i++) { + cJSON* pJsonItem = cJSON_GetArrayItem(pJsonArray, i); + char* itemStr = cJSON_PrintUnformatted(pJsonItem); + taosFprintfFile(g_fp, itemStr); + tmq_free_json_meta(itemStr); + taosFprintfFile(g_fp, "\n"); + } + cJSON_Delete(pJson); + } else { + taosFprintfFile(g_fp, result); + taosFprintfFile(g_fp, "\n"); + } } } tmq_free_json_meta(result); From a2255ca35558af614f6acdee7f9824ce5857c2f2 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 28 May 2024 18:34:37 +0800 Subject: [PATCH 07/11] add tmq param --- include/common/tmsg.h | 5 +++++ source/client/src/clientTmq.c | 11 +++++++++++ source/common/src/tmsg.c | 16 ++++++++++++++++ source/dnode/vnode/src/tq/tqUtil.c | 30 +++++++++++++++++++++++------- 4 files changed, 55 insertions(+), 7 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1657a17ff0..7917222542 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2744,6 +2744,7 @@ typedef struct { int32_t autoCommitInterval; int8_t resetOffsetCfg; int8_t enableReplay; + int8_t enableBatchMeta; } SCMSubscribeReq; static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { @@ -2764,6 +2765,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc tlen += taosEncodeFixedI32(buf, pReq->autoCommitInterval); tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg); tlen += taosEncodeFixedI8(buf, pReq->enableReplay); + tlen += taosEncodeFixedI8(buf, pReq->enableBatchMeta); return tlen; } @@ -2788,6 +2790,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq buf = taosDecodeFixedI32(buf, &pReq->autoCommitInterval); buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg); buf = taosDecodeFixedI8(buf, &pReq->enableReplay); + buf = taosDecodeFixedI8(buf, &pReq->enableBatchMeta); return buf; } @@ -2976,6 +2979,7 @@ typedef struct { int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq); int tDecodeSVCreateTbBatchReq(SDecoder* pCoder, SVCreateTbBatchReq* pReq); +void tDeleteSVCreateTbBatchReq(SVCreateTbBatchReq* pReq); typedef struct { int32_t code; @@ -3893,6 +3897,7 @@ typedef struct { STqOffsetVal reqOffset; int8_t enableReplay; int8_t sourceExcluded; + int8_t enableBatchMeta; } SMqPollReq; int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 69a4503680..8038273b43 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -71,6 +71,7 @@ struct tmq_conf_t { char* pass; tmq_commit_cb* commitCb; void* commitCbUserParam; + int8_t enableBatchMeta; }; struct tmq_t { @@ -87,6 +88,7 @@ struct tmq_t { uint64_t consumerId; tmq_commit_cb* commitCb; void* commitCbUserParam; + int8_t enableBatchMeta; // status SRWLatch lock; @@ -269,6 +271,7 @@ tmq_conf_t* tmq_conf_new() { conf->autoCommit = true; conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL; conf->resetOffset = TMQ_OFFSET__RESET_LATEST; + conf->enableBatchMeta = false; return conf; } @@ -398,6 +401,11 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value return TMQ_CONF_OK; } + if (strcasecmp(key, "msg.enable.batchmeta") == 0) { + conf->enableBatchMeta = (taosStr2int64(value) != 0) ? true : false; + return TMQ_CONF_OK; + } + return TMQ_CONF_UNKNOWN; } @@ -1122,6 +1130,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->resetOffsetCfg = conf->resetOffset; pTmq->replayEnable = conf->replayEnable; pTmq->sourceExcluded = conf->sourceExcluded; + pTmq->enableBatchMeta = conf->enableBatchMeta; if (conf->replayEnable) { pTmq->autoCommit = false; } @@ -1199,6 +1208,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { req.autoCommitInterval = tmq->autoCommitInterval; req.resetOffsetCfg = tmq->resetOffsetCfg; req.enableReplay = tmq->replayEnable; + req.enableBatchMeta = tmq->enableBatchMeta; for (int32_t i = 0; i < sz; i++) { char* topic = taosArrayGetP(container, i); @@ -1623,6 +1633,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl pReq->reqId = generateRequestId(); pReq->enableReplay = tmq->replayEnable; pReq->sourceExcluded = tmq->sourceExcluded; + pReq->enableBatchMeta = tmq->enableBatchMeta; } SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 3f76d03289..c17c522d4f 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7199,6 +7199,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { pHead->vgId = htonl(pReq->head.vgId); pHead->contLen = htonl(tlen + headLen); } + if (tEncodeI8(&encoder, pReq->enableBatchMeta) < 0) return -1; return tlen + headLen; } @@ -7228,6 +7229,12 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { if (tDecodeI8(&decoder, &pReq->sourceExcluded) < 0) return -1; } + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI8(&decoder, &pReq->enableBatchMeta) < 0) return -1; + } else { + pReq->enableBatchMeta = false; + } + tEndDecode(&decoder); tDecoderClear(&decoder); @@ -8362,6 +8369,15 @@ int tDecodeSVCreateTbBatchReq(SDecoder *pCoder, SVCreateTbBatchReq *pReq) { return 0; } +void tDeleteSVCreateTbBatchReq(SVCreateTbBatchReq* pReq) { + for (int32_t iReq = 0; iReq < pReq->nReqs; iReq++) { + SVCreateTbReq* pCreateReq = pReq->pReqs + iReq; + if (pCreateReq->type == TSDB_CHILD_TABLE) { + taosArrayDestroy(pCreateReq->ctb.tagName); + } + } +} + int tEncodeSVCreateTbRsp(SEncoder *pCoder, const SVCreateTbRsp *pRsp) { if (tStartEncode(pCoder) < 0) return -1; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 4b1ff04331..496201e79e 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -174,7 +174,7 @@ end : { } } -#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC) \ +#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC) \ SDecoder decoder = {0};\ TYPE req = {0}; \ void* data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)); \ @@ -184,11 +184,16 @@ end : { tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 " msgType %d", \ pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); \ fetchVer++; \ + DELETE_FUNC(&req); \ tDecoderClear(&decoder); \ continue; \ } \ + DELETE_FUNC(&req); \ tDecoderClear(&decoder); +static void tDeleteCommon(void* parm) { +} + static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* offset) { int code = 0; @@ -266,17 +271,28 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) { if (pHead->msgType == TDMT_VND_CREATE_TABLE) { - PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq) + PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq, tDeleteSVCreateTbBatchReq) } else if (pHead->msgType == TDMT_VND_ALTER_TABLE) { - PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq) + PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq, tDeleteCommon) } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) { - PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq) + PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq, tDeleteCommon) } else if (pHead->msgType == TDMT_VND_DELETE) { - PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes) + PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes, tDeleteCommon) } } - tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s", pHead->version, vgId, TMSG_INFO(pHead->msgType)); + tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s, enable batch meta:%d", pHead->version, vgId, + TMSG_INFO(pHead->msgType), pRequest->enableBatchMeta); + if (!pRequest->enableBatchMeta) { + SMqMetaRsp metaRsp = {0}; + tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1); + metaRsp.resMsgType = pHead->msgType; + metaRsp.metaRspLen = pHead->bodyLen; + metaRsp.metaRsp = pHead->body; + code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId); + goto end; + } + if (!btMetaRsp.batchMetaReq) { btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES); btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t)); @@ -348,7 +364,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } end: - + tDeleteMqBatchMetaRsp(&btMetaRsp); tDeleteSTaosxRsp(&taosxRsp); return code; } From 2cf657c76707bb5ca2920459cae3dd2fc81ab57b Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 28 May 2024 19:33:38 +0800 Subject: [PATCH 08/11] add ci --- utils/test/c/tmq_taosx_ci.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index c1b312335f..51d134a463 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -33,6 +33,7 @@ typedef struct { int srcVgroups; int dstVgroups; char dir[256]; + bool btMeta; } Config; Config g_conf = {0}; @@ -600,6 +601,10 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "experimental.snapshot.enable", "true"); } + if (g_conf.btMeta) { + tmq_conf_set(conf, "msg.enable.batchmeta", "true"); + } + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); assert(tmq); @@ -1184,6 +1189,8 @@ int main(int argc, char* argv[]) { g_conf.subTable = true; } else if (strcmp(argv[i], "-onlymeta") == 0) { g_conf.meta = 1; + } else if (strcmp(argv[i], "-bt") == 0) { + g_conf.btMeta = true; } } From 444014f6ac0e88b9d4c04a5c5567e7cf8b819f20 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 29 May 2024 14:10:08 +0800 Subject: [PATCH 09/11] add ci --- tests/system-test/7-tmq/tmq_taosx.py | 51 ++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index 94e9babf3c..39b9c6d34a 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -278,6 +278,19 @@ class TDTestCase: self.checkDropData(False) return + + def checkSnapshot1VgroupBtmeta(self): + buildPath = tdCom.getBuildPath() + cfgPath = tdCom.getClientCfgPath() + cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -s -bt'%(buildPath, cfgPath) + tdLog.info(cmdStr) + os.system(cmdStr) + + self.checkJson(cfgPath, "tmq_taosx_tmp_snapshot") + self.checkData() + self.checkDropData(False) + + return def checkSnapshot1VgroupTable(self): buildPath = tdCom.getBuildPath() @@ -290,6 +303,18 @@ class TDTestCase: self.checkDataTable() return + + def checkSnapshot1VgroupTableBtmeta(self): + buildPath = tdCom.getBuildPath() + cfgPath = tdCom.getClientCfgPath() + cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -s -t -bt'%(buildPath, cfgPath) + tdLog.info(cmdStr) + os.system(cmdStr) + + self.checkJson(cfgPath, "tmq_taosx_tmp_snapshot") + self.checkDataTable() + + return def checkSnapshotMultiVgroups(self): buildPath = tdCom.getBuildPath() @@ -301,6 +326,17 @@ class TDTestCase: self.checkDropData(False) return + + def checkSnapshotMultiVgroupsBtmeta(self): + buildPath = tdCom.getBuildPath() + cmdStr = '%s/build/bin/tmq_taosx_ci -sv 2 -dv 4 -s -bt'%(buildPath) + tdLog.info(cmdStr) + os.system(cmdStr) + + self.checkData() + self.checkDropData(False) + + return def checkSnapshotMultiVgroupsWithDropTable(self): buildPath = tdCom.getBuildPath() @@ -311,6 +347,16 @@ class TDTestCase: self.checkDropData(True) return + + def checkSnapshotMultiVgroupsWithDropTableBtmeta(self): + buildPath = tdCom.getBuildPath() + cmdStr = '%s/build/bin/tmq_taosx_ci -sv 2 -dv 4 -s -d -bt'%(buildPath) + tdLog.info(cmdStr) + os.system(cmdStr) + + self.checkDropData(True) + + return def consumeTest(self): tdSql.execute(f'create database if not exists d1 vgroups 1') @@ -472,6 +518,11 @@ class TDTestCase: self.checkSnapshotMultiVgroupsWithDropTable() + self.checkSnapshot1VgroupBtmeta() + self.checkSnapshot1VgroupTableBtmeta() + self.checkSnapshotMultiVgroupsBtmeta() + self.checkSnapshotMultiVgroupsWithDropTableBtmeta() + def stop(self): tdSql.close() tdLog.success(f"{__file__} successfully executed") From 129d04a4495f1a01d390b2c8ee1e7192484a0ef1 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 29 May 2024 16:29:04 +0800 Subject: [PATCH 10/11] adj tmq param --- source/dnode/vnode/src/tq/tqUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 496201e79e..33e3414a7d 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -283,7 +283,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s, enable batch meta:%d", pHead->version, vgId, TMSG_INFO(pHead->msgType), pRequest->enableBatchMeta); - if (!pRequest->enableBatchMeta) { + if (!pRequest->enableBatchMeta && !pRequest->useSnapshot) { SMqMetaRsp metaRsp = {0}; tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1); metaRsp.resMsgType = pHead->msgType; From e0167511c1a2c10f9c06c1c6a7ac4d17ed441adb Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 31 May 2024 17:18:25 +0800 Subject: [PATCH 11/11] use tDeleteSVCreateTbBatchReq free mem --- source/client/src/clientRawBlockWrite.c | 17 ++--------------- source/common/src/tmsg.c | 7 +++++-- source/dnode/vnode/src/tq/tqRead.c | 9 +-------- source/dnode/vnode/src/vnd/vnodeSvr.c | 7 +------ 4 files changed, 9 insertions(+), 31 deletions(-) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 5e6d693562..d8bd56985f 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -440,14 +440,7 @@ static cJSON* processCreateTable(SMqMetaRsp* metaRsp) { _exit: uDebug("create table return, sql json:%s", cJSON_PrintUnformatted(pJson)); - for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { - pCreateReq = req.pReqs + iReq; - taosMemoryFreeClear(pCreateReq->comment); - taosMemoryFreeClear(pCreateReq->sql); - if (pCreateReq->type == TSDB_CHILD_TABLE) { - taosArrayDestroy(pCreateReq->ctb.tagName); - } - } + tDeleteSVCreateTbBatchReq(&req); tDecoderClear(&decoder); return pJson; } @@ -1110,13 +1103,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { end: uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code)); - for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { - pCreateReq = req.pReqs + iReq; - taosMemoryFreeClear(pCreateReq->comment); - if (pCreateReq->type == TSDB_CHILD_TABLE) { - taosArrayDestroy(pCreateReq->ctb.tagName); - } - } + tDeleteSVCreateTbBatchReq(&req); taosHashCleanup(pVgroupHashmap); destroyRequest(pRequest); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c17c522d4f..6bb88b7824 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -8369,11 +8369,14 @@ int tDecodeSVCreateTbBatchReq(SDecoder *pCoder, SVCreateTbBatchReq *pReq) { return 0; } -void tDeleteSVCreateTbBatchReq(SVCreateTbBatchReq* pReq) { +void tDeleteSVCreateTbBatchReq(SVCreateTbBatchReq *pReq) { for (int32_t iReq = 0; iReq < pReq->nReqs; iReq++) { - SVCreateTbReq* pCreateReq = pReq->pReqs + iReq; + SVCreateTbReq *pCreateReq = pReq->pReqs + iReq; + taosMemoryFreeClear(pCreateReq->sql); + taosMemoryFreeClear(pCreateReq->comment); if (pCreateReq->type == TSDB_CHILD_TABLE) { taosArrayDestroy(pCreateReq->ctb.tagName); + pCreateReq->ctb.tagName = NULL; } } } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 9cbe48a0b0..40b817accd 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -99,14 +99,7 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) { taosArrayDestroy(reqNew.pArray); } - for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { - pCreateReq = req.pReqs + iReq; - taosMemoryFreeClear(pCreateReq->comment); - taosMemoryFreeClear(pCreateReq->sql); - if (pCreateReq->type == TSDB_CHILD_TABLE) { - taosArrayDestroy(pCreateReq->ctb.tagName); - } - } + tDeleteSVCreateTbBatchReq(&req); } else if (msgType == TDMT_VND_ALTER_TABLE) { SVAlterTbReq req = {0}; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index cb71495e9f..3a3109862c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1158,12 +1158,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, } _exit: - for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { - pCreateReq = req.pReqs + iReq; - taosMemoryFree(pCreateReq->sql); - taosMemoryFree(pCreateReq->comment); - taosArrayDestroy(pCreateReq->ctb.tagName); - } + tDeleteSVCreateTbBatchReq(&req); taosArrayDestroyEx(rsp.pArray, tFreeSVCreateTbRsp); taosArrayDestroy(tbUids); tDecoderClear(&decoder);