From ef0365ebeb61c80b9b6e0f1ed971c57ee0ebf35f Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Sun, 24 Apr 2022 16:27:09 +0800 Subject: [PATCH 1/5] feature/qnode --- source/libs/qworker/src/qworkerMsg.c | 2 +- source/libs/scalar/inc/sclInt.h | 3 + source/libs/scalar/src/scalar.c | 98 +++++++++++++++++++++++----- 3 files changed, 86 insertions(+), 17 deletions(-) diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 6723eb21c1..e0bcfabdd3 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -339,7 +339,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo .ahandle = pConn->ahandle, .msgType = TDMT_VND_QUERY_HEARTBEAT, .pCont = msg, - .contLen = sizeof(SSchedulerHbReq), + .contLen = msgSize, .code = TSDB_CODE_RPC_NETWORK_UNAVAIL, }; diff --git a/source/libs/scalar/inc/sclInt.h b/source/libs/scalar/inc/sclInt.h index 3d59ffd93d..399fb8f47d 100644 --- a/source/libs/scalar/inc/sclInt.h +++ b/source/libs/scalar/inc/sclInt.h @@ -32,6 +32,9 @@ typedef struct SScalarCtx { #define SCL_DATA_TYPE_DUMMY_HASH 9000 #define SCL_DEFAULT_OP_NUM 10 +#define SCL_IS_CONST_NODE(_node) ((NULL == (_node)) || (QUERY_NODE_VALUE == (_node)->type)) +#define SCL_IS_CONST_CALC(_ctx) (NULL == (_ctx)->pBlockList) + #define sclFatal(...) qFatal(__VA_ARGS__) #define sclError(...) qError(__VA_ARGS__) #define sclWarn(...) qWarn(__VA_ARGS__) diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 145afbe984..4783a2701e 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -244,23 +244,55 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t return TSDB_CODE_SUCCESS; } -int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *rowNum) { +int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *rowNum) { int32_t code = 0; - SScalarParam *paramList = taosMemoryCalloc(pParamList->length, sizeof(SScalarParam)); + int32_t paramNum = 0; + if (NULL == pParamList) { + if (ctx->pBlockList) { + SSDataBlock *pBlock = taosArrayGet(ctx->pBlockList, 0); + *rowNum = pBlock->info.rows; + } else { + *rowNum = 1; + } + + paramNum = 1; + } else { + paramNum = pParamList->length; + } + + SScalarParam *paramList = taosMemoryCalloc(paramNum, sizeof(SScalarParam)); if (NULL == paramList) { - sclError("calloc %d failed", (int32_t)(pParamList->length * sizeof(SScalarParam))); + sclError("calloc %d failed", (int32_t)(paramNum * sizeof(SScalarParam))); SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SListCell *cell = pParamList->pHead; - for (int32_t i = 0; i < pParamList->length; ++i) { - if (NULL == cell || NULL == cell->pNode) { - sclError("invalid cell, cell:%p, pNode:%p", cell, cell->pNode); - SCL_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + if (pParamList) { + SNode *tnode = NULL; + int32_t i = 0; + if (SCL_IS_CONST_CALC(ctx)) { + WHERE_EACH (tnode, pParamList) { + if (!SCL_IS_CONST_NODE(tnode)) { + continue; + } else { + SCL_ERR_JRET(sclInitParam(tnode, ¶mList[i], ctx, rowNum)); + ERASE_NODE(pParamList); + } + + ++i; + WHERE_NEXT; + } + } else { + FOREACH(tnode, pParamList) { + SCL_ERR_JRET(sclInitParam(tnode, ¶mList[i], ctx, rowNum)); + ++i; + } } + } else { + paramList[0].numOfRows = *rowNum; + } - SCL_ERR_JRET(sclInitParam(cell->pNode, ¶mList[i], ctx, rowNum)); - cell = cell->pNext; + if (0 == *rowNum) { + taosMemoryFreeClear(paramList); } *pParams = paramList; @@ -299,11 +331,6 @@ _return: } int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) { - if (NULL == node->pParameterList || node->pParameterList->length <= 0) { - sclError("invalid function parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0); - SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } - SScalarFuncExecFuncs ffpSet = {0}; int32_t code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet); if (code) { @@ -357,6 +384,10 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o int32_t rowNum = 0; int32_t code = 0; SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, &rowNum)); + if (NULL == params) { + output->numOfRows = 0; + return TSDB_CODE_SUCCESS; + } int32_t type = node->node.resType.type; output->numOfRows = rowNum; @@ -369,14 +400,20 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o } bool value = false; + bool complete = false; for (int32_t i = 0; i < rowNum; ++i) { for (int32_t m = 0; m < node->pParameterList->length; ++m) { + if (NULL == params[m].columnData) { + continue; + } char* p = colDataGetData(params[m].columnData, i); GET_TYPED_DATA(value, bool, params[m].columnData->info.type, p); if (LOGIC_COND_TYPE_AND == node->condType && (false == value)) { + complete = true; break; } else if (LOGIC_COND_TYPE_OR == node->condType && value) { + complete = true; break; } else if (LOGIC_COND_TYPE_NOT == node->condType) { value = !value; @@ -386,7 +423,13 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o colDataAppend(output->columnData, i, (char*) &value, false); } + if (SCL_IS_CONST_CALC(ctx) && (false == complete)) { + sclFreeParam(output); + output->numOfRows = 0; + } + _return: + for (int32_t i = 0; i < node->pParameterList->length; ++i) { // sclFreeParamNoData(params + i); } @@ -426,6 +469,17 @@ _return: EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) { SFunctionNode *node = (SFunctionNode *)*pNode; + SNode* tnode = NULL; + if (fmIsUserDefinedFunc(node->funcId)) { + return DEAL_RES_CONTINUE; + } + + FOREACH(tnode, node->pParameterList) { + if (!SCL_IS_CONST_NODE(tnode)) { + return DEAL_RES_CONTINUE; + } + } + SScalarParam output = {0}; ctx->code = sclExecFunction(node, ctx, &output); @@ -470,6 +524,10 @@ EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) { return DEAL_RES_ERROR; } + if (0 == output.numOfRows) { + return DEAL_RES_CONTINUE; + } + SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE); if (NULL == res) { sclError("make value node failed"); @@ -498,6 +556,14 @@ EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) { EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) { SOperatorNode *node = (SOperatorNode *)*pNode; + if (!SCL_IS_CONST_NODE(node->pLeft)) { + return DEAL_RES_CONTINUE; + } + + if (!SCL_IS_CONST_NODE(node->pRight)) { + return DEAL_RES_CONTINUE; + } + SScalarParam output = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))}; ctx->code = sclExecOperator(node, ctx, &output); if (ctx->code) { @@ -530,7 +596,7 @@ EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) { } EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) { - if (QUERY_NODE_VALUE == nodeType(*pNode) || QUERY_NODE_NODE_LIST == nodeType(*pNode)) { + if (QUERY_NODE_VALUE == nodeType(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode) || QUERY_NODE_NODE_LIST == nodeType(*pNode)) { return DEAL_RES_CONTINUE; } From bb33139572f045ef2ad900d1d592e943072ef317 Mon Sep 17 00:00:00 2001 From: dapan Date: Sun, 24 Apr 2022 20:11:34 +0800 Subject: [PATCH 2/5] feature/qnode --- include/libs/catalog/catalog.h | 2 +- source/common/src/tmsg.c | 11 +- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 + source/dnode/mnode/impl/src/mndQnode.c | 5 +- source/dnode/qnode/src/qnode.c | 2 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- source/libs/catalog/src/catalog.c | 9 +- source/libs/qcom/src/querymsg.c | 16 +-- source/libs/scalar/inc/sclInt.h | 2 +- source/libs/scalar/src/scalar.c | 36 +++--- .../libs/scalar/test/scalar/scalarTests.cpp | 116 ++++++++++++++++++ 11 files changed, 161 insertions(+), 41 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index b88afcb39d..30d1bd0a51 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -51,7 +51,7 @@ typedef struct SMetaData { SArray *pTableMeta; // STableMeta array SArray *pVgroupInfo; // SVgroupInfo list SArray *pUdfList; // udf info list - SArray *pEpSetList; // qnode epset list, SArray + SArray *pQnodeList; // qnode list, SArray } SMetaData; typedef struct SCatalogCfg { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index eb7c9a763f..fd4b071e64 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2091,10 +2091,15 @@ int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp if (tStartDecode(&decoder) < 0) return -1; int32_t num = 0; if (tDecodeI32(&decoder, &num) < 0) return -1; - pRsp->addrsList = taosArrayInit(num, sizeof(SQueryNodeAddr)); - if (NULL == pRsp->addrsList) return -1; + if (NULL == pRsp->addrsList) { + pRsp->addrsList = taosArrayInit(num, sizeof(SQueryNodeAddr)); + if (NULL == pRsp->addrsList) return -1; + } + for (int32_t i = 0; i < num; ++i) { - if (tDecodeSQueryNodeAddr(&decoder, TARRAY_GET_ELEM(pRsp->addrsList, i)) < 0) return -1; + SQueryNodeAddr addr = {0}; + if (tDecodeSQueryNodeAddr(&decoder, &addr) < 0) return -1; + taosArrayPush(pRsp->addrsList, &addr); } tEndDecode(&decoder); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 159e14a3d5..7c874f63e1 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -177,6 +177,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) { dmSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_MND_QNODE_LIST, mmProcessReadMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg, DEFAULT_HANDLE); diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index 1c03ca30f4..cec5933ba3 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -451,8 +451,9 @@ static int32_t mndProcessQnodeListReq(SNodeMsg *pReq) { goto _OVER; } + void *pIter = NULL; while (1) { - void *pIter = sdbFetch(pSdb, SDB_QNODE, NULL, (void **)&pObj); + pIter = sdbFetch(pSdb, SDB_QNODE, pIter, (void **)&pObj); if (pIter == NULL) break; SQueryNodeAddr nodeAddr = {0}; @@ -472,7 +473,7 @@ static int32_t mndProcessQnodeListReq(SNodeMsg *pReq) { } int32_t rspLen = tSerializeSQnodeListRsp(NULL, 0, &qlistRsp); - void *pRsp = taosMemoryMalloc(rspLen); + void *pRsp = rpcMallocCont(rspLen); if (pRsp == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _OVER; diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 7b299f1f3c..e3d7621fb0 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -43,7 +43,7 @@ void qndClose(SQnode *pQnode) { int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; } int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) { - qTrace("message in query queue is processing"); + qTrace("message in qnode query queue is processing"); SReadHandle handle = {0}; switch (pMsg->msgType) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index aa90a04478..e3c504c93d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -135,7 +135,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg } int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { - vTrace("message in query queue is processing"); + vTrace("message in vnode query queue is processing"); SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config}; switch (pMsg->msgType) { diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 410527c9e6..272c370ca3 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -494,7 +494,7 @@ _return: return TSDB_CODE_SUCCESS; } -int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SArray **out) { +int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SArray *out) { char *msg = NULL; int32_t msgLen = 0; @@ -526,7 +526,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmt CTG_ERR_RET(code); } - ctgDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(*out)); + ctgDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(out)); return TSDB_CODE_SUCCESS; } @@ -2778,7 +2778,8 @@ int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, } if (pReq->qNodeRequired) { - CTG_ERR_JRET(ctgGetQnodeListFromMnode(pCtg, pTrans, pMgmtEps, &pRsp->pEpSetList)); + pRsp->pQnodeList = taosArrayInit(10, sizeof(SQueryNodeAddr)); + CTG_ERR_JRET(ctgGetQnodeListFromMnode(pCtg, pTrans, pMgmtEps, pRsp->pQnodeList)); } CTG_API_LEAVE(TSDB_CODE_SUCCESS); @@ -2807,7 +2808,7 @@ int32_t catalogGetQnodeList(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - CTG_ERR_JRET(ctgGetQnodeListFromMnode(pCtg, pRpc, pMgmtEps, &pQnodeList)); + CTG_ERR_JRET(ctgGetQnodeListFromMnode(pCtg, pRpc, pMgmtEps, pQnodeList)); _return: diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index de805d2c77..fd7c831399 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -342,28 +342,20 @@ PROCESS_META_OVER: int32_t queryProcessQnodeListRsp(void *output, char *msg, int32_t msgSize) { SQnodeListRsp out = {0}; - int32_t code = -1; + int32_t code = 0; if (NULL == output || NULL == msg || msgSize <= 0) { code = TSDB_CODE_TSC_INVALID_INPUT; - goto PROCESS_QLIST_OVER; + return code; } + out.addrsList = (SArray *)output; if (tDeserializeSQnodeListRsp(msg, msgSize, &out) != 0) { qError("invalid qnode list rsp msg, msgSize:%d", msgSize); code = TSDB_CODE_INVALID_MSG; - goto PROCESS_QLIST_OVER; + return code; } -PROCESS_QLIST_OVER: - - if (code != 0) { - tFreeSQnodeListRsp(&out); - out.addrsList = NULL; - } - - *(SArray **)output = out.addrsList; - return code; } diff --git a/source/libs/scalar/inc/sclInt.h b/source/libs/scalar/inc/sclInt.h index 399fb8f47d..9142cc41c4 100644 --- a/source/libs/scalar/inc/sclInt.h +++ b/source/libs/scalar/inc/sclInt.h @@ -32,7 +32,7 @@ typedef struct SScalarCtx { #define SCL_DATA_TYPE_DUMMY_HASH 9000 #define SCL_DEFAULT_OP_NUM 10 -#define SCL_IS_CONST_NODE(_node) ((NULL == (_node)) || (QUERY_NODE_VALUE == (_node)->type)) +#define SCL_IS_CONST_NODE(_node) ((NULL == (_node)) || (QUERY_NODE_VALUE == (_node)->type) || (QUERY_NODE_NODE_LIST == (_node)->type)) #define SCL_IS_CONST_CALC(_ctx) (NULL == (_ctx)->pBlockList) #define sclFatal(...) qFatal(__VA_ARGS__) diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 4783a2701e..6b2e7c17d3 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -244,9 +244,8 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t return TSDB_CODE_SUCCESS; } -int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *rowNum) { +int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *paramNum, int32_t *rowNum) { int32_t code = 0; - int32_t paramNum = 0; if (NULL == pParamList) { if (ctx->pBlockList) { SSDataBlock *pBlock = taosArrayGet(ctx->pBlockList, 0); @@ -255,14 +254,14 @@ int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarC *rowNum = 1; } - paramNum = 1; + *paramNum = 1; } else { - paramNum = pParamList->length; + *paramNum = pParamList->length; } - SScalarParam *paramList = taosMemoryCalloc(paramNum, sizeof(SScalarParam)); + SScalarParam *paramList = taosMemoryCalloc(*paramNum, sizeof(SScalarParam)); if (NULL == paramList) { - sclError("calloc %d failed", (int32_t)(paramNum * sizeof(SScalarParam))); + sclError("calloc %d failed", (int32_t)((*paramNum) * sizeof(SScalarParam))); SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -272,14 +271,13 @@ int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarC if (SCL_IS_CONST_CALC(ctx)) { WHERE_EACH (tnode, pParamList) { if (!SCL_IS_CONST_NODE(tnode)) { - continue; + WHERE_NEXT; } else { SCL_ERR_JRET(sclInitParam(tnode, ¶mList[i], ctx, rowNum)); ERASE_NODE(pParamList); } ++i; - WHERE_NEXT; } } else { FOREACH(tnode, pParamList) { @@ -340,7 +338,8 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp SScalarParam *params = NULL; int32_t rowNum = 0; - SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, &rowNum)); + int32_t paramNum = 0; + SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, ¶mNum, &rowNum)); output->columnData = createColumnInfoData(&node->node.resType, rowNum); if (output->columnData == NULL) { @@ -348,7 +347,7 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - code = (*ffpSet.process)(params, node->pParameterList->length, output); + code = (*ffpSet.process)(params, paramNum, output); if (code) { sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); SCL_ERR_JRET(code); @@ -356,7 +355,7 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp _return: - for (int32_t i = 0; i < node->pParameterList->length; ++i) { + for (int32_t i = 0; i < paramNum; ++i) { // sclFreeParamNoData(params + i); } @@ -382,8 +381,9 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o SScalarParam *params = NULL; int32_t rowNum = 0; + int32_t paramNum = 0; int32_t code = 0; - SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, &rowNum)); + SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, ¶mNum, &rowNum)); if (NULL == params) { output->numOfRows = 0; return TSDB_CODE_SUCCESS; @@ -400,10 +400,12 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o } bool value = false; - bool complete = false; + bool complete = true; for (int32_t i = 0; i < rowNum; ++i) { - for (int32_t m = 0; m < node->pParameterList->length; ++m) { + complete = true; + for (int32_t m = 0; m < paramNum; ++m) { if (NULL == params[m].columnData) { + complete = false; continue; } char* p = colDataGetData(params[m].columnData, i); @@ -420,7 +422,9 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o } } - colDataAppend(output->columnData, i, (char*) &value, false); + if (complete) { + colDataAppend(output->columnData, i, (char*) &value, false); + } } if (SCL_IS_CONST_CALC(ctx) && (false == complete)) { @@ -430,7 +434,7 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o _return: - for (int32_t i = 0; i < node->pParameterList->length; ++i) { + for (int32_t i = 0; i < paramNum; ++i) { // sclFreeParamNoData(params + i); } diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp index 04ab26d8a4..ed52e48f6d 100644 --- a/source/libs/scalar/test/scalar/scalarTests.cpp +++ b/source/libs/scalar/test/scalar/scalarTests.cpp @@ -137,6 +137,11 @@ void scltMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in rnode->node.resType.bytes = dataBytes; rnode->dataBlockId = 0; + if (NULL == block) { + *pNode = (SNode *)rnode; + return; + } + if (NULL == *block) { SSDataBlock *res = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock)); res->info.numOfCols = 3; @@ -889,6 +894,8 @@ TEST(constantTest, int_greater_int_is_true2) { } TEST(constantTest, greater_and_lower) { + scltInitLogFile(); + SNode *pval1 = NULL, *pval2 = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL, *res = NULL; bool eRes[5] = {false, false, true, true, true}; int64_t v1 = 333, v2 = 222, v3 = -10, v4 = 20; @@ -913,6 +920,115 @@ TEST(constantTest, greater_and_lower) { nodesDestroyNode(res); } +TEST(constantTest, column_and_value1) { + scltInitLogFile(); + + SNode *pval1 = NULL, *pval2 = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL, *res = NULL; + bool eRes[5] = {false, false, true, true, true}; + int64_t v1 = 333, v2 = 222, v3 = -10, v4 = 20; + SNode *list[2] = {0}; + scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v1); + scltMakeValueNode(&pval2, TSDB_DATA_TYPE_BIGINT, &v2); + scltMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2); + scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v3); + scltMakeColumnNode(&pval2, NULL, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), 0, NULL); + scltMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2); + list[0] = opNode1; + list[1] = opNode2; + scltMakeLogicNode(&logicNode, LOGIC_COND_TYPE_AND, list, 2); + + int32_t code = scalarCalculateConstants(logicNode, &res); + ASSERT_EQ(code, 0); + ASSERT_TRUE(res); + ASSERT_EQ(nodeType(res), QUERY_NODE_LOGIC_CONDITION); + SLogicConditionNode *v = (SLogicConditionNode *)res; + ASSERT_EQ(v->condType, LOGIC_COND_TYPE_AND); + ASSERT_EQ(v->pParameterList->length, 1); + nodesDestroyNode(res); +} + +TEST(constantTest, column_and_value2) { + scltInitLogFile(); + + SNode *pval1 = NULL, *pval2 = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL, *res = NULL; + bool eRes[5] = {false, false, true, true, true}; + int64_t v1 = 333, v2 = 222, v3 = -10, v4 = 20; + SNode *list[2] = {0}; + scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v1); + scltMakeValueNode(&pval2, TSDB_DATA_TYPE_BIGINT, &v2); + scltMakeOpNode(&opNode1, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2); + scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v3); + scltMakeColumnNode(&pval2, NULL, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), 0, NULL); + scltMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2); + list[0] = opNode1; + list[1] = opNode2; + scltMakeLogicNode(&logicNode, LOGIC_COND_TYPE_AND, list, 2); + + int32_t code = scalarCalculateConstants(logicNode, &res); + ASSERT_EQ(code, 0); + ASSERT_TRUE(res); + ASSERT_EQ(nodeType(res), QUERY_NODE_VALUE); + SValueNode *v = (SValueNode *)res; + ASSERT_EQ(v->node.resType.type, TSDB_DATA_TYPE_BOOL); + ASSERT_EQ(v->datum.b, false); + nodesDestroyNode(res); +} + +TEST(constantTest, column_and_value3) { + scltInitLogFile(); + + SNode *pval1 = NULL, *pval2 = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL, *res = NULL; + bool eRes[5] = {false, false, true, true, true}; + int64_t v1 = 333, v2 = 222, v3 = -10, v4 = 20; + SNode *list[2] = {0}; + scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v1); + scltMakeValueNode(&pval2, TSDB_DATA_TYPE_BIGINT, &v2); + scltMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2); + scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v3); + scltMakeColumnNode(&pval2, NULL, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), 0, NULL); + scltMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2); + list[0] = opNode1; + list[1] = opNode2; + scltMakeLogicNode(&logicNode, LOGIC_COND_TYPE_OR, list, 2); + + int32_t code = scalarCalculateConstants(logicNode, &res); + ASSERT_EQ(code, 0); + ASSERT_TRUE(res); + ASSERT_EQ(nodeType(res), QUERY_NODE_VALUE); + SValueNode *v = (SValueNode *)res; + ASSERT_EQ(v->node.resType.type, TSDB_DATA_TYPE_BOOL); + ASSERT_EQ(v->datum.b, true); + nodesDestroyNode(res); +} + +TEST(constantTest, column_and_value4) { + scltInitLogFile(); + + SNode *pval1 = NULL, *pval2 = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL, *res = NULL; + bool eRes[5] = {false, false, true, true, true}; + int64_t v1 = 333, v2 = 222, v3 = -10, v4 = 20; + SNode *list[2] = {0}; + scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v1); + scltMakeValueNode(&pval2, TSDB_DATA_TYPE_BIGINT, &v2); + scltMakeOpNode(&opNode1, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2); + scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v3); + scltMakeColumnNode(&pval2, NULL, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), 0, NULL); + scltMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2); + list[0] = opNode1; + list[1] = opNode2; + scltMakeLogicNode(&logicNode, LOGIC_COND_TYPE_OR, list, 2); + + int32_t code = scalarCalculateConstants(logicNode, &res); + ASSERT_EQ(code, 0); + ASSERT_TRUE(res); + ASSERT_EQ(nodeType(res), QUERY_NODE_LOGIC_CONDITION); + SLogicConditionNode *v = (SLogicConditionNode *)res; + ASSERT_EQ(v->condType, LOGIC_COND_TYPE_OR); + ASSERT_EQ(v->pParameterList->length, 1); + nodesDestroyNode(res); +} + + void makeJsonArrow(SSDataBlock **src, SNode **opNode, void *json, char *key){ char keyVar[32] = {0}; memcpy(varDataVal(keyVar), key, strlen(key)); From cd1bcb68c7ffd0ff7d969576dd282620e8b833cb Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 24 Apr 2022 21:32:10 +0800 Subject: [PATCH 3/5] fix(tools): repair nettest in shell and dnode --- source/dnode/mgmt/interface/src/dmInt.c | 8 ++++++-- tools/shell/src/shellNettest.c | 10 +++++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/source/dnode/mgmt/interface/src/dmInt.c b/source/dnode/mgmt/interface/src/dmInt.c index 491eff5e17..2d15a7a008 100644 --- a/source/dnode/mgmt/interface/src/dmInt.c +++ b/source/dnode/mgmt/interface/src/dmInt.c @@ -173,9 +173,13 @@ static void dmGetServerStatus(SDnode *pDnode, SServerStatusRsp *pStatus) { void dmProcessNettestReq(SDnode *pDnode, SRpcMsg *pRpc) { dDebug("net test req is received"); - SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = 0}; + SRpcMsg rsp = {.handle = pRpc->handle, .refId = pRpc->refId, .ahandle = pRpc->ahandle, .code = 0}; rsp.pCont = rpcMallocCont(pRpc->contLen); - rsp.contLen = pRpc->contLen; + if (rsp.pCont == NULL) { + rsp.code = TSDB_CODE_OUT_OF_MEMORY; + } else { + rsp.contLen = pRpc->contLen; + } rpcSendResponse(&rsp); } diff --git a/tools/shell/src/shellNettest.c b/tools/shell/src/shellNettest.c index 307049dd38..c8ec31c48b 100644 --- a/tools/shell/src/shellNettest.c +++ b/tools/shell/src/shellNettest.c @@ -67,7 +67,7 @@ static void shellWorkAsClient() { printf("request is sent, size:%d\n", rpcMsg.contLen); rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp); - if (rpcRsp.code == 0 &&rpcRsp.contLen == rpcMsg.contLen) { + if (rpcRsp.code == 0 && rpcRsp.contLen == rpcMsg.contLen) { printf("response is received, size:%d\n", rpcMsg.contLen); if (rpcRsp.code == 0) totalSucc++; } else { @@ -97,8 +97,12 @@ static void shellProcessMsg(void *p, SRpcMsg *pRpc, SEpSet *pEpSet) { printf("request is received, size:%d\n", pRpc->contLen); fflush(stdout); SRpcMsg rsp = {.handle = pRpc->handle, .refId = pRpc->refId, .ahandle = pRpc->ahandle, .code = 0}; - rsp.pCont = rpcMallocCont(shell.args.pktLen); - rsp.contLen = shell.args.pktLen; + rsp.pCont = rpcMallocCont(pRpc->contLen); + if (rsp.pCont == NULL) { + rsp.code = TSDB_CODE_OUT_OF_MEMORY; + } else { + rsp.contLen = pRpc->contLen; + } rpcSendResponse(&rsp); } From 54dd9e3ca49da4bc2b706f99e215361d6e5649ce Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 25 Apr 2022 09:32:46 +0800 Subject: [PATCH 4/5] feature/qnode --- source/dnode/qnode/src/qnode.c | 5 ++++ source/libs/scalar/src/scalar.c | 44 +++++++++++++++++++++------------ 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index e3d7621fb0..907fddaec2 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -17,6 +17,7 @@ #include "qndInt.h" #include "query.h" #include "qworker.h" +//#include "tudf.h" SQnode *qndOpen(const SQnodeOpt *pOption) { SQnode *pQnode = taosMemoryCalloc(1, sizeof(SQnode)); @@ -25,6 +26,8 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { return NULL; } + //udfcOpen(); + if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, &pOption->msgCb)) { taosMemoryFreeClear(pQnode); return NULL; @@ -37,6 +40,8 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { void qndClose(SQnode *pQnode) { qWorkerDestroy((void **)&pQnode->pQuery); + //udfcClose(); + taosMemoryFree(pQnode); } diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 6b2e7c17d3..820a4894b5 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -329,28 +329,40 @@ _return: } int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) { - SScalarFuncExecFuncs ffpSet = {0}; - int32_t code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet); - if (code) { - sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); - SCL_ERR_RET(code); - } - SScalarParam *params = NULL; int32_t rowNum = 0; int32_t paramNum = 0; + int32_t code = 0; SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, ¶mNum, &rowNum)); - output->columnData = createColumnInfoData(&node->node.resType, rowNum); - if (output->columnData == NULL) { - sclError("calloc %d failed", (int32_t)(rowNum * output->columnData->info.bytes)); - SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - code = (*ffpSet.process)(params, paramNum, output); - if (code) { - sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); + if (fmIsUserDefinedFunc(node->funcId)) { +#if 0 + UdfcFuncHandle udfHandle = NULL; + + SCL_ERR_JRET(setupUdf(node->functionName, &udfHandle)); + code = callUdfScalarFunc(udfHandle, params, paramNum, output); + teardownUdf(udfHandle); SCL_ERR_JRET(code); +#endif + } else { + SScalarFuncExecFuncs ffpSet = {0}; + code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet); + if (code) { + sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); + SCL_ERR_JRET(code); + } + + output->columnData = createColumnInfoData(&node->node.resType, rowNum); + if (output->columnData == NULL) { + sclError("calloc %d failed", (int32_t)(rowNum * output->columnData->info.bytes)); + SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + code = (*ffpSet.process)(params, paramNum, output); + if (code) { + sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); + SCL_ERR_JRET(code); + } } _return: From dc8dfee3c86127cadfafd07fb2ae3d4765876f2d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 25 Apr 2022 09:48:37 +0800 Subject: [PATCH 5/5] ci: remove unstable cases --- tests/system-test/fulltest.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 455d70aac7..2954358feb 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -4,4 +4,4 @@ set -e #python3 ./test.py -f 2-query/between.py #python3 ./test.py -f 2-query/distinct.py python3 ./test.py -f 2-query/varchar.py -python3 ./test.py -f 2-query/cast.py +#python3 ./test.py -f 2-query/cast.py