feature/qnode

This commit is contained in:
dapan1121 2022-04-24 16:27:09 +08:00
parent d16a2e276a
commit ef0365ebeb
3 changed files with 86 additions and 17 deletions

View File

@ -339,7 +339,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo
.ahandle = pConn->ahandle, .ahandle = pConn->ahandle,
.msgType = TDMT_VND_QUERY_HEARTBEAT, .msgType = TDMT_VND_QUERY_HEARTBEAT,
.pCont = msg, .pCont = msg,
.contLen = sizeof(SSchedulerHbReq), .contLen = msgSize,
.code = TSDB_CODE_RPC_NETWORK_UNAVAIL, .code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
}; };

View File

@ -32,6 +32,9 @@ typedef struct SScalarCtx {
#define SCL_DATA_TYPE_DUMMY_HASH 9000 #define SCL_DATA_TYPE_DUMMY_HASH 9000
#define SCL_DEFAULT_OP_NUM 10 #define SCL_DEFAULT_OP_NUM 10
#define SCL_IS_CONST_NODE(_node) ((NULL == (_node)) || (QUERY_NODE_VALUE == (_node)->type))
#define SCL_IS_CONST_CALC(_ctx) (NULL == (_ctx)->pBlockList)
#define sclFatal(...) qFatal(__VA_ARGS__) #define sclFatal(...) qFatal(__VA_ARGS__)
#define sclError(...) qError(__VA_ARGS__) #define sclError(...) qError(__VA_ARGS__)
#define sclWarn(...) qWarn(__VA_ARGS__) #define sclWarn(...) qWarn(__VA_ARGS__)

View File

@ -244,23 +244,55 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *rowNum) { int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *rowNum) {
int32_t code = 0; int32_t code = 0;
SScalarParam *paramList = taosMemoryCalloc(pParamList->length, sizeof(SScalarParam)); int32_t paramNum = 0;
if (NULL == pParamList) {
if (ctx->pBlockList) {
SSDataBlock *pBlock = taosArrayGet(ctx->pBlockList, 0);
*rowNum = pBlock->info.rows;
} else {
*rowNum = 1;
}
paramNum = 1;
} else {
paramNum = pParamList->length;
}
SScalarParam *paramList = taosMemoryCalloc(paramNum, sizeof(SScalarParam));
if (NULL == paramList) { if (NULL == paramList) {
sclError("calloc %d failed", (int32_t)(pParamList->length * sizeof(SScalarParam))); sclError("calloc %d failed", (int32_t)(paramNum * sizeof(SScalarParam)));
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SListCell *cell = pParamList->pHead; if (pParamList) {
for (int32_t i = 0; i < pParamList->length; ++i) { SNode *tnode = NULL;
if (NULL == cell || NULL == cell->pNode) { int32_t i = 0;
sclError("invalid cell, cell:%p, pNode:%p", cell, cell->pNode); if (SCL_IS_CONST_CALC(ctx)) {
SCL_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); WHERE_EACH (tnode, pParamList) {
if (!SCL_IS_CONST_NODE(tnode)) {
continue;
} else {
SCL_ERR_JRET(sclInitParam(tnode, &paramList[i], ctx, rowNum));
ERASE_NODE(pParamList);
}
++i;
WHERE_NEXT;
}
} else {
FOREACH(tnode, pParamList) {
SCL_ERR_JRET(sclInitParam(tnode, &paramList[i], ctx, rowNum));
++i;
}
} }
} else {
paramList[0].numOfRows = *rowNum;
}
SCL_ERR_JRET(sclInitParam(cell->pNode, &paramList[i], ctx, rowNum)); if (0 == *rowNum) {
cell = cell->pNext; taosMemoryFreeClear(paramList);
} }
*pParams = paramList; *pParams = paramList;
@ -299,11 +331,6 @@ _return:
} }
int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) { int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
if (NULL == node->pParameterList || node->pParameterList->length <= 0) {
sclError("invalid function parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0);
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SScalarFuncExecFuncs ffpSet = {0}; SScalarFuncExecFuncs ffpSet = {0};
int32_t code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet); int32_t code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
if (code) { if (code) {
@ -357,6 +384,10 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
int32_t rowNum = 0; int32_t rowNum = 0;
int32_t code = 0; int32_t code = 0;
SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &rowNum)); SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &rowNum));
if (NULL == params) {
output->numOfRows = 0;
return TSDB_CODE_SUCCESS;
}
int32_t type = node->node.resType.type; int32_t type = node->node.resType.type;
output->numOfRows = rowNum; output->numOfRows = rowNum;
@ -369,14 +400,20 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
} }
bool value = false; bool value = false;
bool complete = false;
for (int32_t i = 0; i < rowNum; ++i) { for (int32_t i = 0; i < rowNum; ++i) {
for (int32_t m = 0; m < node->pParameterList->length; ++m) { for (int32_t m = 0; m < node->pParameterList->length; ++m) {
if (NULL == params[m].columnData) {
continue;
}
char* p = colDataGetData(params[m].columnData, i); char* p = colDataGetData(params[m].columnData, i);
GET_TYPED_DATA(value, bool, params[m].columnData->info.type, p); GET_TYPED_DATA(value, bool, params[m].columnData->info.type, p);
if (LOGIC_COND_TYPE_AND == node->condType && (false == value)) { if (LOGIC_COND_TYPE_AND == node->condType && (false == value)) {
complete = true;
break; break;
} else if (LOGIC_COND_TYPE_OR == node->condType && value) { } else if (LOGIC_COND_TYPE_OR == node->condType && value) {
complete = true;
break; break;
} else if (LOGIC_COND_TYPE_NOT == node->condType) { } else if (LOGIC_COND_TYPE_NOT == node->condType) {
value = !value; value = !value;
@ -386,7 +423,13 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
colDataAppend(output->columnData, i, (char*) &value, false); colDataAppend(output->columnData, i, (char*) &value, false);
} }
if (SCL_IS_CONST_CALC(ctx) && (false == complete)) {
sclFreeParam(output);
output->numOfRows = 0;
}
_return: _return:
for (int32_t i = 0; i < node->pParameterList->length; ++i) { for (int32_t i = 0; i < node->pParameterList->length; ++i) {
// sclFreeParamNoData(params + i); // sclFreeParamNoData(params + i);
} }
@ -426,6 +469,17 @@ _return:
EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) { EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
SFunctionNode *node = (SFunctionNode *)*pNode; SFunctionNode *node = (SFunctionNode *)*pNode;
SNode* tnode = NULL;
if (fmIsUserDefinedFunc(node->funcId)) {
return DEAL_RES_CONTINUE;
}
FOREACH(tnode, node->pParameterList) {
if (!SCL_IS_CONST_NODE(tnode)) {
return DEAL_RES_CONTINUE;
}
}
SScalarParam output = {0}; SScalarParam output = {0};
ctx->code = sclExecFunction(node, ctx, &output); ctx->code = sclExecFunction(node, ctx, &output);
@ -470,6 +524,10 @@ EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) {
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
if (0 == output.numOfRows) {
return DEAL_RES_CONTINUE;
}
SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE); SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == res) { if (NULL == res) {
sclError("make value node failed"); sclError("make value node failed");
@ -498,6 +556,14 @@ EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) {
EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) { EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
SOperatorNode *node = (SOperatorNode *)*pNode; SOperatorNode *node = (SOperatorNode *)*pNode;
if (!SCL_IS_CONST_NODE(node->pLeft)) {
return DEAL_RES_CONTINUE;
}
if (!SCL_IS_CONST_NODE(node->pRight)) {
return DEAL_RES_CONTINUE;
}
SScalarParam output = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))}; SScalarParam output = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
ctx->code = sclExecOperator(node, ctx, &output); ctx->code = sclExecOperator(node, ctx, &output);
if (ctx->code) { if (ctx->code) {
@ -530,7 +596,7 @@ EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
} }
EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) { EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) {
if (QUERY_NODE_VALUE == nodeType(*pNode) || QUERY_NODE_NODE_LIST == nodeType(*pNode)) { if (QUERY_NODE_VALUE == nodeType(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode) || QUERY_NODE_NODE_LIST == nodeType(*pNode)) {
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }