enh:queryUtil

This commit is contained in:
factosea 2024-11-12 16:47:13 +08:00
parent c1f8e59c1b
commit e3499f2377
1 changed files with 22 additions and 2 deletions

View File

@ -59,6 +59,9 @@ const SSchema* tGetTbnameColumnSchema() {
} }
static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) { static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) {
if (!pSchema) {
return false;
}
int32_t rowLen = 0; int32_t rowLen = 0;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
@ -100,7 +103,7 @@ static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen
} }
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags) { bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags) {
if (!VALIDNUMOFCOLS(numOfCols)) { if (!pSchema || !VALIDNUMOFCOLS(numOfCols)) {
return false; return false;
} }
@ -127,6 +130,7 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag
static STaskQueue taskQueue = {0}; static STaskQueue taskQueue = {0};
static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) { static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
if(!pSchedMsg || !pSchedMsg->ahandle) return;
__async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
(void)execFn(pSchedMsg->thandle); (void)execFn(pSchedMsg->thandle);
taosFreeQitem(pSchedMsg); taosFreeQitem(pSchedMsg);
@ -205,7 +209,12 @@ void destroyAhandle(void *ahandle) {
} }
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo, int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo,
bool persistHandle, void* rpcCtx) { bool persistHandle, void* rpcCtx) {
QUERY_PARAM_CHECK(pTransporter);
QUERY_PARAM_CHECK(epSet);
QUERY_PARAM_CHECK(pTransporterId);
QUERY_PARAM_CHECK(pInfo);
char* pMsg = rpcMallocCont(pInfo->msgInfo.len); char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
if (NULL == pMsg) { if (NULL == pMsg) {
qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType)); qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
@ -236,6 +245,7 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp
return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL); return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL);
} }
int32_t asyncFreeConnById(void* pTransporter, int64_t pid) { int32_t asyncFreeConnById(void* pTransporter, int64_t pid) {
QUERY_PARAM_CHECK(pTransporter);
return rpcFreeConnById(pTransporter, pid); return rpcFreeConnById(pTransporter, pid);
} }
@ -314,6 +324,8 @@ void destroyQueryExecRes(SExecResult* pRes) {
} }
// clang-format on // clang-format on
int32_t dataConverToStr(char* str, int64_t capacity, int type, void* buf, int32_t bufSize, int32_t* len) { int32_t dataConverToStr(char* str, int64_t capacity, int type, void* buf, int32_t bufSize, int32_t* len) {
QUERY_PARAM_CHECK(str);
QUERY_PARAM_CHECK(buf);
int32_t n = 0; int32_t n = 0;
switch (type) { switch (type) {
@ -420,6 +432,10 @@ int32_t dataConverToStr(char* str, int64_t capacity, int type, void* buf, int32_
} }
void parseTagDatatoJson(void* p, char** jsonStr) { void parseTagDatatoJson(void* p, char** jsonStr) {
if (!p || !jsonStr) {
qError("parseTagDatatoJson invalid input, line:%d", __LINE__);
return;
}
char* string = NULL; char* string = NULL;
SArray* pTagVals = NULL; SArray* pTagVals = NULL;
cJSON* json = NULL; cJSON* json = NULL;
@ -520,6 +536,7 @@ end:
} }
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) { int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
QUERY_PARAM_CHECK(pDst);
if (NULL == pSrc) { if (NULL == pSrc) {
*pDst = NULL; *pDst = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -553,6 +570,7 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
} }
void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* pType) { void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* pType) {
if(!pMeta || !pName || !pType) return;
int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns; int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
for (int32_t i = 0; i < nums; ++i) { for (int32_t i = 0; i < nums; ++i) {
if (0 == strcmp(pName, pMeta->schema[i].name)) { if (0 == strcmp(pName, pMeta->schema[i].name)) {
@ -576,6 +594,7 @@ void freeVgInfo(SDBVgInfo* vgInfo) {
} }
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) { int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
QUERY_PARAM_CHECK(pDst);
if (NULL == pSrc) { if (NULL == pSrc) {
*pDst = NULL; *pDst = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -617,6 +636,7 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
} }
int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) { int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) {
QUERY_PARAM_CHECK(pDst);
if (NULL == pSrc) { if (NULL == pSrc) {
*pDst = NULL; *pDst = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;