optimize tmq snapshot meta
This commit is contained in:
parent
ed12ef1c02
commit
441a150d66
|
@ -283,6 +283,7 @@ typedef enum tmq_res_t {
|
|||
TMQ_RES_DATA = 1,
|
||||
TMQ_RES_TABLE_META = 2,
|
||||
TMQ_RES_METADATA = 3,
|
||||
TMQ_RES_BATCH_TABLE_META = 4,
|
||||
} tmq_res_t;
|
||||
|
||||
typedef struct tmq_topic_assignment {
|
||||
|
|
|
@ -144,6 +144,7 @@ enum {
|
|||
TMQ_MSG_TYPE__EP_RSP,
|
||||
TMQ_MSG_TYPE__POLL_DATA_META_RSP,
|
||||
TMQ_MSG_TYPE__WALINFO_RSP,
|
||||
TMQ_MSG_TYPE__POLL_BATCH_META_RSP,
|
||||
};
|
||||
|
||||
enum {
|
||||
|
|
|
@ -3978,6 +3978,20 @@ int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const void* pRsp);
|
|||
int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, void* pRsp);
|
||||
void tDeleteSTaosxRsp(void* pRsp);
|
||||
|
||||
typedef struct SMqBatchMetaRsp {
|
||||
SMqRspHead head; // not serialize
|
||||
STqOffsetVal rspOffset;
|
||||
SArray* batchMetaLen;
|
||||
SArray* batchMetaReq;
|
||||
void* pMetaBuff; // not serialize
|
||||
uint32_t metaBuffLen; // not serialize
|
||||
} SMqBatchMetaRsp;
|
||||
|
||||
int32_t tEncodeMqBatchMetaRsp(SEncoder* pEncoder, const SMqBatchMetaRsp* pRsp);
|
||||
int32_t tDecodeMqBatchMetaRsp(SDecoder* pDecoder, SMqBatchMetaRsp* pRsp);
|
||||
int32_t tSemiDecodeMqBatchMetaRsp(SDecoder* pDecoder, SMqBatchMetaRsp* pRsp);
|
||||
void tDeleteMqBatchMetaRsp(SMqBatchMetaRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
SMqRspHead head;
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
|
|
|
@ -201,7 +201,7 @@ void qStreamSetSourceExcluded(qTaskInfo_t tinfo, int8_t sourceExcluded);
|
|||
|
||||
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
|
||||
|
||||
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
|
||||
SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
|
||||
|
||||
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo);
|
||||
|
||||
|
|
|
@ -77,6 +77,7 @@ typedef struct {
|
|||
#define TD_CODER_CURRENT(CODER) ((CODER)->data + (CODER)->pos)
|
||||
#define TD_CODER_MOVE_POS(CODER, MOVE) ((CODER)->pos += (MOVE))
|
||||
#define TD_CODER_CHECK_CAPACITY_FAILED(CODER, EXPSIZE) (((CODER)->size - (CODER)->pos) < (EXPSIZE))
|
||||
#define TD_CODER_REMAIN_CAPACITY(CODER) ((CODER)->size - (CODER)->pos)
|
||||
|
||||
#define tEncodeSize(E, S, SIZE, RET) \
|
||||
do { \
|
||||
|
|
|
@ -44,6 +44,7 @@ enum {
|
|||
RES_TYPE__TMQ,
|
||||
RES_TYPE__TMQ_META,
|
||||
RES_TYPE__TMQ_METADATA,
|
||||
RES_TYPE__TMQ_BATCH_META,
|
||||
};
|
||||
|
||||
#define SHOW_VARIABLES_RESULT_COLS 3
|
||||
|
@ -51,10 +52,11 @@ enum {
|
|||
#define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE)
|
||||
#define SHOW_VARIABLES_RESULT_FIELD3_LEN (TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE)
|
||||
|
||||
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
|
||||
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
|
||||
#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META)
|
||||
#define TD_RES_TMQ_METADATA(res) (*(int8_t*)res == RES_TYPE__TMQ_METADATA)
|
||||
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
|
||||
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
|
||||
#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META)
|
||||
#define TD_RES_TMQ_METADATA(res) (*(int8_t*)res == RES_TYPE__TMQ_METADATA)
|
||||
#define TD_RES_TMQ_BATCH_META(res) (*(int8_t*)res == RES_TYPE__TMQ_BATCH_META)
|
||||
|
||||
typedef struct SAppInstInfo SAppInstInfo;
|
||||
|
||||
|
@ -241,6 +243,11 @@ typedef struct {
|
|||
STaosxRsp rsp;
|
||||
} SMqTaosxRspObj;
|
||||
|
||||
typedef struct {
|
||||
SMqRspObjCommon common;
|
||||
SMqBatchMetaRsp rsp;
|
||||
} SMqBatchMetaRspObj;
|
||||
|
||||
typedef struct SReqRelInfo {
|
||||
uint64_t userRefId;
|
||||
uint64_t prevRefId;
|
||||
|
|
|
@ -301,7 +301,7 @@ void taos_close(TAOS *taos) {
|
|||
}
|
||||
|
||||
int taos_errno(TAOS_RES *res) {
|
||||
if (res == NULL || TD_RES_TMQ_META(res)) {
|
||||
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
@ -313,7 +313,7 @@ int taos_errno(TAOS_RES *res) {
|
|||
}
|
||||
|
||||
const char *taos_errstr(TAOS_RES *res) {
|
||||
if (res == NULL || TD_RES_TMQ_META(res)) {
|
||||
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
return (const char *)tstrerror(terrno);
|
||||
}
|
||||
|
||||
|
@ -354,6 +354,10 @@ void taos_free_result(TAOS_RES *res) {
|
|||
SMqMetaRspObj *pRspObj = (SMqMetaRspObj *)res;
|
||||
tDeleteMqMetaRsp(&pRspObj->metaRsp);
|
||||
taosMemoryFree(pRspObj);
|
||||
} else if (TD_RES_TMQ_BATCH_META(res)) {
|
||||
SMqBatchMetaRspObj *pBtRspObj = (SMqBatchMetaRspObj *)res;
|
||||
tDeleteMqBatchMetaRsp(&pBtRspObj->rsp);
|
||||
taosMemoryFree(pBtRspObj);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -371,7 +375,7 @@ void taos_kill_query(TAOS *taos) {
|
|||
}
|
||||
|
||||
int taos_field_count(TAOS_RES *res) {
|
||||
if (res == NULL || TD_RES_TMQ_META(res)) {
|
||||
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -382,7 +386,7 @@ int taos_field_count(TAOS_RES *res) {
|
|||
int taos_num_fields(TAOS_RES *res) { return taos_field_count(res); }
|
||||
|
||||
TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
|
||||
if (taos_num_fields(res) == 0 || TD_RES_TMQ_META(res)) {
|
||||
if (taos_num_fields(res) == 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -437,7 +441,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
|||
pResultInfo->current += 1;
|
||||
return pResultInfo->row;
|
||||
}
|
||||
} else if (TD_RES_TMQ_META(res)) {
|
||||
} else if (TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
return NULL;
|
||||
} else {
|
||||
// assert to avoid un-initialization error
|
||||
|
@ -548,7 +552,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
|
|||
}
|
||||
|
||||
int *taos_fetch_lengths(TAOS_RES *res) {
|
||||
if (res == NULL || TD_RES_TMQ_META(res)) {
|
||||
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -557,7 +561,7 @@ int *taos_fetch_lengths(TAOS_RES *res) {
|
|||
}
|
||||
|
||||
TAOS_ROW *taos_result_block(TAOS_RES *res) {
|
||||
if (res == NULL || TD_RES_TMQ_META(res)) {
|
||||
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return NULL;
|
||||
}
|
||||
|
@ -625,7 +629,7 @@ const char *taos_get_client_info() { return version; }
|
|||
|
||||
// return int32_t
|
||||
int taos_affected_rows(TAOS_RES *res) {
|
||||
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) {
|
||||
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -636,7 +640,7 @@ int taos_affected_rows(TAOS_RES *res) {
|
|||
|
||||
// return int64_t
|
||||
int64_t taos_affected_rows64(TAOS_RES *res) {
|
||||
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) {
|
||||
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -646,7 +650,7 @@ int64_t taos_affected_rows64(TAOS_RES *res) {
|
|||
}
|
||||
|
||||
int taos_result_precision(TAOS_RES *res) {
|
||||
if (res == NULL || TD_RES_TMQ_META(res)) {
|
||||
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
return TSDB_TIME_PRECISION_MILLI;
|
||||
}
|
||||
|
||||
|
@ -686,7 +690,7 @@ int taos_select_db(TAOS *taos, const char *db) {
|
|||
}
|
||||
|
||||
void taos_stop_query(TAOS_RES *res) {
|
||||
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) {
|
||||
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -694,7 +698,7 @@ void taos_stop_query(TAOS_RES *res) {
|
|||
}
|
||||
|
||||
bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
|
||||
if (res == NULL || TD_RES_TMQ_META(res)) {
|
||||
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
return true;
|
||||
}
|
||||
SReqResultInfo *pResultInfo = tscGetCurResInfo(res);
|
||||
|
@ -719,7 +723,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
|
|||
}
|
||||
|
||||
int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
|
||||
if (res == NULL || TD_RES_TMQ_META(res)) {
|
||||
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -761,7 +765,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
|
|||
*numOfRows = 0;
|
||||
*pData = NULL;
|
||||
|
||||
if (res == NULL || TD_RES_TMQ_META(res)) {
|
||||
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -797,7 +801,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
|
|||
}
|
||||
|
||||
int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
|
||||
if (res == NULL || TD_RES_TMQ_META(res)) {
|
||||
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,8 @@
|
|||
#define LOG_ID_TAG "connId:0x%" PRIx64 ",reqId:0x%" PRIx64
|
||||
#define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId
|
||||
|
||||
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen);
|
||||
|
||||
static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); }
|
||||
|
||||
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
|
||||
|
@ -2025,38 +2027,81 @@ end:
|
|||
return code;
|
||||
}
|
||||
|
||||
static char* processSimpleMeta(SMqMetaRsp* pMetaRsp) {
|
||||
if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) {
|
||||
return processCreateStb(pMetaRsp);
|
||||
} else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) {
|
||||
return processAlterStb(pMetaRsp);
|
||||
} else if (pMetaRsp->resMsgType == TDMT_VND_DROP_STB) {
|
||||
return processDropSTable(pMetaRsp);
|
||||
} else if (pMetaRsp->resMsgType == TDMT_VND_CREATE_TABLE) {
|
||||
return processCreateTable(pMetaRsp);
|
||||
} else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_TABLE) {
|
||||
return processAlterTable(pMetaRsp);
|
||||
} else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) {
|
||||
return processDropTable(pMetaRsp);
|
||||
} else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) {
|
||||
return processDropTable(pMetaRsp);
|
||||
} else if (pMetaRsp->resMsgType == TDMT_VND_DELETE) {
|
||||
return processDeleteTable(pMetaRsp);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
static char* processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp) {
|
||||
SDecoder coder;
|
||||
SMqBatchMetaRsp rsp = {0};
|
||||
tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen);
|
||||
if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
int64_t fullSize = 0;
|
||||
int32_t num = taosArrayGetSize(rsp.batchMetaReq);
|
||||
SArray* strArray = taosArrayInit(num, POINTER_BYTES);
|
||||
for (int32_t i = 0; i < num; i++) {
|
||||
int32_t len = *(int32_t*)taosArrayGet(rsp.batchMetaLen, i);
|
||||
void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
|
||||
SDecoder metaCoder = {0};
|
||||
SMqMetaRsp metaRsp = {0};
|
||||
tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), len - sizeof(SMqRspHead));
|
||||
if(tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0 ) {
|
||||
goto _end;
|
||||
}
|
||||
char* subStr = processSimpleMeta(&metaRsp);
|
||||
tDeleteMqMetaRsp(&metaRsp);
|
||||
fullSize += strlen(subStr);
|
||||
taosArrayPush(strArray, &subStr);
|
||||
}
|
||||
|
||||
char* buf = (char*)taosMemoryCalloc(1, fullSize + num + 1);
|
||||
for (int32_t i = 0; i < num; i++) {
|
||||
char* subStr = taosArrayGetP(strArray, i);
|
||||
strcat(buf, subStr);
|
||||
strcat(buf, "\n");
|
||||
}
|
||||
|
||||
_end:
|
||||
return NULL;
|
||||
}
|
||||
|
||||
char* tmq_get_json_meta(TAOS_RES* res) {
|
||||
if (res == NULL) return NULL;
|
||||
uDebug("tmq_get_json_meta res:%p", res);
|
||||
if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res)) {
|
||||
if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res) && !TD_RES_TMQ_BATCH_META(res)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (TD_RES_TMQ_METADATA(res)) {
|
||||
SMqTaosxRspObj* pMetaDataRspObj = (SMqTaosxRspObj*)res;
|
||||
return processAutoCreateTable(&pMetaDataRspObj->rsp);
|
||||
} else if (TD_RES_TMQ_BATCH_META(res)) {
|
||||
SMqBatchMetaRspObj* pBatchMetaRspObj = (SMqBatchMetaRspObj*)res;
|
||||
return processBatchMetaToJson(&pBatchMetaRspObj->rsp);
|
||||
}
|
||||
|
||||
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
|
||||
if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) {
|
||||
return processCreateStb(&pMetaRspObj->metaRsp);
|
||||
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB) {
|
||||
return processAlterStb(&pMetaRspObj->metaRsp);
|
||||
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB) {
|
||||
return processDropSTable(&pMetaRspObj->metaRsp);
|
||||
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE) {
|
||||
return processCreateTable(&pMetaRspObj->metaRsp);
|
||||
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE) {
|
||||
return processAlterTable(&pMetaRspObj->metaRsp);
|
||||
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) {
|
||||
return processDropTable(&pMetaRspObj->metaRsp);
|
||||
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) {
|
||||
return processDropTable(&pMetaRspObj->metaRsp);
|
||||
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) {
|
||||
return processDeleteTable(&pMetaRspObj->metaRsp);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
return processSimpleMeta(&pMetaRspObj->metaRsp);
|
||||
}
|
||||
|
||||
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
|
||||
|
@ -2147,6 +2192,12 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
|
|||
}
|
||||
raw->raw_type = RES_TYPE__TMQ_METADATA;
|
||||
uDebug("tmq get raw type metadata:%p", raw);
|
||||
} else if (TD_RES_TMQ_BATCH_META(res)) {
|
||||
SMqBatchMetaRspObj* pBtMetaRspObj = (SMqBatchMetaRspObj*)res;
|
||||
raw->raw = pBtMetaRspObj->rsp.pMetaBuff;
|
||||
raw->raw_len = pBtMetaRspObj->rsp.metaBuffLen;
|
||||
raw->raw_type = RES_TYPE__TMQ_BATCH_META;
|
||||
uDebug("tmq get raw batch meta:%p", raw);
|
||||
} else {
|
||||
uError("tmq get raw error type:%d", *(int8_t*)res);
|
||||
terrno = TSDB_CODE_TMQ_INVALID_MSG;
|
||||
|
@ -2163,32 +2214,74 @@ void tmq_free_raw(tmq_raw_data raw) {
|
|||
memset(terrMsg, 0, ERR_MSG_LEN);
|
||||
}
|
||||
|
||||
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
|
||||
if (type == TDMT_VND_CREATE_STB) {
|
||||
return taosCreateStb(taos, buf, len);
|
||||
} else if (type == TDMT_VND_ALTER_STB) {
|
||||
return taosCreateStb(taos, buf, len);
|
||||
} else if (type == TDMT_VND_DROP_STB) {
|
||||
return taosDropStb(taos, buf, len);
|
||||
} else if (type == TDMT_VND_CREATE_TABLE) {
|
||||
return taosCreateTable(taos, buf, len);
|
||||
} else if (type == TDMT_VND_ALTER_TABLE) {
|
||||
return taosAlterTable(taos, buf, len);
|
||||
} else if (type == TDMT_VND_DROP_TABLE) {
|
||||
return taosDropTable(taos, buf, len);
|
||||
} else if (type == TDMT_VND_DELETE) {
|
||||
return taosDeleteData(taos, buf, len);
|
||||
} else if (type == RES_TYPE__TMQ) {
|
||||
return tmqWriteRawDataImpl(taos, buf, len);
|
||||
} else if (type == RES_TYPE__TMQ_METADATA) {
|
||||
return tmqWriteRawMetaDataImpl(taos, buf, len);
|
||||
} else if (type == RES_TYPE__TMQ_BATCH_META) {
|
||||
return tmqWriteBatchMetaDataImpl(taos, buf, len);
|
||||
}
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
|
||||
if (!taos) {
|
||||
goto end;
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
if (raw.raw_type == TDMT_VND_CREATE_STB) {
|
||||
return taosCreateStb(taos, raw.raw, raw.raw_len);
|
||||
} else if (raw.raw_type == TDMT_VND_ALTER_STB) {
|
||||
return taosCreateStb(taos, raw.raw, raw.raw_len);
|
||||
} else if (raw.raw_type == TDMT_VND_DROP_STB) {
|
||||
return taosDropStb(taos, raw.raw, raw.raw_len);
|
||||
} else if (raw.raw_type == TDMT_VND_CREATE_TABLE) {
|
||||
return taosCreateTable(taos, raw.raw, raw.raw_len);
|
||||
} else if (raw.raw_type == TDMT_VND_ALTER_TABLE) {
|
||||
return taosAlterTable(taos, raw.raw, raw.raw_len);
|
||||
} else if (raw.raw_type == TDMT_VND_DROP_TABLE) {
|
||||
return taosDropTable(taos, raw.raw, raw.raw_len);
|
||||
} else if (raw.raw_type == TDMT_VND_DELETE) {
|
||||
return taosDeleteData(taos, raw.raw, raw.raw_len);
|
||||
} else if (raw.raw_type == RES_TYPE__TMQ) {
|
||||
return tmqWriteRawDataImpl(taos, raw.raw, raw.raw_len);
|
||||
} else if (raw.raw_type == RES_TYPE__TMQ_METADATA) {
|
||||
return tmqWriteRawMetaDataImpl(taos, raw.raw, raw.raw_len);
|
||||
}
|
||||
|
||||
end:
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type);
|
||||
}
|
||||
|
||||
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen) {
|
||||
if (taos == NULL || meta == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
}
|
||||
SMqBatchMetaRsp rsp = {0};
|
||||
SDecoder coder;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
// decode and process req
|
||||
tDecoderInit(&coder, meta, metaLen);
|
||||
if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
goto _end;
|
||||
}
|
||||
int32_t num = taosArrayGetSize(rsp.batchMetaReq);
|
||||
for (int32_t i = 0; i < num; i++) {
|
||||
int32_t len = *(int32_t*)taosArrayGet(rsp.batchMetaLen, i);
|
||||
void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
|
||||
SDecoder metaCoder = {0};
|
||||
SMqMetaRsp metaRsp = {0};
|
||||
tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), len - sizeof(SMqRspHead));
|
||||
if (tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0) {
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
goto _end;
|
||||
}
|
||||
code = writeRawImpl(taos, metaRsp.metaRsp, metaRsp.metaRspLen, metaRsp.resMsgType);
|
||||
tDeleteMqMetaRsp(&metaRsp);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
|
||||
_end:
|
||||
tDeleteMqBatchMetaRsp(&rsp);
|
||||
errno = code;
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -171,9 +171,10 @@ typedef struct {
|
|||
uint64_t reqId;
|
||||
SEpSet* pEpset;
|
||||
union {
|
||||
SMqDataRsp dataRsp;
|
||||
SMqMetaRsp metaRsp;
|
||||
STaosxRsp taosxRsp;
|
||||
SMqDataRsp dataRsp;
|
||||
SMqMetaRsp metaRsp;
|
||||
STaosxRsp taosxRsp;
|
||||
SMqBatchMetaRsp batchMetaRsp;
|
||||
};
|
||||
} SMqPollRspWrapper;
|
||||
|
||||
|
@ -635,6 +636,11 @@ static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_c
|
|||
pTopicName = pRspObj->common.topic;
|
||||
vgId = pRspObj->common.vgId;
|
||||
offsetVal = pRspObj->rsp.common.rspOffset;
|
||||
} else if (TD_RES_TMQ_BATCH_META(pRes)) {
|
||||
SMqBatchMetaRspObj* pBtRspObj = (SMqBatchMetaRspObj*)pRes;
|
||||
pTopicName = pBtRspObj->common.topic;
|
||||
vgId = pBtRspObj->common.vgId;
|
||||
offsetVal = pBtRspObj->rsp.rspOffset;
|
||||
} else {
|
||||
code = TSDB_CODE_TMQ_INVALID_MSG;
|
||||
goto end;
|
||||
|
@ -934,6 +940,10 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
|
|||
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
|
||||
taosMemoryFreeClear(pRsp->pEpset);
|
||||
tDeleteSTaosxRsp(&pRsp->taosxRsp);
|
||||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
|
||||
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
|
||||
taosMemoryFreeClear(pRsp->pEpset);
|
||||
tDeleteMqBatchMetaRsp(&pRsp->batchMetaRsp);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1418,6 +1428,17 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
}
|
||||
tDecoderClear(&decoder);
|
||||
memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
|
||||
} else if (rspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
|
||||
if(tSemiDecodeMqBatchMetaRsp(&decoder, &pRspWrapper->batchMetaRsp) < 0){
|
||||
tDecoderClear(&decoder);
|
||||
taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto FAIL;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
memcpy(&pRspWrapper->batchMetaRsp, pMsg->pData, sizeof(SMqRspHead));
|
||||
} else { // invalid rspType
|
||||
tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
|
||||
}
|
||||
|
@ -1616,6 +1637,19 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
|||
return pRspObj;
|
||||
}
|
||||
|
||||
SMqBatchMetaRspObj* tmqBuildBatchMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
||||
SMqBatchMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqBatchMetaRspObj));
|
||||
if(pRspObj == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
pRspObj->common.resType = RES_TYPE__TMQ_BATCH_META;
|
||||
tstrncpy(pRspObj->common.topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||
tstrncpy(pRspObj->common.db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
|
||||
pRspObj->common.vgId = pWrapper->vgHandle->vgId;
|
||||
|
||||
memcpy(&pRspObj->rsp, &pWrapper->batchMetaRsp, sizeof(SMqBatchMetaRsp));
|
||||
return pRspObj;
|
||||
}
|
||||
|
||||
void changeByteEndian(char* pData){
|
||||
char* p = pData;
|
||||
|
@ -1972,7 +2006,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
|||
tmqFreeRspWrapper(pRspWrapper);
|
||||
taosFreeQitem(pRspWrapper);
|
||||
}
|
||||
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
||||
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
|
||||
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
||||
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
||||
|
||||
|
@ -1995,7 +2029,12 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
|||
updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset,
|
||||
pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true);
|
||||
// build rsp
|
||||
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
|
||||
void* pRsp = NULL;
|
||||
if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
||||
pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
|
||||
} else {
|
||||
pRsp = tmqBuildBatchMetaRspFromWrapper(pollRspWrapper);
|
||||
}
|
||||
taosFreeQitem(pRspWrapper);
|
||||
taosWUnLockLatch(&tmq->lock);
|
||||
return pRsp;
|
||||
|
@ -2217,6 +2256,8 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) {
|
|||
return TMQ_RES_TABLE_META;
|
||||
} else if (TD_RES_TMQ_METADATA(res)) {
|
||||
return TMQ_RES_METADATA;
|
||||
} else if (TD_RES_TMQ_BATCH_META(res)) {
|
||||
return TMQ_RES_BATCH_TABLE_META;
|
||||
} else {
|
||||
return TMQ_RES_INVALID;
|
||||
}
|
||||
|
@ -2226,7 +2267,7 @@ const char* tmq_get_topic_name(TAOS_RES* res) {
|
|||
if (res == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
|
||||
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
return strchr(((SMqRspObjCommon*)res)->topic, '.') + 1;
|
||||
} else if (TD_RES_TMQ_META(res)) {
|
||||
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
|
||||
|
@ -2241,7 +2282,7 @@ const char* tmq_get_db_name(TAOS_RES* res) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
|
||||
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
return strchr(((SMqRspObjCommon*)res)->db, '.') + 1;
|
||||
} else if (TD_RES_TMQ_META(res)) {
|
||||
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
|
||||
|
@ -2255,7 +2296,7 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
|
|||
if (res == NULL) {
|
||||
return -1;
|
||||
}
|
||||
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
|
||||
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
return ((SMqRspObjCommon*)res)->vgId;
|
||||
} else if (TD_RES_TMQ_META(res)) {
|
||||
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
|
||||
|
@ -2282,6 +2323,11 @@ int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
|
|||
if (pRspObj->metaRsp.rspOffset.type == TMQ_OFFSET__LOG) {
|
||||
return pRspObj->metaRsp.rspOffset.version;
|
||||
}
|
||||
} else if (TD_RES_TMQ_BATCH_META(res)) {
|
||||
SMqBatchMetaRspObj* pBtRspObj = (SMqBatchMetaRspObj*)res;
|
||||
if (pBtRspObj->rsp.rspOffset.type == TMQ_OFFSET__LOG) {
|
||||
return pBtRspObj->rsp.rspOffset.version;
|
||||
}
|
||||
} else {
|
||||
tscError("invalid tmq type:%d", *(int8_t*)res);
|
||||
}
|
||||
|
|
|
@ -10557,3 +10557,55 @@ void tFreeFetchTtlExpiredTbsRsp(void *p) {
|
|||
SVFetchTtlExpiredTbsRsp *pRsp = p;
|
||||
taosArrayDestroy(pRsp->pExpiredTbs);
|
||||
}
|
||||
|
||||
int32_t tEncodeMqBatchMetaRsp(SEncoder *pEncoder, const SMqBatchMetaRsp *pRsp) {
|
||||
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1;
|
||||
|
||||
int32_t size = taosArrayGetSize(pRsp->batchMetaReq);
|
||||
if (tEncodeI32(pEncoder, size) < 0) return -1;
|
||||
if (size > 0) {
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
void *pMetaReq = taosArrayGetP(pRsp->batchMetaReq, i);
|
||||
int32_t metaLen = *(int32_t *)taosArrayGet(pRsp->batchMetaLen, i);
|
||||
if (tEncodeBinary(pEncoder, pMetaReq, metaLen) < 0) return -1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tDecodeMqBatchMetaRsp(SDecoder *pDecoder, SMqBatchMetaRsp *pRsp) {
|
||||
int32_t size = 0;
|
||||
if (tDecodeI32(pDecoder, &size) < 0) return -1;
|
||||
if (size > 0) {
|
||||
pRsp->batchMetaReq = taosArrayInit(size, POINTER_BYTES);
|
||||
pRsp->batchMetaLen = taosArrayInit(size, sizeof(int32_t));
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
void *pCreate = NULL;
|
||||
uint64_t len = 0;
|
||||
if (tDecodeBinaryAlloc(pDecoder, &pCreate, &len) < 0) return -1;
|
||||
int32_t l = (int32_t)len;
|
||||
taosArrayPush(pRsp->batchMetaReq, &pCreate);
|
||||
taosArrayPush(pRsp->batchMetaLen, &l);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSemiDecodeMqBatchMetaRsp(SDecoder *pDecoder, SMqBatchMetaRsp *pRsp) {
|
||||
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1;
|
||||
if (pDecoder->size < pDecoder->pos) {
|
||||
return -1;
|
||||
}
|
||||
pRsp->metaBuffLen = TD_CODER_REMAIN_CAPACITY(pDecoder);
|
||||
pRsp->pMetaBuff = taosMemoryCalloc(1, pRsp->metaBuffLen);
|
||||
memcpy(pRsp->pMetaBuff, TD_CODER_CURRENT(pDecoder), pRsp->metaBuffLen);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tDeleteMqBatchMetaRsp(SMqBatchMetaRsp *pRsp) {
|
||||
taosMemoryFreeClear(pRsp->pMetaBuff);
|
||||
taosArrayDestroyP(pRsp->batchMetaReq, taosMemoryFree);
|
||||
taosArrayDestroy(pRsp->batchMetaLen);
|
||||
pRsp->batchMetaReq = NULL;
|
||||
pRsp->batchMetaLen = NULL;
|
||||
}
|
||||
|
|
|
@ -113,7 +113,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
|||
void tqDestroyTqHandle(void* data);
|
||||
|
||||
// tqRead
|
||||
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
||||
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* offset);
|
||||
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest);
|
||||
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId);
|
||||
|
||||
|
|
|
@ -163,7 +163,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {
|
||||
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset) {
|
||||
const STqExecHandle* pExec = &pHandle->execHandle;
|
||||
qTaskInfo_t task = pExec->task;
|
||||
|
||||
|
@ -218,10 +218,10 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
|
|||
}
|
||||
|
||||
// get meta
|
||||
SMqMetaRsp* tmp = qStreamExtractMetaMsg(task);
|
||||
if (tmp->metaRspLen > 0) {
|
||||
SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task);
|
||||
if (taosArrayGetSize(tmp->batchMetaReq) > 0) {
|
||||
qStreamExtractOffset(task, &tmp->rspOffset);
|
||||
*pMetaRsp = *tmp;
|
||||
*pBatchMetaRsp = *tmp;
|
||||
|
||||
tqDebug("tmqsnap task get meta");
|
||||
break;
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
|
||||
const SMqMetaRsp* pRsp, int32_t vgId);
|
||||
static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
|
||||
const SMqBatchMetaRsp* pRsp, int32_t vgId);
|
||||
|
||||
int32_t tqInitDataRsp(SMqDataRspCommon* pRsp, STqOffsetVal pOffset) {
|
||||
tOffsetCopy(&pRsp->reqOffset, &pOffset);
|
||||
|
@ -187,7 +189,6 @@ end : {
|
|||
} \
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
|
||||
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
||||
SRpcMsg* pMsg, STqOffsetVal* offset) {
|
||||
int code = 0;
|
||||
|
@ -197,18 +198,19 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
tqInitTaosxRsp(&taosxRsp.common, *offset);
|
||||
|
||||
if (offset->type != TMQ_OFFSET__LOG) {
|
||||
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
|
||||
SMqBatchMetaRsp btMetaRsp = {0};
|
||||
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset) < 0) {
|
||||
code = -1;
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (metaRsp.metaRspLen > 0) {
|
||||
code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
|
||||
if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) {
|
||||
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
|
||||
",ts:%" PRId64,
|
||||
pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
|
||||
metaRsp.rspOffset.ts);
|
||||
tDeleteMqMetaRsp(&metaRsp);
|
||||
pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid,
|
||||
btMetaRsp.rspOffset.ts);
|
||||
tDeleteMqBatchMetaRsp(&btMetaRsp);
|
||||
goto end;
|
||||
}
|
||||
|
||||
|
@ -352,6 +354,40 @@ static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int
|
|||
pMsgHead->walever = ever;
|
||||
}
|
||||
|
||||
int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqBatchMetaRsp* pRsp,
|
||||
int32_t vgId) {
|
||||
int32_t len = 0;
|
||||
int32_t code = 0;
|
||||
tEncodeSize(tEncodeMqBatchMetaRsp, pRsp, len, code);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
int32_t tlen = sizeof(SMqRspHead) + len;
|
||||
void* buf = rpcMallocCont(tlen);
|
||||
if (buf == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int64_t sver = 0, ever = 0;
|
||||
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
|
||||
initMqRspHead(buf, TMQ_MSG_TYPE__POLL_BATCH_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
|
||||
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, abuf, len);
|
||||
tEncodeMqBatchMetaRsp(&encoder, pRsp);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
|
||||
|
||||
tmsgSendRsp(&resp);
|
||||
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, offset type:%d", vgId,
|
||||
pReq->consumerId, pReq->epoch, pRsp->rspOffset.type);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp,
|
||||
int32_t vgId) {
|
||||
int32_t len = 0;
|
||||
|
|
|
@ -58,7 +58,7 @@ typedef struct STaskStopInfo {
|
|||
|
||||
typedef struct {
|
||||
STqOffsetVal currentOffset; // for tmq
|
||||
SMqMetaRsp metaRsp; // for tmq fetching meta
|
||||
SMqBatchMetaRsp btMetaRsp; // for tmq fetching meta
|
||||
int8_t sourceExcluded;
|
||||
int64_t snapshotVer;
|
||||
SSchemaWrapper* schema;
|
||||
|
|
|
@ -1078,9 +1078,9 @@ const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
|
|||
return pTaskInfo->streamInfo.tbName;
|
||||
}
|
||||
|
||||
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
|
||||
SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
return &pTaskInfo->streamInfo.metaRsp;
|
||||
return &pTaskInfo->streamInfo.btMetaRsp;
|
||||
}
|
||||
|
||||
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
|
||||
|
|
|
@ -44,6 +44,7 @@ int32_t scanDebug = 0;
|
|||
#define STREAM_SCAN_OP_NAME "StreamScanOperator"
|
||||
#define STREAM_SCAN_OP_STATE_NAME "StreamScanFillHistoryState"
|
||||
#define STREAM_SCAN_OP_CHECKPOINT_NAME "StreamScanOperator_Checkpoint"
|
||||
#define TMQ_MAX_BATCH_SIZE 4096
|
||||
|
||||
typedef struct STableMergeScanExecInfo {
|
||||
SFileBlockLoadRecorder blockRecorder;
|
||||
|
@ -2845,8 +2846,8 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
|
|||
|
||||
SStreamRawScanInfo* pInfo = pOperator->info;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta
|
||||
pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
|
||||
pTaskInfo->streamInfo.btMetaRsp.batchMetaReq = NULL; // use batchMetaReq != NULL to judge if data is meta
|
||||
pTaskInfo->streamInfo.btMetaRsp.batchMetaLen = NULL;
|
||||
|
||||
qDebug("tmqsnap doRawScan called");
|
||||
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
|
@ -2893,28 +2894,60 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
} else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
SSnapContext* sContext = pInfo->sContext;
|
||||
void* data = NULL;
|
||||
int32_t dataLen = 0;
|
||||
int16_t type = 0;
|
||||
int64_t uid = 0;
|
||||
if (pAPI->snapshotFn.getTableInfoFromSnapshot(sContext, &data, &dataLen, &type, &uid) < 0) {
|
||||
qError("tmqsnap getTableInfoFromSnapshot error");
|
||||
taosMemoryFreeClear(data);
|
||||
return NULL;
|
||||
}
|
||||
for(int32_t i = 0; i < TMQ_MAX_BATCH_SIZE; i++) {
|
||||
void* data = NULL;
|
||||
int32_t dataLen = 0;
|
||||
int16_t type = 0;
|
||||
int64_t uid = 0;
|
||||
if (pAPI->snapshotFn.getTableInfoFromSnapshot(sContext, &data, &dataLen, &type, &uid) < 0) {
|
||||
qError("tmqsnap getTableInfoFromSnapshot error");
|
||||
taosMemoryFreeClear(data);
|
||||
break;
|
||||
}
|
||||
|
||||
if (!sContext->queryMeta) { // change to get data next poll request
|
||||
STqOffsetVal offset = {0};
|
||||
SValue val = {0};
|
||||
tqOffsetResetToData(&offset, 0, INT64_MIN, val);
|
||||
qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
|
||||
} else {
|
||||
tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid);
|
||||
pTaskInfo->streamInfo.metaRsp.resMsgType = type;
|
||||
pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
|
||||
pTaskInfo->streamInfo.metaRsp.metaRsp = data;
|
||||
}
|
||||
if (!sContext->queryMeta) { // change to get data next poll request
|
||||
STqOffsetVal offset = {0};
|
||||
SValue val = {0};
|
||||
tqOffsetResetToData(&offset, 0, INT64_MIN, val);
|
||||
qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
|
||||
break;
|
||||
} else {
|
||||
tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid);
|
||||
SMqMetaRsp tmpMetaRsp = {0};
|
||||
tmpMetaRsp.resMsgType = type;
|
||||
tmpMetaRsp.metaRspLen = dataLen;
|
||||
tmpMetaRsp.metaRsp = data;
|
||||
if (!pTaskInfo->streamInfo.btMetaRsp.batchMetaReq) {
|
||||
pTaskInfo->streamInfo.btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES);
|
||||
pTaskInfo->streamInfo.btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t));
|
||||
}
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
uint32_t len = 0;
|
||||
tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
qError("tmqsnap tEncodeMqMetaRsp error");
|
||||
taosMemoryFreeClear(data);
|
||||
break;
|
||||
}
|
||||
|
||||
int32_t tLen = sizeof(SMqRspHead) + len;
|
||||
void* tBuf = taosMemoryCalloc(1, tLen);
|
||||
void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, metaBuff, len);
|
||||
code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
|
||||
if (code < 0) {
|
||||
qError("tmqsnap tEncodeMqMetaRsp error");
|
||||
tEncoderClear(&encoder);
|
||||
taosMemoryFreeClear(tBuf);
|
||||
taosMemoryFreeClear(data);
|
||||
break;
|
||||
}
|
||||
taosMemoryFreeClear(data);
|
||||
taosArrayPush(pTaskInfo->streamInfo.btMetaRsp.batchMetaReq, &tBuf);
|
||||
taosArrayPush(pTaskInfo->streamInfo.btMetaRsp.batchMetaLen, &tLen);
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
return NULL;
|
||||
|
|
|
@ -1138,6 +1138,7 @@ void testConsumeExcluded(int topic_type) {
|
|||
taos_close(pConn);
|
||||
return;
|
||||
}
|
||||
taos_close(pConn);
|
||||
taos_free_result(pRes);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue