other: merge 3.0
This commit is contained in:
commit
e63d3df1b7
|
@ -51,7 +51,7 @@ typedef struct SMetaData {
|
||||||
SArray *pTableMeta; // STableMeta array
|
SArray *pTableMeta; // STableMeta array
|
||||||
SArray *pVgroupInfo; // SVgroupInfo list
|
SArray *pVgroupInfo; // SVgroupInfo list
|
||||||
SArray *pUdfList; // udf info list
|
SArray *pUdfList; // udf info list
|
||||||
SArray *pEpSetList; // qnode epset list, SArray<SEpSet>
|
SArray *pQnodeList; // qnode list, SArray<SQueryNodeAddr>
|
||||||
} SMetaData;
|
} SMetaData;
|
||||||
|
|
||||||
typedef struct SCatalogCfg {
|
typedef struct SCatalogCfg {
|
||||||
|
|
|
@ -2084,10 +2084,15 @@ int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp
|
||||||
if (tStartDecode(&decoder) < 0) return -1;
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
if (tDecodeI32(&decoder, &num) < 0) return -1;
|
if (tDecodeI32(&decoder, &num) < 0) return -1;
|
||||||
|
if (NULL == pRsp->addrsList) {
|
||||||
pRsp->addrsList = taosArrayInit(num, sizeof(SQueryNodeAddr));
|
pRsp->addrsList = taosArrayInit(num, sizeof(SQueryNodeAddr));
|
||||||
if (NULL == pRsp->addrsList) return -1;
|
if (NULL == pRsp->addrsList) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
if (tDecodeSQueryNodeAddr(&decoder, TARRAY_GET_ELEM(pRsp->addrsList, i)) < 0) return -1;
|
SQueryNodeAddr addr = {0};
|
||||||
|
if (tDecodeSQueryNodeAddr(&decoder, &addr) < 0) return -1;
|
||||||
|
taosArrayPush(pRsp->addrsList, &addr);
|
||||||
}
|
}
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
|
|
@ -173,9 +173,13 @@ static void dmGetServerStatus(SDnode *pDnode, SServerStatusRsp *pStatus) {
|
||||||
|
|
||||||
void dmProcessNettestReq(SDnode *pDnode, SRpcMsg *pRpc) {
|
void dmProcessNettestReq(SDnode *pDnode, SRpcMsg *pRpc) {
|
||||||
dDebug("net test req is received");
|
dDebug("net test req is received");
|
||||||
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = 0};
|
SRpcMsg rsp = {.handle = pRpc->handle, .refId = pRpc->refId, .ahandle = pRpc->ahandle, .code = 0};
|
||||||
rsp.pCont = rpcMallocCont(pRpc->contLen);
|
rsp.pCont = rpcMallocCont(pRpc->contLen);
|
||||||
|
if (rsp.pCont == NULL) {
|
||||||
|
rsp.code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
} else {
|
||||||
rsp.contLen = pRpc->contLen;
|
rsp.contLen = pRpc->contLen;
|
||||||
|
}
|
||||||
rpcSendResponse(&rsp);
|
rpcSendResponse(&rsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -177,6 +177,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
|
dmSetMsgHandle(pWrapper, TDMT_MND_QNODE_LIST, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
|
|
|
@ -451,8 +451,9 @@ static int32_t mndProcessQnodeListReq(SNodeMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void *pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
void *pIter = sdbFetch(pSdb, SDB_QNODE, NULL, (void **)&pObj);
|
pIter = sdbFetch(pSdb, SDB_QNODE, pIter, (void **)&pObj);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
SQueryNodeAddr nodeAddr = {0};
|
SQueryNodeAddr nodeAddr = {0};
|
||||||
|
@ -472,7 +473,7 @@ static int32_t mndProcessQnodeListReq(SNodeMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t rspLen = tSerializeSQnodeListRsp(NULL, 0, &qlistRsp);
|
int32_t rspLen = tSerializeSQnodeListRsp(NULL, 0, &qlistRsp);
|
||||||
void *pRsp = taosMemoryMalloc(rspLen);
|
void *pRsp = rpcMallocCont(rspLen);
|
||||||
if (pRsp == NULL) {
|
if (pRsp == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#include "qndInt.h"
|
#include "qndInt.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "qworker.h"
|
#include "qworker.h"
|
||||||
|
//#include "tudf.h"
|
||||||
|
|
||||||
SQnode *qndOpen(const SQnodeOpt *pOption) {
|
SQnode *qndOpen(const SQnodeOpt *pOption) {
|
||||||
SQnode *pQnode = taosMemoryCalloc(1, sizeof(SQnode));
|
SQnode *pQnode = taosMemoryCalloc(1, sizeof(SQnode));
|
||||||
|
@ -25,6 +26,8 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//udfcOpen();
|
||||||
|
|
||||||
if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, &pOption->msgCb)) {
|
if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, &pOption->msgCb)) {
|
||||||
taosMemoryFreeClear(pQnode);
|
taosMemoryFreeClear(pQnode);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -37,13 +40,15 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
|
||||||
void qndClose(SQnode *pQnode) {
|
void qndClose(SQnode *pQnode) {
|
||||||
qWorkerDestroy((void **)&pQnode->pQuery);
|
qWorkerDestroy((void **)&pQnode->pQuery);
|
||||||
|
|
||||||
|
//udfcClose();
|
||||||
|
|
||||||
taosMemoryFree(pQnode);
|
taosMemoryFree(pQnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; }
|
int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; }
|
||||||
|
|
||||||
int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) {
|
int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) {
|
||||||
qTrace("message in query queue is processing");
|
qTrace("message in qnode query queue is processing");
|
||||||
SReadHandle handle = {0};
|
SReadHandle handle = {0};
|
||||||
|
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
|
|
|
@ -135,7 +135,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
vTrace("message in query queue is processing");
|
vTrace("message in vnode query queue is processing");
|
||||||
SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config};
|
SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config};
|
||||||
|
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
|
|
|
@ -494,7 +494,7 @@ _return:
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SArray **out) {
|
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SArray *out) {
|
||||||
char *msg = NULL;
|
char *msg = NULL;
|
||||||
int32_t msgLen = 0;
|
int32_t msgLen = 0;
|
||||||
|
|
||||||
|
@ -526,7 +526,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmt
|
||||||
CTG_ERR_RET(code);
|
CTG_ERR_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
ctgDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(*out));
|
ctgDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(out));
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -2778,7 +2778,8 @@ int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReq->qNodeRequired) {
|
if (pReq->qNodeRequired) {
|
||||||
CTG_ERR_JRET(ctgGetQnodeListFromMnode(pCtg, pTrans, pMgmtEps, &pRsp->pEpSetList));
|
pRsp->pQnodeList = taosArrayInit(10, sizeof(SQueryNodeAddr));
|
||||||
|
CTG_ERR_JRET(ctgGetQnodeListFromMnode(pCtg, pTrans, pMgmtEps, pRsp->pQnodeList));
|
||||||
}
|
}
|
||||||
|
|
||||||
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
||||||
|
@ -2807,7 +2808,7 @@ int32_t catalogGetQnodeList(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps,
|
||||||
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
CTG_ERR_JRET(ctgGetQnodeListFromMnode(pCtg, pRpc, pMgmtEps, &pQnodeList));
|
CTG_ERR_JRET(ctgGetQnodeListFromMnode(pCtg, pRpc, pMgmtEps, pQnodeList));
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
|
|
|
@ -342,28 +342,20 @@ PROCESS_META_OVER:
|
||||||
|
|
||||||
int32_t queryProcessQnodeListRsp(void *output, char *msg, int32_t msgSize) {
|
int32_t queryProcessQnodeListRsp(void *output, char *msg, int32_t msgSize) {
|
||||||
SQnodeListRsp out = {0};
|
SQnodeListRsp out = {0};
|
||||||
int32_t code = -1;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (NULL == output || NULL == msg || msgSize <= 0) {
|
if (NULL == output || NULL == msg || msgSize <= 0) {
|
||||||
code = TSDB_CODE_TSC_INVALID_INPUT;
|
code = TSDB_CODE_TSC_INVALID_INPUT;
|
||||||
goto PROCESS_QLIST_OVER;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
out.addrsList = (SArray *)output;
|
||||||
if (tDeserializeSQnodeListRsp(msg, msgSize, &out) != 0) {
|
if (tDeserializeSQnodeListRsp(msg, msgSize, &out) != 0) {
|
||||||
qError("invalid qnode list rsp msg, msgSize:%d", msgSize);
|
qError("invalid qnode list rsp msg, msgSize:%d", msgSize);
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
goto PROCESS_QLIST_OVER;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
PROCESS_QLIST_OVER:
|
|
||||||
|
|
||||||
if (code != 0) {
|
|
||||||
tFreeSQnodeListRsp(&out);
|
|
||||||
out.addrsList = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
*(SArray **)output = out.addrsList;
|
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,9 @@ typedef struct SScalarCtx {
|
||||||
#define SCL_DATA_TYPE_DUMMY_HASH 9000
|
#define SCL_DATA_TYPE_DUMMY_HASH 9000
|
||||||
#define SCL_DEFAULT_OP_NUM 10
|
#define SCL_DEFAULT_OP_NUM 10
|
||||||
|
|
||||||
|
#define SCL_IS_CONST_NODE(_node) ((NULL == (_node)) || (QUERY_NODE_VALUE == (_node)->type) || (QUERY_NODE_NODE_LIST == (_node)->type))
|
||||||
|
#define SCL_IS_CONST_CALC(_ctx) (NULL == (_ctx)->pBlockList)
|
||||||
|
|
||||||
#define sclFatal(...) qFatal(__VA_ARGS__)
|
#define sclFatal(...) qFatal(__VA_ARGS__)
|
||||||
#define sclError(...) qError(__VA_ARGS__)
|
#define sclError(...) qError(__VA_ARGS__)
|
||||||
#define sclWarn(...) qWarn(__VA_ARGS__)
|
#define sclWarn(...) qWarn(__VA_ARGS__)
|
||||||
|
|
|
@ -244,23 +244,53 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *rowNum) {
|
int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *paramNum, int32_t *rowNum) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SScalarParam *paramList = taosMemoryCalloc(pParamList->length, sizeof(SScalarParam));
|
if (NULL == pParamList) {
|
||||||
|
if (ctx->pBlockList) {
|
||||||
|
SSDataBlock *pBlock = taosArrayGet(ctx->pBlockList, 0);
|
||||||
|
*rowNum = pBlock->info.rows;
|
||||||
|
} else {
|
||||||
|
*rowNum = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
*paramNum = 1;
|
||||||
|
} else {
|
||||||
|
*paramNum = pParamList->length;
|
||||||
|
}
|
||||||
|
|
||||||
|
SScalarParam *paramList = taosMemoryCalloc(*paramNum, sizeof(SScalarParam));
|
||||||
if (NULL == paramList) {
|
if (NULL == paramList) {
|
||||||
sclError("calloc %d failed", (int32_t)(pParamList->length * sizeof(SScalarParam)));
|
sclError("calloc %d failed", (int32_t)((*paramNum) * sizeof(SScalarParam)));
|
||||||
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
SListCell *cell = pParamList->pHead;
|
if (pParamList) {
|
||||||
for (int32_t i = 0; i < pParamList->length; ++i) {
|
SNode *tnode = NULL;
|
||||||
if (NULL == cell || NULL == cell->pNode) {
|
int32_t i = 0;
|
||||||
sclError("invalid cell, cell:%p, pNode:%p", cell, cell->pNode);
|
if (SCL_IS_CONST_CALC(ctx)) {
|
||||||
SCL_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
WHERE_EACH (tnode, pParamList) {
|
||||||
|
if (!SCL_IS_CONST_NODE(tnode)) {
|
||||||
|
WHERE_NEXT;
|
||||||
|
} else {
|
||||||
|
SCL_ERR_JRET(sclInitParam(tnode, ¶mList[i], ctx, rowNum));
|
||||||
|
ERASE_NODE(pParamList);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCL_ERR_JRET(sclInitParam(cell->pNode, ¶mList[i], ctx, rowNum));
|
++i;
|
||||||
cell = cell->pNext;
|
}
|
||||||
|
} else {
|
||||||
|
FOREACH(tnode, pParamList) {
|
||||||
|
SCL_ERR_JRET(sclInitParam(tnode, ¶mList[i], ctx, rowNum));
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
paramList[0].numOfRows = *rowNum;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (0 == *rowNum) {
|
||||||
|
taosMemoryFreeClear(paramList);
|
||||||
}
|
}
|
||||||
|
|
||||||
*pParams = paramList;
|
*pParams = paramList;
|
||||||
|
@ -299,21 +329,28 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
|
int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
|
||||||
if (NULL == node->pParameterList || node->pParameterList->length <= 0) {
|
|
||||||
sclError("invalid function parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0);
|
|
||||||
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
|
||||||
}
|
|
||||||
|
|
||||||
SScalarFuncExecFuncs ffpSet = {0};
|
|
||||||
int32_t code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
|
|
||||||
if (code) {
|
|
||||||
sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
|
|
||||||
SCL_ERR_RET(code);
|
|
||||||
}
|
|
||||||
|
|
||||||
SScalarParam *params = NULL;
|
SScalarParam *params = NULL;
|
||||||
int32_t rowNum = 0;
|
int32_t rowNum = 0;
|
||||||
SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, &rowNum));
|
int32_t paramNum = 0;
|
||||||
|
int32_t code = 0;
|
||||||
|
SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, ¶mNum, &rowNum));
|
||||||
|
|
||||||
|
if (fmIsUserDefinedFunc(node->funcId)) {
|
||||||
|
#if 0
|
||||||
|
UdfcFuncHandle udfHandle = NULL;
|
||||||
|
|
||||||
|
SCL_ERR_JRET(setupUdf(node->functionName, &udfHandle));
|
||||||
|
code = callUdfScalarFunc(udfHandle, params, paramNum, output);
|
||||||
|
teardownUdf(udfHandle);
|
||||||
|
SCL_ERR_JRET(code);
|
||||||
|
#endif
|
||||||
|
} else {
|
||||||
|
SScalarFuncExecFuncs ffpSet = {0};
|
||||||
|
code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
|
||||||
|
if (code) {
|
||||||
|
sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
|
||||||
|
SCL_ERR_JRET(code);
|
||||||
|
}
|
||||||
|
|
||||||
output->columnData = createColumnInfoData(&node->node.resType, rowNum);
|
output->columnData = createColumnInfoData(&node->node.resType, rowNum);
|
||||||
if (output->columnData == NULL) {
|
if (output->columnData == NULL) {
|
||||||
|
@ -321,15 +358,16 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp
|
||||||
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = (*ffpSet.process)(params, node->pParameterList->length, output);
|
code = (*ffpSet.process)(params, paramNum, output);
|
||||||
if (code) {
|
if (code) {
|
||||||
sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
|
sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
|
||||||
SCL_ERR_JRET(code);
|
SCL_ERR_JRET(code);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
for (int32_t i = 0; i < node->pParameterList->length; ++i) {
|
for (int32_t i = 0; i < paramNum; ++i) {
|
||||||
// sclFreeParamNoData(params + i);
|
// sclFreeParamNoData(params + i);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -355,8 +393,13 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
|
||||||
|
|
||||||
SScalarParam *params = NULL;
|
SScalarParam *params = NULL;
|
||||||
int32_t rowNum = 0;
|
int32_t rowNum = 0;
|
||||||
|
int32_t paramNum = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, &rowNum));
|
SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, ¶mNum, &rowNum));
|
||||||
|
if (NULL == params) {
|
||||||
|
output->numOfRows = 0;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t type = node->node.resType.type;
|
int32_t type = node->node.resType.type;
|
||||||
output->numOfRows = rowNum;
|
output->numOfRows = rowNum;
|
||||||
|
@ -369,25 +412,41 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
|
||||||
}
|
}
|
||||||
|
|
||||||
bool value = false;
|
bool value = false;
|
||||||
|
bool complete = true;
|
||||||
for (int32_t i = 0; i < rowNum; ++i) {
|
for (int32_t i = 0; i < rowNum; ++i) {
|
||||||
for (int32_t m = 0; m < node->pParameterList->length; ++m) {
|
complete = true;
|
||||||
|
for (int32_t m = 0; m < paramNum; ++m) {
|
||||||
|
if (NULL == params[m].columnData) {
|
||||||
|
complete = false;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
char* p = colDataGetData(params[m].columnData, i);
|
char* p = colDataGetData(params[m].columnData, i);
|
||||||
GET_TYPED_DATA(value, bool, params[m].columnData->info.type, p);
|
GET_TYPED_DATA(value, bool, params[m].columnData->info.type, p);
|
||||||
|
|
||||||
if (LOGIC_COND_TYPE_AND == node->condType && (false == value)) {
|
if (LOGIC_COND_TYPE_AND == node->condType && (false == value)) {
|
||||||
|
complete = true;
|
||||||
break;
|
break;
|
||||||
} else if (LOGIC_COND_TYPE_OR == node->condType && value) {
|
} else if (LOGIC_COND_TYPE_OR == node->condType && value) {
|
||||||
|
complete = true;
|
||||||
break;
|
break;
|
||||||
} else if (LOGIC_COND_TYPE_NOT == node->condType) {
|
} else if (LOGIC_COND_TYPE_NOT == node->condType) {
|
||||||
value = !value;
|
value = !value;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (complete) {
|
||||||
colDataAppend(output->columnData, i, (char*) &value, false);
|
colDataAppend(output->columnData, i, (char*) &value, false);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (SCL_IS_CONST_CALC(ctx) && (false == complete)) {
|
||||||
|
sclFreeParam(output);
|
||||||
|
output->numOfRows = 0;
|
||||||
|
}
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
for (int32_t i = 0; i < node->pParameterList->length; ++i) {
|
|
||||||
|
for (int32_t i = 0; i < paramNum; ++i) {
|
||||||
// sclFreeParamNoData(params + i);
|
// sclFreeParamNoData(params + i);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -426,6 +485,17 @@ _return:
|
||||||
|
|
||||||
EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
|
EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
|
||||||
SFunctionNode *node = (SFunctionNode *)*pNode;
|
SFunctionNode *node = (SFunctionNode *)*pNode;
|
||||||
|
SNode* tnode = NULL;
|
||||||
|
if (fmIsUserDefinedFunc(node->funcId)) {
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
FOREACH(tnode, node->pParameterList) {
|
||||||
|
if (!SCL_IS_CONST_NODE(tnode)) {
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SScalarParam output = {0};
|
SScalarParam output = {0};
|
||||||
|
|
||||||
ctx->code = sclExecFunction(node, ctx, &output);
|
ctx->code = sclExecFunction(node, ctx, &output);
|
||||||
|
@ -470,6 +540,10 @@ EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) {
|
||||||
return DEAL_RES_ERROR;
|
return DEAL_RES_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (0 == output.numOfRows) {
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
|
SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
|
||||||
if (NULL == res) {
|
if (NULL == res) {
|
||||||
sclError("make value node failed");
|
sclError("make value node failed");
|
||||||
|
@ -498,6 +572,14 @@ EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) {
|
||||||
EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
|
EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
|
||||||
SOperatorNode *node = (SOperatorNode *)*pNode;
|
SOperatorNode *node = (SOperatorNode *)*pNode;
|
||||||
|
|
||||||
|
if (!SCL_IS_CONST_NODE(node->pLeft)) {
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!SCL_IS_CONST_NODE(node->pRight)) {
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
SScalarParam output = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
|
SScalarParam output = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
|
||||||
ctx->code = sclExecOperator(node, ctx, &output);
|
ctx->code = sclExecOperator(node, ctx, &output);
|
||||||
if (ctx->code) {
|
if (ctx->code) {
|
||||||
|
@ -530,7 +612,7 @@ EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) {
|
EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) {
|
||||||
if (QUERY_NODE_VALUE == nodeType(*pNode) || QUERY_NODE_NODE_LIST == nodeType(*pNode)) {
|
if (QUERY_NODE_VALUE == nodeType(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode) || QUERY_NODE_NODE_LIST == nodeType(*pNode)) {
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -137,6 +137,11 @@ void scltMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in
|
||||||
rnode->node.resType.bytes = dataBytes;
|
rnode->node.resType.bytes = dataBytes;
|
||||||
rnode->dataBlockId = 0;
|
rnode->dataBlockId = 0;
|
||||||
|
|
||||||
|
if (NULL == block) {
|
||||||
|
*pNode = (SNode *)rnode;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (NULL == *block) {
|
if (NULL == *block) {
|
||||||
SSDataBlock *res = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock));
|
SSDataBlock *res = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||||
res->info.numOfCols = 3;
|
res->info.numOfCols = 3;
|
||||||
|
@ -889,6 +894,8 @@ TEST(constantTest, int_greater_int_is_true2) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(constantTest, greater_and_lower) {
|
TEST(constantTest, greater_and_lower) {
|
||||||
|
scltInitLogFile();
|
||||||
|
|
||||||
SNode *pval1 = NULL, *pval2 = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL, *res = NULL;
|
SNode *pval1 = NULL, *pval2 = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL, *res = NULL;
|
||||||
bool eRes[5] = {false, false, true, true, true};
|
bool eRes[5] = {false, false, true, true, true};
|
||||||
int64_t v1 = 333, v2 = 222, v3 = -10, v4 = 20;
|
int64_t v1 = 333, v2 = 222, v3 = -10, v4 = 20;
|
||||||
|
@ -913,6 +920,115 @@ TEST(constantTest, greater_and_lower) {
|
||||||
nodesDestroyNode(res);
|
nodesDestroyNode(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(constantTest, column_and_value1) {
|
||||||
|
scltInitLogFile();
|
||||||
|
|
||||||
|
SNode *pval1 = NULL, *pval2 = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL, *res = NULL;
|
||||||
|
bool eRes[5] = {false, false, true, true, true};
|
||||||
|
int64_t v1 = 333, v2 = 222, v3 = -10, v4 = 20;
|
||||||
|
SNode *list[2] = {0};
|
||||||
|
scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v1);
|
||||||
|
scltMakeValueNode(&pval2, TSDB_DATA_TYPE_BIGINT, &v2);
|
||||||
|
scltMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2);
|
||||||
|
scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v3);
|
||||||
|
scltMakeColumnNode(&pval2, NULL, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), 0, NULL);
|
||||||
|
scltMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2);
|
||||||
|
list[0] = opNode1;
|
||||||
|
list[1] = opNode2;
|
||||||
|
scltMakeLogicNode(&logicNode, LOGIC_COND_TYPE_AND, list, 2);
|
||||||
|
|
||||||
|
int32_t code = scalarCalculateConstants(logicNode, &res);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
ASSERT_TRUE(res);
|
||||||
|
ASSERT_EQ(nodeType(res), QUERY_NODE_LOGIC_CONDITION);
|
||||||
|
SLogicConditionNode *v = (SLogicConditionNode *)res;
|
||||||
|
ASSERT_EQ(v->condType, LOGIC_COND_TYPE_AND);
|
||||||
|
ASSERT_EQ(v->pParameterList->length, 1);
|
||||||
|
nodesDestroyNode(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(constantTest, column_and_value2) {
|
||||||
|
scltInitLogFile();
|
||||||
|
|
||||||
|
SNode *pval1 = NULL, *pval2 = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL, *res = NULL;
|
||||||
|
bool eRes[5] = {false, false, true, true, true};
|
||||||
|
int64_t v1 = 333, v2 = 222, v3 = -10, v4 = 20;
|
||||||
|
SNode *list[2] = {0};
|
||||||
|
scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v1);
|
||||||
|
scltMakeValueNode(&pval2, TSDB_DATA_TYPE_BIGINT, &v2);
|
||||||
|
scltMakeOpNode(&opNode1, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2);
|
||||||
|
scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v3);
|
||||||
|
scltMakeColumnNode(&pval2, NULL, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), 0, NULL);
|
||||||
|
scltMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2);
|
||||||
|
list[0] = opNode1;
|
||||||
|
list[1] = opNode2;
|
||||||
|
scltMakeLogicNode(&logicNode, LOGIC_COND_TYPE_AND, list, 2);
|
||||||
|
|
||||||
|
int32_t code = scalarCalculateConstants(logicNode, &res);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
ASSERT_TRUE(res);
|
||||||
|
ASSERT_EQ(nodeType(res), QUERY_NODE_VALUE);
|
||||||
|
SValueNode *v = (SValueNode *)res;
|
||||||
|
ASSERT_EQ(v->node.resType.type, TSDB_DATA_TYPE_BOOL);
|
||||||
|
ASSERT_EQ(v->datum.b, false);
|
||||||
|
nodesDestroyNode(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(constantTest, column_and_value3) {
|
||||||
|
scltInitLogFile();
|
||||||
|
|
||||||
|
SNode *pval1 = NULL, *pval2 = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL, *res = NULL;
|
||||||
|
bool eRes[5] = {false, false, true, true, true};
|
||||||
|
int64_t v1 = 333, v2 = 222, v3 = -10, v4 = 20;
|
||||||
|
SNode *list[2] = {0};
|
||||||
|
scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v1);
|
||||||
|
scltMakeValueNode(&pval2, TSDB_DATA_TYPE_BIGINT, &v2);
|
||||||
|
scltMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2);
|
||||||
|
scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v3);
|
||||||
|
scltMakeColumnNode(&pval2, NULL, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), 0, NULL);
|
||||||
|
scltMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2);
|
||||||
|
list[0] = opNode1;
|
||||||
|
list[1] = opNode2;
|
||||||
|
scltMakeLogicNode(&logicNode, LOGIC_COND_TYPE_OR, list, 2);
|
||||||
|
|
||||||
|
int32_t code = scalarCalculateConstants(logicNode, &res);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
ASSERT_TRUE(res);
|
||||||
|
ASSERT_EQ(nodeType(res), QUERY_NODE_VALUE);
|
||||||
|
SValueNode *v = (SValueNode *)res;
|
||||||
|
ASSERT_EQ(v->node.resType.type, TSDB_DATA_TYPE_BOOL);
|
||||||
|
ASSERT_EQ(v->datum.b, true);
|
||||||
|
nodesDestroyNode(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(constantTest, column_and_value4) {
|
||||||
|
scltInitLogFile();
|
||||||
|
|
||||||
|
SNode *pval1 = NULL, *pval2 = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL, *res = NULL;
|
||||||
|
bool eRes[5] = {false, false, true, true, true};
|
||||||
|
int64_t v1 = 333, v2 = 222, v3 = -10, v4 = 20;
|
||||||
|
SNode *list[2] = {0};
|
||||||
|
scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v1);
|
||||||
|
scltMakeValueNode(&pval2, TSDB_DATA_TYPE_BIGINT, &v2);
|
||||||
|
scltMakeOpNode(&opNode1, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2);
|
||||||
|
scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v3);
|
||||||
|
scltMakeColumnNode(&pval2, NULL, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), 0, NULL);
|
||||||
|
scltMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2);
|
||||||
|
list[0] = opNode1;
|
||||||
|
list[1] = opNode2;
|
||||||
|
scltMakeLogicNode(&logicNode, LOGIC_COND_TYPE_OR, list, 2);
|
||||||
|
|
||||||
|
int32_t code = scalarCalculateConstants(logicNode, &res);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
ASSERT_TRUE(res);
|
||||||
|
ASSERT_EQ(nodeType(res), QUERY_NODE_LOGIC_CONDITION);
|
||||||
|
SLogicConditionNode *v = (SLogicConditionNode *)res;
|
||||||
|
ASSERT_EQ(v->condType, LOGIC_COND_TYPE_OR);
|
||||||
|
ASSERT_EQ(v->pParameterList->length, 1);
|
||||||
|
nodesDestroyNode(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void makeJsonArrow(SSDataBlock **src, SNode **opNode, void *json, char *key){
|
void makeJsonArrow(SSDataBlock **src, SNode **opNode, void *json, char *key){
|
||||||
char keyVar[32] = {0};
|
char keyVar[32] = {0};
|
||||||
memcpy(varDataVal(keyVar), key, strlen(key));
|
memcpy(varDataVal(keyVar), key, strlen(key));
|
||||||
|
|
Loading…
Reference in New Issue