diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 05c9da1a8a..27dae6d210 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -240,6 +240,7 @@ typedef struct SNodeList { #define SNodeptr void* +int32_t nodesNodeSize(ENodeType type); SNodeptr nodesMakeNode(ENodeType type); void nodesDestroyNode(SNodeptr pNode); diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 01f3132c69..8c1482894a 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -48,6 +48,7 @@ typedef struct SExprNode { ENodeType type; SDataType resType; char aliasName[TSDB_COL_NAME_LEN]; + char userAlias[TSDB_COL_NAME_LEN]; SArray* pAssociation; } SExprNode; @@ -325,7 +326,7 @@ typedef struct SQuery { bool showRewrite; int32_t placeholderNum; SArray* pPlaceholderValues; - SNode* pContainPlaceholderRoot; + SNode* pPrepareRoot; } SQuery; void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 192e04c9a3..e0ede63352 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1314,6 +1314,90 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { return err; } +int compareUdfcFuncSub(const void* elem1, const void* elem2) { + SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1; + SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2; + return strcmp(stub1->udfName, stub2->udfName); +} + +int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) { + int32_t code = 0; + uv_mutex_lock(&gUdfdProxy.udfStubsMutex); + SUdfcFuncStub key = {0}; + strcpy(key.udfName, udfName); + int32_t stubIndex = taosArraySearchIdx(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); + if (stubIndex != -1) { + SUdfcFuncStub *foundStub = taosArrayGet(gUdfdProxy.udfStubs, stubIndex); + UdfcFuncHandle handle = foundStub->handle; + if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) { + *pHandle = foundStub->handle; + ++foundStub->refCount; + foundStub->lastRefTime = taosGetTimestampUs(); + uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); + return 0; + } else { + fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache", + udfName, foundStub->refCount, foundStub->lastRefTime); + taosArrayRemove(gUdfdProxy.udfStubs, stubIndex); + } + } + *pHandle = NULL; + code = doSetupUdf(udfName, pHandle); + if (code == TSDB_CODE_SUCCESS) { + SUdfcFuncStub stub = {0}; + strcpy(stub.udfName, udfName); + stub.handle = *pHandle; + ++stub.refCount; + stub.lastRefTime = taosGetTimestampUs(); + taosArrayPush(gUdfdProxy.udfStubs, &stub); + taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub); + } else { + *pHandle = NULL; + } + + uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); + return code; +} + +void releaseUdfFuncHandle(char* udfName) { + uv_mutex_lock(&gUdfdProxy.udfStubsMutex); + SUdfcFuncStub key = {0}; + strcpy(key.udfName, udfName); + SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); + ASSERT(foundStub); + --foundStub->refCount; + ASSERT(foundStub->refCount>=0); + uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); +} + +int32_t cleanUpUdfs() { + uv_mutex_lock(&gUdfdProxy.udfStubsMutex); + int32_t i = 0; + SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); + while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) { + SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i); + if (stub->refCount == 0) { + fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle); + doTeardownUdf(stub->handle); + } else { + fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p", + stub->udfName, stub->refCount, stub->lastRefTime, stub->handle); + UdfcFuncHandle handle = stub->handle; + if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) { + taosArrayPush(udfStubs, stub); + } else { + fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache", + stub->udfName, stub->refCount, stub->lastRefTime); + } + } + ++i; + } + taosArrayDestroy(gUdfdProxy.udfStubs); + gUdfdProxy.udfStubs = udfStubs; + uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); + return 0; +} + int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, SSDataBlock* output, SUdfInterBuf *newState) { fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle); @@ -1437,57 +1521,10 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t return err; } -int compareUdfcFuncSub(const void* elem1, const void* elem2) { - SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1; - SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2; - return strcmp(stub1->udfName, stub2->udfName); -} - -int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) { - int32_t code = 0; - uv_mutex_lock(&gUdfdProxy.udfStubsMutex); - SUdfcFuncStub key = {0}; - strcpy(key.udfName, udfName); - SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); - if (foundStub != NULL) { - uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); - *pHandle = foundStub->handle; - ++foundStub->refCount; - foundStub->lastRefTime = taosGetTimestampUs(); - return 0; - } - *pHandle = NULL; - code = doSetupUdf(udfName, pHandle); - if (code == TSDB_CODE_SUCCESS) { - SUdfcFuncStub stub = {0}; - strcpy(stub.udfName, udfName); - stub.handle = *pHandle; - ++stub.refCount; - stub.lastRefTime = taosGetTimestampUs(); - taosArrayPush(gUdfdProxy.udfStubs, &stub); - taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub); - } else { - *pHandle = NULL; - } - - uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); - return code; -} - -void releaseUdfFuncHandle(char* udfName) { - uv_mutex_lock(&gUdfdProxy.udfStubsMutex); - SUdfcFuncStub key = {0}; - strcpy(key.udfName, udfName); - SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); - ASSERT(foundStub); - --foundStub->refCount; - ASSERT(foundStub->refCount>=0); - uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); -} int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) { UdfcFuncHandle handle = NULL; - int32_t code = accquireUdfFuncHandle(udfName, &handle); + int32_t code = acquireUdfFuncHandle(udfName, &handle); if (code != 0) { return code; } @@ -1549,7 +1586,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult } UdfcFuncHandle handle; int32_t udfCode = 0; - if ((udfCode = accquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) { + if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) { fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode); return false; } @@ -1662,25 +1699,3 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { releaseUdfFuncHandle(pCtx->udfName); return udfCallCode == 0 ? numOfResults : udfCallCode; } - -int32_t cleanUpUdfs() { - uv_mutex_lock(&gUdfdProxy.udfStubsMutex); - int32_t i = 0; - SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); - while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) { - SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i); - if (stub->refCount == 0) { - fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle); - doTeardownUdf(stub->handle); - } else { - fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p", - stub->udfName, stub->refCount, stub->lastRefTime, stub->handle); - taosArrayPush(udfStubs, stub); - } - ++i; - } - taosArrayDestroy(gUdfdProxy.udfStubs); - gUdfdProxy.udfStubs = udfStubs; - uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); - return 0; -} \ No newline at end of file diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 19421072f8..78fffc8d8a 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -86,17 +86,148 @@ typedef struct SUdf { TUdfDestroyFunc destroyFunc; } SUdf; -// TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix // TODO: add private udf structure. typedef struct SUdfcFuncHandle { SUdf *udf; } SUdfcFuncHandle; -int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf); +typedef enum EUdfdRpcReqRspType { + UDFD_RPC_MNODE_CONNECT = 0, + UDFD_RPC_RETRIVE_FUNC, +} EUdfdRpcReqRspType; + +typedef struct SUdfdRpcSendRecvInfo { + EUdfdRpcReqRspType rpcType; + int32_t code; + void* param; + uv_sem_t resultSem; +} SUdfdRpcSendRecvInfo; + +void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { + SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->ahandle; + ASSERT(pMsg->ahandle != NULL); + + if (pEpSet) { + if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) { + updateEpSet_s(&global.mgmtEp, pEpSet); + } + } + + if (pMsg->code != TSDB_CODE_SUCCESS) { + fnError("udfd rpc error. code: %s", tstrerror(pMsg->code)); + msgInfo->code = pMsg->code; + goto _return; + } + + if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) { + SConnectRsp connectRsp = {0}; + tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp); + if (connectRsp.epSet.numOfEps == 0) { + msgInfo->code = TSDB_CODE_MND_APP_ERROR; + goto _return; + } + + if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) { + updateEpSet_s(&global.mgmtEp, &connectRsp.epSet); + } + msgInfo->code = 0; + } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) { + SRetrieveFuncRsp retrieveRsp = {0}; + tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp); + + SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); + SUdf* udf = msgInfo->param; + udf->funcType = pFuncInfo->funcType; + udf->scriptType = pFuncInfo->scriptType; + udf->outputType = pFuncInfo->funcType; + udf->outputLen = pFuncInfo->outputLen; + udf->bufSize = pFuncInfo->bufSize; + + char path[PATH_MAX] = {0}; + snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", pFuncInfo->name); + TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); + // TODO check for failure of flush to disk + taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); + taosCloseFile(&file); + strncpy(udf->path, path, strlen(path)); + tFreeSFuncInfo(pFuncInfo); + taosArrayDestroy(retrieveRsp.pFuncInfos); + msgInfo->code = 0; + } + +_return: + rpcFreeCont(pMsg->pCont); + uv_sem_post(&msgInfo->resultSem); + return; +} + +int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) { + SRetrieveFuncReq retrieveReq = {0}; + retrieveReq.numOfFuncs = 1; + retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); + taosArrayPush(retrieveReq.pFuncNames, udfName); + + int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); + void *pReq = rpcMallocCont(contLen); + tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq); + taosArrayDestroy(retrieveReq.pFuncNames); + + SUdfdRpcSendRecvInfo* msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); + msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC; + msgInfo->param = udf; + uv_sem_init(&msgInfo->resultSem, 0); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = contLen; + rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC; + rpcMsg.ahandle = msgInfo; + rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); + + uv_sem_wait(&msgInfo->resultSem); + uv_sem_destroy(&msgInfo->resultSem); + int32_t code = msgInfo->code; + taosMemoryFree(msgInfo); + return code; +} + +int32_t udfdConnectToMnode() { + SConnectReq connReq = {0}; + connReq.connType = CONN_TYPE__UDFD; + tstrncpy(connReq.app, "udfd",sizeof(connReq.app)); + tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user)); + char pass[TSDB_PASSWORD_LEN + 1] = {0}; + taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass); + tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd)); + connReq.pid = htonl(taosGetPId()); + connReq.startTime = htobe64(taosGetTimestampMs()); + + int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSConnectReq(pReq, contLen, &connReq); + + SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); + msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT; + uv_sem_init(&msgInfo->resultSem, 0); + + SRpcMsg rpcMsg = {0}; + rpcMsg.msgType = TDMT_MND_CONNECT; + rpcMsg.pCont = pReq; + rpcMsg.contLen = contLen; + rpcMsg.ahandle = msgInfo; + rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); + + uv_sem_wait(&msgInfo->resultSem); + int32_t code = msgInfo->code; + uv_sem_destroy(&msgInfo->resultSem); + taosMemoryFree(msgInfo); + return code; +} int32_t udfdLoadUdf(char *udfName, SUdf *udf) { strcpy(udf->name, udfName); int32_t err = 0; + err = udfdFillUdfInfoFromMNode(global.clientRpc, udf->name, udf); if (err != 0) { fnError("can not retrieve udf from mnode. udf name %s", udfName); @@ -515,140 +646,6 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { uv_stop(global.loop); } -typedef enum EUdfdRpcReqRspType { - UDFD_RPC_MNODE_CONNECT = 0, - UDFD_RPC_RETRIVE_FUNC, -} EUdfdRpcReqRspType; - -typedef struct SUdfdRpcSendRecvInfo { - EUdfdRpcReqRspType rpcType; - int32_t code; - void* param; - uv_sem_t resultSem; -} SUdfdRpcSendRecvInfo; - - -void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { - SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->ahandle; - ASSERT(pMsg->ahandle != NULL); - - if (pEpSet) { - if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) { - updateEpSet_s(&global.mgmtEp, pEpSet); - } - } - - if (pMsg->code != TSDB_CODE_SUCCESS) { - fnError("udfd rpc error. code: %s", tstrerror(pMsg->code)); - msgInfo->code = pMsg->code; - goto _return; - } - - if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) { - SConnectRsp connectRsp = {0}; - tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp); - if (connectRsp.epSet.numOfEps == 0) { - msgInfo->code = TSDB_CODE_MND_APP_ERROR; - goto _return; - } - - if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) { - updateEpSet_s(&global.mgmtEp, &connectRsp.epSet); - } - msgInfo->code = 0; - } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) { - SRetrieveFuncRsp retrieveRsp = {0}; - tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp); - - SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); - SUdf* udf = msgInfo->param; - udf->funcType = pFuncInfo->funcType; - udf->scriptType = pFuncInfo->scriptType; - udf->outputType = pFuncInfo->funcType; - udf->outputLen = pFuncInfo->outputLen; - udf->bufSize = pFuncInfo->bufSize; - - char path[PATH_MAX] = {0}; - snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", pFuncInfo->name); - TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); - // TODO check for failure of flush to disk - taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); - taosCloseFile(&file); - strncpy(udf->path, path, strlen(path)); - tFreeSFuncInfo(pFuncInfo); - taosArrayDestroy(retrieveRsp.pFuncInfos); - msgInfo->code = 0; - } - -_return: - rpcFreeCont(pMsg->pCont); - uv_sem_post(&msgInfo->resultSem); - return; -} - -int32_t udfdConnectToMNode() { - SConnectReq connReq = {0}; - connReq.connType = CONN_TYPE__UDFD; - tstrncpy(connReq.app, "udfd",sizeof(connReq.app)); - tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user)); - char pass[TSDB_PASSWORD_LEN + 1] = {0}; - taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass); - tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd)); - connReq.pid = htonl(taosGetPId()); - connReq.startTime = htobe64(taosGetTimestampMs()); - - int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq); - void* pReq = rpcMallocCont(contLen); - tSerializeSConnectReq(pReq, contLen, &connReq); - - SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); - msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT; - uv_sem_init(&msgInfo->resultSem, 0); - - SRpcMsg rpcMsg = {0}; - rpcMsg.msgType = TDMT_MND_CONNECT; - rpcMsg.pCont = pReq; - rpcMsg.contLen = contLen; - rpcMsg.ahandle = msgInfo; - rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); - - uv_sem_wait(&msgInfo->resultSem); - int32_t code = msgInfo->code; - uv_sem_destroy(&msgInfo->resultSem); - taosMemoryFree(msgInfo); - return code; -} - -int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) { - SRetrieveFuncReq retrieveReq = {0}; - retrieveReq.numOfFuncs = 1; - retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); - taosArrayPush(retrieveReq.pFuncNames, udfName); - - int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); - void *pReq = rpcMallocCont(contLen); - tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq); - taosArrayDestroy(retrieveReq.pFuncNames); - - SUdfdRpcSendRecvInfo* msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); - msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC; - msgInfo->param = udf; - uv_sem_init(&msgInfo->resultSem, 0); - - SRpcMsg rpcMsg = {0}; - rpcMsg.pCont = pReq; - rpcMsg.contLen = contLen; - rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC; - rpcMsg.ahandle = msgInfo; - rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); - - uv_sem_wait(&msgInfo->resultSem); - uv_sem_destroy(&msgInfo->resultSem); - int32_t code = msgInfo->code; - taosMemoryFree(msgInfo); - return code; -} - static bool udfdRpcRfp(int32_t code) { if (code == TSDB_CODE_RPC_REDIRECT) { return true; @@ -884,7 +881,18 @@ int main(int argc, char *argv[]) { return -3; } - if (udfdConnectToMNode() != 0) { + int32_t retryMnodeTimes = 0; + int32_t code = 0; + while (retryMnodeTimes++ < TSDB_MAX_REPLICA) { + uv_sleep(500 * ( 1 << retryMnodeTimes)); + code = udfdConnectToMnode(); + if (code == 0) { + break; + } + fnError("can not connect to mnode, code: %s. retry", tstrerror(code)); + } + + if (code != 0) { fnError("failed to start since can not connect to mnode"); return -4; } diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c index 9fe9269a3f..15109db220 100644 --- a/source/libs/function/test/runUdf.c +++ b/source/libs/function/test/runUdf.c @@ -41,41 +41,41 @@ int scalarFuncTest() { fnError("setup udf failure"); return -1; } - - SSDataBlock block = {0}; - SSDataBlock *pBlock = █ - pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); - pBlock->info.numOfCols = 1; - pBlock->info.rows = 4; - char data[16] = {0}; - char bitmap[4] = {0}; - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData colInfo = {0}; - colInfo.info.type = TSDB_DATA_TYPE_INT; - colInfo.info.bytes = sizeof(int32_t); - colInfo.info.colId = 1; - colInfo.pData = data; - colInfo.nullbitmap = bitmap; - for (int32_t j = 0; j < pBlock->info.rows; ++j) { - colDataAppendInt32(&colInfo, j, &j); + int64_t beg = taosGetTimestampUs(); + for (int k = 0; k < 1; ++k) { + SSDataBlock block = {0}; + SSDataBlock *pBlock = █ + pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); + pBlock->info.numOfCols = 1; + pBlock->info.rows = 1024; + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData colInfo = {0}; + colInfo.info.type = TSDB_DATA_TYPE_INT; + colInfo.info.bytes = sizeof(int32_t); + colInfo.info.colId = 1; + colInfoDataEnsureCapacity(&colInfo, 0, pBlock->info.rows); + for (int32_t j = 0; j < pBlock->info.rows; ++j) { + colDataAppendInt32(&colInfo, j, &j); + } + taosArrayPush(pBlock->pDataBlock, &colInfo); } - taosArrayPush(pBlock->pDataBlock, &colInfo); + + SScalarParam input = {0}; + input.numOfRows = pBlock->info.rows; + input.columnData = taosArrayGet(pBlock->pDataBlock, 0); + SScalarParam output = {0}; + doCallUdfScalarFunc(handle, &input, 1, &output); + taosArrayDestroy(pBlock->pDataBlock); + SColumnInfoData *col = output.columnData; + for (int32_t i = 0; i < output.numOfRows; ++i) { + if (i % 100 == 0) + fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t))); + } + colDataDestroy(output.columnData); + taosMemoryFree(output.columnData); } - - SScalarParam input = {0}; - input.numOfRows = pBlock->info.rows; - input.columnData = taosArrayGet(pBlock->pDataBlock, 0); - SScalarParam output = {0}; - doCallUdfScalarFunc(handle, &input, 1, &output); - taosArrayDestroy(pBlock->pDataBlock); - SColumnInfoData *col = output.columnData; - for (int32_t i = 0; i < output.numOfRows; ++i) { - fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t))); - } - - colDataDestroy(output.columnData); - taosMemoryFree(output.columnData); - + int64_t end = taosGetTimestampUs(); + fprintf(stderr, "time: %f\n", (end-beg)/1000.0); doTeardownUdf(handle); return 0; @@ -93,16 +93,13 @@ int aggregateFuncTest() { SSDataBlock *pBlock = █ pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); pBlock->info.numOfCols = 1; - pBlock->info.rows = 4; - char data[16] = {0}; - char bitmap[4] = {0}; + pBlock->info.rows = 1024; for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { SColumnInfoData colInfo = {0}; colInfo.info.type = TSDB_DATA_TYPE_INT; colInfo.info.bytes = sizeof(int32_t); colInfo.info.colId = 1; - colInfo.pData = data; - colInfo.nullbitmap = bitmap; + colInfoDataEnsureCapacity(&colInfo, 0, pBlock->info.rows); for (int32_t j = 0; j < pBlock->info.rows; ++j) { colDataAppendInt32(&colInfo, j, &j); } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 5e9e4e1d57..8019200e76 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -19,21 +19,6 @@ #include "taos.h" #include "taoserror.h" -#define COPY_ALL_SCALAR_FIELDS \ - do { \ - memcpy((pDst), (pSrc), sizeof(*pSrc)); \ - } while (0) - -#define COPY_SCALAR_FIELD(fldname) \ - do { \ - (pDst)->fldname = (pSrc)->fldname; \ - } while (0) - -#define COPY_CHAR_ARRAY_FIELD(fldname) \ - do { \ - strcpy((pDst)->fldname, (pSrc)->fldname); \ - } while (0) - #define COPY_CHAR_POINT_FIELD(fldname) \ do { \ if (NULL == (pSrc)->fldname) { \ @@ -85,34 +70,22 @@ } \ } while (0) -static void dataTypeCopy(const SDataType* pSrc, SDataType* pDst) { - COPY_SCALAR_FIELD(type); - COPY_SCALAR_FIELD(precision); - COPY_SCALAR_FIELD(scale); - COPY_SCALAR_FIELD(bytes); -} +static void dataTypeCopy(const SDataType* pSrc, SDataType* pDst) {} -static void exprNodeCopy(const SExprNode* pSrc, SExprNode* pDst) { +static SNode* exprNodeCopy(const SExprNode* pSrc, SExprNode* pDst) { dataTypeCopy(&pSrc->resType, &pDst->resType); - COPY_CHAR_ARRAY_FIELD(aliasName); + pDst->pAssociation = NULL; + return (SNode*)pDst; } static SNode* columnNodeCopy(const SColumnNode* pSrc, SColumnNode* pDst) { - exprNodeCopy((const SExprNode*)pSrc, (SExprNode*)pDst); - COPY_SCALAR_FIELD(colId); - COPY_SCALAR_FIELD(colType); - COPY_CHAR_ARRAY_FIELD(dbName); - COPY_CHAR_ARRAY_FIELD(tableName); - COPY_CHAR_ARRAY_FIELD(tableAlias); - COPY_CHAR_ARRAY_FIELD(colName); - COPY_SCALAR_FIELD(dataBlockId); - COPY_SCALAR_FIELD(slotId); + COPY_BASE_OBJECT_FIELD(node, exprNodeCopy); + pDst->pProjectRef = NULL; return (SNode*)pDst; } static SNode* valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) { - COPY_ALL_SCALAR_FIELDS; - exprNodeCopy((const SExprNode*)pSrc, (SExprNode*)pDst); + COPY_BASE_OBJECT_FIELD(node, exprNodeCopy); COPY_CHAR_POINT_FIELD(literal); if (!pSrc->translate) { return (SNode*)pDst; @@ -139,66 +112,26 @@ static SNode* valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) { } static SNode* operatorNodeCopy(const SOperatorNode* pSrc, SOperatorNode* pDst) { - exprNodeCopy((const SExprNode*)pSrc, (SExprNode*)pDst); - COPY_SCALAR_FIELD(opType); + COPY_BASE_OBJECT_FIELD(node, exprNodeCopy); CLONE_NODE_FIELD(pLeft); CLONE_NODE_FIELD(pRight); return (SNode*)pDst; } static SNode* logicConditionNodeCopy(const SLogicConditionNode* pSrc, SLogicConditionNode* pDst) { - exprNodeCopy((const SExprNode*)pSrc, (SExprNode*)pDst); - COPY_SCALAR_FIELD(condType); + COPY_BASE_OBJECT_FIELD(node, exprNodeCopy); CLONE_NODE_LIST_FIELD(pParameterList); return (SNode*)pDst; } static SNode* functionNodeCopy(const SFunctionNode* pSrc, SFunctionNode* pDst) { - COPY_ALL_SCALAR_FIELDS; - exprNodeCopy((const SExprNode*)pSrc, (SExprNode*)pDst); - COPY_CHAR_ARRAY_FIELD(functionName); - COPY_SCALAR_FIELD(funcId); - COPY_SCALAR_FIELD(funcType); + COPY_BASE_OBJECT_FIELD(node, exprNodeCopy); CLONE_NODE_LIST_FIELD(pParameterList); return (SNode*)pDst; } -static SNode* targetNodeCopy(const STargetNode* pSrc, STargetNode* pDst) { - COPY_SCALAR_FIELD(dataBlockId); - COPY_SCALAR_FIELD(slotId); - CLONE_NODE_FIELD(pExpr); - return (SNode*)pDst; -} - -static SNode* groupingSetNodeCopy(const SGroupingSetNode* pSrc, SGroupingSetNode* pDst) { - COPY_SCALAR_FIELD(groupingSetType); - CLONE_NODE_LIST_FIELD(pParameterList); - return (SNode*)pDst; -} - -static SNode* orderByExprNodeCopy(const SOrderByExprNode* pSrc, SOrderByExprNode* pDst) { - COPY_ALL_SCALAR_FIELDS; - CLONE_NODE_FIELD(pExpr); - return (SNode*)pDst; -} - -static SNode* nodeListNodeCopy(const SNodeListNode* pSrc, SNodeListNode* pDst) { - COPY_ALL_SCALAR_FIELDS; - CLONE_NODE_LIST_FIELD(pNodeList); - return (SNode*)pDst; -} - -static SNode* fillNodeCopy(const SFillNode* pSrc, SFillNode* pDst) { - COPY_SCALAR_FIELD(mode); - CLONE_NODE_FIELD(pValues); - CLONE_NODE_FIELD(pWStartTs); - return (SNode*)pDst; -} - -static SNode* logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) { - CLONE_NODE_LIST_FIELD(pTargets); - CLONE_NODE_FIELD(pConditions); - CLONE_NODE_LIST_FIELD(pChildren); +static SNode* tableNodeCopy(const STableNode* pSrc, STableNode* pDst) { + COPY_BASE_OBJECT_FIELD(node, exprNodeCopy); return (SNode*)pDst; } @@ -222,8 +155,85 @@ static SVgroupsInfo* vgroupsInfoClone(const SVgroupsInfo* pSrc) { return pDst; } +static SNode* realTableNodeCopy(const SRealTableNode* pSrc, SRealTableNode* pDst) { + COPY_BASE_OBJECT_FIELD(table, tableNodeCopy); + CLONE_OBJECT_FIELD(pMeta, tableMetaClone); + CLONE_OBJECT_FIELD(pVgroupList, vgroupsInfoClone); + return (SNode*)pDst; +} + +static SNode* tempTableNodeCopy(const STempTableNode* pSrc, STempTableNode* pDst) { + COPY_BASE_OBJECT_FIELD(table, tableNodeCopy); + CLONE_NODE_FIELD(pSubquery); + return (SNode*)pDst; +} + +static SNode* joinTableNodeCopy(const SJoinTableNode* pSrc, SJoinTableNode* pDst) { + COPY_BASE_OBJECT_FIELD(table, tableNodeCopy); + CLONE_NODE_FIELD(pLeft); + CLONE_NODE_FIELD(pRight); + CLONE_NODE_FIELD(pOnCond); + return (SNode*)pDst; +} + +static SNode* targetNodeCopy(const STargetNode* pSrc, STargetNode* pDst) { + CLONE_NODE_FIELD(pExpr); + return (SNode*)pDst; +} + +static SNode* groupingSetNodeCopy(const SGroupingSetNode* pSrc, SGroupingSetNode* pDst) { + CLONE_NODE_LIST_FIELD(pParameterList); + return (SNode*)pDst; +} + +static SNode* orderByExprNodeCopy(const SOrderByExprNode* pSrc, SOrderByExprNode* pDst) { + CLONE_NODE_FIELD(pExpr); + return (SNode*)pDst; +} + +static SNode* limitNodeCopy(const SLimitNode* pSrc, SLimitNode* pDst) { return (SNode*)pDst; } + +static SNode* stateWindowNodeCopy(const SStateWindowNode* pSrc, SStateWindowNode* pDst) { + CLONE_NODE_FIELD(pCol); + CLONE_NODE_FIELD(pExpr); + return (SNode*)pDst; +} + +static SNode* sessionWindowNodeCopy(const SSessionWindowNode* pSrc, SSessionWindowNode* pDst) { + CLONE_NODE_FIELD(pCol); + CLONE_NODE_FIELD(pGap); + return (SNode*)pDst; +} + +static SNode* intervalWindowNodeCopy(const SIntervalWindowNode* pSrc, SIntervalWindowNode* pDst) { + CLONE_NODE_FIELD(pCol); + CLONE_NODE_FIELD(pInterval); + CLONE_NODE_FIELD(pOffset); + CLONE_NODE_FIELD(pSliding); + CLONE_NODE_FIELD(pFill); + return (SNode*)pDst; +} + +static SNode* nodeListNodeCopy(const SNodeListNode* pSrc, SNodeListNode* pDst) { + CLONE_NODE_LIST_FIELD(pNodeList); + return (SNode*)pDst; +} + +static SNode* fillNodeCopy(const SFillNode* pSrc, SFillNode* pDst) { + CLONE_NODE_FIELD(pValues); + CLONE_NODE_FIELD(pWStartTs); + return (SNode*)pDst; +} + +static SNode* logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) { + CLONE_NODE_LIST_FIELD(pTargets); + CLONE_NODE_FIELD(pConditions); + CLONE_NODE_LIST_FIELD(pChildren); + pDst->pParent = NULL; + return (SNode*)pDst; +} + static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { - COPY_ALL_SCALAR_FIELDS; COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); CLONE_NODE_LIST_FIELD(pScanCols); CLONE_NODE_LIST_FIELD(pScanPseudoCols); @@ -234,7 +244,6 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { } static SNode* logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { - COPY_ALL_SCALAR_FIELDS; COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); CLONE_NODE_FIELD(pOnConditions); return (SNode*)pDst; @@ -248,7 +257,6 @@ static SNode* logicAggCopy(const SAggLogicNode* pSrc, SAggLogicNode* pDst) { } static SNode* logicProjectCopy(const SProjectLogicNode* pSrc, SProjectLogicNode* pDst) { - COPY_ALL_SCALAR_FIELDS; COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); CLONE_NODE_LIST_FIELD(pProjections); return (SNode*)pDst; @@ -256,18 +264,17 @@ static SNode* logicProjectCopy(const SProjectLogicNode* pSrc, SProjectLogicNode* static SNode* logicVnodeModifCopy(const SVnodeModifLogicNode* pSrc, SVnodeModifLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); - COPY_SCALAR_FIELD(msgType); + pDst->pDataBlocks = NULL; + pDst->pVgDataBlocks = NULL; return (SNode*)pDst; } static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); - COPY_SCALAR_FIELD(srcGroupId); return (SNode*)pDst; } static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pDst) { - COPY_ALL_SCALAR_FIELDS; COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); CLONE_NODE_LIST_FIELD(pFuncs); CLONE_NODE_FIELD(pTspk); @@ -275,7 +282,6 @@ static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pD } static SNode* logicFillCopy(const SFillLogicNode* pSrc, SFillLogicNode* pDst) { - COPY_ALL_SCALAR_FIELDS; COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); CLONE_NODE_FIELD(pWStartTs); CLONE_NODE_FIELD(pValues); @@ -296,29 +302,37 @@ static SNode* logicPartitionCopy(const SPartitionLogicNode* pSrc, SPartitionLogi static SNode* logicSubplanCopy(const SLogicSubplan* pSrc, SLogicSubplan* pDst) { CLONE_NODE_FIELD(pNode); - COPY_SCALAR_FIELD(subplanType); + pDst->pChildren = NULL; + pDst->pParents = NULL; + pDst->pVgroupList = NULL; return (SNode*)pDst; } static SNode* dataBlockDescCopy(const SDataBlockDescNode* pSrc, SDataBlockDescNode* pDst) { - COPY_ALL_SCALAR_FIELDS; CLONE_NODE_LIST_FIELD(pSlots); return (SNode*)pDst; } static SNode* slotDescCopy(const SSlotDescNode* pSrc, SSlotDescNode* pDst) { - COPY_SCALAR_FIELD(slotId); dataTypeCopy(&pSrc->dataType, &pDst->dataType); - COPY_SCALAR_FIELD(reserve); - COPY_SCALAR_FIELD(output); - COPY_SCALAR_FIELD(tag); return (SNode*)pDst; } static SNode* downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstreamSourceNode* pDst) { - COPY_SCALAR_FIELD(addr); - COPY_SCALAR_FIELD(taskId); - COPY_SCALAR_FIELD(schedId); + return (SNode*)pDst; +} + +static SNode* selectStmtCopy(const SSelectStmt* pSrc, SSelectStmt* pDst) { + CLONE_NODE_LIST_FIELD(pProjectionList); + CLONE_NODE_FIELD(pFromTable); + CLONE_NODE_FIELD(pWhere); + CLONE_NODE_LIST_FIELD(pPartitionByList); + CLONE_NODE_FIELD(pWindow); + CLONE_NODE_LIST_FIELD(pGroupByList); + CLONE_NODE_FIELD(pHaving); + CLONE_NODE_LIST_FIELD(pOrderByList); + CLONE_NODE_FIELD(pLimit); + CLONE_NODE_FIELD(pLimit); return (SNode*)pDst; } @@ -331,6 +345,7 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + memcpy(pDst, pNode, nodesNodeSize(nodeType(pNode))); switch (nodeType(pNode)) { case QUERY_NODE_COLUMN: return columnNodeCopy((const SColumnNode*)pNode, (SColumnNode*)pDst); @@ -342,28 +357,38 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) { return logicConditionNodeCopy((const SLogicConditionNode*)pNode, (SLogicConditionNode*)pDst); case QUERY_NODE_FUNCTION: return functionNodeCopy((const SFunctionNode*)pNode, (SFunctionNode*)pDst); - case QUERY_NODE_TARGET: - return targetNodeCopy((const STargetNode*)pNode, (STargetNode*)pDst); case QUERY_NODE_REAL_TABLE: + return realTableNodeCopy((const SRealTableNode*)pNode, (SRealTableNode*)pDst); case QUERY_NODE_TEMP_TABLE: + return tempTableNodeCopy((const STempTableNode*)pNode, (STempTableNode*)pDst); case QUERY_NODE_JOIN_TABLE: - break; + return joinTableNodeCopy((const SJoinTableNode*)pNode, (SJoinTableNode*)pDst); case QUERY_NODE_GROUPING_SET: return groupingSetNodeCopy((const SGroupingSetNode*)pNode, (SGroupingSetNode*)pDst); case QUERY_NODE_ORDER_BY_EXPR: return orderByExprNodeCopy((const SOrderByExprNode*)pNode, (SOrderByExprNode*)pDst); case QUERY_NODE_LIMIT: - break; + return limitNodeCopy((const SLimitNode*)pNode, (SLimitNode*)pDst); + case QUERY_NODE_STATE_WINDOW: + return stateWindowNodeCopy((const SStateWindowNode*)pNode, (SStateWindowNode*)pDst); + case QUERY_NODE_SESSION_WINDOW: + return sessionWindowNodeCopy((const SSessionWindowNode*)pNode, (SSessionWindowNode*)pDst); + case QUERY_NODE_INTERVAL_WINDOW: + return intervalWindowNodeCopy((const SIntervalWindowNode*)pNode, (SIntervalWindowNode*)pDst); case QUERY_NODE_NODE_LIST: return nodeListNodeCopy((const SNodeListNode*)pNode, (SNodeListNode*)pDst); case QUERY_NODE_FILL: return fillNodeCopy((const SFillNode*)pNode, (SFillNode*)pDst); + case QUERY_NODE_TARGET: + return targetNodeCopy((const STargetNode*)pNode, (STargetNode*)pDst); case QUERY_NODE_DATABLOCK_DESC: return dataBlockDescCopy((const SDataBlockDescNode*)pNode, (SDataBlockDescNode*)pDst); case QUERY_NODE_SLOT_DESC: return slotDescCopy((const SSlotDescNode*)pNode, (SSlotDescNode*)pDst); case QUERY_NODE_DOWNSTREAM_SOURCE: return downstreamSourceCopy((const SDownstreamSourceNode*)pNode, (SDownstreamSourceNode*)pDst); + case QUERY_NODE_SELECT_STMT: + return selectStmtCopy((const SSelectStmt*)pNode, (SSelectStmt*)pDst); case QUERY_NODE_LOGIC_PLAN_SCAN: return logicScanCopy((const SScanLogicNode*)pNode, (SScanLogicNode*)pDst); case QUERY_NODE_LOGIC_PLAN_JOIN: diff --git a/source/libs/nodes/src/nodesEqualFuncs.c b/source/libs/nodes/src/nodesEqualFuncs.c index bd1662db4c..9887cbdbc5 100644 --- a/source/libs/nodes/src/nodesEqualFuncs.c +++ b/source/libs/nodes/src/nodesEqualFuncs.c @@ -20,13 +20,28 @@ if (a->fldname != b->fldname) return false; \ } while (0) -#define COMPARE_STRING(a, b) (((a) != NULL && (b) != NULL) ? (strcmp(a, b) == 0) : (a) == (b)) +#define COMPARE_STRING(a, b) (((a) != NULL && (b) != NULL) ? (strcmp((a), (b)) == 0) : (a) == (b)) + +#define COMPARE_VARDATA(a, b) \ + (((a) != NULL && (b) != NULL) \ + ? (varDataLen((a)) == varDataLen((b)) && memcmp(varDataVal((a)), varDataVal((b)), varDataLen((a))) == 0) \ + : (a) == (b)) #define COMPARE_STRING_FIELD(fldname) \ do { \ if (!COMPARE_STRING(a->fldname, b->fldname)) return false; \ } while (0) +#define COMPARE_VARDATA_FIELD(fldname) \ + do { \ + if (!COMPARE_VARDATA(a->fldname, b->fldname)) return false; \ + } while (0) + +#define COMPARE_OBJECT_FIELD(fldname, equalFunc) \ + do { \ + if (!equalFunc(a->fldname, b->fldname)) return false; \ + } while (0) + #define COMPARE_NODE_FIELD(fldname) \ do { \ if (!nodesEqualNode(a->fldname, b->fldname)) return false; \ @@ -59,6 +74,10 @@ static bool nodeNodeListEqual(const SNodeList* a, const SNodeList* b) { return true; } +static bool dataTypeEqual(SDataType a, SDataType b) { + return a.type == b.type && a.bytes == b.bytes && a.precision == b.precision && a.scale == b.scale; +} + static bool columnNodeEqual(const SColumnNode* a, const SColumnNode* b) { COMPARE_STRING_FIELD(dbName); COMPARE_STRING_FIELD(tableName); @@ -67,7 +86,35 @@ static bool columnNodeEqual(const SColumnNode* a, const SColumnNode* b) { } static bool valueNodeEqual(const SValueNode* a, const SValueNode* b) { + COMPARE_OBJECT_FIELD(node.resType, dataTypeEqual); COMPARE_STRING_FIELD(literal); + switch (a->node.resType.type) { + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UTINYINT: + case TSDB_DATA_TYPE_USMALLINT: + case TSDB_DATA_TYPE_UINT: + case TSDB_DATA_TYPE_UBIGINT: + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_DOUBLE: + case TSDB_DATA_TYPE_TIMESTAMP: + COMPARE_SCALAR_FIELD(typeData); + break; + case TSDB_DATA_TYPE_VARCHAR: + case TSDB_DATA_TYPE_VARBINARY: + case TSDB_DATA_TYPE_NCHAR: + COMPARE_VARDATA_FIELD(datum.p); + break; + case TSDB_DATA_TYPE_JSON: + case TSDB_DATA_TYPE_DECIMAL: + case TSDB_DATA_TYPE_BLOB: + return false; + default: + break; + } return true; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 76c8b17487..476b3b2786 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -21,156 +21,147 @@ #include "taoserror.h" #include "thash.h" -static SNode* makeNode(ENodeType type, size_t size) { - SNode* p = taosMemoryCalloc(1, size); - if (NULL == p) { - return NULL; - } - setNodeType(p, type); - return p; -} - -SNodeptr nodesMakeNode(ENodeType type) { +int32_t nodesNodeSize(ENodeType type) { switch (type) { case QUERY_NODE_COLUMN: - return makeNode(type, sizeof(SColumnNode)); + return sizeof(SColumnNode); case QUERY_NODE_VALUE: - return makeNode(type, sizeof(SValueNode)); + return sizeof(SValueNode); case QUERY_NODE_OPERATOR: - return makeNode(type, sizeof(SOperatorNode)); + return sizeof(SOperatorNode); case QUERY_NODE_LOGIC_CONDITION: - return makeNode(type, sizeof(SLogicConditionNode)); + return sizeof(SLogicConditionNode); case QUERY_NODE_FUNCTION: - return makeNode(type, sizeof(SFunctionNode)); + return sizeof(SFunctionNode); case QUERY_NODE_REAL_TABLE: - return makeNode(type, sizeof(SRealTableNode)); + return sizeof(SRealTableNode); case QUERY_NODE_TEMP_TABLE: - return makeNode(type, sizeof(STempTableNode)); + return sizeof(STempTableNode); case QUERY_NODE_JOIN_TABLE: - return makeNode(type, sizeof(SJoinTableNode)); + return sizeof(SJoinTableNode); case QUERY_NODE_GROUPING_SET: - return makeNode(type, sizeof(SGroupingSetNode)); + return sizeof(SGroupingSetNode); case QUERY_NODE_ORDER_BY_EXPR: - return makeNode(type, sizeof(SOrderByExprNode)); + return sizeof(SOrderByExprNode); case QUERY_NODE_LIMIT: - return makeNode(type, sizeof(SLimitNode)); + return sizeof(SLimitNode); case QUERY_NODE_STATE_WINDOW: - return makeNode(type, sizeof(SStateWindowNode)); + return sizeof(SStateWindowNode); case QUERY_NODE_SESSION_WINDOW: - return makeNode(type, sizeof(SSessionWindowNode)); + return sizeof(SSessionWindowNode); case QUERY_NODE_INTERVAL_WINDOW: - return makeNode(type, sizeof(SIntervalWindowNode)); + return sizeof(SIntervalWindowNode); case QUERY_NODE_NODE_LIST: - return makeNode(type, sizeof(SNodeListNode)); + return sizeof(SNodeListNode); case QUERY_NODE_FILL: - return makeNode(type, sizeof(SFillNode)); + return sizeof(SFillNode); case QUERY_NODE_RAW_EXPR: - return makeNode(type, sizeof(SRawExprNode)); + return sizeof(SRawExprNode); case QUERY_NODE_TARGET: - return makeNode(type, sizeof(STargetNode)); + return sizeof(STargetNode); case QUERY_NODE_DATABLOCK_DESC: - return makeNode(type, sizeof(SDataBlockDescNode)); + return sizeof(SDataBlockDescNode); case QUERY_NODE_SLOT_DESC: - return makeNode(type, sizeof(SSlotDescNode)); + return sizeof(SSlotDescNode); case QUERY_NODE_COLUMN_DEF: - return makeNode(type, sizeof(SColumnDefNode)); + return sizeof(SColumnDefNode); case QUERY_NODE_DOWNSTREAM_SOURCE: - return makeNode(type, sizeof(SDownstreamSourceNode)); + return sizeof(SDownstreamSourceNode); case QUERY_NODE_DATABASE_OPTIONS: - return makeNode(type, sizeof(SDatabaseOptions)); + return sizeof(SDatabaseOptions); case QUERY_NODE_TABLE_OPTIONS: - return makeNode(type, sizeof(STableOptions)); + return sizeof(STableOptions); case QUERY_NODE_INDEX_OPTIONS: - return makeNode(type, sizeof(SIndexOptions)); + return sizeof(SIndexOptions); case QUERY_NODE_EXPLAIN_OPTIONS: - return makeNode(type, sizeof(SExplainOptions)); + return sizeof(SExplainOptions); case QUERY_NODE_STREAM_OPTIONS: - return makeNode(type, sizeof(SStreamOptions)); + return sizeof(SStreamOptions); case QUERY_NODE_TOPIC_OPTIONS: - return makeNode(type, sizeof(STopicOptions)); + return sizeof(STopicOptions); case QUERY_NODE_SET_OPERATOR: - return makeNode(type, sizeof(SSetOperator)); + return sizeof(SSetOperator); case QUERY_NODE_SELECT_STMT: - return makeNode(type, sizeof(SSelectStmt)); + return sizeof(SSelectStmt); case QUERY_NODE_VNODE_MODIF_STMT: - return makeNode(type, sizeof(SVnodeModifOpStmt)); + return sizeof(SVnodeModifOpStmt); case QUERY_NODE_CREATE_DATABASE_STMT: - return makeNode(type, sizeof(SCreateDatabaseStmt)); + return sizeof(SCreateDatabaseStmt); case QUERY_NODE_DROP_DATABASE_STMT: - return makeNode(type, sizeof(SDropDatabaseStmt)); + return sizeof(SDropDatabaseStmt); case QUERY_NODE_ALTER_DATABASE_STMT: - return makeNode(type, sizeof(SAlterDatabaseStmt)); + return sizeof(SAlterDatabaseStmt); case QUERY_NODE_CREATE_TABLE_STMT: - return makeNode(type, sizeof(SCreateTableStmt)); + return sizeof(SCreateTableStmt); case QUERY_NODE_CREATE_SUBTABLE_CLAUSE: - return makeNode(type, sizeof(SCreateSubTableClause)); + return sizeof(SCreateSubTableClause); case QUERY_NODE_CREATE_MULTI_TABLE_STMT: - return makeNode(type, sizeof(SCreateMultiTableStmt)); + return sizeof(SCreateMultiTableStmt); case QUERY_NODE_DROP_TABLE_CLAUSE: - return makeNode(type, sizeof(SDropTableClause)); + return sizeof(SDropTableClause); case QUERY_NODE_DROP_TABLE_STMT: - return makeNode(type, sizeof(SDropTableStmt)); + return sizeof(SDropTableStmt); case QUERY_NODE_DROP_SUPER_TABLE_STMT: - return makeNode(type, sizeof(SDropSuperTableStmt)); + return sizeof(SDropSuperTableStmt); case QUERY_NODE_ALTER_TABLE_STMT: - return makeNode(type, sizeof(SAlterTableStmt)); + return sizeof(SAlterTableStmt); case QUERY_NODE_CREATE_USER_STMT: - return makeNode(type, sizeof(SCreateUserStmt)); + return sizeof(SCreateUserStmt); case QUERY_NODE_ALTER_USER_STMT: - return makeNode(type, sizeof(SAlterUserStmt)); + return sizeof(SAlterUserStmt); case QUERY_NODE_DROP_USER_STMT: - return makeNode(type, sizeof(SDropUserStmt)); + return sizeof(SDropUserStmt); case QUERY_NODE_USE_DATABASE_STMT: - return makeNode(type, sizeof(SUseDatabaseStmt)); + return sizeof(SUseDatabaseStmt); case QUERY_NODE_CREATE_DNODE_STMT: - return makeNode(type, sizeof(SCreateDnodeStmt)); + return sizeof(SCreateDnodeStmt); case QUERY_NODE_DROP_DNODE_STMT: - return makeNode(type, sizeof(SDropDnodeStmt)); + return sizeof(SDropDnodeStmt); case QUERY_NODE_ALTER_DNODE_STMT: - return makeNode(type, sizeof(SAlterDnodeStmt)); + return sizeof(SAlterDnodeStmt); case QUERY_NODE_CREATE_INDEX_STMT: - return makeNode(type, sizeof(SCreateIndexStmt)); + return sizeof(SCreateIndexStmt); case QUERY_NODE_DROP_INDEX_STMT: - return makeNode(type, sizeof(SDropIndexStmt)); + return sizeof(SDropIndexStmt); case QUERY_NODE_CREATE_QNODE_STMT: case QUERY_NODE_CREATE_BNODE_STMT: case QUERY_NODE_CREATE_SNODE_STMT: case QUERY_NODE_CREATE_MNODE_STMT: - return makeNode(type, sizeof(SCreateComponentNodeStmt)); + return sizeof(SCreateComponentNodeStmt); case QUERY_NODE_DROP_QNODE_STMT: case QUERY_NODE_DROP_BNODE_STMT: case QUERY_NODE_DROP_SNODE_STMT: case QUERY_NODE_DROP_MNODE_STMT: - return makeNode(type, sizeof(SDropComponentNodeStmt)); + return sizeof(SDropComponentNodeStmt); case QUERY_NODE_CREATE_TOPIC_STMT: - return makeNode(type, sizeof(SCreateTopicStmt)); + return sizeof(SCreateTopicStmt); case QUERY_NODE_DROP_TOPIC_STMT: - return makeNode(type, sizeof(SDropTopicStmt)); + return sizeof(SDropTopicStmt); case QUERY_NODE_EXPLAIN_STMT: - return makeNode(type, sizeof(SExplainStmt)); + return sizeof(SExplainStmt); case QUERY_NODE_DESCRIBE_STMT: - return makeNode(type, sizeof(SDescribeStmt)); + return sizeof(SDescribeStmt); case QUERY_NODE_RESET_QUERY_CACHE_STMT: - return makeNode(type, sizeof(SNode)); + return sizeof(SNode); case QUERY_NODE_COMPACT_STMT: break; case QUERY_NODE_CREATE_FUNCTION_STMT: - return makeNode(type, sizeof(SCreateFunctionStmt)); + return sizeof(SCreateFunctionStmt); case QUERY_NODE_DROP_FUNCTION_STMT: - return makeNode(type, sizeof(SDropFunctionStmt)); + return sizeof(SDropFunctionStmt); case QUERY_NODE_CREATE_STREAM_STMT: - return makeNode(type, sizeof(SCreateStreamStmt)); + return sizeof(SCreateStreamStmt); case QUERY_NODE_DROP_STREAM_STMT: - return makeNode(type, sizeof(SDropStreamStmt)); + return sizeof(SDropStreamStmt); case QUERY_NODE_MERGE_VGROUP_STMT: case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT: case QUERY_NODE_SPLIT_VGROUP_STMT: case QUERY_NODE_SYNCDB_STMT: break; case QUERY_NODE_GRANT_STMT: - return makeNode(type, sizeof(SGrantStmt)); + return sizeof(SGrantStmt); case QUERY_NODE_REVOKE_STMT: - return makeNode(type, sizeof(SRevokeStmt)); + return sizeof(SRevokeStmt); case QUERY_NODE_SHOW_DNODES_STMT: case QUERY_NODE_SHOW_MNODES_STMT: case QUERY_NODE_SHOW_MODULES_STMT: @@ -201,80 +192,89 @@ SNodeptr nodesMakeNode(ENodeType type) { case QUERY_NODE_SHOW_CREATE_TABLE_STMT: case QUERY_NODE_SHOW_CREATE_STABLE_STMT: case QUERY_NODE_SHOW_TRANSACTIONS_STMT: - return makeNode(type, sizeof(SShowStmt)); + return sizeof(SShowStmt); case QUERY_NODE_KILL_CONNECTION_STMT: case QUERY_NODE_KILL_QUERY_STMT: case QUERY_NODE_KILL_TRANSACTION_STMT: - return makeNode(type, sizeof(SKillStmt)); + return sizeof(SKillStmt); case QUERY_NODE_LOGIC_PLAN_SCAN: - return makeNode(type, sizeof(SScanLogicNode)); + return sizeof(SScanLogicNode); case QUERY_NODE_LOGIC_PLAN_JOIN: - return makeNode(type, sizeof(SJoinLogicNode)); + return sizeof(SJoinLogicNode); case QUERY_NODE_LOGIC_PLAN_AGG: - return makeNode(type, sizeof(SAggLogicNode)); + return sizeof(SAggLogicNode); case QUERY_NODE_LOGIC_PLAN_PROJECT: - return makeNode(type, sizeof(SProjectLogicNode)); + return sizeof(SProjectLogicNode); case QUERY_NODE_LOGIC_PLAN_VNODE_MODIF: - return makeNode(type, sizeof(SVnodeModifLogicNode)); + return sizeof(SVnodeModifLogicNode); case QUERY_NODE_LOGIC_PLAN_EXCHANGE: - return makeNode(type, sizeof(SExchangeLogicNode)); + return sizeof(SExchangeLogicNode); case QUERY_NODE_LOGIC_PLAN_WINDOW: - return makeNode(type, sizeof(SWindowLogicNode)); + return sizeof(SWindowLogicNode); case QUERY_NODE_LOGIC_PLAN_FILL: - return makeNode(type, sizeof(SFillLogicNode)); + return sizeof(SFillLogicNode); case QUERY_NODE_LOGIC_PLAN_SORT: - return makeNode(type, sizeof(SSortLogicNode)); + return sizeof(SSortLogicNode); case QUERY_NODE_LOGIC_PLAN_PARTITION: - return makeNode(type, sizeof(SPartitionLogicNode)); + return sizeof(SPartitionLogicNode); case QUERY_NODE_LOGIC_SUBPLAN: - return makeNode(type, sizeof(SLogicSubplan)); + return sizeof(SLogicSubplan); case QUERY_NODE_LOGIC_PLAN: - return makeNode(type, sizeof(SQueryLogicPlan)); + return sizeof(SQueryLogicPlan); case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: - return makeNode(type, sizeof(STagScanPhysiNode)); + return sizeof(STagScanPhysiNode); case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: - return makeNode(type, sizeof(STableScanPhysiNode)); + return sizeof(STableScanPhysiNode); case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN: - return makeNode(type, sizeof(STableSeqScanPhysiNode)); + return sizeof(STableSeqScanPhysiNode); case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: - return makeNode(type, sizeof(SStreamScanPhysiNode)); + return sizeof(SStreamScanPhysiNode); case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: - return makeNode(type, sizeof(SSystemTableScanPhysiNode)); + return sizeof(SSystemTableScanPhysiNode); case QUERY_NODE_PHYSICAL_PLAN_PROJECT: - return makeNode(type, sizeof(SProjectPhysiNode)); + return sizeof(SProjectPhysiNode); case QUERY_NODE_PHYSICAL_PLAN_JOIN: - return makeNode(type, sizeof(SJoinPhysiNode)); + return sizeof(SJoinPhysiNode); case QUERY_NODE_PHYSICAL_PLAN_AGG: - return makeNode(type, sizeof(SAggPhysiNode)); + return sizeof(SAggPhysiNode); case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: - return makeNode(type, sizeof(SExchangePhysiNode)); + return sizeof(SExchangePhysiNode); case QUERY_NODE_PHYSICAL_PLAN_SORT: - return makeNode(type, sizeof(SSortPhysiNode)); + return sizeof(SSortPhysiNode); case QUERY_NODE_PHYSICAL_PLAN_INTERVAL: - return makeNode(type, sizeof(SIntervalPhysiNode)); + return sizeof(SIntervalPhysiNode); case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: - return makeNode(type, sizeof(SStreamIntervalPhysiNode)); + return sizeof(SStreamIntervalPhysiNode); case QUERY_NODE_PHYSICAL_PLAN_FILL: - return makeNode(type, sizeof(SFillPhysiNode)); + return sizeof(SFillPhysiNode); case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW: - return makeNode(type, sizeof(SSessionWinodwPhysiNode)); + return sizeof(SSessionWinodwPhysiNode); case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: - return makeNode(type, sizeof(SStateWinodwPhysiNode)); + return sizeof(SStateWinodwPhysiNode); case QUERY_NODE_PHYSICAL_PLAN_PARTITION: - return makeNode(type, sizeof(SPartitionPhysiNode)); + return sizeof(SPartitionPhysiNode); case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: - return makeNode(type, sizeof(SDataDispatcherNode)); + return sizeof(SDataDispatcherNode); case QUERY_NODE_PHYSICAL_PLAN_INSERT: - return makeNode(type, sizeof(SDataInserterNode)); + return sizeof(SDataInserterNode); case QUERY_NODE_PHYSICAL_SUBPLAN: - return makeNode(type, sizeof(SSubplan)); + return sizeof(SSubplan); case QUERY_NODE_PHYSICAL_PLAN: - return makeNode(type, sizeof(SQueryPlan)); + return sizeof(SQueryPlan); default: break; } nodesError("nodesMakeNode unknown node = %s", nodesNodeName(type)); - return NULL; + return 0; +} + +SNodeptr nodesMakeNode(ENodeType type) { + SNode* p = taosMemoryCalloc(1, nodesNodeSize(type)); + if (NULL == p) { + return NULL; + } + setNodeType(p, type); + return p; } static void destroyVgDataBlockArray(SArray* pArray) { diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 7dc2978ec7..5de6968c51 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -559,6 +559,8 @@ SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, const SToken* p int32_t len = TMIN(sizeof(((SExprNode*)pNode)->aliasName) - 1, pAlias->n); strncpy(((SExprNode*)pNode)->aliasName, pAlias->z, len); ((SExprNode*)pNode)->aliasName[len] = '\0'; + strncpy(((SExprNode*)pNode)->userAlias, pAlias->z, len); + ((SExprNode*)pNode)->userAlias[len] = '\0'; return pNode; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index df7d10969d..07a0d357ba 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3446,7 +3446,11 @@ static int32_t extractQueryResultSchema(const SNodeList* pProjections, int32_t* (*pSchema)[index].type = pExpr->resType.type; (*pSchema)[index].bytes = pExpr->resType.bytes; (*pSchema)[index].colId = index + 1; - strcpy((*pSchema)[index].name, pExpr->aliasName); + if ('\0' != pExpr->userAlias[0]) { + strcpy((*pSchema)[index].name, pExpr->userAlias); + } else { + strcpy((*pSchema)[index].name, pExpr->aliasName); + } index += 1; } diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 82d1aabd2a..688e20063a 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -41,7 +41,7 @@ static int32_t parseSqlIntoAst(SParseContext* pCxt, SQuery** pQuery) { } if (TSDB_CODE_SUCCESS == code && (*pQuery)->placeholderNum > 0) { - // TSWAP((*pQuery)->pContainPlaceholderRoot, (*pQuery)->pRoot); + TSWAP((*pQuery)->pPrepareRoot, (*pQuery)->pRoot); return TSDB_CODE_SUCCESS; } @@ -137,6 +137,36 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) { return TSDB_CODE_SUCCESS; } +static EDealRes rewriteQueryExprAliasImpl(SNode* pNode, void* pContext) { + if (nodesIsExprNode(pNode) && QUERY_NODE_COLUMN != nodeType(pNode) && '\0' == ((SExprNode*)pNode)->userAlias[0]) { + strcpy(((SExprNode*)pNode)->userAlias, ((SExprNode*)pNode)->aliasName); + sprintf(((SExprNode*)pNode)->aliasName, "#%d", *(int32_t*)pContext); + ++(*(int32_t*)pContext); + } + return DEAL_RES_CONTINUE; +} + +static void rewriteQueryExprAlias(SNode* pRoot, int32_t* pNo) { + switch (nodeType(pRoot)) { + case QUERY_NODE_SELECT_STMT: + nodesWalkSelectStmt((SSelectStmt*)pRoot, SQL_CLAUSE_FROM, rewriteQueryExprAliasImpl, pNo); + break; + case QUERY_NODE_SET_OPERATOR: { + SSetOperator* pSetOper = (SSetOperator*)pRoot; + rewriteQueryExprAlias(pSetOper->pLeft, pNo); + rewriteQueryExprAlias(pSetOper->pRight, pNo); + break; + } + default: + break; + } +} + +static void rewriteExprAlias(SNode* pRoot) { + int32_t no = 1; + rewriteQueryExprAlias(pRoot, &no); +} + int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) { int32_t code = TSDB_CODE_SUCCESS; if (isInsertSql(pCxt->pSql, pCxt->sqlLen)) { @@ -169,6 +199,15 @@ int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx code = setValueByBindParam((SValueNode*)taosArrayGetP(pQuery->pPlaceholderValues, colIdx), pParams); } + if (TSDB_CODE_SUCCESS == code && (colIdx < 0 || colIdx + 1 == pQuery->placeholderNum)) { + pQuery->pRoot = nodesCloneNode(pQuery->pPrepareRoot); + if (NULL == pQuery->pRoot) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (TSDB_CODE_SUCCESS == code) { + rewriteExprAlias(pQuery->pRoot); + } return code; } diff --git a/source/libs/planner/test/planStmtTest.cpp b/source/libs/planner/test/planStmtTest.cpp index 6d6eaaf190..39290b5b2f 100644 --- a/source/libs/planner/test/planStmtTest.cpp +++ b/source/libs/planner/test/planStmtTest.cpp @@ -20,7 +20,96 @@ using namespace std; class PlanStmtTest : public PlannerTestBase { public: - void buildParam(TAOS_MULTI_BIND* pBindParams, int32_t index, void* pVal, int32_t type, int32_t bytes = 0) { + TAOS_MULTI_BIND* createBindParams(int32_t nParams) { + return (TAOS_MULTI_BIND*)taosMemoryCalloc(nParams, sizeof(TAOS_MULTI_BIND)); + } + + TAOS_MULTI_BIND* buildIntegerParam(TAOS_MULTI_BIND* pBindParams, int32_t index, int64_t val, int32_t type) { + TAOS_MULTI_BIND* pBindParam = initParam(pBindParams, index, type, 0); + + switch (type) { + case TSDB_DATA_TYPE_BOOL: + *((bool*)pBindParam->buffer) = val; + break; + case TSDB_DATA_TYPE_TINYINT: + *((int8_t*)pBindParam->buffer) = val; + break; + case TSDB_DATA_TYPE_SMALLINT: + *((int16_t*)pBindParam->buffer) = val; + break; + case TSDB_DATA_TYPE_INT: + *((int32_t*)pBindParam->buffer) = val; + break; + case TSDB_DATA_TYPE_BIGINT: + *((int64_t*)pBindParam->buffer) = val; + break; + case TSDB_DATA_TYPE_TIMESTAMP: + *((int64_t*)pBindParam->buffer) = val; + break; + default: + break; + } + + return pBindParam; + } + + TAOS_MULTI_BIND* buildUIntegerParam(TAOS_MULTI_BIND* pBindParams, int32_t index, uint64_t val, int32_t type) { + TAOS_MULTI_BIND* pBindParam = initParam(pBindParams, index, type, 0); + + switch (type) { + case TSDB_DATA_TYPE_UTINYINT: + *((uint8_t*)pBindParam->buffer) = val; + break; + case TSDB_DATA_TYPE_USMALLINT: + *((uint16_t*)pBindParam->buffer) = val; + break; + case TSDB_DATA_TYPE_UINT: + *((uint32_t*)pBindParam->buffer) = val; + break; + case TSDB_DATA_TYPE_UBIGINT: + *((uint64_t*)pBindParam->buffer) = val; + break; + default: + break; + } + return pBindParam; + } + + TAOS_MULTI_BIND* buildDoubleParam(TAOS_MULTI_BIND* pBindParams, int32_t index, double val, int32_t type) { + TAOS_MULTI_BIND* pBindParam = initParam(pBindParams, index, type, 0); + + switch (type) { + case TSDB_DATA_TYPE_FLOAT: + *((float*)pBindParam->buffer) = val; + break; + case TSDB_DATA_TYPE_DOUBLE: + *((double*)pBindParam->buffer) = val; + break; + default: + break; + } + return pBindParam; + } + + TAOS_MULTI_BIND* buildStringParam(TAOS_MULTI_BIND* pBindParams, int32_t index, const char* pVal, int32_t type, + int32_t bytes) { + TAOS_MULTI_BIND* pBindParam = initParam(pBindParams, index, type, bytes); + + switch (type) { + case TSDB_DATA_TYPE_VARCHAR: + case TSDB_DATA_TYPE_VARBINARY: + strncpy((char*)pBindParam->buffer, pVal, bytes); + break; + case TSDB_DATA_TYPE_TIMESTAMP: + case TSDB_DATA_TYPE_NCHAR: + default: + break; + } + return pBindParam; + } + + private: + TAOS_MULTI_BIND* initParam(TAOS_MULTI_BIND* pBindParams, int32_t index, int32_t type, int32_t bytes) { TAOS_MULTI_BIND* pBindParam = pBindParams + index; pBindParam->buffer_type = type; pBindParam->num = 1; @@ -30,39 +119,48 @@ class PlanStmtTest : public PlannerTestBase { pBindParam->is_null = (char*)taosMemoryCalloc(1, sizeof(char)); *(pBindParam->length) = bytes > 0 ? bytes : tDataTypes[type].bytes; *(pBindParam->is_null) = 0; - - switch (type) { - case TSDB_DATA_TYPE_BOOL: - *((bool*)pBindParam->buffer) = *(bool*)pVal; - break; - case TSDB_DATA_TYPE_TINYINT: - *((int8_t*)pBindParam->buffer) = *(int64_t*)pVal; - break; - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_FLOAT: - case TSDB_DATA_TYPE_DOUBLE: - case TSDB_DATA_TYPE_VARCHAR: - case TSDB_DATA_TYPE_TIMESTAMP: - case TSDB_DATA_TYPE_NCHAR: - case TSDB_DATA_TYPE_UTINYINT: - case TSDB_DATA_TYPE_USMALLINT: - case TSDB_DATA_TYPE_UINT: - case TSDB_DATA_TYPE_UBIGINT: - case TSDB_DATA_TYPE_JSON: - case TSDB_DATA_TYPE_VARBINARY: - case TSDB_DATA_TYPE_DECIMAL: - case TSDB_DATA_TYPE_BLOB: - case TSDB_DATA_TYPE_MEDIUMBLOB: - default: - break; - } + return pBindParam; } }; -TEST_F(PlanStmtTest, stmt) { +TEST_F(PlanStmtTest, basic) { useDb("root", "test"); prepare("SELECT * FROM t1 WHERE c1 = ?"); + bindParams(buildIntegerParam(createBindParams(1), 0, 10, TSDB_DATA_TYPE_INT), 0); + exec(); + + { + prepare("SELECT * FROM t1 WHERE c1 = ? AND c2 = ?"); + TAOS_MULTI_BIND* pBindParams = createBindParams(2); + buildIntegerParam(pBindParams, 0, 10, TSDB_DATA_TYPE_INT); + buildStringParam(pBindParams, 1, "abc", TSDB_DATA_TYPE_VARCHAR, strlen("abc")); + bindParams(pBindParams, -1); + exec(); + taosMemoryFreeClear(pBindParams); + } + + { + prepare("SELECT MAX(?), MAX(?) FROM t1"); + TAOS_MULTI_BIND* pBindParams = createBindParams(2); + buildIntegerParam(pBindParams, 0, 10, TSDB_DATA_TYPE_TINYINT); + buildIntegerParam(pBindParams, 1, 20, TSDB_DATA_TYPE_INT); + bindParams(pBindParams, -1); + exec(); + taosMemoryFreeClear(pBindParams); + } } + +TEST_F(PlanStmtTest, multiExec) { + useDb("root", "test"); + + prepare("SELECT * FROM t1 WHERE c1 = ?"); + bindParams(buildIntegerParam(createBindParams(1), 0, 10, TSDB_DATA_TYPE_INT), 0); + exec(); + bindParams(buildIntegerParam(createBindParams(1), 0, 20, TSDB_DATA_TYPE_INT), 0); + exec(); + bindParams(buildIntegerParam(createBindParams(1), 0, 30, TSDB_DATA_TYPE_INT), 0); + exec(); +} + +TEST_F(PlanStmtTest, allDataType) { useDb("root", "test"); } diff --git a/source/libs/planner/test/planTestUtil.cpp b/source/libs/planner/test/planTestUtil.cpp index 97ff181d7f..af8ec87158 100644 --- a/source/libs/planner/test/planTestUtil.cpp +++ b/source/libs/planner/test/planTestUtil.cpp @@ -112,8 +112,6 @@ class PlannerTestBaseImpl { reset(); try { doParseSql(sql, &stmtEnv_.pQuery_, true); - - dump(g_dumpModule); } catch (...) { dump(DUMP_MODULE_ALL); throw; @@ -123,6 +121,15 @@ class PlannerTestBaseImpl { void bindParams(TAOS_MULTI_BIND* pParams, int32_t colIdx) { try { doBindParams(stmtEnv_.pQuery_, pParams, colIdx); + } catch (...) { + dump(DUMP_MODULE_ALL); + throw; + } + } + + void exec() { + try { + doParseBoundSql(stmtEnv_.pQuery_); SPlanContext cxt = {0}; setPlanContext(stmtEnv_.pQuery_, &cxt); @@ -148,17 +155,6 @@ class PlannerTestBaseImpl { } } - void exec() { - try { - doParseBoundSql(stmtEnv_.pQuery_); - - dump(g_dumpModule); - } catch (...) { - dump(DUMP_MODULE_ALL); - throw; - } - } - private: struct caseEnv { string acctId_; @@ -208,11 +204,17 @@ class PlannerTestBaseImpl { cout << "==========================================sql : [" << stmtEnv_.sql_ << "]" << endl; if (DUMP_MODULE_ALL == module || DUMP_MODULE_PARSER == module) { - cout << "syntax tree : " << endl; - cout << res_.ast_ << endl; - - cout << "bound syntax tree : " << endl; - cout << res_.boundAst_ << endl; + if (res_.prepareAst_.empty()) { + cout << "syntax tree : " << endl; + cout << res_.ast_ << endl; + } else { + cout << "prepare syntax tree : " << endl; + cout << res_.prepareAst_ << endl; + cout << "bound syntax tree : " << endl; + cout << res_.boundAst_ << endl; + cout << "syntax tree : " << endl; + cout << res_.ast_ << endl; + } } if (DUMP_MODULE_ALL == module || DUMP_MODULE_LOGIC == module) { @@ -262,7 +264,7 @@ class PlannerTestBaseImpl { DO_WITH_THROW(qParseSql, &cxt, pQuery); if (prepare) { - res_.prepareAst_ = toString((*pQuery)->pRoot); + res_.prepareAst_ = toString((*pQuery)->pPrepareRoot); } else { res_.ast_ = toString((*pQuery)->pRoot); }