From 7c5c9f624565946d1b08f893f05760b1d0da8207 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 22 Jan 2025 16:38:37 +0800 Subject: [PATCH] fix:[TS-5776]add raw type from consumer --- contrib/CMakeLists.txt | 1 + include/client/taos.h | 3 +- include/common/tcommon.h | 1 + include/common/tmsg.h | 9 +- include/libs/parser/parser.h | 3 + include/libs/qcom/query.h | 5 +- include/util/taoserror.h | 1 + source/client/inc/clientInt.h | 6 + source/client/src/clientMain.c | 39 ++++--- source/client/src/clientRawBlockWrite.c | 116 +++++++++++++++++++- source/client/src/clientTmq.c | 64 ++++++++--- source/common/src/msg/tmsg.c | 137 ++++++++++++++++++----- source/dnode/vnode/inc/vnode.h | 5 +- source/dnode/vnode/src/inc/tq.h | 3 +- source/dnode/vnode/src/tq/tqRead.c | 66 ++++++++++- source/dnode/vnode/src/tq/tqScan.c | 140 ++++++++++++------------ source/dnode/vnode/src/tq/tqUtil.c | 26 +++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- source/libs/executor/src/scanoperator.c | 2 +- source/libs/parser/src/parInsertSml.c | 15 +++ source/libs/parser/src/parInsertUtil.c | 38 ++++++- source/util/src/terror.c | 1 + 22 files changed, 521 insertions(+), 162 deletions(-) diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 2304ad54aa..6afcdd22f2 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -74,6 +74,7 @@ endif() # jemalloc if(${JEMALLOC_ENABLED}) cat("${TD_SUPPORT_DIR}/jemalloc_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) + MESSAGE("JEMALLOC_ENABLED is on") endif() # msvc regex diff --git a/include/client/taos.h b/include/client/taos.h index 17f97d3d3d..94a7884950 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -360,7 +360,8 @@ typedef enum tmq_res_t { TMQ_RES_INVALID = -1, TMQ_RES_DATA = 1, TMQ_RES_TABLE_META = 2, - TMQ_RES_METADATA = 3 + TMQ_RES_METADATA = 3, + TMQ_RES_RAWDATA = 4 } tmq_res_t; typedef struct tmq_topic_assignment { diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 0450766535..08fae0952c 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -121,6 +121,7 @@ enum { TMQ_MSG_TYPE__POLL_DATA_META_RSP, TMQ_MSG_TYPE__WALINFO_RSP, TMQ_MSG_TYPE__POLL_BATCH_META_RSP, + TMQ_MSG_TYPE__POLL_RAW_DATA_RSP, }; static char* tmqMsgTypeStr[] = { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index aebe09b563..d14044a8bf 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2311,6 +2311,10 @@ typedef struct SSysTableSchema { int32_t tSerializeSRetrieveTableReq(void* buf, int32_t bufLen, SRetrieveTableReq* pReq); int32_t tDeserializeSRetrieveTableReq(void* buf, int32_t bufLen, SRetrieveTableReq* pReq); +#define RETRIEVE_TABLE_RSP_VERSION 0 +#define RETRIEVE_TABLE_RSP_TMQ_VERSION 1 +#define RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION 2 + typedef struct { int64_t useconds; int8_t completed; // all results are returned to client @@ -4176,6 +4180,7 @@ typedef struct { STqOffsetVal reqOffset; int8_t enableReplay; int8_t sourceExcluded; + int8_t rawData; int8_t enableBatchMeta; } SMqPollReq; @@ -4506,6 +4511,7 @@ typedef struct { typedef struct { SArray* aSubmitTbData; // SArray + bool raw; } SSubmitReq2; typedef struct { @@ -4514,8 +4520,9 @@ typedef struct { char data[]; // SSubmitReq2 } SSubmitReq2Msg; +int32_t transformRawSSubmitTbData(void* data, int64_t suid, int64_t uid, int32_t sver); int32_t tEncodeSubmitReq(SEncoder* pCoder, const SSubmitReq2* pReq); -int32_t tDecodeSubmitReq(SDecoder* pCoder, SSubmitReq2* pReq); +int32_t tDecodeSubmitReq(SDecoder* pCoder, SSubmitReq2* pReq, SArray* rawList); void tDestroySubmitTbData(SSubmitTbData* pTbData, int32_t flag); void tDestroySubmitReq(SSubmitReq2* pReq, int32_t flag); diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 95f522f504..f4bf5fafd4 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -180,8 +180,11 @@ int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsS STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int32_t msgBufLen, void* charsetCxt); int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash); +int32_t smlBuildOutputRaw(SQuery* handle, SHashObj* pVgHash); +int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* pTableMeta, void* data); int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* fields, int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw); +int32_t checkSchema(SSchema* pColSchema, int8_t* fields, char* errstr, int32_t errstrLen); int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray); int32_t serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap, SArray** pOut); diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 5b28eadc4f..02cb96e4c7 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -222,7 +222,10 @@ typedef struct STableDataCxt { STSchema* pSchema; SBoundColInfo boundColsInfo; SArray* pValues; - SSubmitTbData* pData; + union { + SSubmitTbData* pData; + void* raw; + }; SRowKey lastKey; bool ordered; bool duplicateTs; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 464dffa937..8488e2800a 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -1016,6 +1016,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015) #define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016) #define TSDB_CODE_TMQ_INVALID_STATUS TAOS_DEF_ERROR_CODE(0, 0x4017) +#define TSDB_CODE_TMQ_INVALID_DATA TAOS_DEF_ERROR_CODE(0, 0x4018) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 2543a1f3ec..ae88883739 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -45,6 +45,7 @@ enum { RES_TYPE__TMQ_META, RES_TYPE__TMQ_METADATA, RES_TYPE__TMQ_BATCH_META, + RES_TYPE__TMQ_RAWDATA, }; #define SHOW_VARIABLES_RESULT_COLS 5 @@ -55,6 +56,7 @@ enum { #define SHOW_VARIABLES_RESULT_FIELD5_LEN (TSDB_CONFIG_INFO_LEN + VARSTR_HEADER_SIZE) #define TD_RES_QUERY(res) (*(int8_t*)(res) == RES_TYPE__QUERY) +#define TD_RES_TMQ_RAW(res) (*(int8_t*)(res) == RES_TYPE__TMQ_RAWDATA) #define TD_RES_TMQ(res) (*(int8_t*)(res) == RES_TYPE__TMQ) #define TD_RES_TMQ_META(res) (*(int8_t*)(res) == RES_TYPE__TMQ_META) #define TD_RES_TMQ_METADATA(res) (*(int8_t*)(res) == RES_TYPE__TMQ_METADATA) @@ -251,6 +253,10 @@ typedef struct { SMqDataRsp dataRsp; SMqMetaRsp metaRsp; SMqBatchMetaRsp batchMetaRsp; + struct{ + int32_t len; + void* rawData; + }; }; } SMqRspObj; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 83aff351dd..eb1bffeee3 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -253,7 +253,7 @@ void taos_cleanup(void) { taosCloseRef(id); nodesDestroyAllocatorSet(); - // cleanupAppInfo(); + cleanupAppInfo(); rpcCleanup(); tscDebug("rpc cleanup"); @@ -502,7 +502,7 @@ void taos_close(TAOS *taos) { } int taos_errno(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { + if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return terrno; } @@ -514,7 +514,7 @@ int taos_errno(TAOS_RES *res) { } const char *taos_errstr(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { + if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return (const char *)tstrerror(terrno); } @@ -554,6 +554,9 @@ void taos_free_result(TAOS_RES *res) { tDeleteMqMetaRsp(&pRsp->metaRsp); } else if (TD_RES_TMQ_BATCH_META(res)) { tDeleteMqBatchMetaRsp(&pRsp->batchMetaRsp); + } else if (TD_RES_TMQ_RAW(res)) { + taosMemoryFree(pRsp->rawData); + doFreeReqResultInfo(&pRsp->resInfo); } taosMemoryFree(pRsp); } @@ -572,7 +575,7 @@ void taos_kill_query(TAOS *taos) { } int taos_field_count(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { + if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return 0; } @@ -583,7 +586,7 @@ int taos_field_count(TAOS_RES *res) { int taos_num_fields(TAOS_RES *res) { return taos_field_count(res); } TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { - if (taos_num_fields(res) == 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { + if (taos_num_fields(res) == 0 || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return NULL; } @@ -643,7 +646,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { return NULL; } else { tscError("invalid result passed to taos_fetch_row"); - terrno = TSDB_CODE_TSC_INTERNAL_ERROR; + terrno = TSDB_CODE_TMQ_INVALID_DATA; return NULL; } } @@ -764,7 +767,7 @@ int taos_print_row_with_size(char *str, uint32_t size, TAOS_ROW row, TAOS_FIELD } int *taos_fetch_lengths(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { + if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return NULL; } @@ -773,7 +776,7 @@ int *taos_fetch_lengths(TAOS_RES *res) { } TAOS_ROW *taos_result_block(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { + if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { terrno = TSDB_CODE_INVALID_PARA; return NULL; } @@ -841,7 +844,7 @@ const char *taos_get_client_info() { return td_version; } // return int32_t int taos_affected_rows(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || + if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) { return 0; } @@ -853,7 +856,7 @@ int taos_affected_rows(TAOS_RES *res) { // return int64_t int64_t taos_affected_rows64(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || + if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) { return 0; } @@ -864,7 +867,7 @@ int64_t taos_affected_rows64(TAOS_RES *res) { } int taos_result_precision(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { + if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return TSDB_TIME_PRECISION_MILLI; } @@ -904,7 +907,7 @@ int taos_select_db(TAOS *taos, const char *db) { } void taos_stop_query(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || + if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) { return; } @@ -913,7 +916,7 @@ void taos_stop_query(TAOS_RES *res) { } bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { - if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { + if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return true; } SReqResultInfo *pResultInfo = tscGetCurResInfo(res); @@ -938,7 +941,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { } int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { - if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { + if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return 0; } @@ -973,7 +976,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { return 0; } else { tscError("taos_fetch_block_s invalid res type"); - return -1; + return TSDB_CODE_TMQ_INVALID_DATA; } } @@ -981,7 +984,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { *numOfRows = 0; *pData = NULL; - if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { + if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return 0; } @@ -1018,7 +1021,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { } int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { - if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { + if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return 0; } @@ -1038,7 +1041,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows) { if (res == NULL || result == NULL || rows == NULL || *rows <= 0 || columnIndex < 0 || TD_RES_TMQ_META(res) || - TD_RES_TMQ_BATCH_META(res)) { + TD_RES_TMQ_RAW(res) || TD_RES_TMQ_BATCH_META(res)) { return TSDB_CODE_INVALID_PARA; } diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index c200d38a56..689adf51e7 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1844,9 +1844,10 @@ static void* getRawDataFromRes(void* pRetrieve) { } void* rawData = NULL; // deal with compatibility - if (*(int64_t*)pRetrieve == 0) { + if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_VERSION) { rawData = ((SRetrieveTableRsp*)pRetrieve)->data; - } else if (*(int64_t*)pRetrieve == 1) { + } else if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_VERSION || + *(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION) { rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data; } return rawData; @@ -1947,7 +1948,10 @@ static int32_t initRawCacheHash() { } static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) { - if (rawData == NULL || pTableMeta == NULL || pSW == NULL) { + if (rawData == NULL){ + return false; + } + if (pTableMeta == NULL || pSW == NULL) { uError("invalid parameter in %s", __func__); return false; } @@ -1965,6 +1969,7 @@ static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrappe if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) { return true; } + for (int i = 0; i < pSW->nCols; i++) { int j = 0; for (; j < pTableMeta->tableInfo.numOfColumns; j++) { @@ -1972,7 +1977,7 @@ static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrappe char* fieldName = pSW->pSchema[i].name; if (strcmp(pColSchema->name, fieldName) == 0) { - if (*fields != pColSchema->type || *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) { + if (checkSchema(pColSchema, fields, NULL, 0) != 0){ return true; } break; @@ -2069,7 +2074,7 @@ static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName, STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) { if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL || - pMeta == NULL || pSW == NULL || rawData == NULL) { + pMeta == NULL || pSW == NULL) { uError("invalid parameter in %s", __func__); return TSDB_CODE_INVALID_PARA; } @@ -2298,6 +2303,96 @@ end: return code; } +static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) { + if (taos == NULL || data == NULL) { + uError("invalid parameter in %s", __func__); + return TSDB_CODE_INVALID_PARA; + } + int32_t code = TSDB_CODE_SUCCESS; + SQuery* pQuery = NULL; + SHashObj* pVgroupHash = NULL; + SMqRspObj rspObj = {0}; + SDecoder decoder = {0}; + + SRequestObj* pRequest = NULL; + SCatalog* pCatalog = NULL; + SRequestConnInfo conn = {0}; + + RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn)); + uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen); + RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeSTaosxRsp, &rspObj)); + + SHashObj* pVgHash = NULL; + SHashObj* pNameHash = NULL; + SHashObj* pMetaHash = NULL; + RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos)); + int retry = 0; + while (1) { + RAW_RETURN_CHECK(smlInitHandle(&pQuery)); + uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum); + SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(pQuery)->pRoot; + pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); + RAW_NULL_CHECK(pVgroupHash); + pStmt->pVgDataBlocks = taosArrayInit(8, POINTER_BYTES); + RAW_NULL_CHECK(pStmt->pVgDataBlocks); + + while (++rspObj.resIter < rspObj.dataRsp.blockNum) { + if (!rspObj.dataRsp.withSchema) { + goto end; + } + + const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter); + RAW_NULL_CHECK(tbName); + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter); + RAW_NULL_CHECK(pSW); + void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter); + RAW_NULL_CHECK(pRetrieve); + void* rawData = getRawDataFromRes(pRetrieve); + RAW_NULL_CHECK(rawData); + + uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName); + SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; + tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN); + tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN); + + // find schema data info + STableMeta* pTableMeta = NULL; + RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, + &pTableMeta, pSW, NULL, retry)); + char err[ERR_MSG_LEN] = {0}; + code = rawBlockBindRawData(pVgroupHash, pStmt->pVgDataBlocks, pTableMeta, rawData); + if (code != TSDB_CODE_SUCCESS) { + SET_ERROR_MSG("table:%s, err:%s", pName.tname, err); + goto end; + } + } + taosHashCleanup(pVgroupHash); + pVgroupHash = NULL; + + RAW_RETURN_CHECK(smlBuildOutputRaw(pQuery, pVgHash)); + launchQueryImpl(pRequest, pQuery, true, NULL); + code = pRequest->code; + + if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) { + uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code)); + qDestroyQuery(pQuery); + pQuery = NULL; + rspObj.resIter = -1; + continue; + } + break; + } + + end: + uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code)); + tDeleteSTaosxRsp(&rspObj.dataRsp); + tDecoderClear(&decoder); + qDestroyQuery(pQuery); + taosHashCleanup(pVgroupHash); + destroyRequest(pRequest); + return code; +} + static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) { if (pMetaRsp == NULL || meta == NULL) { uError("invalid parameter in %s", __func__); @@ -2499,6 +2594,11 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { raw->raw_len = rspObj->batchMetaRsp.metaBuffLen; raw->raw_type = rspObj->resType; uDebug("tmq get raw batch meta:%p", raw); + } else if (TD_RES_TMQ_RAW(res)) { + raw->raw = rspObj->rawData; + raw->raw_len = rspObj->len; + raw->raw_type = rspObj->resType; + uDebug("tmq get raw raw:%p", raw); } else { uError("tmq get raw error type:%d", *(int8_t*)res); return TSDB_CODE_TMQ_INVALID_MSG; @@ -2508,7 +2608,9 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { void tmq_free_raw(tmq_raw_data raw) { uDebug("tmq free raw data type:%d", raw.raw_type); - if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) { + if (raw.raw_type == RES_TYPE__TMQ || + raw.raw_type == RES_TYPE__TMQ_RAWDATA || + raw.raw_type == RES_TYPE__TMQ_METADATA) { taosMemoryFree(raw.raw); } (void)memset(terrMsg, 0, ERR_MSG_LEN); @@ -2559,6 +2661,8 @@ static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) return taosDeleteData(taos, buf, len); } else if (type == RES_TYPE__TMQ_METADATA) { return tmqWriteRawMetaDataImpl(taos, buf, len); + } else if (type == RES_TYPE__TMQ_RAWDATA) { + return tmqWriteRawRawDataImpl(taos, buf, len); } else if (type == RES_TYPE__TMQ) { return tmqWriteRawDataImpl(taos, buf, len); } else if (type == RES_TYPE__TMQ_BATCH_META) { diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index f4426fc94a..e611e7e417 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -91,6 +91,7 @@ struct tmq_conf_t { int8_t snapEnable; int8_t replayEnable; int8_t sourceExcluded; // do not consume, bit + int8_t rawData; // fetch raw data uint16_t port; int32_t autoCommitInterval; int32_t sessionTimeoutMs; @@ -120,6 +121,7 @@ struct tmq_t { int8_t resetOffsetCfg; int8_t replayEnable; int8_t sourceExcluded; // do not consume, bit + int8_t rawData; // fetch raw data int64_t consumerId; tmq_commit_cb* commitCb; void* commitCbUserParam; @@ -195,6 +197,10 @@ typedef struct { SMqDataRsp dataRsp; SMqMetaRsp metaRsp; SMqBatchMetaRsp batchMetaRsp; + struct{ + int32_t len; + void* rawData; + }; }; } SMqPollRspWrapper; @@ -490,11 +496,17 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } } if (strcasecmp(key, "msg.consume.excluded") == 0) { - int64_t tmp; + int64_t tmp = 0; code = taosStr2int64(value, &tmp); conf->sourceExcluded = (0 == code && tmp != 0) ? TD_REQ_FROM_TAOX : 0; return TMQ_CONF_OK; } + if (strcasecmp(key, "msg.consume.rawdata") == 0) { + int64_t tmp = 0; + code = taosStr2int64(value, &tmp); + conf->rawData = (0 == code && tmp != 0) ? 1 : 0; + return TMQ_CONF_OK; + } if (strcasecmp(key, "td.connect.db") == 0) { return TMQ_CONF_OK; @@ -811,7 +823,7 @@ static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_c goto end; } - if (TD_RES_TMQ(pRes) || TD_RES_TMQ_META(pRes) || + if (TD_RES_TMQ(pRes) || TD_RES_TMQ_RAW(pRes) || TD_RES_TMQ_META(pRes) || TD_RES_TMQ_METADATA(pRes) || TD_RES_TMQ_BATCH_META(pRes)) { SMqRspObj* pRspObj = (SMqRspObj*)pRes; pTopicName = pRspObj->topic; @@ -1081,6 +1093,7 @@ void tmqSendHbReq(void* param, void* tmrId) { sendInfo->fp = tmqHbCb; sendInfo->msgType = TDMT_MND_TMQ_HB; + SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo); @@ -1121,6 +1134,10 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { DELETE_POLL_RSP(tDeleteMqMetaRsp,&pRsp->metaRsp) } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { DELETE_POLL_RSP(tDeleteMqBatchMetaRsp,&pRsp->batchMetaRsp) + } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) { + SMqPollRspWrapper* pRsp = &rspWrapper->pollRsp; + taosMemoryFreeClear(pRsp->pEpset); + taosMemoryFreeClear(pRsp->rawData); } } @@ -1709,6 +1726,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->resetOffsetCfg = conf->resetOffset; pTmq->replayEnable = conf->replayEnable; pTmq->sourceExcluded = conf->sourceExcluded; + pTmq->rawData = conf->rawData; pTmq->enableBatchMeta = conf->enableBatchMeta; tstrncpy(pTmq->user, user, TSDB_USER_LEN); if (taosGetFqdn(pTmq->fqdn) != 0) { @@ -2042,6 +2060,10 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp) } else if (rspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp) + } else if (rspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) { + pRspWrapper->pollRsp.len = pMsg->len; + pRspWrapper->pollRsp.rawData = pMsg->pData; + pMsg->pData = NULL; } else { // invalid rspType tqErrorC("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType); code = TSDB_CODE_TSC_INTERNAL_ERROR; @@ -2098,6 +2120,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl pReq->reqId = generateRequestId(); pReq->enableReplay = tmq->replayEnable; pReq->sourceExcluded = tmq->sourceExcluded; + pReq->rawData = tmq->rawData; pReq->enableBatchMeta = tmq->enableBatchMeta; } @@ -2346,6 +2369,10 @@ static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){ SMqDataRsp dataRsp; SMqMetaRsp metaRsp; SMqBatchMetaRsp batchMetaRsp; + struct{ + int32_t len; + void* rawData; + }; } MEMSIZE; SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); @@ -2418,13 +2445,16 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ pVg->epSet = *pollRspWrapper->pEpset; } - if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { + if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP || + pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP || + pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) { updateVgInfo(pVg, &pollRspWrapper->dataRsp.reqOffset, &pollRspWrapper->dataRsp.rspOffset, pollRspWrapper->head.walsver, pollRspWrapper->head.walever, tmq->consumerId, pollRspWrapper->dataRsp.blockNum != 0); char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &pollRspWrapper->rspOffset); - if (pollRspWrapper->dataRsp.blockNum == 0) { + if (pollRspWrapper->dataRsp.blockNum == 0 && + pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) { tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64 ", total:%" PRId64 ",QID:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); @@ -2435,12 +2465,18 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId); goto END; } - pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ? RES_TYPE__TMQ : RES_TYPE__TMQ_METADATA; + pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP ? RES_TYPE__TMQ_RAWDATA : + (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ? RES_TYPE__TMQ : RES_TYPE__TMQ_METADATA); int64_t numOfRows = 0; - tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj); - tmq->totalRows += numOfRows; + if (pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){ + tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj); + tmq->totalRows += numOfRows; + } else { + pRspObj->rawData = pollRspWrapper->rawData; + pRspObj->len = pollRspWrapper->len; + } pVg->emptyBlockReceiveTs = 0; - if (tmq->replayEnable) { + if (tmq->replayEnable && pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) { pVg->blockReceiveTs = taosGetTimestampMs(); pVg->blockSleepForReplay = pRspObj->dataRsp.sleepTime; if (pVg->blockSleepForReplay > 0) { @@ -2656,6 +2692,8 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) { return TMQ_RES_METADATA; } else if (TD_RES_TMQ_BATCH_META(res)) { return TMQ_RES_TABLE_META; + } else if (TD_RES_TMQ_RAW(res)) { + return TMQ_RES_RAWDATA; } else { return TMQ_RES_INVALID; } @@ -2665,7 +2703,7 @@ const char* tmq_get_topic_name(TAOS_RES* res) { if (res == NULL) { return NULL; } - if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || + if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { char* tmp = strchr(((SMqRspObj*)res)->topic, '.'); if (tmp == NULL) { @@ -2682,7 +2720,7 @@ const char* tmq_get_db_name(TAOS_RES* res) { return NULL; } - if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || + if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) { char* tmp = strchr(((SMqRspObj*)res)->db, '.'); if (tmp == NULL) { @@ -2698,7 +2736,7 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { if (res == NULL) { return TSDB_CODE_INVALID_PARA; } - if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || + if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) { return ((SMqRspObj*)res)->vgId; } else { @@ -2710,7 +2748,7 @@ int64_t tmq_get_vgroup_offset(TAOS_RES* res) { if (res == NULL) { return TSDB_CODE_INVALID_PARA; } - if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { + if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { SMqRspObj* pRspObj = (SMqRspObj*)res; STqOffsetVal* pOffset = &pRspObj->dataRsp.reqOffset; if (pOffset->type == TMQ_OFFSET__LOG) { @@ -2735,7 +2773,7 @@ const char* tmq_get_table_name(TAOS_RES* res) { if (res == NULL) { return NULL; } - if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { + if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ) { SMqRspObj* pRspObj = (SMqRspObj*)res; SMqDataRsp* data = &pRspObj->dataRsp; if (!data->withTbName || data->blockTbName == NULL || pRspObj->resIter < 0 || diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index a3989012f6..1b6c7add3f 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -9205,6 +9205,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->enableReplay)); TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->sourceExcluded)); TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->enableBatchMeta)); + TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->rawData)); tEndEncode(&encoder); @@ -9258,6 +9259,10 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { pReq->enableBatchMeta = false; } + if (!tDecodeIsEnd(&decoder)) { + TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->rawData)); + } + tEndDecode(&decoder); _exit: @@ -11609,6 +11614,26 @@ int32_t tDecodeSBatchDeleteReqSetCtime(SDecoder *pDecoder, SBatchDeleteReq *pReq _exit: return code; } +int32_t transformRawSSubmitTbData(void* data, int64_t suid, int64_t uid, int32_t sver){ + int32_t code = 0; + int32_t lino = 0; + SDecoder decoder = {0}; + tDecoderInit(&decoder, (uint8_t *)POINTER_SHIFT(data, INT_BYTES), *(uint32_t*)data); + + int32_t flags = 0; + TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &flags)); + flags |= TD_REQ_FROM_TAOX; + flags &= ~SUBMIT_REQ_AUTO_CREATE_TABLE; + + SEncoder encoder = {0}; + tEncoderInit(&encoder, (uint8_t *)POINTER_SHIFT(data, INT_BYTES), *(uint32_t*)data); + TAOS_CHECK_EXIT(tEncodeI32v(&encoder, flags)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, suid)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, uid)); + TAOS_CHECK_EXIT(tEncodeI32v(&encoder, sver)); + _exit: + return code; +} static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubmitTbData) { int32_t code = 0; @@ -11656,14 +11681,21 @@ _exit: return code; } -static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbData) { +static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbData, void* rawData) { int32_t code = 0; int32_t lino; int32_t flags; uint8_t version; + uint8_t* dataAfterCreate = NULL; + uint8_t* dataStart = pCoder->data; + uint32_t posStart = pCoder->pos; + uint32_t posAfterCreate = 0; + TAOS_CHECK_EXIT(tStartDecode(pCoder)); + uint32_t pos = pCoder->pos; TAOS_CHECK_EXIT(tDecodeI32v(pCoder, &flags)); + uint32_t flagsLen = pCoder->pos - posStart; pSubmitTbData->flags = flags & 0xff; version = (flags >> 8) & 0xff; @@ -11675,6 +11707,8 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa } TAOS_CHECK_EXIT(tDecodeSVCreateTbReq(pCoder, pSubmitTbData->pCreateTbReq)); + dataAfterCreate = pCoder->data; + posAfterCreate = pCoder->pos; } // submit data @@ -11682,35 +11716,56 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa TAOS_CHECK_EXIT(tDecodeI64(pCoder, &pSubmitTbData->uid)); TAOS_CHECK_EXIT(tDecodeI32v(pCoder, &pSubmitTbData->sver)); - if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { - uint64_t nColData; + if (rawData != NULL){ // no need to decode data + if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { + uint64_t nColData = 0; - TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nColData)); + TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nColData)); - pSubmitTbData->aCol = taosArrayInit(nColData, sizeof(SColData)); - if (pSubmitTbData->aCol == NULL) { - TAOS_CHECK_EXIT(terrno); - } + for (int32_t i = 0; i < nColData; ++i) { + SColData pColData = {0}; + TAOS_CHECK_EXIT(tDecodeColData(version, pCoder, &pColData)); + } + } else { + uint64_t nRow = 0; + TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nRow)); - for (int32_t i = 0; i < nColData; ++i) { - TAOS_CHECK_EXIT(tDecodeColData(version, pCoder, taosArrayReserve(pSubmitTbData->aCol, 1))); + for (int32_t iRow = 0; iRow < nRow; ++iRow) { + SRow *ppRow = NULL; + TAOS_CHECK_EXIT(tDecodeRow(pCoder, &ppRow)); + } } } else { - uint64_t nRow; - TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nRow)); + if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { + uint64_t nColData = 0; - pSubmitTbData->aRowP = taosArrayInit(nRow, sizeof(SRow *)); - if (pSubmitTbData->aRowP == NULL) { - TAOS_CHECK_EXIT(terrno); - } + TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nColData)); - for (int32_t iRow = 0; iRow < nRow; ++iRow) { - SRow **ppRow = taosArrayReserve(pSubmitTbData->aRowP, 1); - if (ppRow == NULL) { + pSubmitTbData->aCol = taosArrayInit(nColData, sizeof(SColData)); + if (pSubmitTbData->aCol == NULL) { TAOS_CHECK_EXIT(terrno); } - TAOS_CHECK_EXIT(tDecodeRow(pCoder, ppRow)); + for (int32_t i = 0; i < nColData; ++i) { + TAOS_CHECK_EXIT(tDecodeColData(version, pCoder, taosArrayReserve(pSubmitTbData->aCol, 1))); + } + } else { + uint64_t nRow = 0; + TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nRow)); + + pSubmitTbData->aRowP = taosArrayInit(nRow, sizeof(SRow *)); + if (pSubmitTbData->aRowP == NULL) { + TAOS_CHECK_EXIT(terrno); + } + + for (int32_t iRow = 0; iRow < nRow; ++iRow) { + SRow **ppRow = taosArrayReserve(pSubmitTbData->aRowP, 1); + if (ppRow == NULL) { + TAOS_CHECK_EXIT(terrno); + } + + TAOS_CHECK_EXIT(tDecodeRow(pCoder, ppRow)); + } } } @@ -11720,6 +11775,16 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa } tEndDecode(pCoder); + if (rawData != NULL){ + if (dataAfterCreate != NULL){ + TAOS_MEMCPY(dataAfterCreate - INT_BYTES - flagsLen, dataStart, INT_BYTES + flagsLen); + *(int32_t*)(dataAfterCreate - INT_BYTES - flagsLen) = pCoder->pos - posAfterCreate + flagsLen; + *(void**)rawData = dataAfterCreate - INT_BYTES - flagsLen; + }else{ + *(void**)rawData = dataStart; + } + } + _exit: return code; @@ -11731,15 +11796,27 @@ int32_t tEncodeSubmitReq(SEncoder *pCoder, const SSubmitReq2 *pReq) { TAOS_CHECK_EXIT(tStartEncode(pCoder)); TAOS_CHECK_EXIT(tEncodeU64v(pCoder, taosArrayGetSize(pReq->aSubmitTbData))); - for (uint64_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); i++) { - TAOS_CHECK_EXIT(tEncodeSSubmitTbData(pCoder, taosArrayGet(pReq->aSubmitTbData, i))); + if (pReq->raw){ + for (uint64_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); i++) { + void* data = taosArrayGetP(pReq->aSubmitTbData, i); + if (pCoder->data != NULL){ + TAOS_MEMCPY(pCoder->data + pCoder->pos, data, *(uint32_t*)data + INT_BYTES); + + } + pCoder->pos += *(uint32_t*)data + INT_BYTES; + } + } else{ + for (uint64_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); i++) { + TAOS_CHECK_EXIT(tEncodeSSubmitTbData(pCoder, taosArrayGet(pReq->aSubmitTbData, i))); + } } + tEndEncode(pCoder); _exit: return code; } -int32_t tDecodeSubmitReq(SDecoder *pCoder, SSubmitReq2 *pReq) { +int32_t tDecodeSubmitReq(SDecoder *pCoder, SSubmitReq2 *pReq, SArray* rawList) { int32_t code = 0; memset(pReq, 0, sizeof(*pReq)); @@ -11763,7 +11840,8 @@ int32_t tDecodeSubmitReq(SDecoder *pCoder, SSubmitReq2 *pReq) { } for (uint64_t i = 0; i < nSubmitTbData; i++) { - if (tDecodeSSubmitTbData(pCoder, taosArrayReserve(pReq->aSubmitTbData, 1)) < 0) { + if (tDecodeSSubmitTbData(pCoder, taosArrayReserve(pReq->aSubmitTbData, 1), + rawList != NULL ? taosArrayReserve(rawList, 1) : NULL) < 0) { code = TSDB_CODE_INVALID_MSG; goto _exit; } @@ -11834,12 +11912,15 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) { void tDestroySubmitReq(SSubmitReq2 *pReq, int32_t flag) { if (pReq->aSubmitTbData == NULL) return; - int32_t nSubmitTbData = TARRAY_SIZE(pReq->aSubmitTbData); - SSubmitTbData *aSubmitTbData = (SSubmitTbData *)TARRAY_DATA(pReq->aSubmitTbData); + if (!pReq->raw){ + int32_t nSubmitTbData = TARRAY_SIZE(pReq->aSubmitTbData); + SSubmitTbData *aSubmitTbData = (SSubmitTbData *)TARRAY_DATA(pReq->aSubmitTbData); - for (int32_t i = 0; i < nSubmitTbData; i++) { - tDestroySubmitTbData(&aSubmitTbData[i], flag); + for (int32_t i = 0; i < nSubmitTbData; i++) { + tDestroySubmitTbData(&aSubmitTbData[i], flag); + } } + taosArrayDestroy(pReq->aSubmitTbData); pReq->aSubmitTbData = NULL; } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index b33bdb0976..5a6489d182 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -241,11 +241,10 @@ SSDataBlock *tqGetResultBlock(STqReader *pReader); int64_t tqGetResultBlockTime(STqReader *pReader); int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, const char *id); -int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); +int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver, SArray* rawList); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr); -int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet, - int64_t *createTime); +int32_t tqRetrieveTaosxBlock(STqReader *pReader, SMqDataRsp* pRsp, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta); int32_t tqGetStreamExecInfo(SVnode *pVnode, int64_t streamId, int64_t *pDelay, bool *fhFinished); // sma diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 12a803d1d8..57e238b499 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -117,7 +117,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId); // tqExec -int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded); +int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, const SMqPollReq* pRequest); int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type, int32_t vgId); void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId); @@ -178,6 +178,7 @@ int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, vo #define TQ_SUBSCRIBE_NAME "subscribe" #define TQ_OFFSET_NAME "offset-ver0" #define TQ_POLL_MAX_TIME 1000 +#define TQ_POLL_MAX_BYTES 1048576 #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 937ca80bcc..6452f7bb5c 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -352,7 +352,7 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) { return TSDB_CODE_INVALID_PARA; } if (walReaderSeekVer(pReader->pWalReader, ver) < 0) { - return -1; + return terrno; } tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id); return 0; @@ -485,14 +485,14 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) { void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); int64_t ver = pWalReader->pHead->head.version; - if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver) != 0) { + if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver, NULL) != 0) { return false; } pReader->nextBlk = 0; } } -int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) { +int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver, SArray* rawList) { if (pReader == NULL) { return TSDB_CODE_INVALID_PARA; } @@ -504,7 +504,7 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i SDecoder decoder = {0}; tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen); - int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit); + int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit, rawList); tDecoderClear(&decoder); if (code != 0) { @@ -1046,7 +1046,45 @@ END: return code; } -int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, int64_t *createTime) { +static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){ + int32_t code = 0; + int32_t lino = 0; + void* createReq = NULL; + TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA); + + if (pRsp->createTableNum == 0) { + pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); + TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno); + pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); + TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno); + } + + uint32_t len = 0; + tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code); + TSDB_CHECK_CODE(code, lino, END); + createReq = taosMemoryCalloc(1, len); + TSDB_CHECK_NULL(createReq, code, lino, END, terrno); + + SEncoder encoder = {0}; + tEncoderInit(&encoder, createReq, len); + code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq); + tEncoderClear(&encoder); + TSDB_CHECK_CODE(code, lino, END); + TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno); + TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno); + pRsp->createTableNum++; + tqDebug("build create table info msg success"); + + END: + if (code != 0){ + tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code)); + taosMemoryFree(createReq); + } + return code; +} + +int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta) { tqDebug("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); if (pSubmitTbData == NULL) { @@ -1062,8 +1100,9 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas int64_t uid = pSubmitTbData->uid; pReader->lastBlkUid = uid; + int64_t createTime = INT64_MAX; tDeleteSchemaWrapper(pReader->pSchemaWrapper); - pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, createTime); + pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &createTime); if (pReader->pSchemaWrapper == NULL) { tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer); @@ -1071,6 +1110,21 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; } + if (fetchMeta != WITH_DATA && + pSubmitTbData->pCreateTbReq != NULL && + pSubmitTbData->ctimeMs - createTime <= 0) { // judge if table is already created to avoid sending crateTbReq + int32_t code = buildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq); + if (code != 0) { + return code; + } + } else if (rawList != NULL){ + if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL){ + return terrno; + } + pReader->pSchemaWrapper = NULL; + return 0; + } + if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { return tqProcessColData(pReader, pSubmitTbData, blocks, schemas); } else { diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index a65b118aea..9501ca4099 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -14,6 +14,32 @@ */ #include "tq.h" +static int32_t tqAddRawDataToRsp(const void* rawData, SMqDataRsp* pRsp, int8_t precision) { + int32_t code = TDB_CODE_SUCCESS; + int32_t lino = 0; + void* buf = NULL; + + int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + *(uint32_t *)rawData + INT_BYTES; + buf = taosMemoryCalloc(1, dataStrLen); + TSDB_CHECK_NULL(buf, code, lino, END, terrno); + + SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf; + pRetrieve->version = RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION; + pRetrieve->precision = precision; + pRetrieve->compressed = 0; + + memcpy(pRetrieve->data, rawData, *(uint32_t *)rawData + INT_BYTES); + TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &dataStrLen), code, lino, END, terrno); + TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno); + + tqDebug("add block data to block array, blockDataLen:%d, blockData:%p", dataStrLen, buf); + END: + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(buf); + tqError("%s failed at %d, failed to add block data to response:%s", __FUNCTION__, lino, tstrerror(code)); + } + return code; +} static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) { int32_t code = 0; @@ -25,7 +51,7 @@ static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, TSDB_CHECK_NULL(buf, code, lino, END, terrno); SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf; - pRetrieve->version = 1; + pRetrieve->version = RETRIEVE_TABLE_RSP_TMQ_VERSION; pRetrieve->precision = precision; pRetrieve->compressed = 0; pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows); @@ -290,45 +316,8 @@ END: return code; } -static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){ - int32_t code = 0; - int32_t lino = 0; - void* createReq = NULL; - TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA); - TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA); - - if (pRsp->createTableNum == 0) { - pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); - TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno); - pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); - TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno); - } - - uint32_t len = 0; - tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code); - TSDB_CHECK_CODE(code, lino, END); - createReq = taosMemoryCalloc(1, len); - TSDB_CHECK_NULL(createReq, code, lino, END, terrno); - - SEncoder encoder = {0}; - tEncoderInit(&encoder, createReq, len); - code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq); - tEncoderClear(&encoder); - TSDB_CHECK_CODE(code, lino, END); - TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno); - TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno); - pRsp->createTableNum++; - tqDebug("build create table info msg success"); - -END: - if (code != 0){ - tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code)); - taosMemoryFree(createReq); - } - return code; -} - -static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){ +static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, + const SMqPollReq* pRequest, SArray* rawList){ int32_t code = 0; int32_t lino = 0; SArray* pBlocks = NULL; @@ -342,36 +331,46 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int pSchemas = taosArrayInit(0, sizeof(void*)); TSDB_CHECK_NULL(pSchemas, code, lino, END, terrno); - SSubmitTbData* pSubmitTbDataRet = NULL; - int64_t createTime = INT64_MAX; - code = tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet, &createTime); + SSubmitTbData* pSubmitTbData = NULL; + code = tqRetrieveTaosxBlock(pReader, pRsp, pBlocks, pSchemas, &pSubmitTbData, rawList, pHandle->fetchMeta); TSDB_CHECK_CODE(code, lino, END); - bool tmp = (pSubmitTbDataRet->flags & sourceExcluded) != 0; + bool tmp = (pSubmitTbData->flags & pRequest->sourceExcluded) != 0; TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS); + int32_t blockNum = taosArrayGetSize(pBlocks) == 0 ? 1 : taosArrayGetSize(pBlocks); + if (pRsp->withTbName) { int64_t uid = pExec->pTqReader->lastBlkUid; - code = tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)); + code = tqAddTbNameToRsp(pTq, uid, pRsp, blockNum); TSDB_CHECK_CODE(code, lino, END); } - if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) { - if (pSubmitTbDataRet->ctimeMs - createTime <= 1000) { // judge if table is already created to avoid sending crateTbReq - code = buildCreateTbInfo(pRsp, pSubmitTbDataRet->pCreateTbReq); - TSDB_CHECK_CODE(code, lino, END); - } - } - tmp = (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL); + + tmp = (pHandle->fetchMeta == ONLY_META && pSubmitTbData->pCreateTbReq == NULL); TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS); - for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { - SSDataBlock* pBlock = taosArrayGet(pBlocks, i); - if (pBlock == NULL) { - continue; + for (int32_t i = 0; i < blockNum; i++) { + if (taosArrayGetSize(pBlocks) == 0){ + void* rawData = taosArrayGetP(rawList, pReader->nextBlk - 1); + if (rawData == NULL) { + continue; + } + if (tqAddRawDataToRsp(rawData, pRsp, pTq->pVnode->config.tsdbCfg.precision) != 0){ + tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId); + continue; + } + *totalRows += *(uint32_t *)rawData + INT_BYTES; // bytes actually + } else { + SSDataBlock* pBlock = taosArrayGet(pBlocks, i); + if (pBlock == NULL) { + continue; + } + + if (tqAddBlockDataToRsp(pBlock, pRsp, taosArrayGetSize(pBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision) != 0){ + tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId); + continue; + } + *totalRows += pBlock->info.rows; + blockDataFreeRes(pBlock); } - if (tqAddBlockDataToRsp(pBlock, pRsp, taosArrayGetSize(pBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision) != 0){ - tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId); - continue; - } - *totalRows += pBlock->info.rows; - blockDataFreeRes(pBlock); + SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i); if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL){ tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId); @@ -391,29 +390,30 @@ END: } } -int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded) { +int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, const SMqPollReq* pRequest) { int32_t code = 0; int32_t lino = 0; - TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA); - TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA); - TSDB_CHECK_NULL(pHandle, code, lino, END, TSDB_CODE_INVALID_PARA); - TSDB_CHECK_NULL(totalRows, code, lino, END, TSDB_CODE_INVALID_PARA); STqExecHandle* pExec = &pHandle->execHandle; STqReader* pReader = pExec->pTqReader; - code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver); + SArray *rawList = NULL; + if (pRequest->rawData){ + rawList = taosArrayInit(0, POINTER_BYTES); + } + code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver, rawList); TSDB_CHECK_CODE(code, lino, END); if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { while (tqNextBlockImpl(pReader, NULL)) { - tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded); + tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList); } } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { - tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded); + tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList); } } END: + taosArrayDestroy(rawList); if (code != 0){ tqError("%s failed at %d, failed to scan log:%s", __FUNCTION__, lino, tstrerror(code)); } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 197a45cdb9..d619e0534d 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -131,7 +131,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand } tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId, pHandle->subKey, vgId, dataRsp.rspOffset.version); - code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); + code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, (pRequest->rawData == 1) ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); tDeleteMqDataRsp(&dataRsp); *pBlockReturned = true; @@ -222,6 +222,10 @@ end: static void tDeleteCommon(void* parm) {} +#define POLL_RSP_TYPE(pRequest,taosxRsp) \ +taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : \ +(pRequest->rawData == 1 ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP) + static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* offset) { int32_t vgId = TD_VID(pTq->pVnode); @@ -274,7 +278,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, - taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); + POLL_RSP_TYPE(pRequest, taosxRsp), vgId); goto END; } @@ -287,7 +291,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (totalRows > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, - taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); + POLL_RSP_TYPE(pRequest, taosxRsp), vgId); goto END; } @@ -370,12 +374,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, .ver = pHead->version, }; - TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest->sourceExcluded)); + TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest)); - if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) { + if ((pRequest->rawData == 0 && totalRows >= tmqRowSize) || + (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout)) || + (pRequest->rawData != 0 && totalRows >= TQ_POLL_MAX_BYTES)) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1); code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, - taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); + POLL_RSP_TYPE(pRequest, taosxRsp), vgId); goto END; } else { fetchVer++; @@ -534,7 +540,9 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* int32_t len = 0; int32_t code = 0; - if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) { + if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || + type == TMQ_MSG_TYPE__WALINFO_RSP || + type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) { tEncodeSize(tEncodeMqDataRsp, pRsp, len, code); } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code); @@ -558,7 +566,9 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* SEncoder encoder = {0}; tEncoderInit(&encoder, abuf, len); - if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) { + if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || + type == TMQ_MSG_TYPE__WALINFO_RSP || + type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) { code = tEncodeMqDataRsp(&encoder, pRsp); } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { code = tEncodeSTaosxRsp(&encoder, pRsp); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index c7b0caf286..3fb7b597ce 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1873,7 +1873,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in len -= sizeof(SSubmitReq2Msg); SDecoder dc = {0}; tDecoderInit(&dc, pReq, len); - if (tDecodeSubmitReq(&dc, pSubmitReq) < 0) { + if (tDecodeSubmitReq(&dc, pSubmitReq, NULL) < 0) { code = TSDB_CODE_INVALID_MSG; goto _exit; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1060cbcffe..4f647a2e9c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3953,7 +3953,7 @@ FETCH_NEXT_BLOCK: QUERY_CHECK_NULL(pSubmit, code, lino, _end, terrno); qDebug("set %d/%d as the input submit block, %s", current + 1, totalBlocks, id); - if (pAPI->tqReaderFn.tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < + if (pAPI->tqReaderFn.tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver, NULL) < 0) { qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current, totalBlocks, id); diff --git a/source/libs/parser/src/parInsertSml.c b/source/libs/parser/src/parInsertSml.c index 676aed4464..bd463bfd9d 100644 --- a/source/libs/parser/src/parInsertSml.c +++ b/source/libs/parser/src/parInsertSml.c @@ -403,3 +403,18 @@ end: } return code; } + +int32_t smlBuildOutputRaw(SQuery* handle, SHashObj* pVgHash) { + int32_t lino = 0; + int32_t code = 0; + + SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(handle)->pRoot; + code = insBuildVgDataBlocks(pVgHash, pStmt->pVgDataBlocks, &pStmt->pDataBlocks, false); + TSDB_CHECK_CODE(code, lino, end); + + end: + if (code != 0) { + uError("%s failed at %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index ed1f498a32..8ec9032a7c 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -483,7 +483,7 @@ static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCx return code; } -static int32_t createVgroupDataCxt(STableDataCxt* pTableCxt, SHashObj* pVgroupHash, SArray* pVgroupList, +static int32_t createVgroupDataCxt(int32_t vgId, SHashObj* pVgroupHash, SArray* pVgroupList, SVgroupDataCxt** pOutput) { SVgroupDataCxt* pVgCxt = taosMemoryCalloc(1, sizeof(SVgroupDataCxt)); if (NULL == pVgCxt) { @@ -495,7 +495,7 @@ static int32_t createVgroupDataCxt(STableDataCxt* pTableCxt, SHashObj* pVgroupHa return terrno; } - pVgCxt->vgId = pTableCxt->pMeta->vgId; + pVgCxt->vgId = vgId; int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES); if (TSDB_CODE_SUCCESS == code) { if (NULL == taosArrayPush(pVgroupList, &pVgCxt)) { @@ -642,7 +642,7 @@ int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, if (NULL == pp) { pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId)); if (NULL == pp) { - code = createVgroupDataCxt(pTbCtx, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt); + code = createVgroupDataCxt(vgId, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt); } else { pVgCxt = *(SVgroupDataCxt**)pp; } @@ -777,7 +777,7 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool int32_t vgId = pTableCxt->pMeta->vgId; void** pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId)); if (NULL == pp) { - code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt); + code = createVgroupDataCxt(vgId, pVgroupHash, pVgroupList, &pVgCxt); } else { pVgCxt = *(SVgroupDataCxt**)pp; } @@ -1074,3 +1074,33 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate end: return ret; } + +int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* pTableMeta, void* data) { + transformRawSSubmitTbData(data, pTableMeta->suid, pTableMeta->uid, pTableMeta->sversion); + SVgroupDataCxt* pVgCxt = NULL; + void** pp = taosHashGet(pVgroupHash, &pTableMeta->vgId, sizeof(pTableMeta->vgId)); + if (NULL == pp) { + int code = createVgroupDataCxt(pTableMeta->vgId, pVgroupHash, pVgroupList, &pVgCxt); + if (code != 0){ + return code; + } + } else { + pVgCxt = *(SVgroupDataCxt**)pp; + } + if (NULL == pVgCxt->pData->aSubmitTbData) { + pVgCxt->pData->aSubmitTbData = taosArrayInit(0, POINTER_BYTES); + pVgCxt->pData->raw = true; + if (NULL == pVgCxt->pData->aSubmitTbData) { + return terrno; + } + } + + // push data to submit, rebuild empty data for next submit + if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, &data)) { + return terrno; + } + + qDebug("add raw data to vgId:%d", pTableMeta->vgId); + + return 0; +} diff --git a/source/util/src/terror.c b/source/util/src/terror.c index ba2d471ccf..842d42a002 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -859,6 +859,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_STATUS, "Invalid status, please subscribe topic first") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_DATA, "Invalid data use here") // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")