fix:[TS-5776]add raw type from consumer

This commit is contained in:
wangmm0220 2025-01-22 16:38:37 +08:00
parent a4235fc794
commit 7c5c9f6245
22 changed files with 521 additions and 162 deletions

View File

@ -74,6 +74,7 @@ endif()
# jemalloc # jemalloc
if(${JEMALLOC_ENABLED}) if(${JEMALLOC_ENABLED})
cat("${TD_SUPPORT_DIR}/jemalloc_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/jemalloc_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
MESSAGE("JEMALLOC_ENABLED is on")
endif() endif()
# msvc regex # msvc regex

View File

@ -360,7 +360,8 @@ typedef enum tmq_res_t {
TMQ_RES_INVALID = -1, TMQ_RES_INVALID = -1,
TMQ_RES_DATA = 1, TMQ_RES_DATA = 1,
TMQ_RES_TABLE_META = 2, TMQ_RES_TABLE_META = 2,
TMQ_RES_METADATA = 3 TMQ_RES_METADATA = 3,
TMQ_RES_RAWDATA = 4
} tmq_res_t; } tmq_res_t;
typedef struct tmq_topic_assignment { typedef struct tmq_topic_assignment {

View File

@ -121,6 +121,7 @@ enum {
TMQ_MSG_TYPE__POLL_DATA_META_RSP, TMQ_MSG_TYPE__POLL_DATA_META_RSP,
TMQ_MSG_TYPE__WALINFO_RSP, TMQ_MSG_TYPE__WALINFO_RSP,
TMQ_MSG_TYPE__POLL_BATCH_META_RSP, TMQ_MSG_TYPE__POLL_BATCH_META_RSP,
TMQ_MSG_TYPE__POLL_RAW_DATA_RSP,
}; };
static char* tmqMsgTypeStr[] = { static char* tmqMsgTypeStr[] = {

View File

@ -2311,6 +2311,10 @@ typedef struct SSysTableSchema {
int32_t tSerializeSRetrieveTableReq(void* buf, int32_t bufLen, SRetrieveTableReq* pReq); int32_t tSerializeSRetrieveTableReq(void* buf, int32_t bufLen, SRetrieveTableReq* pReq);
int32_t tDeserializeSRetrieveTableReq(void* buf, int32_t bufLen, SRetrieveTableReq* pReq); int32_t tDeserializeSRetrieveTableReq(void* buf, int32_t bufLen, SRetrieveTableReq* pReq);
#define RETRIEVE_TABLE_RSP_VERSION 0
#define RETRIEVE_TABLE_RSP_TMQ_VERSION 1
#define RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION 2
typedef struct { typedef struct {
int64_t useconds; int64_t useconds;
int8_t completed; // all results are returned to client int8_t completed; // all results are returned to client
@ -4176,6 +4180,7 @@ typedef struct {
STqOffsetVal reqOffset; STqOffsetVal reqOffset;
int8_t enableReplay; int8_t enableReplay;
int8_t sourceExcluded; int8_t sourceExcluded;
int8_t rawData;
int8_t enableBatchMeta; int8_t enableBatchMeta;
} SMqPollReq; } SMqPollReq;
@ -4506,6 +4511,7 @@ typedef struct {
typedef struct { typedef struct {
SArray* aSubmitTbData; // SArray<SSubmitTbData> SArray* aSubmitTbData; // SArray<SSubmitTbData>
bool raw;
} SSubmitReq2; } SSubmitReq2;
typedef struct { typedef struct {
@ -4514,8 +4520,9 @@ typedef struct {
char data[]; // SSubmitReq2 char data[]; // SSubmitReq2
} SSubmitReq2Msg; } SSubmitReq2Msg;
int32_t transformRawSSubmitTbData(void* data, int64_t suid, int64_t uid, int32_t sver);
int32_t tEncodeSubmitReq(SEncoder* pCoder, const SSubmitReq2* pReq); int32_t tEncodeSubmitReq(SEncoder* pCoder, const SSubmitReq2* pReq);
int32_t tDecodeSubmitReq(SDecoder* pCoder, SSubmitReq2* pReq); int32_t tDecodeSubmitReq(SDecoder* pCoder, SSubmitReq2* pReq, SArray* rawList);
void tDestroySubmitTbData(SSubmitTbData* pTbData, int32_t flag); void tDestroySubmitTbData(SSubmitTbData* pTbData, int32_t flag);
void tDestroySubmitReq(SSubmitReq2* pReq, int32_t flag); void tDestroySubmitReq(SSubmitReq2* pReq, int32_t flag);

View File

@ -180,8 +180,11 @@ int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsS
STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl,
char* msgBuf, int32_t msgBufLen, void* charsetCxt); char* msgBuf, int32_t msgBufLen, void* charsetCxt);
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash); int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash);
int32_t smlBuildOutputRaw(SQuery* handle, SHashObj* pVgHash);
int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* pTableMeta, void* data);
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* fields, int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* fields,
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw); int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw);
int32_t checkSchema(SSchema* pColSchema, int8_t* fields, char* errstr, int32_t errstrLen);
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray); int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
int32_t serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap, SArray** pOut); int32_t serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap, SArray** pOut);

View File

@ -222,7 +222,10 @@ typedef struct STableDataCxt {
STSchema* pSchema; STSchema* pSchema;
SBoundColInfo boundColsInfo; SBoundColInfo boundColsInfo;
SArray* pValues; SArray* pValues;
SSubmitTbData* pData; union {
SSubmitTbData* pData;
void* raw;
};
SRowKey lastKey; SRowKey lastKey;
bool ordered; bool ordered;
bool duplicateTs; bool duplicateTs;

View File

@ -1016,6 +1016,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015) #define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015)
#define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016) #define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016)
#define TSDB_CODE_TMQ_INVALID_STATUS TAOS_DEF_ERROR_CODE(0, 0x4017) #define TSDB_CODE_TMQ_INVALID_STATUS TAOS_DEF_ERROR_CODE(0, 0x4017)
#define TSDB_CODE_TMQ_INVALID_DATA TAOS_DEF_ERROR_CODE(0, 0x4018)
// stream // stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)

View File

@ -45,6 +45,7 @@ enum {
RES_TYPE__TMQ_META, RES_TYPE__TMQ_META,
RES_TYPE__TMQ_METADATA, RES_TYPE__TMQ_METADATA,
RES_TYPE__TMQ_BATCH_META, RES_TYPE__TMQ_BATCH_META,
RES_TYPE__TMQ_RAWDATA,
}; };
#define SHOW_VARIABLES_RESULT_COLS 5 #define SHOW_VARIABLES_RESULT_COLS 5
@ -55,6 +56,7 @@ enum {
#define SHOW_VARIABLES_RESULT_FIELD5_LEN (TSDB_CONFIG_INFO_LEN + VARSTR_HEADER_SIZE) #define SHOW_VARIABLES_RESULT_FIELD5_LEN (TSDB_CONFIG_INFO_LEN + VARSTR_HEADER_SIZE)
#define TD_RES_QUERY(res) (*(int8_t*)(res) == RES_TYPE__QUERY) #define TD_RES_QUERY(res) (*(int8_t*)(res) == RES_TYPE__QUERY)
#define TD_RES_TMQ_RAW(res) (*(int8_t*)(res) == RES_TYPE__TMQ_RAWDATA)
#define TD_RES_TMQ(res) (*(int8_t*)(res) == RES_TYPE__TMQ) #define TD_RES_TMQ(res) (*(int8_t*)(res) == RES_TYPE__TMQ)
#define TD_RES_TMQ_META(res) (*(int8_t*)(res) == RES_TYPE__TMQ_META) #define TD_RES_TMQ_META(res) (*(int8_t*)(res) == RES_TYPE__TMQ_META)
#define TD_RES_TMQ_METADATA(res) (*(int8_t*)(res) == RES_TYPE__TMQ_METADATA) #define TD_RES_TMQ_METADATA(res) (*(int8_t*)(res) == RES_TYPE__TMQ_METADATA)
@ -251,6 +253,10 @@ typedef struct {
SMqDataRsp dataRsp; SMqDataRsp dataRsp;
SMqMetaRsp metaRsp; SMqMetaRsp metaRsp;
SMqBatchMetaRsp batchMetaRsp; SMqBatchMetaRsp batchMetaRsp;
struct{
int32_t len;
void* rawData;
};
}; };
} SMqRspObj; } SMqRspObj;

View File

@ -253,7 +253,7 @@ void taos_cleanup(void) {
taosCloseRef(id); taosCloseRef(id);
nodesDestroyAllocatorSet(); nodesDestroyAllocatorSet();
// cleanupAppInfo(); cleanupAppInfo();
rpcCleanup(); rpcCleanup();
tscDebug("rpc cleanup"); tscDebug("rpc cleanup");
@ -502,7 +502,7 @@ void taos_close(TAOS *taos) {
} }
int taos_errno(TAOS_RES *res) { int taos_errno(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return terrno; return terrno;
} }
@ -514,7 +514,7 @@ int taos_errno(TAOS_RES *res) {
} }
const char *taos_errstr(TAOS_RES *res) { const char *taos_errstr(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return (const char *)tstrerror(terrno); return (const char *)tstrerror(terrno);
} }
@ -554,6 +554,9 @@ void taos_free_result(TAOS_RES *res) {
tDeleteMqMetaRsp(&pRsp->metaRsp); tDeleteMqMetaRsp(&pRsp->metaRsp);
} else if (TD_RES_TMQ_BATCH_META(res)) { } else if (TD_RES_TMQ_BATCH_META(res)) {
tDeleteMqBatchMetaRsp(&pRsp->batchMetaRsp); tDeleteMqBatchMetaRsp(&pRsp->batchMetaRsp);
} else if (TD_RES_TMQ_RAW(res)) {
taosMemoryFree(pRsp->rawData);
doFreeReqResultInfo(&pRsp->resInfo);
} }
taosMemoryFree(pRsp); taosMemoryFree(pRsp);
} }
@ -572,7 +575,7 @@ void taos_kill_query(TAOS *taos) {
} }
int taos_field_count(TAOS_RES *res) { int taos_field_count(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return 0; return 0;
} }
@ -583,7 +586,7 @@ int taos_field_count(TAOS_RES *res) {
int taos_num_fields(TAOS_RES *res) { return taos_field_count(res); } int taos_num_fields(TAOS_RES *res) { return taos_field_count(res); }
TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
if (taos_num_fields(res) == 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { if (taos_num_fields(res) == 0 || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return NULL; return NULL;
} }
@ -643,7 +646,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return NULL; return NULL;
} else { } else {
tscError("invalid result passed to taos_fetch_row"); tscError("invalid result passed to taos_fetch_row");
terrno = TSDB_CODE_TSC_INTERNAL_ERROR; terrno = TSDB_CODE_TMQ_INVALID_DATA;
return NULL; return NULL;
} }
} }
@ -764,7 +767,7 @@ int taos_print_row_with_size(char *str, uint32_t size, TAOS_ROW row, TAOS_FIELD
} }
int *taos_fetch_lengths(TAOS_RES *res) { int *taos_fetch_lengths(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return NULL; return NULL;
} }
@ -773,7 +776,7 @@ int *taos_fetch_lengths(TAOS_RES *res) {
} }
TAOS_ROW *taos_result_block(TAOS_RES *res) { TAOS_ROW *taos_result_block(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return NULL; return NULL;
} }
@ -841,7 +844,7 @@ const char *taos_get_client_info() { return td_version; }
// return int32_t // return int32_t
int taos_affected_rows(TAOS_RES *res) { int taos_affected_rows(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
TD_RES_TMQ_BATCH_META(res)) { TD_RES_TMQ_BATCH_META(res)) {
return 0; return 0;
} }
@ -853,7 +856,7 @@ int taos_affected_rows(TAOS_RES *res) {
// return int64_t // return int64_t
int64_t taos_affected_rows64(TAOS_RES *res) { int64_t taos_affected_rows64(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
TD_RES_TMQ_BATCH_META(res)) { TD_RES_TMQ_BATCH_META(res)) {
return 0; return 0;
} }
@ -864,7 +867,7 @@ int64_t taos_affected_rows64(TAOS_RES *res) {
} }
int taos_result_precision(TAOS_RES *res) { int taos_result_precision(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return TSDB_TIME_PRECISION_MILLI; return TSDB_TIME_PRECISION_MILLI;
} }
@ -904,7 +907,7 @@ int taos_select_db(TAOS *taos, const char *db) {
} }
void taos_stop_query(TAOS_RES *res) { void taos_stop_query(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
TD_RES_TMQ_BATCH_META(res)) { TD_RES_TMQ_BATCH_META(res)) {
return; return;
} }
@ -913,7 +916,7 @@ void taos_stop_query(TAOS_RES *res) {
} }
bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return true; return true;
} }
SReqResultInfo *pResultInfo = tscGetCurResInfo(res); SReqResultInfo *pResultInfo = tscGetCurResInfo(res);
@ -938,7 +941,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
} }
int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return 0; return 0;
} }
@ -973,7 +976,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
return 0; return 0;
} else { } else {
tscError("taos_fetch_block_s invalid res type"); tscError("taos_fetch_block_s invalid res type");
return -1; return TSDB_CODE_TMQ_INVALID_DATA;
} }
} }
@ -981,7 +984,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
*numOfRows = 0; *numOfRows = 0;
*pData = NULL; *pData = NULL;
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return 0; return 0;
} }
@ -1018,7 +1021,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
} }
int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return 0; return 0;
} }
@ -1038,7 +1041,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows) { int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows) {
if (res == NULL || result == NULL || rows == NULL || *rows <= 0 || columnIndex < 0 || TD_RES_TMQ_META(res) || if (res == NULL || result == NULL || rows == NULL || *rows <= 0 || columnIndex < 0 || TD_RES_TMQ_META(res) ||
TD_RES_TMQ_BATCH_META(res)) { TD_RES_TMQ_RAW(res) || TD_RES_TMQ_BATCH_META(res)) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }

View File

@ -1844,9 +1844,10 @@ static void* getRawDataFromRes(void* pRetrieve) {
} }
void* rawData = NULL; void* rawData = NULL;
// deal with compatibility // deal with compatibility
if (*(int64_t*)pRetrieve == 0) { if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_VERSION) {
rawData = ((SRetrieveTableRsp*)pRetrieve)->data; rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
} else if (*(int64_t*)pRetrieve == 1) { } else if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_VERSION ||
*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION) {
rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data; rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
} }
return rawData; return rawData;
@ -1947,7 +1948,10 @@ static int32_t initRawCacheHash() {
} }
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) { static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
if (rawData == NULL || pTableMeta == NULL || pSW == NULL) { if (rawData == NULL){
return false;
}
if (pTableMeta == NULL || pSW == NULL) {
uError("invalid parameter in %s", __func__); uError("invalid parameter in %s", __func__);
return false; return false;
} }
@ -1965,6 +1969,7 @@ static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrappe
if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) { if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) {
return true; return true;
} }
for (int i = 0; i < pSW->nCols; i++) { for (int i = 0; i < pSW->nCols; i++) {
int j = 0; int j = 0;
for (; j < pTableMeta->tableInfo.numOfColumns; j++) { for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
@ -1972,7 +1977,7 @@ static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrappe
char* fieldName = pSW->pSchema[i].name; char* fieldName = pSW->pSchema[i].name;
if (strcmp(pColSchema->name, fieldName) == 0) { if (strcmp(pColSchema->name, fieldName) == 0) {
if (*fields != pColSchema->type || *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) { if (checkSchema(pColSchema, fields, NULL, 0) != 0){
return true; return true;
} }
break; break;
@ -2069,7 +2074,7 @@ static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj
SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName, SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) { STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL || if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL ||
pMeta == NULL || pSW == NULL || rawData == NULL) { pMeta == NULL || pSW == NULL) {
uError("invalid parameter in %s", __func__); uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
@ -2298,6 +2303,96 @@ end:
return code; return code;
} }
static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
if (taos == NULL || data == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
int32_t code = TSDB_CODE_SUCCESS;
SQuery* pQuery = NULL;
SHashObj* pVgroupHash = NULL;
SMqRspObj rspObj = {0};
SDecoder decoder = {0};
SRequestObj* pRequest = NULL;
SCatalog* pCatalog = NULL;
SRequestConnInfo conn = {0};
RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeSTaosxRsp, &rspObj));
SHashObj* pVgHash = NULL;
SHashObj* pNameHash = NULL;
SHashObj* pMetaHash = NULL;
RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
int retry = 0;
while (1) {
RAW_RETURN_CHECK(smlInitHandle(&pQuery));
uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(pQuery)->pRoot;
pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
RAW_NULL_CHECK(pVgroupHash);
pStmt->pVgDataBlocks = taosArrayInit(8, POINTER_BYTES);
RAW_NULL_CHECK(pStmt->pVgDataBlocks);
while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
if (!rspObj.dataRsp.withSchema) {
goto end;
}
const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
RAW_NULL_CHECK(tbName);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
RAW_NULL_CHECK(pSW);
void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
RAW_NULL_CHECK(pRetrieve);
void* rawData = getRawDataFromRes(pRetrieve);
RAW_NULL_CHECK(rawData);
uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
// find schema data info
STableMeta* pTableMeta = NULL;
RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName,
&pTableMeta, pSW, NULL, retry));
char err[ERR_MSG_LEN] = {0};
code = rawBlockBindRawData(pVgroupHash, pStmt->pVgDataBlocks, pTableMeta, rawData);
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
goto end;
}
}
taosHashCleanup(pVgroupHash);
pVgroupHash = NULL;
RAW_RETURN_CHECK(smlBuildOutputRaw(pQuery, pVgHash));
launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code;
if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
qDestroyQuery(pQuery);
pQuery = NULL;
rspObj.resIter = -1;
continue;
}
break;
}
end:
uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
tDeleteSTaosxRsp(&rspObj.dataRsp);
tDecoderClear(&decoder);
qDestroyQuery(pQuery);
taosHashCleanup(pVgroupHash);
destroyRequest(pRequest);
return code;
}
static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) { static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) {
if (pMetaRsp == NULL || meta == NULL) { if (pMetaRsp == NULL || meta == NULL) {
uError("invalid parameter in %s", __func__); uError("invalid parameter in %s", __func__);
@ -2499,6 +2594,11 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
raw->raw_len = rspObj->batchMetaRsp.metaBuffLen; raw->raw_len = rspObj->batchMetaRsp.metaBuffLen;
raw->raw_type = rspObj->resType; raw->raw_type = rspObj->resType;
uDebug("tmq get raw batch meta:%p", raw); uDebug("tmq get raw batch meta:%p", raw);
} else if (TD_RES_TMQ_RAW(res)) {
raw->raw = rspObj->rawData;
raw->raw_len = rspObj->len;
raw->raw_type = rspObj->resType;
uDebug("tmq get raw raw:%p", raw);
} else { } else {
uError("tmq get raw error type:%d", *(int8_t*)res); uError("tmq get raw error type:%d", *(int8_t*)res);
return TSDB_CODE_TMQ_INVALID_MSG; return TSDB_CODE_TMQ_INVALID_MSG;
@ -2508,7 +2608,9 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
void tmq_free_raw(tmq_raw_data raw) { void tmq_free_raw(tmq_raw_data raw) {
uDebug("tmq free raw data type:%d", raw.raw_type); uDebug("tmq free raw data type:%d", raw.raw_type);
if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) { if (raw.raw_type == RES_TYPE__TMQ ||
raw.raw_type == RES_TYPE__TMQ_RAWDATA ||
raw.raw_type == RES_TYPE__TMQ_METADATA) {
taosMemoryFree(raw.raw); taosMemoryFree(raw.raw);
} }
(void)memset(terrMsg, 0, ERR_MSG_LEN); (void)memset(terrMsg, 0, ERR_MSG_LEN);
@ -2559,6 +2661,8 @@ static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type)
return taosDeleteData(taos, buf, len); return taosDeleteData(taos, buf, len);
} else if (type == RES_TYPE__TMQ_METADATA) { } else if (type == RES_TYPE__TMQ_METADATA) {
return tmqWriteRawMetaDataImpl(taos, buf, len); return tmqWriteRawMetaDataImpl(taos, buf, len);
} else if (type == RES_TYPE__TMQ_RAWDATA) {
return tmqWriteRawRawDataImpl(taos, buf, len);
} else if (type == RES_TYPE__TMQ) { } else if (type == RES_TYPE__TMQ) {
return tmqWriteRawDataImpl(taos, buf, len); return tmqWriteRawDataImpl(taos, buf, len);
} else if (type == RES_TYPE__TMQ_BATCH_META) { } else if (type == RES_TYPE__TMQ_BATCH_META) {

View File

@ -91,6 +91,7 @@ struct tmq_conf_t {
int8_t snapEnable; int8_t snapEnable;
int8_t replayEnable; int8_t replayEnable;
int8_t sourceExcluded; // do not consume, bit int8_t sourceExcluded; // do not consume, bit
int8_t rawData; // fetch raw data
uint16_t port; uint16_t port;
int32_t autoCommitInterval; int32_t autoCommitInterval;
int32_t sessionTimeoutMs; int32_t sessionTimeoutMs;
@ -120,6 +121,7 @@ struct tmq_t {
int8_t resetOffsetCfg; int8_t resetOffsetCfg;
int8_t replayEnable; int8_t replayEnable;
int8_t sourceExcluded; // do not consume, bit int8_t sourceExcluded; // do not consume, bit
int8_t rawData; // fetch raw data
int64_t consumerId; int64_t consumerId;
tmq_commit_cb* commitCb; tmq_commit_cb* commitCb;
void* commitCbUserParam; void* commitCbUserParam;
@ -195,6 +197,10 @@ typedef struct {
SMqDataRsp dataRsp; SMqDataRsp dataRsp;
SMqMetaRsp metaRsp; SMqMetaRsp metaRsp;
SMqBatchMetaRsp batchMetaRsp; SMqBatchMetaRsp batchMetaRsp;
struct{
int32_t len;
void* rawData;
};
}; };
} SMqPollRspWrapper; } SMqPollRspWrapper;
@ -490,11 +496,17 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
} }
} }
if (strcasecmp(key, "msg.consume.excluded") == 0) { if (strcasecmp(key, "msg.consume.excluded") == 0) {
int64_t tmp; int64_t tmp = 0;
code = taosStr2int64(value, &tmp); code = taosStr2int64(value, &tmp);
conf->sourceExcluded = (0 == code && tmp != 0) ? TD_REQ_FROM_TAOX : 0; conf->sourceExcluded = (0 == code && tmp != 0) ? TD_REQ_FROM_TAOX : 0;
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
if (strcasecmp(key, "msg.consume.rawdata") == 0) {
int64_t tmp = 0;
code = taosStr2int64(value, &tmp);
conf->rawData = (0 == code && tmp != 0) ? 1 : 0;
return TMQ_CONF_OK;
}
if (strcasecmp(key, "td.connect.db") == 0) { if (strcasecmp(key, "td.connect.db") == 0) {
return TMQ_CONF_OK; return TMQ_CONF_OK;
@ -811,7 +823,7 @@ static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_c
goto end; goto end;
} }
if (TD_RES_TMQ(pRes) || TD_RES_TMQ_META(pRes) || if (TD_RES_TMQ(pRes) || TD_RES_TMQ_RAW(pRes) || TD_RES_TMQ_META(pRes) ||
TD_RES_TMQ_METADATA(pRes) || TD_RES_TMQ_BATCH_META(pRes)) { TD_RES_TMQ_METADATA(pRes) || TD_RES_TMQ_BATCH_META(pRes)) {
SMqRspObj* pRspObj = (SMqRspObj*)pRes; SMqRspObj* pRspObj = (SMqRspObj*)pRes;
pTopicName = pRspObj->topic; pTopicName = pRspObj->topic;
@ -1081,6 +1093,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
sendInfo->fp = tmqHbCb; sendInfo->fp = tmqHbCb;
sendInfo->msgType = TDMT_MND_TMQ_HB; sendInfo->msgType = TDMT_MND_TMQ_HB;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo); int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
@ -1121,6 +1134,10 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
DELETE_POLL_RSP(tDeleteMqMetaRsp,&pRsp->metaRsp) DELETE_POLL_RSP(tDeleteMqMetaRsp,&pRsp->metaRsp)
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
DELETE_POLL_RSP(tDeleteMqBatchMetaRsp,&pRsp->batchMetaRsp) DELETE_POLL_RSP(tDeleteMqBatchMetaRsp,&pRsp->batchMetaRsp)
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
SMqPollRspWrapper* pRsp = &rspWrapper->pollRsp;
taosMemoryFreeClear(pRsp->pEpset);
taosMemoryFreeClear(pRsp->rawData);
} }
} }
@ -1709,6 +1726,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->resetOffsetCfg = conf->resetOffset; pTmq->resetOffsetCfg = conf->resetOffset;
pTmq->replayEnable = conf->replayEnable; pTmq->replayEnable = conf->replayEnable;
pTmq->sourceExcluded = conf->sourceExcluded; pTmq->sourceExcluded = conf->sourceExcluded;
pTmq->rawData = conf->rawData;
pTmq->enableBatchMeta = conf->enableBatchMeta; pTmq->enableBatchMeta = conf->enableBatchMeta;
tstrncpy(pTmq->user, user, TSDB_USER_LEN); tstrncpy(pTmq->user, user, TSDB_USER_LEN);
if (taosGetFqdn(pTmq->fqdn) != 0) { if (taosGetFqdn(pTmq->fqdn) != 0) {
@ -2042,6 +2060,10 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp) PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp)
} else if (rspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { } else if (rspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp) PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp)
} else if (rspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
pRspWrapper->pollRsp.len = pMsg->len;
pRspWrapper->pollRsp.rawData = pMsg->pData;
pMsg->pData = NULL;
} else { // invalid rspType } else { // invalid rspType
tqErrorC("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType); tqErrorC("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
code = TSDB_CODE_TSC_INTERNAL_ERROR; code = TSDB_CODE_TSC_INTERNAL_ERROR;
@ -2098,6 +2120,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl
pReq->reqId = generateRequestId(); pReq->reqId = generateRequestId();
pReq->enableReplay = tmq->replayEnable; pReq->enableReplay = tmq->replayEnable;
pReq->sourceExcluded = tmq->sourceExcluded; pReq->sourceExcluded = tmq->sourceExcluded;
pReq->rawData = tmq->rawData;
pReq->enableBatchMeta = tmq->enableBatchMeta; pReq->enableBatchMeta = tmq->enableBatchMeta;
} }
@ -2346,6 +2369,10 @@ static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){
SMqDataRsp dataRsp; SMqDataRsp dataRsp;
SMqMetaRsp metaRsp; SMqMetaRsp metaRsp;
SMqBatchMetaRsp batchMetaRsp; SMqBatchMetaRsp batchMetaRsp;
struct{
int32_t len;
void* rawData;
};
} MEMSIZE; } MEMSIZE;
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
@ -2418,13 +2445,16 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
pVg->epSet = *pollRspWrapper->pEpset; pVg->epSet = *pollRspWrapper->pEpset;
} }
if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ||
pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP ||
pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
updateVgInfo(pVg, &pollRspWrapper->dataRsp.reqOffset, &pollRspWrapper->dataRsp.rspOffset, pollRspWrapper->head.walsver, pollRspWrapper->head.walever, updateVgInfo(pVg, &pollRspWrapper->dataRsp.reqOffset, &pollRspWrapper->dataRsp.rspOffset, pollRspWrapper->head.walsver, pollRspWrapper->head.walever,
tmq->consumerId, pollRspWrapper->dataRsp.blockNum != 0); tmq->consumerId, pollRspWrapper->dataRsp.blockNum != 0);
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pollRspWrapper->rspOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &pollRspWrapper->rspOffset);
if (pollRspWrapper->dataRsp.blockNum == 0) { if (pollRspWrapper->dataRsp.blockNum == 0 &&
pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64 tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
", total:%" PRId64 ",QID:0x%" PRIx64, ", total:%" PRId64 ",QID:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
@ -2435,12 +2465,18 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId); tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
goto END; goto END;
} }
pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ? RES_TYPE__TMQ : RES_TYPE__TMQ_METADATA; pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP ? RES_TYPE__TMQ_RAWDATA :
(pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ? RES_TYPE__TMQ : RES_TYPE__TMQ_METADATA);
int64_t numOfRows = 0; int64_t numOfRows = 0;
tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj); if (pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){
tmq->totalRows += numOfRows; tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj);
tmq->totalRows += numOfRows;
} else {
pRspObj->rawData = pollRspWrapper->rawData;
pRspObj->len = pollRspWrapper->len;
}
pVg->emptyBlockReceiveTs = 0; pVg->emptyBlockReceiveTs = 0;
if (tmq->replayEnable) { if (tmq->replayEnable && pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
pVg->blockReceiveTs = taosGetTimestampMs(); pVg->blockReceiveTs = taosGetTimestampMs();
pVg->blockSleepForReplay = pRspObj->dataRsp.sleepTime; pVg->blockSleepForReplay = pRspObj->dataRsp.sleepTime;
if (pVg->blockSleepForReplay > 0) { if (pVg->blockSleepForReplay > 0) {
@ -2656,6 +2692,8 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) {
return TMQ_RES_METADATA; return TMQ_RES_METADATA;
} else if (TD_RES_TMQ_BATCH_META(res)) { } else if (TD_RES_TMQ_BATCH_META(res)) {
return TMQ_RES_TABLE_META; return TMQ_RES_TABLE_META;
} else if (TD_RES_TMQ_RAW(res)) {
return TMQ_RES_RAWDATA;
} else { } else {
return TMQ_RES_INVALID; return TMQ_RES_INVALID;
} }
@ -2665,7 +2703,7 @@ const char* tmq_get_topic_name(TAOS_RES* res) {
if (res == NULL) { if (res == NULL) {
return NULL; return NULL;
} }
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
char* tmp = strchr(((SMqRspObj*)res)->topic, '.'); char* tmp = strchr(((SMqRspObj*)res)->topic, '.');
if (tmp == NULL) { if (tmp == NULL) {
@ -2682,7 +2720,7 @@ const char* tmq_get_db_name(TAOS_RES* res) {
return NULL; return NULL;
} }
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) { TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
char* tmp = strchr(((SMqRspObj*)res)->db, '.'); char* tmp = strchr(((SMqRspObj*)res)->db, '.');
if (tmp == NULL) { if (tmp == NULL) {
@ -2698,7 +2736,7 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
if (res == NULL) { if (res == NULL) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) { TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
return ((SMqRspObj*)res)->vgId; return ((SMqRspObj*)res)->vgId;
} else { } else {
@ -2710,7 +2748,7 @@ int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
if (res == NULL) { if (res == NULL) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res; SMqRspObj* pRspObj = (SMqRspObj*)res;
STqOffsetVal* pOffset = &pRspObj->dataRsp.reqOffset; STqOffsetVal* pOffset = &pRspObj->dataRsp.reqOffset;
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
@ -2735,7 +2773,7 @@ const char* tmq_get_table_name(TAOS_RES* res) {
if (res == NULL) { if (res == NULL) {
return NULL; return NULL;
} }
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ) {
SMqRspObj* pRspObj = (SMqRspObj*)res; SMqRspObj* pRspObj = (SMqRspObj*)res;
SMqDataRsp* data = &pRspObj->dataRsp; SMqDataRsp* data = &pRspObj->dataRsp;
if (!data->withTbName || data->blockTbName == NULL || pRspObj->resIter < 0 || if (!data->withTbName || data->blockTbName == NULL || pRspObj->resIter < 0 ||

View File

@ -9205,6 +9205,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->enableReplay)); TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->enableReplay));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->sourceExcluded)); TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->sourceExcluded));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->enableBatchMeta)); TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->enableBatchMeta));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->rawData));
tEndEncode(&encoder); tEndEncode(&encoder);
@ -9258,6 +9259,10 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
pReq->enableBatchMeta = false; pReq->enableBatchMeta = false;
} }
if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->rawData));
}
tEndDecode(&decoder); tEndDecode(&decoder);
_exit: _exit:
@ -11609,6 +11614,26 @@ int32_t tDecodeSBatchDeleteReqSetCtime(SDecoder *pDecoder, SBatchDeleteReq *pReq
_exit: _exit:
return code; return code;
} }
int32_t transformRawSSubmitTbData(void* data, int64_t suid, int64_t uid, int32_t sver){
int32_t code = 0;
int32_t lino = 0;
SDecoder decoder = {0};
tDecoderInit(&decoder, (uint8_t *)POINTER_SHIFT(data, INT_BYTES), *(uint32_t*)data);
int32_t flags = 0;
TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &flags));
flags |= TD_REQ_FROM_TAOX;
flags &= ~SUBMIT_REQ_AUTO_CREATE_TABLE;
SEncoder encoder = {0};
tEncoderInit(&encoder, (uint8_t *)POINTER_SHIFT(data, INT_BYTES), *(uint32_t*)data);
TAOS_CHECK_EXIT(tEncodeI32v(&encoder, flags));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, suid));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, uid));
TAOS_CHECK_EXIT(tEncodeI32v(&encoder, sver));
_exit:
return code;
}
static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubmitTbData) { static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubmitTbData) {
int32_t code = 0; int32_t code = 0;
@ -11656,14 +11681,21 @@ _exit:
return code; return code;
} }
static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbData) { static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbData, void* rawData) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
int32_t flags; int32_t flags;
uint8_t version; uint8_t version;
uint8_t* dataAfterCreate = NULL;
uint8_t* dataStart = pCoder->data;
uint32_t posStart = pCoder->pos;
uint32_t posAfterCreate = 0;
TAOS_CHECK_EXIT(tStartDecode(pCoder)); TAOS_CHECK_EXIT(tStartDecode(pCoder));
uint32_t pos = pCoder->pos;
TAOS_CHECK_EXIT(tDecodeI32v(pCoder, &flags)); TAOS_CHECK_EXIT(tDecodeI32v(pCoder, &flags));
uint32_t flagsLen = pCoder->pos - posStart;
pSubmitTbData->flags = flags & 0xff; pSubmitTbData->flags = flags & 0xff;
version = (flags >> 8) & 0xff; version = (flags >> 8) & 0xff;
@ -11675,6 +11707,8 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
} }
TAOS_CHECK_EXIT(tDecodeSVCreateTbReq(pCoder, pSubmitTbData->pCreateTbReq)); TAOS_CHECK_EXIT(tDecodeSVCreateTbReq(pCoder, pSubmitTbData->pCreateTbReq));
dataAfterCreate = pCoder->data;
posAfterCreate = pCoder->pos;
} }
// submit data // submit data
@ -11682,35 +11716,56 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
TAOS_CHECK_EXIT(tDecodeI64(pCoder, &pSubmitTbData->uid)); TAOS_CHECK_EXIT(tDecodeI64(pCoder, &pSubmitTbData->uid));
TAOS_CHECK_EXIT(tDecodeI32v(pCoder, &pSubmitTbData->sver)); TAOS_CHECK_EXIT(tDecodeI32v(pCoder, &pSubmitTbData->sver));
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { if (rawData != NULL){ // no need to decode data
uint64_t nColData; if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
uint64_t nColData = 0;
TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nColData)); TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nColData));
pSubmitTbData->aCol = taosArrayInit(nColData, sizeof(SColData)); for (int32_t i = 0; i < nColData; ++i) {
if (pSubmitTbData->aCol == NULL) { SColData pColData = {0};
TAOS_CHECK_EXIT(terrno); TAOS_CHECK_EXIT(tDecodeColData(version, pCoder, &pColData));
} }
} else {
uint64_t nRow = 0;
TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nRow));
for (int32_t i = 0; i < nColData; ++i) { for (int32_t iRow = 0; iRow < nRow; ++iRow) {
TAOS_CHECK_EXIT(tDecodeColData(version, pCoder, taosArrayReserve(pSubmitTbData->aCol, 1))); SRow *ppRow = NULL;
TAOS_CHECK_EXIT(tDecodeRow(pCoder, &ppRow));
}
} }
} else { } else {
uint64_t nRow; if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nRow)); uint64_t nColData = 0;
pSubmitTbData->aRowP = taosArrayInit(nRow, sizeof(SRow *)); TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nColData));
if (pSubmitTbData->aRowP == NULL) {
TAOS_CHECK_EXIT(terrno);
}
for (int32_t iRow = 0; iRow < nRow; ++iRow) { pSubmitTbData->aCol = taosArrayInit(nColData, sizeof(SColData));
SRow **ppRow = taosArrayReserve(pSubmitTbData->aRowP, 1); if (pSubmitTbData->aCol == NULL) {
if (ppRow == NULL) {
TAOS_CHECK_EXIT(terrno); TAOS_CHECK_EXIT(terrno);
} }
TAOS_CHECK_EXIT(tDecodeRow(pCoder, ppRow)); for (int32_t i = 0; i < nColData; ++i) {
TAOS_CHECK_EXIT(tDecodeColData(version, pCoder, taosArrayReserve(pSubmitTbData->aCol, 1)));
}
} else {
uint64_t nRow = 0;
TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nRow));
pSubmitTbData->aRowP = taosArrayInit(nRow, sizeof(SRow *));
if (pSubmitTbData->aRowP == NULL) {
TAOS_CHECK_EXIT(terrno);
}
for (int32_t iRow = 0; iRow < nRow; ++iRow) {
SRow **ppRow = taosArrayReserve(pSubmitTbData->aRowP, 1);
if (ppRow == NULL) {
TAOS_CHECK_EXIT(terrno);
}
TAOS_CHECK_EXIT(tDecodeRow(pCoder, ppRow));
}
} }
} }
@ -11720,6 +11775,16 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
} }
tEndDecode(pCoder); tEndDecode(pCoder);
if (rawData != NULL){
if (dataAfterCreate != NULL){
TAOS_MEMCPY(dataAfterCreate - INT_BYTES - flagsLen, dataStart, INT_BYTES + flagsLen);
*(int32_t*)(dataAfterCreate - INT_BYTES - flagsLen) = pCoder->pos - posAfterCreate + flagsLen;
*(void**)rawData = dataAfterCreate - INT_BYTES - flagsLen;
}else{
*(void**)rawData = dataStart;
}
}
_exit: _exit:
return code; return code;
@ -11731,15 +11796,27 @@ int32_t tEncodeSubmitReq(SEncoder *pCoder, const SSubmitReq2 *pReq) {
TAOS_CHECK_EXIT(tStartEncode(pCoder)); TAOS_CHECK_EXIT(tStartEncode(pCoder));
TAOS_CHECK_EXIT(tEncodeU64v(pCoder, taosArrayGetSize(pReq->aSubmitTbData))); TAOS_CHECK_EXIT(tEncodeU64v(pCoder, taosArrayGetSize(pReq->aSubmitTbData)));
for (uint64_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); i++) { if (pReq->raw){
TAOS_CHECK_EXIT(tEncodeSSubmitTbData(pCoder, taosArrayGet(pReq->aSubmitTbData, i))); for (uint64_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); i++) {
void* data = taosArrayGetP(pReq->aSubmitTbData, i);
if (pCoder->data != NULL){
TAOS_MEMCPY(pCoder->data + pCoder->pos, data, *(uint32_t*)data + INT_BYTES);
}
pCoder->pos += *(uint32_t*)data + INT_BYTES;
}
} else{
for (uint64_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); i++) {
TAOS_CHECK_EXIT(tEncodeSSubmitTbData(pCoder, taosArrayGet(pReq->aSubmitTbData, i)));
}
} }
tEndEncode(pCoder); tEndEncode(pCoder);
_exit: _exit:
return code; return code;
} }
int32_t tDecodeSubmitReq(SDecoder *pCoder, SSubmitReq2 *pReq) { int32_t tDecodeSubmitReq(SDecoder *pCoder, SSubmitReq2 *pReq, SArray* rawList) {
int32_t code = 0; int32_t code = 0;
memset(pReq, 0, sizeof(*pReq)); memset(pReq, 0, sizeof(*pReq));
@ -11763,7 +11840,8 @@ int32_t tDecodeSubmitReq(SDecoder *pCoder, SSubmitReq2 *pReq) {
} }
for (uint64_t i = 0; i < nSubmitTbData; i++) { for (uint64_t i = 0; i < nSubmitTbData; i++) {
if (tDecodeSSubmitTbData(pCoder, taosArrayReserve(pReq->aSubmitTbData, 1)) < 0) { if (tDecodeSSubmitTbData(pCoder, taosArrayReserve(pReq->aSubmitTbData, 1),
rawList != NULL ? taosArrayReserve(rawList, 1) : NULL) < 0) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
goto _exit; goto _exit;
} }
@ -11834,12 +11912,15 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
void tDestroySubmitReq(SSubmitReq2 *pReq, int32_t flag) { void tDestroySubmitReq(SSubmitReq2 *pReq, int32_t flag) {
if (pReq->aSubmitTbData == NULL) return; if (pReq->aSubmitTbData == NULL) return;
int32_t nSubmitTbData = TARRAY_SIZE(pReq->aSubmitTbData); if (!pReq->raw){
SSubmitTbData *aSubmitTbData = (SSubmitTbData *)TARRAY_DATA(pReq->aSubmitTbData); int32_t nSubmitTbData = TARRAY_SIZE(pReq->aSubmitTbData);
SSubmitTbData *aSubmitTbData = (SSubmitTbData *)TARRAY_DATA(pReq->aSubmitTbData);
for (int32_t i = 0; i < nSubmitTbData; i++) { for (int32_t i = 0; i < nSubmitTbData; i++) {
tDestroySubmitTbData(&aSubmitTbData[i], flag); tDestroySubmitTbData(&aSubmitTbData[i], flag);
}
} }
taosArrayDestroy(pReq->aSubmitTbData); taosArrayDestroy(pReq->aSubmitTbData);
pReq->aSubmitTbData = NULL; pReq->aSubmitTbData = NULL;
} }

View File

@ -241,11 +241,10 @@ SSDataBlock *tqGetResultBlock(STqReader *pReader);
int64_t tqGetResultBlockTime(STqReader *pReader); int64_t tqGetResultBlockTime(STqReader *pReader);
int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, const char *id); int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, const char *id);
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver, SArray* rawList);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr); int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet, int32_t tqRetrieveTaosxBlock(STqReader *pReader, SMqDataRsp* pRsp, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta);
int64_t *createTime);
int32_t tqGetStreamExecInfo(SVnode *pVnode, int64_t streamId, int64_t *pDelay, bool *fhFinished); int32_t tqGetStreamExecInfo(SVnode *pVnode, int64_t streamId, int64_t *pDelay, bool *fhFinished);
// sma // sma

View File

@ -117,7 +117,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId); int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId);
// tqExec // tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded); int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, const SMqPollReq* pRequest);
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t type, int32_t vgId); int32_t type, int32_t vgId);
void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId); void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId);
@ -178,6 +178,7 @@ int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, vo
#define TQ_SUBSCRIBE_NAME "subscribe" #define TQ_SUBSCRIBE_NAME "subscribe"
#define TQ_OFFSET_NAME "offset-ver0" #define TQ_OFFSET_NAME "offset-ver0"
#define TQ_POLL_MAX_TIME 1000 #define TQ_POLL_MAX_TIME 1000
#define TQ_POLL_MAX_BYTES 1048576
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -352,7 +352,7 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
if (walReaderSeekVer(pReader->pWalReader, ver) < 0) { if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
return -1; return terrno;
} }
tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id); tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
return 0; return 0;
@ -485,14 +485,14 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
int64_t ver = pWalReader->pHead->head.version; int64_t ver = pWalReader->pHead->head.version;
if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver) != 0) { if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver, NULL) != 0) {
return false; return false;
} }
pReader->nextBlk = 0; pReader->nextBlk = 0;
} }
} }
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) { int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver, SArray* rawList) {
if (pReader == NULL) { if (pReader == NULL) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
@ -504,7 +504,7 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen); tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit); int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit, rawList);
tDecoderClear(&decoder); tDecoderClear(&decoder);
if (code != 0) { if (code != 0) {
@ -1046,7 +1046,45 @@ END:
return code; return code;
} }
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, int64_t *createTime) { static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){
int32_t code = 0;
int32_t lino = 0;
void* createReq = NULL;
TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
}
uint32_t len = 0;
tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
TSDB_CHECK_CODE(code, lino, END);
createReq = taosMemoryCalloc(1, len);
TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
SEncoder encoder = {0};
tEncoderInit(&encoder, createReq, len);
code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
tEncoderClear(&encoder);
TSDB_CHECK_CODE(code, lino, END);
TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
pRsp->createTableNum++;
tqDebug("build create table info msg success");
END:
if (code != 0){
tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
taosMemoryFree(createReq);
}
return code;
}
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta) {
tqDebug("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk); tqDebug("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pSubmitTbData == NULL) { if (pSubmitTbData == NULL) {
@ -1062,8 +1100,9 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
int64_t uid = pSubmitTbData->uid; int64_t uid = pSubmitTbData->uid;
pReader->lastBlkUid = uid; pReader->lastBlkUid = uid;
int64_t createTime = INT64_MAX;
tDeleteSchemaWrapper(pReader->pSchemaWrapper); tDeleteSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, createTime); pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &createTime);
if (pReader->pSchemaWrapper == NULL) { if (pReader->pSchemaWrapper == NULL) {
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer); pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
@ -1071,6 +1110,21 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
} }
if (fetchMeta != WITH_DATA &&
pSubmitTbData->pCreateTbReq != NULL &&
pSubmitTbData->ctimeMs - createTime <= 0) { // judge if table is already created to avoid sending crateTbReq
int32_t code = buildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq);
if (code != 0) {
return code;
}
} else if (rawList != NULL){
if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL){
return terrno;
}
pReader->pSchemaWrapper = NULL;
return 0;
}
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
return tqProcessColData(pReader, pSubmitTbData, blocks, schemas); return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
} else { } else {

View File

@ -14,6 +14,32 @@
*/ */
#include "tq.h" #include "tq.h"
static int32_t tqAddRawDataToRsp(const void* rawData, SMqDataRsp* pRsp, int8_t precision) {
int32_t code = TDB_CODE_SUCCESS;
int32_t lino = 0;
void* buf = NULL;
int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + *(uint32_t *)rawData + INT_BYTES;
buf = taosMemoryCalloc(1, dataStrLen);
TSDB_CHECK_NULL(buf, code, lino, END, terrno);
SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
pRetrieve->version = RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION;
pRetrieve->precision = precision;
pRetrieve->compressed = 0;
memcpy(pRetrieve->data, rawData, *(uint32_t *)rawData + INT_BYTES);
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &dataStrLen), code, lino, END, terrno);
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno);
tqDebug("add block data to block array, blockDataLen:%d, blockData:%p", dataStrLen, buf);
END:
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(buf);
tqError("%s failed at %d, failed to add block data to response:%s", __FUNCTION__, lino, tstrerror(code));
}
return code;
}
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) { static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
int32_t code = 0; int32_t code = 0;
@ -25,7 +51,7 @@ static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp,
TSDB_CHECK_NULL(buf, code, lino, END, terrno); TSDB_CHECK_NULL(buf, code, lino, END, terrno);
SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf; SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
pRetrieve->version = 1; pRetrieve->version = RETRIEVE_TABLE_RSP_TMQ_VERSION;
pRetrieve->precision = precision; pRetrieve->precision = precision;
pRetrieve->compressed = 0; pRetrieve->compressed = 0;
pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows); pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
@ -290,45 +316,8 @@ END:
return code; return code;
} }
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows,
int32_t code = 0; const SMqPollReq* pRequest, SArray* rawList){
int32_t lino = 0;
void* createReq = NULL;
TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
}
uint32_t len = 0;
tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
TSDB_CHECK_CODE(code, lino, END);
createReq = taosMemoryCalloc(1, len);
TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
SEncoder encoder = {0};
tEncoderInit(&encoder, createReq, len);
code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
tEncoderClear(&encoder);
TSDB_CHECK_CODE(code, lino, END);
TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
pRsp->createTableNum++;
tqDebug("build create table info msg success");
END:
if (code != 0){
tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
taosMemoryFree(createReq);
}
return code;
}
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SArray* pBlocks = NULL; SArray* pBlocks = NULL;
@ -342,36 +331,46 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
pSchemas = taosArrayInit(0, sizeof(void*)); pSchemas = taosArrayInit(0, sizeof(void*));
TSDB_CHECK_NULL(pSchemas, code, lino, END, terrno); TSDB_CHECK_NULL(pSchemas, code, lino, END, terrno);
SSubmitTbData* pSubmitTbDataRet = NULL; SSubmitTbData* pSubmitTbData = NULL;
int64_t createTime = INT64_MAX; code = tqRetrieveTaosxBlock(pReader, pRsp, pBlocks, pSchemas, &pSubmitTbData, rawList, pHandle->fetchMeta);
code = tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet, &createTime);
TSDB_CHECK_CODE(code, lino, END); TSDB_CHECK_CODE(code, lino, END);
bool tmp = (pSubmitTbDataRet->flags & sourceExcluded) != 0; bool tmp = (pSubmitTbData->flags & pRequest->sourceExcluded) != 0;
TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS); TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
int32_t blockNum = taosArrayGetSize(pBlocks) == 0 ? 1 : taosArrayGetSize(pBlocks);
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pTqReader->lastBlkUid; int64_t uid = pExec->pTqReader->lastBlkUid;
code = tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)); code = tqAddTbNameToRsp(pTq, uid, pRsp, blockNum);
TSDB_CHECK_CODE(code, lino, END); TSDB_CHECK_CODE(code, lino, END);
} }
if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) {
if (pSubmitTbDataRet->ctimeMs - createTime <= 1000) { // judge if table is already created to avoid sending crateTbReq tmp = (pHandle->fetchMeta == ONLY_META && pSubmitTbData->pCreateTbReq == NULL);
code = buildCreateTbInfo(pRsp, pSubmitTbDataRet->pCreateTbReq);
TSDB_CHECK_CODE(code, lino, END);
}
}
tmp = (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL);
TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS); TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { for (int32_t i = 0; i < blockNum; i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i); if (taosArrayGetSize(pBlocks) == 0){
if (pBlock == NULL) { void* rawData = taosArrayGetP(rawList, pReader->nextBlk - 1);
continue; if (rawData == NULL) {
continue;
}
if (tqAddRawDataToRsp(rawData, pRsp, pTq->pVnode->config.tsdbCfg.precision) != 0){
tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
continue;
}
*totalRows += *(uint32_t *)rawData + INT_BYTES; // bytes actually
} else {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
if (pBlock == NULL) {
continue;
}
if (tqAddBlockDataToRsp(pBlock, pRsp, taosArrayGetSize(pBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision) != 0){
tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
continue;
}
*totalRows += pBlock->info.rows;
blockDataFreeRes(pBlock);
} }
if (tqAddBlockDataToRsp(pBlock, pRsp, taosArrayGetSize(pBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision) != 0){
tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
continue;
}
*totalRows += pBlock->info.rows;
blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i); SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL){ if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId); tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
@ -391,29 +390,30 @@ END:
} }
} }
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded) { int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, const SMqPollReq* pRequest) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pHandle, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(totalRows, code, lino, END, TSDB_CODE_INVALID_PARA);
STqExecHandle* pExec = &pHandle->execHandle; STqExecHandle* pExec = &pHandle->execHandle;
STqReader* pReader = pExec->pTqReader; STqReader* pReader = pExec->pTqReader;
code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver); SArray *rawList = NULL;
if (pRequest->rawData){
rawList = taosArrayInit(0, POINTER_BYTES);
}
code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver, rawList);
TSDB_CHECK_CODE(code, lino, END); TSDB_CHECK_CODE(code, lino, END);
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
while (tqNextBlockImpl(pReader, NULL)) { while (tqNextBlockImpl(pReader, NULL)) {
tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded); tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList);
} }
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) { } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded); tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList);
} }
} }
END: END:
taosArrayDestroy(rawList);
if (code != 0){ if (code != 0){
tqError("%s failed at %d, failed to scan log:%s", __FUNCTION__, lino, tstrerror(code)); tqError("%s failed at %d, failed to scan log:%s", __FUNCTION__, lino, tstrerror(code));
} }

View File

@ -131,7 +131,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
} }
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId, tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
pHandle->subKey, vgId, dataRsp.rspOffset.version); pHandle->subKey, vgId, dataRsp.rspOffset.version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, (pRequest->rawData == 1) ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
tDeleteMqDataRsp(&dataRsp); tDeleteMqDataRsp(&dataRsp);
*pBlockReturned = true; *pBlockReturned = true;
@ -222,6 +222,10 @@ end:
static void tDeleteCommon(void* parm) {} static void tDeleteCommon(void* parm) {}
#define POLL_RSP_TYPE(pRequest,taosxRsp) \
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : \
(pRequest->rawData == 1 ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP)
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* offset) { SRpcMsg* pMsg, STqOffsetVal* offset) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
@ -274,7 +278,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
} }
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); POLL_RSP_TYPE(pRequest, taosxRsp), vgId);
goto END; goto END;
} }
@ -287,7 +291,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (totalRows > 0) { if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); POLL_RSP_TYPE(pRequest, taosxRsp), vgId);
goto END; goto END;
} }
@ -370,12 +374,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
.ver = pHead->version, .ver = pHead->version,
}; };
TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest->sourceExcluded)); TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest));
if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) { if ((pRequest->rawData == 0 && totalRows >= tmqRowSize) ||
(taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout)) ||
(pRequest->rawData != 0 && totalRows >= TQ_POLL_MAX_BYTES)) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); POLL_RSP_TYPE(pRequest, taosxRsp), vgId);
goto END; goto END;
} else { } else {
fetchVer++; fetchVer++;
@ -534,7 +540,9 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
int32_t len = 0; int32_t len = 0;
int32_t code = 0; int32_t code = 0;
if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) { if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
type == TMQ_MSG_TYPE__WALINFO_RSP ||
type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
tEncodeSize(tEncodeMqDataRsp, pRsp, len, code); tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
} else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code); tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
@ -558,7 +566,9 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, abuf, len); tEncoderInit(&encoder, abuf, len);
if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) { if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
type == TMQ_MSG_TYPE__WALINFO_RSP ||
type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
code = tEncodeMqDataRsp(&encoder, pRsp); code = tEncodeMqDataRsp(&encoder, pRsp);
} else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
code = tEncodeSTaosxRsp(&encoder, pRsp); code = tEncodeSTaosxRsp(&encoder, pRsp);

View File

@ -1873,7 +1873,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
len -= sizeof(SSubmitReq2Msg); len -= sizeof(SSubmitReq2Msg);
SDecoder dc = {0}; SDecoder dc = {0};
tDecoderInit(&dc, pReq, len); tDecoderInit(&dc, pReq, len);
if (tDecodeSubmitReq(&dc, pSubmitReq) < 0) { if (tDecodeSubmitReq(&dc, pSubmitReq, NULL) < 0) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
goto _exit; goto _exit;
} }

View File

@ -3953,7 +3953,7 @@ FETCH_NEXT_BLOCK:
QUERY_CHECK_NULL(pSubmit, code, lino, _end, terrno); QUERY_CHECK_NULL(pSubmit, code, lino, _end, terrno);
qDebug("set %d/%d as the input submit block, %s", current + 1, totalBlocks, id); qDebug("set %d/%d as the input submit block, %s", current + 1, totalBlocks, id);
if (pAPI->tqReaderFn.tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < if (pAPI->tqReaderFn.tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver, NULL) <
0) { 0) {
qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current, qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current,
totalBlocks, id); totalBlocks, id);

View File

@ -403,3 +403,18 @@ end:
} }
return code; return code;
} }
int32_t smlBuildOutputRaw(SQuery* handle, SHashObj* pVgHash) {
int32_t lino = 0;
int32_t code = 0;
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(handle)->pRoot;
code = insBuildVgDataBlocks(pVgHash, pStmt->pVgDataBlocks, &pStmt->pDataBlocks, false);
TSDB_CHECK_CODE(code, lino, end);
end:
if (code != 0) {
uError("%s failed at %d since %s", __func__, lino, tstrerror(code));
}
return code;
}

View File

@ -483,7 +483,7 @@ static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCx
return code; return code;
} }
static int32_t createVgroupDataCxt(STableDataCxt* pTableCxt, SHashObj* pVgroupHash, SArray* pVgroupList, static int32_t createVgroupDataCxt(int32_t vgId, SHashObj* pVgroupHash, SArray* pVgroupList,
SVgroupDataCxt** pOutput) { SVgroupDataCxt** pOutput) {
SVgroupDataCxt* pVgCxt = taosMemoryCalloc(1, sizeof(SVgroupDataCxt)); SVgroupDataCxt* pVgCxt = taosMemoryCalloc(1, sizeof(SVgroupDataCxt));
if (NULL == pVgCxt) { if (NULL == pVgCxt) {
@ -495,7 +495,7 @@ static int32_t createVgroupDataCxt(STableDataCxt* pTableCxt, SHashObj* pVgroupHa
return terrno; return terrno;
} }
pVgCxt->vgId = pTableCxt->pMeta->vgId; pVgCxt->vgId = vgId;
int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES); int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if (NULL == taosArrayPush(pVgroupList, &pVgCxt)) { if (NULL == taosArrayPush(pVgroupList, &pVgCxt)) {
@ -642,7 +642,7 @@ int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData,
if (NULL == pp) { if (NULL == pp) {
pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId)); pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
if (NULL == pp) { if (NULL == pp) {
code = createVgroupDataCxt(pTbCtx, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt); code = createVgroupDataCxt(vgId, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
} else { } else {
pVgCxt = *(SVgroupDataCxt**)pp; pVgCxt = *(SVgroupDataCxt**)pp;
} }
@ -777,7 +777,7 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool
int32_t vgId = pTableCxt->pMeta->vgId; int32_t vgId = pTableCxt->pMeta->vgId;
void** pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId)); void** pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
if (NULL == pp) { if (NULL == pp) {
code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt); code = createVgroupDataCxt(vgId, pVgroupHash, pVgroupList, &pVgCxt);
} else { } else {
pVgCxt = *(SVgroupDataCxt**)pp; pVgCxt = *(SVgroupDataCxt**)pp;
} }
@ -1074,3 +1074,33 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
end: end:
return ret; return ret;
} }
int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* pTableMeta, void* data) {
transformRawSSubmitTbData(data, pTableMeta->suid, pTableMeta->uid, pTableMeta->sversion);
SVgroupDataCxt* pVgCxt = NULL;
void** pp = taosHashGet(pVgroupHash, &pTableMeta->vgId, sizeof(pTableMeta->vgId));
if (NULL == pp) {
int code = createVgroupDataCxt(pTableMeta->vgId, pVgroupHash, pVgroupList, &pVgCxt);
if (code != 0){
return code;
}
} else {
pVgCxt = *(SVgroupDataCxt**)pp;
}
if (NULL == pVgCxt->pData->aSubmitTbData) {
pVgCxt->pData->aSubmitTbData = taosArrayInit(0, POINTER_BYTES);
pVgCxt->pData->raw = true;
if (NULL == pVgCxt->pData->aSubmitTbData) {
return terrno;
}
}
// push data to submit, rebuild empty data for next submit
if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, &data)) {
return terrno;
}
qDebug("add raw data to vgId:%d", pTableMeta->vgId);
return 0;
}

View File

@ -859,6 +859,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_STATUS, "Invalid status, please subscribe topic first") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_STATUS, "Invalid status, please subscribe topic first")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_DATA, "Invalid data use here")
// stream // stream
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")