Merge pull request #25909 from taosdata/feat/ly_tmq_batch_meta

optimize tmq snapshot meta
This commit is contained in:
dapan1121 2024-06-03 10:19:57 +08:00 committed by GitHub
commit affdeed944
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 666 additions and 210 deletions

View File

@ -282,7 +282,7 @@ typedef enum tmq_res_t {
TMQ_RES_INVALID = -1, TMQ_RES_INVALID = -1,
TMQ_RES_DATA = 1, TMQ_RES_DATA = 1,
TMQ_RES_TABLE_META = 2, TMQ_RES_TABLE_META = 2,
TMQ_RES_METADATA = 3, TMQ_RES_METADATA = 3
} tmq_res_t; } tmq_res_t;
typedef struct tmq_topic_assignment { typedef struct tmq_topic_assignment {

View File

@ -144,6 +144,7 @@ enum {
TMQ_MSG_TYPE__EP_RSP, TMQ_MSG_TYPE__EP_RSP,
TMQ_MSG_TYPE__POLL_DATA_META_RSP, TMQ_MSG_TYPE__POLL_DATA_META_RSP,
TMQ_MSG_TYPE__WALINFO_RSP, TMQ_MSG_TYPE__WALINFO_RSP,
TMQ_MSG_TYPE__POLL_BATCH_META_RSP,
}; };
enum { enum {

View File

@ -2755,6 +2755,7 @@ typedef struct {
int32_t autoCommitInterval; int32_t autoCommitInterval;
int8_t resetOffsetCfg; int8_t resetOffsetCfg;
int8_t enableReplay; int8_t enableReplay;
int8_t enableBatchMeta;
} SCMSubscribeReq; } SCMSubscribeReq;
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
@ -2775,6 +2776,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
tlen += taosEncodeFixedI32(buf, pReq->autoCommitInterval); tlen += taosEncodeFixedI32(buf, pReq->autoCommitInterval);
tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg); tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg);
tlen += taosEncodeFixedI8(buf, pReq->enableReplay); tlen += taosEncodeFixedI8(buf, pReq->enableReplay);
tlen += taosEncodeFixedI8(buf, pReq->enableBatchMeta);
return tlen; return tlen;
} }
@ -2799,6 +2801,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq
buf = taosDecodeFixedI32(buf, &pReq->autoCommitInterval); buf = taosDecodeFixedI32(buf, &pReq->autoCommitInterval);
buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg); buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg);
buf = taosDecodeFixedI8(buf, &pReq->enableReplay); buf = taosDecodeFixedI8(buf, &pReq->enableReplay);
buf = taosDecodeFixedI8(buf, &pReq->enableBatchMeta);
return buf; return buf;
} }
@ -2987,6 +2990,7 @@ typedef struct {
int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq); int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq);
int tDecodeSVCreateTbBatchReq(SDecoder* pCoder, SVCreateTbBatchReq* pReq); int tDecodeSVCreateTbBatchReq(SDecoder* pCoder, SVCreateTbBatchReq* pReq);
void tDeleteSVCreateTbBatchReq(SVCreateTbBatchReq* pReq);
typedef struct { typedef struct {
int32_t code; int32_t code;
@ -3904,6 +3908,7 @@ typedef struct {
STqOffsetVal reqOffset; STqOffsetVal reqOffset;
int8_t enableReplay; int8_t enableReplay;
int8_t sourceExcluded; int8_t sourceExcluded;
int8_t enableBatchMeta;
} SMqPollReq; } SMqPollReq;
int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq); int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq);
@ -3989,6 +3994,20 @@ int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const void* pRsp);
int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, void* pRsp); int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, void* pRsp);
void tDeleteSTaosxRsp(void* pRsp); void tDeleteSTaosxRsp(void* pRsp);
typedef struct SMqBatchMetaRsp {
SMqRspHead head; // not serialize
STqOffsetVal rspOffset;
SArray* batchMetaLen;
SArray* batchMetaReq;
void* pMetaBuff; // not serialize
uint32_t metaBuffLen; // not serialize
} SMqBatchMetaRsp;
int32_t tEncodeMqBatchMetaRsp(SEncoder* pEncoder, const SMqBatchMetaRsp* pRsp);
int32_t tDecodeMqBatchMetaRsp(SDecoder* pDecoder, SMqBatchMetaRsp* pRsp);
int32_t tSemiDecodeMqBatchMetaRsp(SDecoder* pDecoder, SMqBatchMetaRsp* pRsp);
void tDeleteMqBatchMetaRsp(SMqBatchMetaRsp* pRsp);
typedef struct { typedef struct {
SMqRspHead head; SMqRspHead head;
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];

View File

@ -201,7 +201,7 @@ void qStreamSetSourceExcluded(qTaskInfo_t tinfo, int8_t sourceExcluded);
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo); const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo);

View File

@ -77,6 +77,7 @@ typedef struct {
#define TD_CODER_CURRENT(CODER) ((CODER)->data + (CODER)->pos) #define TD_CODER_CURRENT(CODER) ((CODER)->data + (CODER)->pos)
#define TD_CODER_MOVE_POS(CODER, MOVE) ((CODER)->pos += (MOVE)) #define TD_CODER_MOVE_POS(CODER, MOVE) ((CODER)->pos += (MOVE))
#define TD_CODER_CHECK_CAPACITY_FAILED(CODER, EXPSIZE) (((CODER)->size - (CODER)->pos) < (EXPSIZE)) #define TD_CODER_CHECK_CAPACITY_FAILED(CODER, EXPSIZE) (((CODER)->size - (CODER)->pos) < (EXPSIZE))
#define TD_CODER_REMAIN_CAPACITY(CODER) ((CODER)->size - (CODER)->pos)
#define tEncodeSize(E, S, SIZE, RET) \ #define tEncodeSize(E, S, SIZE, RET) \
do { \ do { \

View File

@ -44,6 +44,7 @@ enum {
RES_TYPE__TMQ, RES_TYPE__TMQ,
RES_TYPE__TMQ_META, RES_TYPE__TMQ_META,
RES_TYPE__TMQ_METADATA, RES_TYPE__TMQ_METADATA,
RES_TYPE__TMQ_BATCH_META,
}; };
#define SHOW_VARIABLES_RESULT_COLS 3 #define SHOW_VARIABLES_RESULT_COLS 3
@ -55,6 +56,7 @@ enum {
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) #define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META) #define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META)
#define TD_RES_TMQ_METADATA(res) (*(int8_t*)res == RES_TYPE__TMQ_METADATA) #define TD_RES_TMQ_METADATA(res) (*(int8_t*)res == RES_TYPE__TMQ_METADATA)
#define TD_RES_TMQ_BATCH_META(res) (*(int8_t*)res == RES_TYPE__TMQ_BATCH_META)
typedef struct SAppInstInfo SAppInstInfo; typedef struct SAppInstInfo SAppInstInfo;
@ -241,6 +243,11 @@ typedef struct {
STaosxRsp rsp; STaosxRsp rsp;
} SMqTaosxRspObj; } SMqTaosxRspObj;
typedef struct {
SMqRspObjCommon common;
SMqBatchMetaRsp rsp;
} SMqBatchMetaRspObj;
typedef struct SReqRelInfo { typedef struct SReqRelInfo {
uint64_t userRefId; uint64_t userRefId;
uint64_t prevRefId; uint64_t prevRefId;

View File

@ -301,7 +301,7 @@ void taos_close(TAOS *taos) {
} }
int taos_errno(TAOS_RES *res) { int taos_errno(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res)) { if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return terrno; return terrno;
} }
@ -313,7 +313,7 @@ int taos_errno(TAOS_RES *res) {
} }
const char *taos_errstr(TAOS_RES *res) { const char *taos_errstr(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res)) { if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return (const char *)tstrerror(terrno); return (const char *)tstrerror(terrno);
} }
@ -354,6 +354,10 @@ void taos_free_result(TAOS_RES *res) {
SMqMetaRspObj *pRspObj = (SMqMetaRspObj *)res; SMqMetaRspObj *pRspObj = (SMqMetaRspObj *)res;
tDeleteMqMetaRsp(&pRspObj->metaRsp); tDeleteMqMetaRsp(&pRspObj->metaRsp);
taosMemoryFree(pRspObj); taosMemoryFree(pRspObj);
} else if (TD_RES_TMQ_BATCH_META(res)) {
SMqBatchMetaRspObj *pBtRspObj = (SMqBatchMetaRspObj *)res;
tDeleteMqBatchMetaRsp(&pBtRspObj->rsp);
taosMemoryFree(pBtRspObj);
} }
} }
@ -371,7 +375,7 @@ void taos_kill_query(TAOS *taos) {
} }
int taos_field_count(TAOS_RES *res) { int taos_field_count(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res)) { if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return 0; return 0;
} }
@ -382,7 +386,7 @@ int taos_field_count(TAOS_RES *res) {
int taos_num_fields(TAOS_RES *res) { return taos_field_count(res); } int taos_num_fields(TAOS_RES *res) { return taos_field_count(res); }
TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
if (taos_num_fields(res) == 0 || TD_RES_TMQ_META(res)) { if (taos_num_fields(res) == 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return NULL; return NULL;
} }
@ -437,7 +441,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
pResultInfo->current += 1; pResultInfo->current += 1;
return pResultInfo->row; return pResultInfo->row;
} }
} else if (TD_RES_TMQ_META(res)) { } else if (TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return NULL; return NULL;
} else { } else {
// assert to avoid un-initialization error // assert to avoid un-initialization error
@ -548,7 +552,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
} }
int *taos_fetch_lengths(TAOS_RES *res) { int *taos_fetch_lengths(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res)) { if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return NULL; return NULL;
} }
@ -557,7 +561,7 @@ int *taos_fetch_lengths(TAOS_RES *res) {
} }
TAOS_ROW *taos_result_block(TAOS_RES *res) { TAOS_ROW *taos_result_block(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res)) { if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return NULL; return NULL;
} }
@ -625,7 +629,7 @@ const char *taos_get_client_info() { return version; }
// return int32_t // return int32_t
int taos_affected_rows(TAOS_RES *res) { int taos_affected_rows(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) { if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
return 0; return 0;
} }
@ -636,7 +640,7 @@ int taos_affected_rows(TAOS_RES *res) {
// return int64_t // return int64_t
int64_t taos_affected_rows64(TAOS_RES *res) { int64_t taos_affected_rows64(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) { if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
return 0; return 0;
} }
@ -646,7 +650,7 @@ int64_t taos_affected_rows64(TAOS_RES *res) {
} }
int taos_result_precision(TAOS_RES *res) { int taos_result_precision(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res)) { if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return TSDB_TIME_PRECISION_MILLI; return TSDB_TIME_PRECISION_MILLI;
} }
@ -686,7 +690,7 @@ int taos_select_db(TAOS *taos, const char *db) {
} }
void taos_stop_query(TAOS_RES *res) { void taos_stop_query(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) { if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
return; return;
} }
@ -694,7 +698,7 @@ void taos_stop_query(TAOS_RES *res) {
} }
bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
if (res == NULL || TD_RES_TMQ_META(res)) { if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return true; return true;
} }
SReqResultInfo *pResultInfo = tscGetCurResInfo(res); SReqResultInfo *pResultInfo = tscGetCurResInfo(res);
@ -719,7 +723,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
} }
int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
if (res == NULL || TD_RES_TMQ_META(res)) { if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return 0; return 0;
} }
@ -761,7 +765,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
*numOfRows = 0; *numOfRows = 0;
*pData = NULL; *pData = NULL;
if (res == NULL || TD_RES_TMQ_META(res)) { if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return 0; return 0;
} }
@ -797,7 +801,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
} }
int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
if (res == NULL || TD_RES_TMQ_META(res)) { if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return 0; return 0;
} }

View File

@ -26,9 +26,13 @@
#define LOG_ID_TAG "connId:0x%" PRIx64 ",reqId:0x%" PRIx64 #define LOG_ID_TAG "connId:0x%" PRIx64 ",reqId:0x%" PRIx64
#define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId #define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId
#define TMQ_META_VERSION "1.0"
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen);
static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); } static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); }
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, static cJSON* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
int8_t t, SColCmprWrapper* pColCmprRow) { int8_t t, SColCmprWrapper* pColCmprRow) {
int8_t buildDefaultCompress = 0; int8_t buildDefaultCompress = 0;
if (pColCmprRow->nCols <= 0) { if (pColCmprRow->nCols <= 0) {
@ -122,9 +126,7 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch
} }
cJSON_AddItemToObject(json, "tags", tags); cJSON_AddItemToObject(json, "tags", tags);
string = cJSON_PrintUnformatted(json); return json;
cJSON_Delete(json);
return string;
} }
static int32_t setCompressOption(cJSON* json, uint32_t para) { static int32_t setCompressOption(cJSON* json, uint32_t para) {
@ -151,7 +153,7 @@ static int32_t setCompressOption(cJSON* json, uint32_t para) {
} }
return 0; return 0;
} }
static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) { static cJSON* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
SMAlterStbReq req = {0}; SMAlterStbReq req = {0};
cJSON* json = NULL; cJSON* json = NULL;
char* string = NULL; char* string = NULL;
@ -245,18 +247,16 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
default: default:
break; break;
} }
string = cJSON_PrintUnformatted(json);
end: end:
cJSON_Delete(json);
tFreeSMAltertbReq(&req); tFreeSMAltertbReq(&req);
return string; return json;
} }
static char* processCreateStb(SMqMetaRsp* metaRsp) { static cJSON* processCreateStb(SMqMetaRsp* metaRsp) {
SVCreateStbReq req = {0}; SVCreateStbReq req = {0};
SDecoder coder; SDecoder coder;
char* string = NULL; cJSON* pJson = NULL;
uDebug("create stable data:%p", metaRsp); uDebug("create stable data:%p", metaRsp);
// decode and process req // decode and process req
@ -267,17 +267,17 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) {
if (tDecodeSVCreateStbReq(&coder, &req) < 0) { if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
goto _err; goto _err;
} }
string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr); pJson = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr);
_err: _err:
uDebug("create stable return, sql json:%s", string); uDebug("create stable return, sql json:%s", cJSON_PrintUnformatted(pJson));
tDecoderClear(&coder); tDecoderClear(&coder);
return string; return pJson;
} }
static char* processAlterStb(SMqMetaRsp* metaRsp) { static cJSON* processAlterStb(SMqMetaRsp* metaRsp) {
SVCreateStbReq req = {0}; SVCreateStbReq req = {0};
SDecoder coder; SDecoder coder;
char* string = NULL; cJSON* pJson = NULL;
uDebug("alter stable data:%p", metaRsp); uDebug("alter stable data:%p", metaRsp);
// decode and process req // decode and process req
@ -288,11 +288,11 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) {
if (tDecodeSVCreateStbReq(&coder, &req) < 0) { if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
goto _err; goto _err;
} }
string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen); pJson = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
_err: _err:
uDebug("alter stable return, sql json:%s", string); uDebug("alter stable return, sql json:%s", cJSON_PrintUnformatted(pJson));
tDecoderClear(&coder); tDecoderClear(&coder);
return string; return pJson;
} }
static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
@ -385,7 +385,7 @@ end:
taosArrayDestroy(pTagVals); taosArrayDestroy(pTagVals);
} }
static char* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) { static cJSON* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) {
char* string = NULL; char* string = NULL;
cJSON* json = cJSON_CreateObject(); cJSON* json = cJSON_CreateObject();
if (json == NULL) { if (json == NULL) {
@ -410,16 +410,14 @@ static char* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) {
cJSON_AddItemToArray(createList, create); cJSON_AddItemToArray(createList, create);
} }
cJSON_AddItemToObject(json, "createList", createList); cJSON_AddItemToObject(json, "createList", createList);
string = cJSON_PrintUnformatted(json); return json;
cJSON_Delete(json);
return string;
} }
static char* processCreateTable(SMqMetaRsp* metaRsp) { static cJSON* processCreateTable(SMqMetaRsp* metaRsp) {
SDecoder decoder = {0}; SDecoder decoder = {0};
SVCreateTbBatchReq req = {0}; SVCreateTbBatchReq req = {0};
SVCreateTbReq* pCreateReq; SVCreateTbReq* pCreateReq;
char* string = NULL; cJSON* pJson = NULL;
// decode // decode
uDebug("create table data:%p", metaRsp); uDebug("create table data:%p", metaRsp);
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
@ -433,25 +431,18 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
if (req.nReqs > 0) { if (req.nReqs > 0) {
pCreateReq = req.pReqs; pCreateReq = req.pReqs;
if (pCreateReq->type == TSDB_CHILD_TABLE) { if (pCreateReq->type == TSDB_CHILD_TABLE) {
string = buildCreateCTableJson(req.pReqs, req.nReqs); pJson = buildCreateCTableJson(req.pReqs, req.nReqs);
} else if (pCreateReq->type == TSDB_NORMAL_TABLE) { } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, pJson = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid,
TSDB_NORMAL_TABLE, &pCreateReq->colCmpr); TSDB_NORMAL_TABLE, &pCreateReq->colCmpr);
} }
} }
_exit: _exit:
uDebug("create table return, sql json:%s", string); uDebug("create table return, sql json:%s", cJSON_PrintUnformatted(pJson));
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { tDeleteSVCreateTbBatchReq(&req);
pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment);
taosMemoryFreeClear(pCreateReq->sql);
if (pCreateReq->type == TSDB_CHILD_TABLE) {
taosArrayDestroy(pCreateReq->ctb.tagName);
}
}
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return pJson;
} }
static char* processAutoCreateTable(STaosxRsp* rsp) { static char* processAutoCreateTable(STaosxRsp* rsp) {
@ -480,7 +471,9 @@ static char* processAutoCreateTable(STaosxRsp* rsp) {
goto _exit; goto _exit;
} }
} }
string = buildCreateCTableJson(pCreateReq, rsp->createTableNum); cJSON* pJson = buildCreateCTableJson(pCreateReq, rsp->createTableNum);
string = cJSON_PrintUnformatted(pJson);
cJSON_Delete(pJson);
_exit: _exit:
uDebug("auto created table return, sql json:%s", string); uDebug("auto created table return, sql json:%s", string);
for (int i = 0; i < rsp->createTableNum; i++) { for (int i = 0; i < rsp->createTableNum; i++) {
@ -495,7 +488,7 @@ _exit:
return string; return string;
} }
static char* processAlterTable(SMqMetaRsp* metaRsp) { static cJSON* processAlterTable(SMqMetaRsp* metaRsp) {
SDecoder decoder = {0}; SDecoder decoder = {0};
SVAlterTbReq vAlterTbReq = {0}; SVAlterTbReq vAlterTbReq = {0};
char* string = NULL; char* string = NULL;
@ -620,19 +613,16 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
default: default:
break; break;
} }
string = cJSON_PrintUnformatted(json);
_exit: _exit:
uDebug("alter table return, sql json:%s", string); uDebug("alter table return, sql json:%s", cJSON_PrintUnformatted(json));
cJSON_Delete(json);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return json;
} }
static char* processDropSTable(SMqMetaRsp* metaRsp) { static cJSON* processDropSTable(SMqMetaRsp* metaRsp) {
SDecoder decoder = {0}; SDecoder decoder = {0};
SVDropStbReq req = {0}; SVDropStbReq req = {0};
char* string = NULL;
cJSON* json = NULL; cJSON* json = NULL;
uDebug("processDropSTable data:%p", metaRsp); uDebug("processDropSTable data:%p", metaRsp);
@ -657,18 +647,15 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
cJSON* tableName = cJSON_CreateString(req.name); cJSON* tableName = cJSON_CreateString(req.name);
cJSON_AddItemToObject(json, "tableName", tableName); cJSON_AddItemToObject(json, "tableName", tableName);
string = cJSON_PrintUnformatted(json);
_exit: _exit:
uDebug("processDropSTable return, sql json:%s", string); uDebug("processDropSTable return, sql json:%s", cJSON_PrintUnformatted(json));
cJSON_Delete(json);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return json;
} }
static char* processDeleteTable(SMqMetaRsp* metaRsp) { static cJSON* processDeleteTable(SMqMetaRsp* metaRsp) {
SDeleteRes req = {0}; SDeleteRes req = {0};
SDecoder coder = {0}; SDecoder coder = {0};
cJSON* json = NULL; cJSON* json = NULL;
char* string = NULL;
uDebug("processDeleteTable data:%p", metaRsp); uDebug("processDeleteTable data:%p", metaRsp);
// decode and process req // decode and process req
@ -696,18 +683,15 @@ static char* processDeleteTable(SMqMetaRsp* metaRsp) {
cJSON* sqlJson = cJSON_CreateString(sql); cJSON* sqlJson = cJSON_CreateString(sql);
cJSON_AddItemToObject(json, "sql", sqlJson); cJSON_AddItemToObject(json, "sql", sqlJson);
string = cJSON_PrintUnformatted(json);
_exit: _exit:
uDebug("processDeleteTable return, sql json:%s", string); uDebug("processDeleteTable return, sql json:%s", cJSON_PrintUnformatted(json));
cJSON_Delete(json);
tDecoderClear(&coder); tDecoderClear(&coder);
return string; return json;
} }
static char* processDropTable(SMqMetaRsp* metaRsp) { static cJSON* processDropTable(SMqMetaRsp* metaRsp) {
SDecoder decoder = {0}; SDecoder decoder = {0};
SVDropTbBatchReq req = {0}; SVDropTbBatchReq req = {0};
char* string = NULL;
cJSON* json = NULL; cJSON* json = NULL;
uDebug("processDropTable data:%p", metaRsp); uDebug("processDropTable data:%p", metaRsp);
@ -741,12 +725,10 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
} }
cJSON_AddItemToObject(json, "tableNameList", tableNameList); cJSON_AddItemToObject(json, "tableNameList", tableNameList);
string = cJSON_PrintUnformatted(json);
_exit: _exit:
uDebug("processDropTable return, json sql:%s", string); uDebug("processDropTable return, json sql:%s", cJSON_PrintUnformatted(json));
cJSON_Delete(json);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return json;
} }
static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
@ -1121,13 +1103,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
end: end:
uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code)); uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { tDeleteSVCreateTbBatchReq(&req);
pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment);
if (pCreateReq->type == TSDB_CHILD_TABLE) {
taosArrayDestroy(pCreateReq->ctb.tagName);
}
}
taosHashCleanup(pVgroupHashmap); taosHashCleanup(pVgroupHashmap);
destroyRequest(pRequest); destroyRequest(pRequest);
@ -1803,6 +1779,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
char err[ERR_MSG_LEN] = {0}; char err[ERR_MSG_LEN] = {0};
code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, fields, pSW->nCols, true, err, ERR_MSG_LEN); code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, fields, pSW->nCols, true, err, ERR_MSG_LEN);
taosMemoryFree(fields); taosMemoryFree(fields);
taosMemoryFreeClear(pTableMeta);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("table:%s, err:%s", tbName, err); SET_ERROR_MSG("table:%s, err:%s", tbName, err);
goto end; goto end;
@ -1995,6 +1972,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
char err[ERR_MSG_LEN] = {0}; char err[ERR_MSG_LEN] = {0};
code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqDst, fields, pSW->nCols, true, err, ERR_MSG_LEN); code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqDst, fields, pSW->nCols, true, err, ERR_MSG_LEN);
taosMemoryFree(fields); taosMemoryFree(fields);
taosMemoryFreeClear(pTableMeta);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("table:%s, err:%s", tbName, err); SET_ERROR_MSG("table:%s, err:%s", tbName, err);
goto end; goto end;
@ -2025,38 +2003,85 @@ end:
return code; return code;
} }
static cJSON* processSimpleMeta(SMqMetaRsp* pMetaRsp) {
if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) {
return processCreateStb(pMetaRsp);
} else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) {
return processAlterStb(pMetaRsp);
} else if (pMetaRsp->resMsgType == TDMT_VND_DROP_STB) {
return processDropSTable(pMetaRsp);
} else if (pMetaRsp->resMsgType == TDMT_VND_CREATE_TABLE) {
return processCreateTable(pMetaRsp);
} else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_TABLE) {
return processAlterTable(pMetaRsp);
} else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) {
return processDropTable(pMetaRsp);
} else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) {
return processDropTable(pMetaRsp);
} else if (pMetaRsp->resMsgType == TDMT_VND_DELETE) {
return processDeleteTable(pMetaRsp);
}
return NULL;
}
static char* processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp) {
SDecoder coder;
SMqBatchMetaRsp rsp = {0};
tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen);
if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
goto _end;
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION);
cJSON* pMetaArr = cJSON_CreateArray();
int32_t num = taosArrayGetSize(rsp.batchMetaReq);
for (int32_t i = 0; i < num; i++) {
int32_t len = *(int32_t*)taosArrayGet(rsp.batchMetaLen, i);
void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
SDecoder metaCoder = {0};
SMqMetaRsp metaRsp = {0};
tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), len - sizeof(SMqRspHead));
if(tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0 ) {
goto _end;
}
cJSON* pItem = processSimpleMeta(&metaRsp);
tDeleteMqMetaRsp(&metaRsp);
cJSON_AddItemToArray(pMetaArr, pItem);
}
cJSON_AddItemToObject(pJson, "metas", pMetaArr);
tDeleteMqBatchMetaRsp(&rsp);
char* fullStr = cJSON_PrintUnformatted(pJson);
cJSON_Delete(pJson);
return fullStr;
_end:
cJSON_Delete(pJson);
tDeleteMqBatchMetaRsp(&rsp);
return NULL;
}
char* tmq_get_json_meta(TAOS_RES* res) { char* tmq_get_json_meta(TAOS_RES* res) {
if (res == NULL) return NULL; if (res == NULL) return NULL;
uDebug("tmq_get_json_meta res:%p", res); uDebug("tmq_get_json_meta res:%p", res);
if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res)) { if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res) && !TD_RES_TMQ_BATCH_META(res)) {
return NULL; return NULL;
} }
if (TD_RES_TMQ_METADATA(res)) { if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pMetaDataRspObj = (SMqTaosxRspObj*)res; SMqTaosxRspObj* pMetaDataRspObj = (SMqTaosxRspObj*)res;
return processAutoCreateTable(&pMetaDataRspObj->rsp); return processAutoCreateTable(&pMetaDataRspObj->rsp);
} else if (TD_RES_TMQ_BATCH_META(res)) {
SMqBatchMetaRspObj* pBatchMetaRspObj = (SMqBatchMetaRspObj*)res;
return processBatchMetaToJson(&pBatchMetaRspObj->rsp);
} }
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) { cJSON* pJson = processSimpleMeta(&pMetaRspObj->metaRsp);
return processCreateStb(&pMetaRspObj->metaRsp); char* string = cJSON_PrintUnformatted(pJson);
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB) { cJSON_Delete(pJson);
return processAlterStb(&pMetaRspObj->metaRsp); return string;
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB) {
return processDropSTable(&pMetaRspObj->metaRsp);
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE) {
return processCreateTable(&pMetaRspObj->metaRsp);
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE) {
return processAlterTable(&pMetaRspObj->metaRsp);
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) {
return processDropTable(&pMetaRspObj->metaRsp);
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) {
return processDropTable(&pMetaRspObj->metaRsp);
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) {
return processDeleteTable(&pMetaRspObj->metaRsp);
}
return NULL;
} }
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); } void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
@ -2147,6 +2172,12 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
} }
raw->raw_type = RES_TYPE__TMQ_METADATA; raw->raw_type = RES_TYPE__TMQ_METADATA;
uDebug("tmq get raw type metadata:%p", raw); uDebug("tmq get raw type metadata:%p", raw);
} else if (TD_RES_TMQ_BATCH_META(res)) {
SMqBatchMetaRspObj* pBtMetaRspObj = (SMqBatchMetaRspObj*)res;
raw->raw = pBtMetaRspObj->rsp.pMetaBuff;
raw->raw_len = pBtMetaRspObj->rsp.metaBuffLen;
raw->raw_type = RES_TYPE__TMQ_BATCH_META;
uDebug("tmq get raw batch meta:%p", raw);
} else { } else {
uError("tmq get raw error type:%d", *(int8_t*)res); uError("tmq get raw error type:%d", *(int8_t*)res);
terrno = TSDB_CODE_TMQ_INVALID_MSG; terrno = TSDB_CODE_TMQ_INVALID_MSG;
@ -2163,32 +2194,74 @@ void tmq_free_raw(tmq_raw_data raw) {
memset(terrMsg, 0, ERR_MSG_LEN); memset(terrMsg, 0, ERR_MSG_LEN);
} }
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
if (type == TDMT_VND_CREATE_STB) {
return taosCreateStb(taos, buf, len);
} else if (type == TDMT_VND_ALTER_STB) {
return taosCreateStb(taos, buf, len);
} else if (type == TDMT_VND_DROP_STB) {
return taosDropStb(taos, buf, len);
} else if (type == TDMT_VND_CREATE_TABLE) {
return taosCreateTable(taos, buf, len);
} else if (type == TDMT_VND_ALTER_TABLE) {
return taosAlterTable(taos, buf, len);
} else if (type == TDMT_VND_DROP_TABLE) {
return taosDropTable(taos, buf, len);
} else if (type == TDMT_VND_DELETE) {
return taosDeleteData(taos, buf, len);
} else if (type == RES_TYPE__TMQ) {
return tmqWriteRawDataImpl(taos, buf, len);
} else if (type == RES_TYPE__TMQ_METADATA) {
return tmqWriteRawMetaDataImpl(taos, buf, len);
} else if (type == RES_TYPE__TMQ_BATCH_META) {
return tmqWriteBatchMetaDataImpl(taos, buf, len);
}
return TSDB_CODE_INVALID_PARA;
}
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) { int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
if (!taos) { if (!taos) {
goto end; return TSDB_CODE_INVALID_PARA;
} }
if (raw.raw_type == TDMT_VND_CREATE_STB) { return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type);
return taosCreateStb(taos, raw.raw, raw.raw_len); }
} else if (raw.raw_type == TDMT_VND_ALTER_STB) {
return taosCreateStb(taos, raw.raw, raw.raw_len);
} else if (raw.raw_type == TDMT_VND_DROP_STB) {
return taosDropStb(taos, raw.raw, raw.raw_len);
} else if (raw.raw_type == TDMT_VND_CREATE_TABLE) {
return taosCreateTable(taos, raw.raw, raw.raw_len);
} else if (raw.raw_type == TDMT_VND_ALTER_TABLE) {
return taosAlterTable(taos, raw.raw, raw.raw_len);
} else if (raw.raw_type == TDMT_VND_DROP_TABLE) {
return taosDropTable(taos, raw.raw, raw.raw_len);
} else if (raw.raw_type == TDMT_VND_DELETE) {
return taosDeleteData(taos, raw.raw, raw.raw_len);
} else if (raw.raw_type == RES_TYPE__TMQ) {
return tmqWriteRawDataImpl(taos, raw.raw, raw.raw_len);
} else if (raw.raw_type == RES_TYPE__TMQ_METADATA) {
return tmqWriteRawMetaDataImpl(taos, raw.raw, raw.raw_len);
}
end: static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen) {
if (taos == NULL || meta == NULL) {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
}
SMqBatchMetaRsp rsp = {0};
SDecoder coder;
int32_t code = TSDB_CODE_SUCCESS;
// decode and process req
tDecoderInit(&coder, meta, metaLen);
if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
code = TSDB_CODE_INVALID_PARA;
goto _end;
}
int32_t num = taosArrayGetSize(rsp.batchMetaReq);
for (int32_t i = 0; i < num; i++) {
int32_t len = *(int32_t*)taosArrayGet(rsp.batchMetaLen, i);
void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
SDecoder metaCoder = {0};
SMqMetaRsp metaRsp = {0};
tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), len - sizeof(SMqRspHead));
if (tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0) {
code = TSDB_CODE_INVALID_PARA;
goto _end;
}
code = writeRawImpl(taos, metaRsp.metaRsp, metaRsp.metaRspLen, metaRsp.resMsgType);
tDeleteMqMetaRsp(&metaRsp);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
}
_end:
tDeleteMqBatchMetaRsp(&rsp);
errno = code;
return code;
} }

View File

@ -71,6 +71,7 @@ struct tmq_conf_t {
char* pass; char* pass;
tmq_commit_cb* commitCb; tmq_commit_cb* commitCb;
void* commitCbUserParam; void* commitCbUserParam;
int8_t enableBatchMeta;
}; };
struct tmq_t { struct tmq_t {
@ -87,6 +88,7 @@ struct tmq_t {
uint64_t consumerId; uint64_t consumerId;
tmq_commit_cb* commitCb; tmq_commit_cb* commitCb;
void* commitCbUserParam; void* commitCbUserParam;
int8_t enableBatchMeta;
// status // status
SRWLatch lock; SRWLatch lock;
@ -174,6 +176,7 @@ typedef struct {
SMqDataRsp dataRsp; SMqDataRsp dataRsp;
SMqMetaRsp metaRsp; SMqMetaRsp metaRsp;
STaosxRsp taosxRsp; STaosxRsp taosxRsp;
SMqBatchMetaRsp batchMetaRsp;
}; };
} SMqPollRspWrapper; } SMqPollRspWrapper;
@ -268,6 +271,7 @@ tmq_conf_t* tmq_conf_new() {
conf->autoCommit = true; conf->autoCommit = true;
conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL; conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
conf->resetOffset = TMQ_OFFSET__RESET_LATEST; conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
conf->enableBatchMeta = false;
return conf; return conf;
} }
@ -397,6 +401,11 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
if (strcasecmp(key, "msg.enable.batchmeta") == 0) {
conf->enableBatchMeta = (taosStr2int64(value) != 0) ? true : false;
return TMQ_CONF_OK;
}
return TMQ_CONF_UNKNOWN; return TMQ_CONF_UNKNOWN;
} }
@ -635,6 +644,11 @@ static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_c
pTopicName = pRspObj->common.topic; pTopicName = pRspObj->common.topic;
vgId = pRspObj->common.vgId; vgId = pRspObj->common.vgId;
offsetVal = pRspObj->rsp.common.rspOffset; offsetVal = pRspObj->rsp.common.rspOffset;
} else if (TD_RES_TMQ_BATCH_META(pRes)) {
SMqBatchMetaRspObj* pBtRspObj = (SMqBatchMetaRspObj*)pRes;
pTopicName = pBtRspObj->common.topic;
vgId = pBtRspObj->common.vgId;
offsetVal = pBtRspObj->rsp.rspOffset;
} else { } else {
code = TSDB_CODE_TMQ_INVALID_MSG; code = TSDB_CODE_TMQ_INVALID_MSG;
goto end; goto end;
@ -934,6 +948,10 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosMemoryFreeClear(pRsp->pEpset); taosMemoryFreeClear(pRsp->pEpset);
tDeleteSTaosxRsp(&pRsp->taosxRsp); tDeleteSTaosxRsp(&pRsp->taosxRsp);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosMemoryFreeClear(pRsp->pEpset);
tDeleteMqBatchMetaRsp(&pRsp->batchMetaRsp);
} }
} }
@ -1112,6 +1130,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->resetOffsetCfg = conf->resetOffset; pTmq->resetOffsetCfg = conf->resetOffset;
pTmq->replayEnable = conf->replayEnable; pTmq->replayEnable = conf->replayEnable;
pTmq->sourceExcluded = conf->sourceExcluded; pTmq->sourceExcluded = conf->sourceExcluded;
pTmq->enableBatchMeta = conf->enableBatchMeta;
if (conf->replayEnable) { if (conf->replayEnable) {
pTmq->autoCommit = false; pTmq->autoCommit = false;
} }
@ -1189,6 +1208,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
req.autoCommitInterval = tmq->autoCommitInterval; req.autoCommitInterval = tmq->autoCommitInterval;
req.resetOffsetCfg = tmq->resetOffsetCfg; req.resetOffsetCfg = tmq->resetOffsetCfg;
req.enableReplay = tmq->replayEnable; req.enableReplay = tmq->replayEnable;
req.enableBatchMeta = tmq->enableBatchMeta;
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
char* topic = taosArrayGetP(container, i); char* topic = taosArrayGetP(container, i);
@ -1418,6 +1438,19 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead)); memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
} else if (rspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
if(tSemiDecodeMqBatchMetaRsp(&decoder, &pRspWrapper->batchMetaRsp) < 0){
tDecoderClear(&decoder);
taosReleaseRef(tmqMgmt.rsetId, refId);
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
tDecoderClear(&decoder);
memcpy(&pRspWrapper->batchMetaRsp, pMsg->pData, sizeof(SMqRspHead));
tscDebug("consumer:0x%" PRIx64 " recv poll batchmeta rsp, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, vgId,
requestId);
} else { // invalid rspType } else { // invalid rspType
tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType); tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
} }
@ -1600,6 +1633,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl
pReq->reqId = generateRequestId(); pReq->reqId = generateRequestId();
pReq->enableReplay = tmq->replayEnable; pReq->enableReplay = tmq->replayEnable;
pReq->sourceExcluded = tmq->sourceExcluded; pReq->sourceExcluded = tmq->sourceExcluded;
pReq->enableBatchMeta = tmq->enableBatchMeta;
} }
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
@ -1616,6 +1650,20 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
return pRspObj; return pRspObj;
} }
SMqBatchMetaRspObj* tmqBuildBatchMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqBatchMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqBatchMetaRspObj));
if(pRspObj == NULL) {
return NULL;
}
pRspObj->common.resType = RES_TYPE__TMQ_BATCH_META;
tstrncpy(pRspObj->common.topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->common.db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
pRspObj->common.vgId = pWrapper->vgHandle->vgId;
memcpy(&pRspObj->rsp, &pWrapper->batchMetaRsp, sizeof(SMqBatchMetaRsp));
tscDebug("build batchmeta Rsp from wrapper");
return pRspObj;
}
void changeByteEndian(char* pData){ void changeByteEndian(char* pData){
char* p = pData; char* p = pData;
@ -2006,6 +2054,42 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
tmqFreeRspWrapper(pRspWrapper); tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pRspWrapper); taosFreeQitem(pRspWrapper);
} }
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);
if (pollRspWrapper->batchMetaRsp.head.epoch == consumerEpoch) {
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
pollRspWrapper->vgHandle = pVg;
pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) {
tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
pollRspWrapper->topicName, pollRspWrapper->vgId);
tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pRspWrapper);
taosWUnLockLatch(&tmq->lock);
return NULL;
}
// build rsp
void* pRsp = NULL;
updateVgInfo(pVg, &pollRspWrapper->batchMetaRsp.rspOffset, &pollRspWrapper->batchMetaRsp.rspOffset,
pollRspWrapper->batchMetaRsp.head.walsver, pollRspWrapper->batchMetaRsp.head.walever,
tmq->consumerId, true);
pRsp = tmqBuildBatchMetaRspFromWrapper(pollRspWrapper);
taosFreeQitem(pRspWrapper);
taosWUnLockLatch(&tmq->lock);
return pRsp;
} else {
tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->batchMetaRsp.head.epoch, consumerEpoch);
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pRspWrapper);
}
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch); int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
@ -2217,6 +2301,8 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) {
return TMQ_RES_TABLE_META; return TMQ_RES_TABLE_META;
} else if (TD_RES_TMQ_METADATA(res)) { } else if (TD_RES_TMQ_METADATA(res)) {
return TMQ_RES_METADATA; return TMQ_RES_METADATA;
} else if (TD_RES_TMQ_BATCH_META(res)) {
return TMQ_RES_TABLE_META;
} else { } else {
return TMQ_RES_INVALID; return TMQ_RES_INVALID;
} }
@ -2226,7 +2312,7 @@ const char* tmq_get_topic_name(TAOS_RES* res) {
if (res == NULL) { if (res == NULL) {
return NULL; return NULL;
} }
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
return strchr(((SMqRspObjCommon*)res)->topic, '.') + 1; return strchr(((SMqRspObjCommon*)res)->topic, '.') + 1;
} else if (TD_RES_TMQ_META(res)) { } else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
@ -2241,7 +2327,7 @@ const char* tmq_get_db_name(TAOS_RES* res) {
return NULL; return NULL;
} }
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
return strchr(((SMqRspObjCommon*)res)->db, '.') + 1; return strchr(((SMqRspObjCommon*)res)->db, '.') + 1;
} else if (TD_RES_TMQ_META(res)) { } else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
@ -2255,7 +2341,7 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
if (res == NULL) { if (res == NULL) {
return -1; return -1;
} }
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
return ((SMqRspObjCommon*)res)->vgId; return ((SMqRspObjCommon*)res)->vgId;
} else if (TD_RES_TMQ_META(res)) { } else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
@ -2282,6 +2368,11 @@ int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
if (pRspObj->metaRsp.rspOffset.type == TMQ_OFFSET__LOG) { if (pRspObj->metaRsp.rspOffset.type == TMQ_OFFSET__LOG) {
return pRspObj->metaRsp.rspOffset.version; return pRspObj->metaRsp.rspOffset.version;
} }
} else if (TD_RES_TMQ_BATCH_META(res)) {
SMqBatchMetaRspObj* pBtRspObj = (SMqBatchMetaRspObj*)res;
if (pBtRspObj->rsp.rspOffset.type == TMQ_OFFSET__LOG) {
return pBtRspObj->rsp.rspOffset.version;
}
} else { } else {
tscError("invalid tmq type:%d", *(int8_t*)res); tscError("invalid tmq type:%d", *(int8_t*)res);
} }

View File

@ -7256,6 +7256,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
pHead->vgId = htonl(pReq->head.vgId); pHead->vgId = htonl(pReq->head.vgId);
pHead->contLen = htonl(tlen + headLen); pHead->contLen = htonl(tlen + headLen);
} }
if (tEncodeI8(&encoder, pReq->enableBatchMeta) < 0) return -1;
return tlen + headLen; return tlen + headLen;
} }
@ -7285,6 +7286,12 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
if (tDecodeI8(&decoder, &pReq->sourceExcluded) < 0) return -1; if (tDecodeI8(&decoder, &pReq->sourceExcluded) < 0) return -1;
} }
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI8(&decoder, &pReq->enableBatchMeta) < 0) return -1;
} else {
pReq->enableBatchMeta = false;
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
@ -8419,6 +8426,18 @@ int tDecodeSVCreateTbBatchReq(SDecoder *pCoder, SVCreateTbBatchReq *pReq) {
return 0; return 0;
} }
void tDeleteSVCreateTbBatchReq(SVCreateTbBatchReq *pReq) {
for (int32_t iReq = 0; iReq < pReq->nReqs; iReq++) {
SVCreateTbReq *pCreateReq = pReq->pReqs + iReq;
taosMemoryFreeClear(pCreateReq->sql);
taosMemoryFreeClear(pCreateReq->comment);
if (pCreateReq->type == TSDB_CHILD_TABLE) {
taosArrayDestroy(pCreateReq->ctb.tagName);
pCreateReq->ctb.tagName = NULL;
}
}
}
int tEncodeSVCreateTbRsp(SEncoder *pCoder, const SVCreateTbRsp *pRsp) { int tEncodeSVCreateTbRsp(SEncoder *pCoder, const SVCreateTbRsp *pRsp) {
if (tStartEncode(pCoder) < 0) return -1; if (tStartEncode(pCoder) < 0) return -1;
@ -10614,3 +10633,55 @@ void tFreeFetchTtlExpiredTbsRsp(void *p) {
SVFetchTtlExpiredTbsRsp *pRsp = p; SVFetchTtlExpiredTbsRsp *pRsp = p;
taosArrayDestroy(pRsp->pExpiredTbs); taosArrayDestroy(pRsp->pExpiredTbs);
} }
int32_t tEncodeMqBatchMetaRsp(SEncoder *pEncoder, const SMqBatchMetaRsp *pRsp) {
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1;
int32_t size = taosArrayGetSize(pRsp->batchMetaReq);
if (tEncodeI32(pEncoder, size) < 0) return -1;
if (size > 0) {
for (int32_t i = 0; i < size; i++) {
void *pMetaReq = taosArrayGetP(pRsp->batchMetaReq, i);
int32_t metaLen = *(int32_t *)taosArrayGet(pRsp->batchMetaLen, i);
if (tEncodeBinary(pEncoder, pMetaReq, metaLen) < 0) return -1;
}
}
return 0;
}
int32_t tDecodeMqBatchMetaRsp(SDecoder *pDecoder, SMqBatchMetaRsp *pRsp) {
int32_t size = 0;
if (tDecodeI32(pDecoder, &size) < 0) return -1;
if (size > 0) {
pRsp->batchMetaReq = taosArrayInit(size, POINTER_BYTES);
pRsp->batchMetaLen = taosArrayInit(size, sizeof(int32_t));
for (int32_t i = 0; i < size; i++) {
void *pCreate = NULL;
uint64_t len = 0;
if (tDecodeBinaryAlloc(pDecoder, &pCreate, &len) < 0) return -1;
int32_t l = (int32_t)len;
taosArrayPush(pRsp->batchMetaReq, &pCreate);
taosArrayPush(pRsp->batchMetaLen, &l);
}
}
return 0;
}
int32_t tSemiDecodeMqBatchMetaRsp(SDecoder *pDecoder, SMqBatchMetaRsp *pRsp) {
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1;
if (pDecoder->size < pDecoder->pos) {
return -1;
}
pRsp->metaBuffLen = TD_CODER_REMAIN_CAPACITY(pDecoder);
pRsp->pMetaBuff = taosMemoryCalloc(1, pRsp->metaBuffLen);
memcpy(pRsp->pMetaBuff, TD_CODER_CURRENT(pDecoder), pRsp->metaBuffLen);
return 0;
}
void tDeleteMqBatchMetaRsp(SMqBatchMetaRsp *pRsp) {
taosMemoryFreeClear(pRsp->pMetaBuff);
taosArrayDestroyP(pRsp->batchMetaReq, taosMemoryFree);
taosArrayDestroy(pRsp->batchMetaLen);
pRsp->batchMetaReq = NULL;
pRsp->batchMetaLen = NULL;
}

View File

@ -113,7 +113,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
void tqDestroyTqHandle(void* data); void tqDestroyTqHandle(void* data);
// tqRead // tqRead
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset); int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* offset);
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest); int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest);
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId); int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId);

View File

@ -99,14 +99,7 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
taosArrayDestroy(reqNew.pArray); taosArrayDestroy(reqNew.pArray);
} }
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { tDeleteSVCreateTbBatchReq(&req);
pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment);
taosMemoryFreeClear(pCreateReq->sql);
if (pCreateReq->type == TSDB_CHILD_TABLE) {
taosArrayDestroy(pCreateReq->ctb.tagName);
}
}
} else if (msgType == TDMT_VND_ALTER_TABLE) { } else if (msgType == TDMT_VND_ALTER_TABLE) {
SVAlterTbReq req = {0}; SVAlterTbReq req = {0};

View File

@ -163,7 +163,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
return 0; return 0;
} }
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) { int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset) {
const STqExecHandle* pExec = &pHandle->execHandle; const STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->task; qTaskInfo_t task = pExec->task;
@ -218,10 +218,10 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
} }
// get meta // get meta
SMqMetaRsp* tmp = qStreamExtractMetaMsg(task); SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task);
if (tmp->metaRspLen > 0) { if (taosArrayGetSize(tmp->batchMetaReq) > 0) {
qStreamExtractOffset(task, &tmp->rspOffset); qStreamExtractOffset(task, &tmp->rspOffset);
*pMetaRsp = *tmp; *pBatchMetaRsp = *tmp;
tqDebug("tmqsnap task get meta"); tqDebug("tmqsnap task get meta");
break; break;

View File

@ -17,6 +17,8 @@
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
const SMqMetaRsp* pRsp, int32_t vgId); const SMqMetaRsp* pRsp, int32_t vgId);
static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
const SMqBatchMetaRsp* pRsp, int32_t vgId);
int32_t tqInitDataRsp(SMqDataRspCommon* pRsp, STqOffsetVal pOffset) { int32_t tqInitDataRsp(SMqDataRspCommon* pRsp, STqOffsetVal pOffset) {
tOffsetCopy(&pRsp->reqOffset, &pOffset); tOffsetCopy(&pRsp->reqOffset, &pOffset);
@ -172,7 +174,7 @@ end : {
} }
} }
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC) \ #define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC) \
SDecoder decoder = {0};\ SDecoder decoder = {0};\
TYPE req = {0}; \ TYPE req = {0}; \
void* data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)); \ void* data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)); \
@ -182,33 +184,37 @@ end : {
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 " msgType %d", \ tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 " msgType %d", \
pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); \ pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); \
fetchVer++; \ fetchVer++; \
DELETE_FUNC(&req); \
tDecoderClear(&decoder); \ tDecoderClear(&decoder); \
continue; \ continue; \
} \ } \
DELETE_FUNC(&req); \
tDecoderClear(&decoder); tDecoderClear(&decoder);
static void tDeleteCommon(void* parm) {
}
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* offset) { SRpcMsg* pMsg, STqOffsetVal* offset) {
int code = 0; int code = 0;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0}; STaosxRsp taosxRsp = {0};
SMqBatchMetaRsp btMetaRsp = {0};
tqInitTaosxRsp(&taosxRsp.common, *offset); tqInitTaosxRsp(&taosxRsp.common, *offset);
if (offset->type != TMQ_OFFSET__LOG) { if (offset->type != TMQ_OFFSET__LOG) {
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset) < 0) {
code = -1; code = -1;
goto end; goto end;
} }
if (metaRsp.metaRspLen > 0) { if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) {
code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId); code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
",ts:%" PRId64, ",ts:%" PRId64,
pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid,
metaRsp.rspOffset.ts); btMetaRsp.rspOffset.ts);
tDeleteMqMetaRsp(&metaRsp); tDeleteMqBatchMetaRsp(&btMetaRsp);
goto end; goto end;
} }
@ -230,11 +236,18 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
uint64_t st = taosGetTimestampMs(); uint64_t st = taosGetTimestampMs();
int totalRows = 0; int totalRows = 0;
int32_t totalMetaRows = 0;
while (1) { while (1) {
int32_t savedEpoch = atomic_load_32(&pHandle->epoch); int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
ASSERT(savedEpoch <= pRequest->epoch); ASSERT(savedEpoch <= pRequest->epoch);
if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) { if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
if (totalMetaRows > 0) {
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
ASSERT(totalRows == 0);
goto end;
}
tqOffsetResetToLog(&taosxRsp.common.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.common.rspOffset, fetchVer);
code = tqSendDataRsp( code = tqSendDataRsp(
pHandle, pMsg, pRequest, &taosxRsp, pHandle, pMsg, pRequest, &taosxRsp,
@ -258,17 +271,20 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) { if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
if (pHead->msgType == TDMT_VND_CREATE_TABLE) { if (pHead->msgType == TDMT_VND_CREATE_TABLE) {
PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq) PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq, tDeleteSVCreateTbBatchReq)
} else if (pHead->msgType == TDMT_VND_ALTER_TABLE) { } else if (pHead->msgType == TDMT_VND_ALTER_TABLE) {
PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq) PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq, tDeleteCommon)
} else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) { } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq) PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq, tDeleteCommon)
} else if (pHead->msgType == TDMT_VND_DELETE) { } else if (pHead->msgType == TDMT_VND_DELETE) {
PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes) PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes, tDeleteCommon)
} }
} }
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s, enable batch meta:%d", pHead->version, vgId,
TMSG_INFO(pHead->msgType), pRequest->enableBatchMeta);
if (!pRequest->enableBatchMeta && !pRequest->useSnapshot) {
SMqMetaRsp metaRsp = {0};
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1); tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
metaRsp.resMsgType = pHead->msgType; metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen; metaRsp.metaRspLen = pHead->bodyLen;
@ -277,6 +293,50 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
goto end; goto end;
} }
if (!btMetaRsp.batchMetaReq) {
btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES);
btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t));
}
fetchVer++;
SMqMetaRsp tmpMetaRsp = {0};
tmpMetaRsp.resMsgType = pHead->msgType;
tmpMetaRsp.metaRspLen = pHead->bodyLen;
tmpMetaRsp.metaRsp = pHead->body;
uint32_t len = 0;
tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
if (TSDB_CODE_SUCCESS != code) {
tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
continue;
}
int32_t tLen = sizeof(SMqRspHead) + len;
void* tBuf = taosMemoryCalloc(1, tLen);
void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
SEncoder encoder = {0};
tEncoderInit(&encoder, metaBuff, len);
code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
if (code < 0) {
tEncoderClear(&encoder);
tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
continue;
}
taosArrayPush(btMetaRsp.batchMetaReq, &tBuf);
taosArrayPush(btMetaRsp.batchMetaLen, &tLen);
totalMetaRows++;
if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > 1000)) {
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
goto end;
}
continue;
}
if (totalMetaRows > 0) {
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
goto end;
}
// process data // process data
SPackedData submit = { SPackedData submit = {
.msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)), .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
@ -304,7 +364,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
} }
end: end:
tDeleteMqBatchMetaRsp(&btMetaRsp);
tDeleteSTaosxRsp(&taosxRsp); tDeleteSTaosxRsp(&taosxRsp);
return code; return code;
} }
@ -352,6 +412,40 @@ static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int
pMsgHead->walever = ever; pMsgHead->walever = ever;
} }
int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqBatchMetaRsp* pRsp,
int32_t vgId) {
int32_t len = 0;
int32_t code = 0;
tEncodeSize(tEncodeMqBatchMetaRsp, pRsp, len, code);
if (code < 0) {
return -1;
}
int32_t tlen = sizeof(SMqRspHead) + len;
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
return -1;
}
int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
initMqRspHead(buf, TMQ_MSG_TYPE__POLL_BATCH_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
SEncoder encoder = {0};
tEncoderInit(&encoder, abuf, len);
tEncodeMqBatchMetaRsp(&encoder, pRsp);
tEncoderClear(&encoder);
SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
tmsgSendRsp(&resp);
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%ld offset type:%d", vgId,
pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type);
return 0;
}
int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp,
int32_t vgId) { int32_t vgId) {
int32_t len = 0; int32_t len = 0;

View File

@ -1158,12 +1158,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
} }
_exit: _exit:
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { tDeleteSVCreateTbBatchReq(&req);
pCreateReq = req.pReqs + iReq;
taosMemoryFree(pCreateReq->sql);
taosMemoryFree(pCreateReq->comment);
taosArrayDestroy(pCreateReq->ctb.tagName);
}
taosArrayDestroyEx(rsp.pArray, tFreeSVCreateTbRsp); taosArrayDestroyEx(rsp.pArray, tFreeSVCreateTbRsp);
taosArrayDestroy(tbUids); taosArrayDestroy(tbUids);
tDecoderClear(&decoder); tDecoderClear(&decoder);

View File

@ -58,7 +58,7 @@ typedef struct STaskStopInfo {
typedef struct { typedef struct {
STqOffsetVal currentOffset; // for tmq STqOffsetVal currentOffset; // for tmq
SMqMetaRsp metaRsp; // for tmq fetching meta SMqBatchMetaRsp btMetaRsp; // for tmq fetching meta
int8_t sourceExcluded; int8_t sourceExcluded;
int64_t snapshotVer; int64_t snapshotVer;
SSchemaWrapper* schema; SSchemaWrapper* schema;

View File

@ -1078,9 +1078,9 @@ const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
return pTaskInfo->streamInfo.tbName; return pTaskInfo->streamInfo.tbName;
} }
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return &pTaskInfo->streamInfo.metaRsp; return &pTaskInfo->streamInfo.btMetaRsp;
} }
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {

View File

@ -2845,8 +2845,8 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
SStreamRawScanInfo* pInfo = pOperator->info; SStreamRawScanInfo* pInfo = pOperator->info;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta pTaskInfo->streamInfo.btMetaRsp.batchMetaReq = NULL; // use batchMetaReq != NULL to judge if data is meta
pTaskInfo->streamInfo.metaRsp.metaRsp = NULL; pTaskInfo->streamInfo.btMetaRsp.batchMetaLen = NULL;
qDebug("tmqsnap doRawScan called"); qDebug("tmqsnap doRawScan called");
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
@ -2893,6 +2893,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) { } else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) {
SSnapContext* sContext = pInfo->sContext; SSnapContext* sContext = pInfo->sContext;
for(int32_t i = 0; i < tmqRowSize; i++) {
void* data = NULL; void* data = NULL;
int32_t dataLen = 0; int32_t dataLen = 0;
int16_t type = 0; int16_t type = 0;
@ -2900,7 +2901,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
if (pAPI->snapshotFn.getTableInfoFromSnapshot(sContext, &data, &dataLen, &type, &uid) < 0) { if (pAPI->snapshotFn.getTableInfoFromSnapshot(sContext, &data, &dataLen, &type, &uid) < 0) {
qError("tmqsnap getTableInfoFromSnapshot error"); qError("tmqsnap getTableInfoFromSnapshot error");
taosMemoryFreeClear(data); taosMemoryFreeClear(data);
return NULL; break;
} }
if (!sContext->queryMeta) { // change to get data next poll request if (!sContext->queryMeta) { // change to get data next poll request
@ -2908,13 +2909,44 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
SValue val = {0}; SValue val = {0};
tqOffsetResetToData(&offset, 0, INT64_MIN, val); tqOffsetResetToData(&offset, 0, INT64_MIN, val);
qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType); qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
break;
} else { } else {
tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid); tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid);
pTaskInfo->streamInfo.metaRsp.resMsgType = type; SMqMetaRsp tmpMetaRsp = {0};
pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen; tmpMetaRsp.resMsgType = type;
pTaskInfo->streamInfo.metaRsp.metaRsp = data; tmpMetaRsp.metaRspLen = dataLen;
tmpMetaRsp.metaRsp = data;
if (!pTaskInfo->streamInfo.btMetaRsp.batchMetaReq) {
pTaskInfo->streamInfo.btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES);
pTaskInfo->streamInfo.btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t));
}
int32_t code = TSDB_CODE_SUCCESS;
uint32_t len = 0;
tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
if (TSDB_CODE_SUCCESS != code) {
qError("tmqsnap tEncodeMqMetaRsp error");
taosMemoryFreeClear(data);
break;
} }
int32_t tLen = sizeof(SMqRspHead) + len;
void* tBuf = taosMemoryCalloc(1, tLen);
void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
SEncoder encoder = {0};
tEncoderInit(&encoder, metaBuff, len);
code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
if (code < 0) {
qError("tmqsnap tEncodeMqMetaRsp error");
tEncoderClear(&encoder);
taosMemoryFreeClear(tBuf);
taosMemoryFreeClear(data);
break;
}
taosMemoryFreeClear(data);
taosArrayPush(pTaskInfo->streamInfo.btMetaRsp.batchMetaReq, &tBuf);
taosArrayPush(pTaskInfo->streamInfo.btMetaRsp.batchMetaLen, &tLen);
}
}
return NULL; return NULL;
} }
return NULL; return NULL;

View File

@ -279,6 +279,19 @@ class TDTestCase:
return return
def checkSnapshot1VgroupBtmeta(self):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -s -bt'%(buildPath, cfgPath)
tdLog.info(cmdStr)
os.system(cmdStr)
self.checkJson(cfgPath, "tmq_taosx_tmp_snapshot")
self.checkData()
self.checkDropData(False)
return
def checkSnapshot1VgroupTable(self): def checkSnapshot1VgroupTable(self):
buildPath = tdCom.getBuildPath() buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath() cfgPath = tdCom.getClientCfgPath()
@ -291,6 +304,18 @@ class TDTestCase:
return return
def checkSnapshot1VgroupTableBtmeta(self):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -s -t -bt'%(buildPath, cfgPath)
tdLog.info(cmdStr)
os.system(cmdStr)
self.checkJson(cfgPath, "tmq_taosx_tmp_snapshot")
self.checkDataTable()
return
def checkSnapshotMultiVgroups(self): def checkSnapshotMultiVgroups(self):
buildPath = tdCom.getBuildPath() buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -sv 2 -dv 4 -s'%(buildPath) cmdStr = '%s/build/bin/tmq_taosx_ci -sv 2 -dv 4 -s'%(buildPath)
@ -302,6 +327,17 @@ class TDTestCase:
return return
def checkSnapshotMultiVgroupsBtmeta(self):
buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -sv 2 -dv 4 -s -bt'%(buildPath)
tdLog.info(cmdStr)
os.system(cmdStr)
self.checkData()
self.checkDropData(False)
return
def checkSnapshotMultiVgroupsWithDropTable(self): def checkSnapshotMultiVgroupsWithDropTable(self):
buildPath = tdCom.getBuildPath() buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -sv 2 -dv 4 -s -d'%(buildPath) cmdStr = '%s/build/bin/tmq_taosx_ci -sv 2 -dv 4 -s -d'%(buildPath)
@ -312,6 +348,16 @@ class TDTestCase:
return return
def checkSnapshotMultiVgroupsWithDropTableBtmeta(self):
buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -sv 2 -dv 4 -s -d -bt'%(buildPath)
tdLog.info(cmdStr)
os.system(cmdStr)
self.checkDropData(True)
return
def consumeTest(self): def consumeTest(self):
tdSql.execute(f'create database if not exists d1 vgroups 1') tdSql.execute(f'create database if not exists d1 vgroups 1')
tdSql.execute(f'use d1') tdSql.execute(f'use d1')
@ -472,6 +518,11 @@ class TDTestCase:
self.checkSnapshotMultiVgroupsWithDropTable() self.checkSnapshotMultiVgroupsWithDropTable()
self.checkSnapshot1VgroupBtmeta()
self.checkSnapshot1VgroupTableBtmeta()
self.checkSnapshotMultiVgroupsBtmeta()
self.checkSnapshotMultiVgroupsWithDropTableBtmeta()
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success(f"{__file__} successfully executed") tdLog.success(f"{__file__} successfully executed")

View File

@ -18,6 +18,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <time.h> #include <time.h>
#include "cJSON.h"
#include "taos.h" #include "taos.h"
#include "tmsg.h" #include "tmsg.h"
#include "types.h" #include "types.h"
@ -32,6 +33,7 @@ typedef struct {
int srcVgroups; int srcVgroups;
int dstVgroups; int dstVgroups;
char dir[256]; char dir[256];
bool btMeta;
} Config; } Config;
Config g_conf = {0}; Config g_conf = {0};
@ -61,10 +63,25 @@ static void msg_process(TAOS_RES* msg) {
if (result) { if (result) {
printf("meta result: %s\n", result); printf("meta result: %s\n", result);
if (g_fp && strcmp(result, "") != 0) { if (g_fp && strcmp(result, "") != 0) {
// RES_TYPE__TMQ_BATCH_META
if ((*(int8_t*)msg) == 5) {
cJSON* pJson = cJSON_Parse(result);
cJSON* pJsonArray = cJSON_GetObjectItem(pJson, "metas");
int32_t num = cJSON_GetArraySize(pJsonArray);
for (int32_t i = 0; i < num; i++) {
cJSON* pJsonItem = cJSON_GetArrayItem(pJsonArray, i);
char* itemStr = cJSON_PrintUnformatted(pJsonItem);
taosFprintfFile(g_fp, itemStr);
tmq_free_json_meta(itemStr);
taosFprintfFile(g_fp, "\n");
}
cJSON_Delete(pJson);
} else {
taosFprintfFile(g_fp, result); taosFprintfFile(g_fp, result);
taosFprintfFile(g_fp, "\n"); taosFprintfFile(g_fp, "\n");
} }
} }
}
tmq_free_json_meta(result); tmq_free_json_meta(result);
} }
@ -584,6 +601,10 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "experimental.snapshot.enable", "true"); tmq_conf_set(conf, "experimental.snapshot.enable", "true");
} }
if (g_conf.btMeta) {
tmq_conf_set(conf, "msg.enable.batchmeta", "true");
}
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq); assert(tmq);
@ -1064,7 +1085,7 @@ void testConsumeExcluded(int topic_type) {
char* topic = "create topic topic_excluded with meta as database db_taosx"; char* topic = "create topic topic_excluded with meta as database db_taosx";
pRes = taos_query(pConn, topic); pRes = taos_query(pConn, topic);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_excluded, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_excluded1, reason:%s\n", taos_errstr(pRes));
taos_close(pConn); taos_close(pConn);
return; return;
} }
@ -1073,7 +1094,7 @@ void testConsumeExcluded(int topic_type) {
char* topic = "create topic topic_excluded as select * from stt"; char* topic = "create topic topic_excluded as select * from stt";
pRes = taos_query(pConn, topic); pRes = taos_query(pConn, topic);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_excluded, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_excluded2, reason:%s\n", taos_errstr(pRes));
taos_close(pConn); taos_close(pConn);
return; return;
} }
@ -1115,7 +1136,7 @@ void testConsumeExcluded(int topic_type) {
assert(raw.raw_type != 2 && raw.raw_type != 4 && raw.raw_type != TDMT_VND_CREATE_STB && assert(raw.raw_type != 2 && raw.raw_type != 4 && raw.raw_type != TDMT_VND_CREATE_STB &&
raw.raw_type != TDMT_VND_ALTER_STB && raw.raw_type != TDMT_VND_CREATE_TABLE && raw.raw_type != TDMT_VND_ALTER_STB && raw.raw_type != TDMT_VND_CREATE_TABLE &&
raw.raw_type != TDMT_VND_ALTER_TABLE && raw.raw_type != TDMT_VND_DELETE); raw.raw_type != TDMT_VND_ALTER_TABLE && raw.raw_type != TDMT_VND_DELETE);
assert(raw.raw_type == TDMT_VND_DROP_STB || raw.raw_type == TDMT_VND_DROP_TABLE); assert(raw.raw_type == TDMT_VND_DROP_STB || raw.raw_type == TDMT_VND_DROP_TABLE || raw.raw_type == 5);
} else if (topic_type == 2) { } else if (topic_type == 2) {
assert(0); assert(0);
} }
@ -1138,6 +1159,7 @@ void testConsumeExcluded(int topic_type) {
taos_close(pConn); taos_close(pConn);
return; return;
} }
taos_close(pConn);
taos_free_result(pRes); taos_free_result(pRes);
} }
@ -1167,6 +1189,8 @@ int main(int argc, char* argv[]) {
g_conf.subTable = true; g_conf.subTable = true;
} else if (strcmp(argv[i], "-onlymeta") == 0) { } else if (strcmp(argv[i], "-onlymeta") == 0) {
g_conf.meta = 1; g_conf.meta = 1;
} else if (strcmp(argv[i], "-bt") == 0) {
g_conf.btMeta = true;
} }
} }