From bc6a466b36863921a536a1475317d56dde680497 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 12 Apr 2022 10:45:34 +0800 Subject: [PATCH 1/4] refactor TAOS_RES process --- include/common/tdatablock.h | 11 +- source/client/inc/clientInt.h | 104 +++++++---- source/client/src/clientEnv.c | 1 + source/client/src/clientImpl.c | 5 +- source/client/src/clientMain.c | 216 ++++++++++++++-------- source/client/src/tmq.c | 40 ---- source/libs/executor/src/dataDispatcher.c | 4 +- 7 files changed, 219 insertions(+), 162 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index b2b8cff19a..5e5a8826e5 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -136,7 +136,8 @@ static FORCE_INLINE void colDataAppendInt8(SColumnInfoData* pColumnInfoData, uin } static FORCE_INLINE void colDataAppendInt16(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int16_t* v) { - ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_SMALLINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_USMALLINT); + ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_SMALLINT || + pColumnInfoData->info.type == TSDB_DATA_TYPE_USMALLINT); char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow; *(int16_t*)p = *(int16_t*)v; } @@ -210,15 +211,19 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock); void blockDebugShowData(const SArray* dataBlocks); +static FORCE_INLINE int32_t blockEstimateEncodeSize(const SSDataBlock* pBlock) { + return blockDataGetSerialMetaSize(pBlock) + (int32_t)ceil(blockDataGetSerialRowSize(pBlock) * pBlock->info.rows); +} + static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data, - int8_t compressed) { + int8_t compressed) { int32_t colSize = colDataGetLength(pColRes, numOfRows); return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); } static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, - int8_t needCompress) { + int8_t needCompress) { int32_t* colSizes = (int32_t*)data; data += numOfCols * sizeof(int32_t); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index ae1b34a3bd..5c3c716ef1 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -75,12 +75,12 @@ typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq typedef struct { int8_t inited; // ctl - int8_t threadStop; - TdThread thread; + int8_t threadStop; + TdThread thread; TdThreadMutex lock; // used when app init and cleanup - SArray* appHbMgrs; // SArray one for each cluster - FHbReqHandle reqHandle[HEARTBEAT_TYPE_MAX]; - FHbRspHandle rspHandle[HEARTBEAT_TYPE_MAX]; + SArray* appHbMgrs; // SArray one for each cluster + FHbReqHandle reqHandle[HEARTBEAT_TYPE_MAX]; + FHbRspHandle rspHandle[HEARTBEAT_TYPE_MAX]; } SClientHbMgr; typedef struct SQueryExecMetric { @@ -118,42 +118,42 @@ struct SAppInstInfo { }; typedef struct SAppInfo { - int64_t startTime; - char appName[TSDB_APP_NAME_LEN]; - char* ep; - int32_t pid; - int32_t numOfThreads; - SHashObj* pInstMap; + int64_t startTime; + char appName[TSDB_APP_NAME_LEN]; + char* ep; + int32_t pid; + int32_t numOfThreads; + SHashObj* pInstMap; TdThreadMutex mutex; } SAppInfo; typedef struct STscObj { - char user[TSDB_USER_LEN]; - char pass[TSDB_PASSWORD_LEN]; - char db[TSDB_DB_FNAME_LEN]; - char ver[128]; - int32_t acctId; - uint32_t connId; - int32_t connType; - uint64_t id; // ref ID returned by taosAddRef - TdThreadMutex mutex; // used to protect the operation on db - int32_t numOfReqs; // number of sqlObj bound to this connection - SAppInstInfo* pAppInfo; + char user[TSDB_USER_LEN]; + char pass[TSDB_PASSWORD_LEN]; + char db[TSDB_DB_FNAME_LEN]; + char ver[128]; + int32_t acctId; + uint32_t connId; + int32_t connType; + uint64_t id; // ref ID returned by taosAddRef + TdThreadMutex mutex; // used to protect the operation on db + int32_t numOfReqs; // number of sqlObj bound to this connection + SAppInstInfo* pAppInfo; } STscObj; typedef struct SResultColumn { union { - char* nullbitmap; // bitmap, one bit for each item in the list - int32_t* offset; + char* nullbitmap; // bitmap, one bit for each item in the list + int32_t* offset; }; - char* pData; + char* pData; } SResultColumn; typedef struct SReqResultInfo { const char* pRspMsg; const char* pData; - TAOS_FIELD* fields; // todo, column names are not needed. - TAOS_FIELD* userFields; // the fields info that return to user + TAOS_FIELD* fields; // todo, column names are not needed. + TAOS_FIELD* userFields; // the fields info that return to user uint32_t numOfCols; int32_t* length; char** convertBuf; @@ -180,13 +180,31 @@ typedef struct SRequestSendRecvBody { SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed. SDataBuf requestMsg; int64_t queryJob; // query job, created according to sql query DAG. - struct SQueryPlan* pDag; // the query dag, generated according to the sql statement. + struct SQueryPlan* pDag; // the query dag, generated according to the sql statement. SReqResultInfo resInfo; } SRequestSendRecvBody; #define ERROR_MSG_BUF_DEFAULT_SIZE 512 +enum { + RES_TYPE__QUERY = 1, + RES_TYPE__TMQ, +}; + +#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) +#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) + +struct tmq_message_t { + int8_t resType; + SMqPollRsp msg; + char* topic; + void* vg; + SArray* res; // SArray + int32_t resIter; +}; + typedef struct SRequestObj { + int8_t resType; // query or tmq uint64_t requestId; int32_t type; // request type STscObj* pTscObj; @@ -203,6 +221,25 @@ typedef struct SRequestObj { SRequestSendRecvBody body; } SRequestObj; +static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) { + tmq_message_t* msg = (tmq_message_t*)res; + int32_t resIter = msg->resIter == -1 ? 0 : msg->resIter; + return (SReqResultInfo*)taosArrayGet(msg->res, resIter); +} + +static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res) { + tmq_message_t* msg = (tmq_message_t*)res; + if (++msg->resIter < taosArrayGetSize(msg->res)) { + return (SReqResultInfo*)taosArrayGet(msg->res, msg->resIter); + } + return NULL; +} + +static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) { + if (TD_RES_QUERY(res)) return &(((SRequestObj*)res)->body.resInfo); + return tmqGetCurResInfo(res); +} + extern SAppInfo appInfo; extern int32_t clientReqRefPool; extern int32_t clientConnRefPool; @@ -238,14 +275,17 @@ void initMsgHandleFp(); TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, uint16_t port); -void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr); +int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery); -int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); +int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList); int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest); -int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery); -int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList); +void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr); +void doSetOneRowPtr(SReqResultInfo* pResultInfo); +int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); +void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); +int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); // --- heartbeat // global, called by mgmt diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 769870ada8..307f1ce416 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -149,6 +149,7 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty return NULL; } + pRequest->resType = RES_TYPE__QUERY; pRequest->pDb = getDbOfConnection(pObj); pRequest->requestId = generateRequestId(); pRequest->metric.start = taosGetTimestampMs(); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 9938a2e1b9..f19e943300 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -13,7 +13,6 @@ static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); -static int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); static bool stringLengthCheck(const char* str, size_t maxsize) { if (str == NULL) { @@ -42,7 +41,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo); -static void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); + TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, uint16_t port) { @@ -590,7 +589,7 @@ TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, c return taos_connect(ipStr, userStr, passStr, dbStr, port); } -static void doSetOneRowPtr(SReqResultInfo* pResultInfo) { +void doSetOneRowPtr(SReqResultInfo* pResultInfo) { for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) { SResultColumn* pCol = &pResultInfo->pCol[i]; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index e10cf5179e..99efecdb1b 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -71,7 +71,7 @@ void taos_cleanup(void) { tscInfo("all local resources released"); } -setConfRet taos_set_config(const char *config) { +setConfRet taos_set_config(const char *config) { // TODO setConfRet ret = {SET_CONF_RET_SUCC, {0}}; return ret; @@ -133,8 +133,7 @@ int taos_field_count(TAOS_RES *res) { return 0; } - SRequestObj *pRequest = (SRequestObj *)res; - SReqResultInfo *pResInfo = &pRequest->body.resInfo; + SReqResultInfo *pResInfo = tscGetCurResInfo(res); return pResInfo->numOfCols; } @@ -145,7 +144,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { return NULL; } - SReqResultInfo *pResInfo = &(((SRequestObj *)res)->body.resInfo); + SReqResultInfo *pResInfo = tscGetCurResInfo(res); return pResInfo->userFields; } @@ -162,13 +161,36 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { return NULL; } - SRequestObj *pRequest = (SRequestObj *)res; - if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT || - pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) { - return NULL; - } + if (TD_RES_QUERY(res)) { + SRequestObj *pRequest = (SRequestObj *)res; + if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT || + pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) { + return NULL; + } - return doFetchRow(pRequest, true); + return doFetchRow(pRequest, true); + + } else if (TD_RES_TMQ(res)) { + tmq_message_t *msg = ((tmq_message_t *)res); + SReqResultInfo *pResultInfo = taosArrayGet(msg->res, msg->resIter); + + doSetOneRowPtr(pResultInfo); + pResultInfo->current += 1; + + if (pResultInfo->row == NULL) { + msg->resIter++; + pResultInfo = taosArrayGet(msg->res, msg->resIter); + doSetOneRowPtr(pResultInfo); + pResultInfo->current += 1; + } + + return pResultInfo->row; + + } else { + // assert to avoid uninitialization error + ASSERT(0); + } + return NULL; } int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) { @@ -260,12 +282,12 @@ int *taos_fetch_lengths(TAOS_RES *res) { return NULL; } - return ((SRequestObj *)res)->body.resInfo.length; + SReqResultInfo *pResInfo = tscGetCurResInfo(res); + return pResInfo->length; } TAOS_ROW *taos_result_block(TAOS_RES *res) { - SRequestObj* pRequest = (SRequestObj*) res; - if (pRequest == NULL) { + if (res == NULL) { terrno = TSDB_CODE_INVALID_PARA; return NULL; } @@ -274,7 +296,8 @@ TAOS_ROW *taos_result_block(TAOS_RES *res) { return NULL; } - return &pRequest->body.resInfo.row; + SReqResultInfo *pResInfo = tscGetCurResInfo(res); + return &pResInfo->row; } // todo intergrate with tDataTypes @@ -313,7 +336,7 @@ const char *taos_data_type(int type) { const char *taos_get_client_info() { return version; } int taos_affected_rows(TAOS_RES *res) { - if (res == NULL) { + if (res == NULL || TD_RES_TMQ(res)) { return 0; } @@ -323,12 +346,17 @@ int taos_affected_rows(TAOS_RES *res) { } int taos_result_precision(TAOS_RES *res) { - SRequestObj* pRequest = (SRequestObj*) res; - if (pRequest == NULL) { + if (res == NULL) { return TSDB_TIME_PRECISION_MILLI; } - - return pRequest->body.resInfo.precision; + if (TD_RES_QUERY(res)) { + SRequestObj *pRequest = (SRequestObj *)res; + return pRequest->body.resInfo.precision; + } else if (TD_RES_TMQ(res)) { + SReqResultInfo *info = tmqGetCurResInfo(res); + return info->precision; + } + return TSDB_TIME_PRECISION_MILLI; } int taos_select_db(TAOS *taos, const char *db) { @@ -370,90 +398,115 @@ void taos_stop_query(TAOS_RES *res) { } bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { - SRequestObj *pRequestObj = res; - SReqResultInfo *pResultInfo = &pRequestObj->body.resInfo; + SReqResultInfo *pResultInfo = tscGetCurResInfo(res); if (col >= pResultInfo->numOfCols || col < 0 || row >= pResultInfo->numOfRows || row < 0) { return true; } - SResultColumn *pCol = &pRequestObj->body.resInfo.pCol[col]; + SResultColumn *pCol = &pResultInfo->pCol[col]; return colDataIsNull_f(pCol->nullbitmap, row); } -bool taos_is_update_query(TAOS_RES *res) { - return taos_num_fields(res) == 0; -} +bool taos_is_update_query(TAOS_RES *res) { return taos_num_fields(res) == 0; } int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { int32_t numOfRows = 0; - /*int32_t code = */taos_fetch_block_s(res, &numOfRows, rows); + /*int32_t code = */ taos_fetch_block_s(res, &numOfRows, rows); return numOfRows; } -int taos_fetch_block_s(TAOS_RES *res, int* numOfRows, TAOS_ROW *rows) { - SRequestObj *pRequest = (SRequestObj *)res; - if (pRequest == NULL) { +int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { + if (res == NULL) { return 0; } + if (TD_RES_QUERY(res)) { + SRequestObj *pRequest = (SRequestObj *)res; - (*rows) = NULL; - (*numOfRows) = 0; + (*rows) = NULL; + (*numOfRows) = 0; - if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT || - pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) { + if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT || + pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) { + return 0; + } + + doFetchRow(pRequest, false); + + // TODO refactor + SReqResultInfo *pResultInfo = &pRequest->body.resInfo; + pResultInfo->current = pResultInfo->numOfRows; + + (*rows) = pResultInfo->row; + (*numOfRows) = pResultInfo->numOfRows; + return pRequest->code; + } else if (TD_RES_TMQ(res)) { + SReqResultInfo *pResultInfo = tmqGetNextResInfo(res); + if (pResultInfo == NULL) return -1; + + pResultInfo->current = pResultInfo->numOfRows; + (*rows) = pResultInfo->row; + (*numOfRows) = pResultInfo->numOfRows; return 0; + } else { + ASSERT(0); + return -1; } - - doFetchRow(pRequest, false); - - // TODO refactor - SReqResultInfo *pResultInfo = &pRequest->body.resInfo; - pResultInfo->current = pResultInfo->numOfRows; - - (*rows) = pResultInfo->row; - (*numOfRows) = pResultInfo->numOfRows; - return pRequest->code; } -int taos_fetch_raw_block(TAOS_RES *res, int* numOfRows, void** pData) { - SRequestObj *pRequest = (SRequestObj *)res; - if (pRequest == NULL) { +int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { + if (res == NULL) { return 0; } + if (TD_RES_QUERY(res)) { + SRequestObj *pRequest = (SRequestObj *)res; + + if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT || + pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) { + return 0; + } + + doFetchRow(pRequest, false); + + SReqResultInfo *pResultInfo = &pRequest->body.resInfo; + + pResultInfo->current = pResultInfo->numOfRows; + (*numOfRows) = pResultInfo->numOfRows; + (*pData) = (void *)pResultInfo->pData; - if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT || - pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) { return 0; + + } else if (TD_RES_TMQ(res)) { + SReqResultInfo *pResultInfo = tmqGetNextResInfo(res); + if (pResultInfo == NULL) return -1; + + pResultInfo->current = pResultInfo->numOfRows; + (*numOfRows) = pResultInfo->numOfRows; + (*pData) = (void *)pResultInfo->pData; + return 0; + + } else { + ASSERT(0); + return -1; } - - doFetchRow(pRequest, false); - - SReqResultInfo *pResultInfo = &pRequest->body.resInfo; - - pResultInfo->current = pResultInfo->numOfRows; - (*numOfRows) = pResultInfo->numOfRows; - (*pData) = (void*) pResultInfo->pData; - - return 0; } int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { - SRequestObj *pRequest = (SRequestObj *)res; - if (pRequest == NULL) { + if (res == NULL) { return 0; } - int32_t numOfFields = taos_num_fields(pRequest); + int32_t numOfFields = taos_num_fields(res); if (columnIndex < 0 || columnIndex >= numOfFields || numOfFields == 0) { return 0; } - TAOS_FIELD* pField = &pRequest->body.resInfo.userFields[columnIndex]; + SReqResultInfo *pResInfo = tscGetCurResInfo(res); + TAOS_FIELD *pField = &pResInfo->userFields[columnIndex]; if (!IS_VAR_DATA_TYPE(pField->type)) { return 0; } - return pRequest->body.resInfo.pCol[columnIndex].offset; + return pResInfo->pCol[columnIndex].offset; } int taos_validate_sql(TAOS *taos, const char *sql) { return true; } @@ -483,18 +536,19 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { // TODO } -TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval) { - // TODO - return NULL; +TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, + void *param, int interval) { + // TODO + return NULL; } TAOS_RES *taos_consume(TAOS_SUB *tsub) { - // TODO - return NULL; + // TODO + return NULL; } void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) { - // TODO + // TODO } int taos_load_table_info(TAOS *taos, const char *tableNameList) { @@ -553,26 +607,26 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) { } int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert) { - // TODO - return -1; + // TODO + return -1; } int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) { - // TODO - return -1; + // TODO + return -1; } -int taos_stmt_add_batch(TAOS_STMT* stmt) { - // TODO - return -1; +int taos_stmt_add_batch(TAOS_STMT *stmt) { + // TODO + return -1; } TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) { - // TODO - return NULL; + // TODO + return NULL; } -int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind) { - // TODO - return -1; +int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { + // TODO + return -1; } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 955e25fd71..84e3515539 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -17,27 +17,13 @@ #include "clientLog.h" #include "parser.h" #include "planner.h" -#include "scheduler.h" #include "tdatablock.h" #include "tdef.h" #include "tglobal.h" #include "tmsgtype.h" -#include "tpagedbuf.h" #include "tqueue.h" #include "tref.h" -typedef struct { - int32_t curBlock; - int32_t curRow; - void** uData; -} SMqRowIter; - -struct tmq_message_t { - SMqPollRsp msg; - void* vg; - SMqRowIter iter; -}; - struct tmq_list_t { SArray container; }; @@ -1566,30 +1552,4 @@ const char* tmq_err2str(tmq_resp_err_t err) { return "fail"; } -TAOS_ROW tmq_get_row(tmq_message_t* message) { - SMqPollRsp* rsp = &message->msg; - while (1) { - if (message->iter.curBlock < taosArrayGetSize(rsp->pBlockData)) { - SSDataBlock* pBlock = taosArrayGet(rsp->pBlockData, message->iter.curBlock); - if (message->iter.curRow < pBlock->info.rows) { - for (int i = 0; i < pBlock->info.numOfCols; i++) { - SColumnInfoData* pData = taosArrayGet(pBlock->pDataBlock, i); - if (colDataIsNull_s(pData, message->iter.curRow)) - message->iter.uData[i] = NULL; - else { - message->iter.uData[i] = colDataGetData(pData, message->iter.curRow); - } - } - message->iter.curRow++; - return message->iter.uData; - } else { - message->iter.curBlock++; - message->iter.curRow = 0; - continue; - } - } - return NULL; - } -} - char* tmq_get_topic_name(tmq_message_t* message) { return "not implemented yet"; } diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 1edebcd7db..626cb1b5f0 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -92,9 +92,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, return false; } - // NOTE: there are four bytes of an integer more than the required buffer space. - // struct size + data payload + length for each column + bitmap length - pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockDataGetSerialMetaSize(pInput->pData) + blockDataGetSize(pInput->pData); + pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockEstimateEncodeSize(pInput->pData); pBuf->pData = taosMemoryMalloc(pBuf->allocSize); if (pBuf->pData == NULL) { From ad9731bd6f2afd091883ddd52f9cf09900d09227 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 12 Apr 2022 11:00:09 +0800 Subject: [PATCH 2/4] merge from 3.0 --- source/client/inc/clientInt.h | 5 +- source/client/src/clientImpl.c | 84 +++++++++++++++++----------------- source/client/src/clientMain.c | 6 +-- source/client/src/tmq.c | 17 ++++--- 4 files changed, 58 insertions(+), 54 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index aa27e3bafc..604e6a49ae 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -282,9 +282,10 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj* void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); void doSetOneRowPtr(SReqResultInfo* pResultInfo); -int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, bool convertUcs4); +int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, + bool convertUcs4); void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); -int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); +int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4); // --- heartbeat // global, called by mgmt diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a348c252d0..71c5df6a09 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -41,7 +41,6 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo); -static void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, uint16_t port) { @@ -173,7 +172,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) { int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) { SRetrieveTableRsp* pRsp = NULL; - int32_t code = qExecCommand(pQuery->pRoot, &pRsp); + int32_t code = qExecCommand(pQuery->pRoot, &pRsp); if (TSDB_CODE_SUCCESS == code && NULL != pRsp) { code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false); } @@ -189,7 +188,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg; pRequest->type = pMsgInfo->msgType; pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL}; - pMsgInfo->pMsg = NULL; // pMsg transferred to SMsgSendInfo management + pMsgInfo->pMsg = NULL; // pMsg transferred to SMsgSendInfo management STscObj* pTscObj = pRequest->pTscObj; SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest); @@ -210,14 +209,12 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) { pRequest->type = pQuery->msgType; - SPlanContext cxt = { - .queryId = pRequest->requestId, - .acctId = pRequest->pTscObj->acctId, - .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp), - .pAstRoot = pQuery->pRoot, - .showRewrite = pQuery->showRewrite - }; - int32_t code = qCreateQueryPlan(&cxt, pPlan, pNodeList); + SPlanContext cxt = {.queryId = pRequest->requestId, + .acctId = pRequest->pTscObj->acctId, + .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp), + .pAstRoot = pQuery->pRoot, + .showRewrite = pQuery->showRewrite}; + int32_t code = qCreateQueryPlan(&cxt, pPlan, pNodeList); if (code != 0) { return code; } @@ -233,10 +230,10 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t for (int32_t i = 0; i < pResInfo->numOfCols; ++i) { pResInfo->fields[i].bytes = pSchema[i].bytes; - pResInfo->fields[i].type = pSchema[i].type; + pResInfo->fields[i].type = pSchema[i].type; pResInfo->userFields[i].bytes = pSchema[i].bytes; - pResInfo->userFields[i].type = pSchema[i].type; + pResInfo->userFields[i].type = pSchema[i].type; if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR) { pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE; @@ -253,7 +250,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; - int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, pRequest->metric.start, &res); + int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, + pRequest->metric.start, &res); if (code != TSDB_CODE_SUCCESS) { if (pRequest->body.queryJob != 0) { schedulerFreeJob(pRequest->body.queryJob); @@ -273,14 +271,14 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList } pRequest->code = res.code; - terrno = res.code; + terrno = res.code; return pRequest->code; } SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) { SRequestObj* pRequest = NULL; - SQuery* pQuery = NULL; - SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); + SQuery* pQuery = NULL; + SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest); if (TSDB_CODE_SUCCESS == code) { @@ -319,15 +317,15 @@ SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) { } int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) { - SCatalog *pCatalog = NULL; - int32_t code = 0; - int32_t dbNum = taosArrayGetSize(pRequest->dbList); - int32_t tblNum = taosArrayGetSize(pRequest->tableList); + SCatalog* pCatalog = NULL; + int32_t code = 0; + int32_t dbNum = taosArrayGetSize(pRequest->dbList); + int32_t tblNum = taosArrayGetSize(pRequest->tableList); if (dbNum <= 0 && tblNum <= 0) { return TSDB_CODE_QRY_APP_ERROR; } - + code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { return code; @@ -336,8 +334,8 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) { SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); for (int32_t i = 0; i < dbNum; ++i) { - char *dbFName = taosArrayGet(pRequest->dbList, i); - + char* dbFName = taosArrayGet(pRequest->dbList, i); + code = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName); if (code != TSDB_CODE_SUCCESS) { return code; @@ -345,7 +343,7 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) { } for (int32_t i = 0; i < tblNum; ++i) { - SName *tableName = taosArrayGet(pRequest->tableList, i); + SName* tableName = taosArrayGet(pRequest->tableList, i); code = catalogRefreshTableMeta(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, tableName, -1); if (code != TSDB_CODE_SUCCESS) { @@ -356,11 +354,10 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) { return code; } - SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { SRequestObj* pRequest = NULL; - int32_t retryNum = 0; - int32_t code = 0; + int32_t retryNum = 0; + int32_t code = 0; while (retryNum++ < REQUEST_MAX_TRY_TIMES) { pRequest = execQueryImpl(pTscObj, sql, sqlLen); @@ -376,7 +373,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { destroyRequest(pRequest); } - + return pRequest; } @@ -508,7 +505,8 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { } bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) { - return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP || msgType == TDMT_VND_QUERY_HEARTBEAT_RSP; + return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP || + msgType == TDMT_VND_QUERY_HEARTBEAT_RSP; } void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { @@ -535,10 +533,10 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start; if (pMsg->code == TSDB_CODE_SUCCESS) { tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self, - TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed/1000, pRequest->requestId); + TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId); } else { tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self, - TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed/1000, pRequest->requestId); + TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId); } taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId); @@ -721,8 +719,8 @@ _return: static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) { if (pResInfo->row == NULL) { - pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES); - pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn)); + pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES); + pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn)); pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t)); pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES); @@ -769,7 +767,8 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int return TSDB_CODE_SUCCESS; } -int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, bool convertUcs4) { +int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, + bool convertUcs4) { assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL); if (numOfRows == 0) { return TSDB_CODE_SUCCESS; @@ -840,15 +839,16 @@ void resetConnectDB(STscObj* pTscObj) { int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) { assert(pResultInfo != NULL && pRsp != NULL); - pResultInfo->pRspMsg = (const char*)pRsp; - pResultInfo->pData = (void*)pRsp->data; - pResultInfo->numOfRows = htonl(pRsp->numOfRows); - pResultInfo->current = 0; - pResultInfo->completed = (pRsp->completed == 1); + pResultInfo->pRspMsg = (const char*)pRsp; + pResultInfo->pData = (void*)pRsp->data; + pResultInfo->numOfRows = htonl(pRsp->numOfRows); + pResultInfo->current = 0; + pResultInfo->completed = (pRsp->completed == 1); pResultInfo->payloadLen = htonl(pRsp->compLen); - pResultInfo->precision = pRsp->precision; + pResultInfo->precision = pRsp->precision; // TODO handle the compressed case pResultInfo->totalRows += pResultInfo->numOfRows; - return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows, convertUcs4); + return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows, + convertUcs4); } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 99efecdb1b..577049ebc8 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -168,7 +168,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { return NULL; } - return doFetchRow(pRequest, true); + return doFetchRow(pRequest, true, false); } else if (TD_RES_TMQ(res)) { tmq_message_t *msg = ((tmq_message_t *)res); @@ -430,7 +430,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { return 0; } - doFetchRow(pRequest, false); + doFetchRow(pRequest, false, false); // TODO refactor SReqResultInfo *pResultInfo = &pRequest->body.resInfo; @@ -465,7 +465,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { return 0; } - doFetchRow(pRequest, false); + doFetchRow(pRequest, false, false); SReqResultInfo *pResultInfo = &pRequest->body.resInfo; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 84e3515539..b3e0cc6fd9 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -835,7 +835,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { if (msgEpoch < tmqEpoch) { /*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/ /*tsem_post(&tmq->rspSem);*/ - tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch); + tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch, + tmqEpoch); return 0; } @@ -872,8 +873,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->msg); - pRsp->iter.curBlock = 0; - pRsp->iter.curRow = 0; + /*pRsp->iter.curBlock = 0;*/ + /*pRsp->iter.curRow = 0;*/ // TODO: alloc mem /*pRsp->*/ /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ @@ -885,8 +886,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { } #endif - tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pParam->pVg->vgId, pRsp->msg.reqOffset, - pRsp->msg.rspOffset); + tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pParam->pVg->vgId, + pRsp->msg.reqOffset, pRsp->msg.rspOffset); pRsp->vg = pParam->pVg; taosWriteQitem(tmq->mqueue, pRsp); @@ -907,7 +908,8 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { bool set = false; int32_t topicNumGet = taosArrayGetSize(pRsp->topics); char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; - tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch, topicNumGet); + tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch, + topicNumGet); SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic)); if (newTopics == NULL) { return false; @@ -1275,7 +1277,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { int64_t transporterId = 0; /*printf("send poll\n");*/ atomic_add_fetch_32(&tmq->waitingRequest, 1); - tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId); + tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId, + pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId); /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); pVg->pollCnt++; From 285dc2175529ecf71b74eec19ee78da58e9f8d9f Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 12 Apr 2022 11:04:36 +0800 Subject: [PATCH 3/4] merge from 3.0 --- source/client/src/clientMain.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 577049ebc8..080f666cdd 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -168,7 +168,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { return NULL; } - return doFetchRow(pRequest, true, false); + return doFetchRow(pRequest, true, true); } else if (TD_RES_TMQ(res)) { tmq_message_t *msg = ((tmq_message_t *)res); @@ -430,7 +430,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { return 0; } - doFetchRow(pRequest, false, false); + doFetchRow(pRequest, false, true); // TODO refactor SReqResultInfo *pResultInfo = &pRequest->body.resInfo; From cf33a822b9ad0c9a4f9c3eb2f1e6e175f30646ef Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 12 Apr 2022 13:39:52 +0800 Subject: [PATCH 4/4] fix type convert --- example/src/tmq.c | 5 +++-- source/client/inc/clientInt.h | 21 ++++++++++----------- source/client/src/clientMain.c | 2 +- source/client/src/tmq.c | 8 ++++++++ 4 files changed, 22 insertions(+), 14 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index efb4d1830e..ca80c8fe5a 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -163,12 +163,13 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { printf("subscribe err\n"); return; } - /*int32_t cnt = 0;*/ + int32_t cnt = 0; /*clock_t startTime = clock();*/ while (running) { tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); if (tmqmessage) { - /*cnt++;*/ + cnt++; + printf("get data\n"); msg_process(tmqmessage); tmq_message_destroy(tmqmessage); /*} else {*/ diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 604e6a49ae..185b5824d9 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -194,14 +194,13 @@ enum { #define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) #define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) -struct tmq_message_t { - int8_t resType; - SMqPollRsp msg; - char* topic; - void* vg; - SArray* res; // SArray - int32_t resIter; -}; +typedef struct SMqRspObj { + int8_t resType; + char* topic; + void* vg; + SArray* res; // SArray + int32_t resIter; +} SMqRspObj; typedef struct SRequestObj { int8_t resType; // query or tmq @@ -222,13 +221,13 @@ typedef struct SRequestObj { } SRequestObj; static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) { - tmq_message_t* msg = (tmq_message_t*)res; - int32_t resIter = msg->resIter == -1 ? 0 : msg->resIter; + SMqRspObj* msg = (SMqRspObj*)res; + int32_t resIter = msg->resIter == -1 ? 0 : msg->resIter; return (SReqResultInfo*)taosArrayGet(msg->res, resIter); } static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res) { - tmq_message_t* msg = (tmq_message_t*)res; + SMqRspObj* msg = (SMqRspObj*)res; if (++msg->resIter < taosArrayGetSize(msg->res)) { return (SReqResultInfo*)taosArrayGet(msg->res, msg->resIter); } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 080f666cdd..040ddde630 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -171,7 +171,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { return doFetchRow(pRequest, true, true); } else if (TD_RES_TMQ(res)) { - tmq_message_t *msg = ((tmq_message_t *)res); + SMqRspObj *msg = ((SMqRspObj *)res); SReqResultInfo *pResultInfo = taosArrayGet(msg->res, msg->resIter); doSetOneRowPtr(pResultInfo); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index b3e0cc6fd9..dbe78782f5 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -24,6 +24,14 @@ #include "tqueue.h" #include "tref.h" +struct tmq_message_t { + SMqPollRsp msg; + char* topic; + void* vg; + SArray* res; // SArray + int32_t resIter; +}; + struct tmq_list_t { SArray container; };