diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index f56860dd4f..6e2b83dce7 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -25,9 +25,9 @@ extern "C" { #include "tarray.h" #include "thash.h" #include "tlog.h" -#include "tsimplehash.h" #include "tmsg.h" #include "tmsgcb.h" +#include "tsimplehash.h" typedef enum { JOB_TASK_STATUS_NULL = 0, @@ -69,16 +69,16 @@ typedef enum { #define QUERY_MSG_MASK_SHOW_REWRITE() (1 << 0) #define QUERY_MSG_MASK_AUDIT() (1 << 1) #define QUERY_MSG_MASK_VIEW() (1 << 2) -#define TEST_SHOW_REWRITE_MASK(m) (((m) & QUERY_MSG_MASK_SHOW_REWRITE()) != 0) -#define TEST_AUDIT_MASK(m) (((m) & QUERY_MSG_MASK_AUDIT()) != 0) -#define TEST_VIEW_MASK(m) (((m) & QUERY_MSG_MASK_VIEW()) != 0) +#define TEST_SHOW_REWRITE_MASK(m) (((m)&QUERY_MSG_MASK_SHOW_REWRITE()) != 0) +#define TEST_AUDIT_MASK(m) (((m)&QUERY_MSG_MASK_AUDIT()) != 0) +#define TEST_VIEW_MASK(m) (((m)&QUERY_MSG_MASK_VIEW()) != 0) typedef struct STableComInfo { uint8_t numOfTags; // the number of tags in schema uint8_t precision; // the number of precision col_id_t numOfColumns; // the number of columns int16_t numOfPKs; - int32_t rowSize; // row size of the schema + int32_t rowSize; // row size of the schema } STableComInfo; typedef struct SIndexMeta { @@ -119,8 +119,9 @@ typedef struct STableMeta { int32_t sversion; int32_t tversion; STableComInfo tableInfo; - SSchemaExt* schemaExt; // There is no additional memory allocation, and the pointer is fixed to the next address of the schema content. - SSchema schema[]; + SSchemaExt* schemaExt; // There is no additional memory allocation, and the pointer is fixed to the next address of + // the schema content. + SSchema schema[]; } STableMeta; #pragma pack(pop) @@ -196,9 +197,9 @@ typedef struct SBoundColInfo { } SBoundColInfo; typedef struct STableColsData { - char tbName[TSDB_TABLE_NAME_LEN]; - SArray* aCol; - bool getFromHash; + char tbName[TSDB_TABLE_NAME_LEN]; + SArray* aCol; + bool getFromHash; } STableColsData; typedef struct STableVgUid { @@ -207,15 +208,14 @@ typedef struct STableVgUid { } STableVgUid; typedef struct STableBufInfo { - void* pCurBuff; - SArray* pBufList; - int64_t buffUnit; - int64_t buffSize; - int64_t buffIdx; - int64_t buffOffset; + void* pCurBuff; + SArray* pBufList; + int64_t buffUnit; + int64_t buffSize; + int64_t buffIdx; + int64_t buffOffset; } STableBufInfo; - typedef struct STableDataCxt { STableMeta* pMeta; STSchema* pSchema; @@ -237,23 +237,22 @@ typedef struct SStbInterlaceInfo { void* pRequest; uint64_t requestId; int64_t requestSelf; - bool tbFromHash; + bool tbFromHash; SHashObj* pVgroupHash; SArray* pVgroupList; SSHashObj* pTableHash; int64_t tbRemainNum; STableBufInfo tbBuf; char firstName[TSDB_TABLE_NAME_LEN]; - STSchema *pTSchema; - STableDataCxt *pDataCtx; - void *boundTags; + STSchema* pTSchema; + STableDataCxt* pDataCtx; + void* boundTags; - bool tableColsReady; - SArray *pTableCols; - int32_t pTableColsIdx; + bool tableColsReady; + SArray* pTableCols; + int32_t pTableColsIdx; } SStbInterlaceInfo; - typedef int32_t (*__async_send_cb_fn_t)(void* param, SDataBuf* pMsg, int32_t code); typedef int32_t (*__async_exec_fn_t)(void* param); @@ -308,6 +307,8 @@ void destroyAhandle(void* ahandle); int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo, bool persistHandle, void* ctx); +int32_t asyncFreeConnById(void* pTransporter, int64_t pid); +; /** * Asynchronously send message to server, after the response received, the callback will be incured. * @@ -325,7 +326,7 @@ void initQueryModuleMsgHandle(); const SSchema* tGetTbnameColumnSchema(); bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); -int32_t getAsofJoinReverseOp(EOperatorType op); +int32_t getAsofJoinReverseOp(EOperatorType op); int32_t queryCreateCTableMetaFromMsg(STableMetaRsp* msg, SCTableMeta* pMeta); int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta); @@ -384,7 +385,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define NEED_CLIENT_RM_TBLMETA_REQ(_type) \ ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \ - (_type) == TDMT_MND_DROP_STB || (_type) == TDMT_MND_CREATE_VIEW || (_type) == TDMT_MND_DROP_VIEW || \ + (_type) == TDMT_MND_DROP_STB || (_type) == TDMT_MND_CREATE_VIEW || (_type) == TDMT_MND_DROP_VIEW || \ (_type) == TDMT_MND_CREATE_TSMA || (_type) == TDMT_MND_DROP_TSMA || (_type) == TDMT_MND_DROP_TB_WITH_TSMA) #define NEED_SCHEDULER_REDIRECT_ERROR(_code) \ diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 2688f5698b..676c14fee3 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -298,13 +298,13 @@ _end: pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } - (*ppRes) = NULL; + (*ppRes) = NULL; return code; } static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; - int32_t code = loadRemoteDataNext(pOperator, &pRes); + int32_t code = loadRemoteDataNext(pOperator, &pRes); return pRes; } @@ -391,7 +391,7 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); - int32_t code = 0; + int32_t code = 0; int32_t lino = 0; SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -690,7 +690,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo int32_t lino = 0; if (pColList == NULL) { // data from other sources blockDataCleanup(pRes); - code = blockDecode(pRes, pData, (const char**) pNextStart); + code = blockDecode(pRes, pData, (const char**)pNextStart); if (code) { return code; } diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 406e1e674e..c531d66b8e 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -236,6 +236,9 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) { return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL); } +int32_t asyncFreeConnById(void* pTransporter, int64_t pid) { + return rpcFreeConnById(pTransporter, pid); +} char* jobTaskStatusStr(int32_t status) { switch (status) {