refact code

This commit is contained in:
yihaoDeng 2024-08-01 09:29:16 +08:00 committed by wangjiaming0909
parent e063a8092c
commit f2a3951c23
3 changed files with 35 additions and 31 deletions

View File

@ -25,9 +25,9 @@ extern "C" {
#include "tarray.h" #include "tarray.h"
#include "thash.h" #include "thash.h"
#include "tlog.h" #include "tlog.h"
#include "tsimplehash.h"
#include "tmsg.h" #include "tmsg.h"
#include "tmsgcb.h" #include "tmsgcb.h"
#include "tsimplehash.h"
typedef enum { typedef enum {
JOB_TASK_STATUS_NULL = 0, JOB_TASK_STATUS_NULL = 0,
@ -69,16 +69,16 @@ typedef enum {
#define QUERY_MSG_MASK_SHOW_REWRITE() (1 << 0) #define QUERY_MSG_MASK_SHOW_REWRITE() (1 << 0)
#define QUERY_MSG_MASK_AUDIT() (1 << 1) #define QUERY_MSG_MASK_AUDIT() (1 << 1)
#define QUERY_MSG_MASK_VIEW() (1 << 2) #define QUERY_MSG_MASK_VIEW() (1 << 2)
#define TEST_SHOW_REWRITE_MASK(m) (((m) & QUERY_MSG_MASK_SHOW_REWRITE()) != 0) #define TEST_SHOW_REWRITE_MASK(m) (((m)&QUERY_MSG_MASK_SHOW_REWRITE()) != 0)
#define TEST_AUDIT_MASK(m) (((m) & QUERY_MSG_MASK_AUDIT()) != 0) #define TEST_AUDIT_MASK(m) (((m)&QUERY_MSG_MASK_AUDIT()) != 0)
#define TEST_VIEW_MASK(m) (((m) & QUERY_MSG_MASK_VIEW()) != 0) #define TEST_VIEW_MASK(m) (((m)&QUERY_MSG_MASK_VIEW()) != 0)
typedef struct STableComInfo { typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision uint8_t precision; // the number of precision
col_id_t numOfColumns; // the number of columns col_id_t numOfColumns; // the number of columns
int16_t numOfPKs; int16_t numOfPKs;
int32_t rowSize; // row size of the schema int32_t rowSize; // row size of the schema
} STableComInfo; } STableComInfo;
typedef struct SIndexMeta { typedef struct SIndexMeta {
@ -119,8 +119,9 @@ typedef struct STableMeta {
int32_t sversion; int32_t sversion;
int32_t tversion; int32_t tversion;
STableComInfo tableInfo; STableComInfo tableInfo;
SSchemaExt* schemaExt; // There is no additional memory allocation, and the pointer is fixed to the next address of the schema content. SSchemaExt* schemaExt; // There is no additional memory allocation, and the pointer is fixed to the next address of
SSchema schema[]; // the schema content.
SSchema schema[];
} STableMeta; } STableMeta;
#pragma pack(pop) #pragma pack(pop)
@ -196,9 +197,9 @@ typedef struct SBoundColInfo {
} SBoundColInfo; } SBoundColInfo;
typedef struct STableColsData { typedef struct STableColsData {
char tbName[TSDB_TABLE_NAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN];
SArray* aCol; SArray* aCol;
bool getFromHash; bool getFromHash;
} STableColsData; } STableColsData;
typedef struct STableVgUid { typedef struct STableVgUid {
@ -207,15 +208,14 @@ typedef struct STableVgUid {
} STableVgUid; } STableVgUid;
typedef struct STableBufInfo { typedef struct STableBufInfo {
void* pCurBuff; void* pCurBuff;
SArray* pBufList; SArray* pBufList;
int64_t buffUnit; int64_t buffUnit;
int64_t buffSize; int64_t buffSize;
int64_t buffIdx; int64_t buffIdx;
int64_t buffOffset; int64_t buffOffset;
} STableBufInfo; } STableBufInfo;
typedef struct STableDataCxt { typedef struct STableDataCxt {
STableMeta* pMeta; STableMeta* pMeta;
STSchema* pSchema; STSchema* pSchema;
@ -237,23 +237,22 @@ typedef struct SStbInterlaceInfo {
void* pRequest; void* pRequest;
uint64_t requestId; uint64_t requestId;
int64_t requestSelf; int64_t requestSelf;
bool tbFromHash; bool tbFromHash;
SHashObj* pVgroupHash; SHashObj* pVgroupHash;
SArray* pVgroupList; SArray* pVgroupList;
SSHashObj* pTableHash; SSHashObj* pTableHash;
int64_t tbRemainNum; int64_t tbRemainNum;
STableBufInfo tbBuf; STableBufInfo tbBuf;
char firstName[TSDB_TABLE_NAME_LEN]; char firstName[TSDB_TABLE_NAME_LEN];
STSchema *pTSchema; STSchema* pTSchema;
STableDataCxt *pDataCtx; STableDataCxt* pDataCtx;
void *boundTags; void* boundTags;
bool tableColsReady; bool tableColsReady;
SArray *pTableCols; SArray* pTableCols;
int32_t pTableColsIdx; int32_t pTableColsIdx;
} SStbInterlaceInfo; } SStbInterlaceInfo;
typedef int32_t (*__async_send_cb_fn_t)(void* param, SDataBuf* pMsg, int32_t code); typedef int32_t (*__async_send_cb_fn_t)(void* param, SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_exec_fn_t)(void* param); typedef int32_t (*__async_exec_fn_t)(void* param);
@ -308,6 +307,8 @@ void destroyAhandle(void* ahandle);
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo, int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo,
bool persistHandle, void* ctx); bool persistHandle, void* ctx);
int32_t asyncFreeConnById(void* pTransporter, int64_t pid);
;
/** /**
* Asynchronously send message to server, after the response received, the callback will be incured. * Asynchronously send message to server, after the response received, the callback will be incured.
* *
@ -325,7 +326,7 @@ void initQueryModuleMsgHandle();
const SSchema* tGetTbnameColumnSchema(); const SSchema* tGetTbnameColumnSchema();
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
int32_t getAsofJoinReverseOp(EOperatorType op); int32_t getAsofJoinReverseOp(EOperatorType op);
int32_t queryCreateCTableMetaFromMsg(STableMetaRsp* msg, SCTableMeta* pMeta); int32_t queryCreateCTableMetaFromMsg(STableMetaRsp* msg, SCTableMeta* pMeta);
int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta); int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta);
@ -384,7 +385,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \ #define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \ ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \
(_type) == TDMT_MND_DROP_STB || (_type) == TDMT_MND_CREATE_VIEW || (_type) == TDMT_MND_DROP_VIEW || \ (_type) == TDMT_MND_DROP_STB || (_type) == TDMT_MND_CREATE_VIEW || (_type) == TDMT_MND_DROP_VIEW || \
(_type) == TDMT_MND_CREATE_TSMA || (_type) == TDMT_MND_DROP_TSMA || (_type) == TDMT_MND_DROP_TB_WITH_TSMA) (_type) == TDMT_MND_CREATE_TSMA || (_type) == TDMT_MND_DROP_TSMA || (_type) == TDMT_MND_DROP_TB_WITH_TSMA)
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \ #define NEED_SCHEDULER_REDIRECT_ERROR(_code) \

View File

@ -298,13 +298,13 @@ _end:
pTaskInfo->code = code; pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
(*ppRes) = NULL; (*ppRes) = NULL;
return code; return code;
} }
static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) { static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL; SSDataBlock* pRes = NULL;
int32_t code = loadRemoteDataNext(pOperator, &pRes); int32_t code = loadRemoteDataNext(pOperator, &pRes);
return pRes; return pRes;
} }
@ -391,7 +391,7 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo
SOperatorInfo** pOptrInfo) { SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo); QRY_OPTR_CHECK(pOptrInfo);
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo)); SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
@ -690,7 +690,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo
int32_t lino = 0; int32_t lino = 0;
if (pColList == NULL) { // data from other sources if (pColList == NULL) { // data from other sources
blockDataCleanup(pRes); blockDataCleanup(pRes);
code = blockDecode(pRes, pData, (const char**) pNextStart); code = blockDecode(pRes, pData, (const char**)pNextStart);
if (code) { if (code) {
return code; return code;
} }

View File

@ -236,6 +236,9 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) { int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL); return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL);
} }
int32_t asyncFreeConnById(void* pTransporter, int64_t pid) {
return rpcFreeConnById(pTransporter, pid);
}
char* jobTaskStatusStr(int32_t status) { char* jobTaskStatusStr(int32_t status) {
switch (status) { switch (status) {