diff --git a/2.0/src/query/inc/qExecutor.h b/2.0/src/query/inc/qExecutor.h index 970b826303..967101fb41 100644 --- a/2.0/src/query/inc/qExecutor.h +++ b/2.0/src/query/inc/qExecutor.h @@ -574,11 +574,11 @@ typedef struct SMultiwayMergeInfo { } SMultiwayMergeInfo; // todo support the disk-based sort -typedef struct SOrderOperatorInfo { +typedef struct SSortOperatorInfo { int32_t colIndex; int32_t order; SSDataBlock *pDataBlock; -} SOrderOperatorInfo; +} SSortOperatorInfo; void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); @@ -609,7 +609,7 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput); -SOperatorInfo* createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal); +SOperatorInfo* createSortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal); SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); diff --git a/2.0/src/query/src/qExecutor.c b/2.0/src/query/src/qExecutor.c index 490584c75a..1e879c3912 100644 --- a/2.0/src/query/src/qExecutor.c +++ b/2.0/src/query/src/qExecutor.c @@ -2301,7 +2301,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } case OP_Order: { - pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order); + pRuntimeEnv->proot = createSortOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order); break; } @@ -5516,7 +5516,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { return NULL; } - SOrderOperatorInfo* pInfo = pOperator->info; + SSortOperatorInfo* pInfo = pOperator->info; SSDataBlock* pBlock = NULL; while(1) { @@ -5556,8 +5556,8 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; } -SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal) { - SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); +SOperatorInfo *createSortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal) { + SSortOperatorInfo* pInfo = calloc(1, sizeof(SSortOperatorInfo)); { SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock)); @@ -6611,7 +6611,7 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { } static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { - SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; + SSortOperatorInfo* pInfo = (SSortOperatorInfo*) param; pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock); } diff --git a/include/client/consumer/consumer.h b/include/client/consumer/consumer.h deleted file mode 100644 index 8d1c9835e6..0000000000 --- a/include/client/consumer/consumer.h +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_CONSUMER_H_ -#define _TD_CONSUMER_H_ - -#include "tlist.h" -#include "tarray.h" -#include "hash.h" - -#ifdef __cplusplus -extern "C" { -#endif - - //consumer handle - struct tmq_consumer_t; - typedef struct tmq_consumer_t tmq_consumer_t; - - //consumer config - struct tmq_consumer_config_t; - typedef struct tmq_consumer_config_t tmq_consumer_config_t; - - //response err - struct tmq_resp_err_t; - typedef struct tmq_resp_err_t tmq_resp_err_t; - - struct tmq_message_t; - typedef struct tmq_message_t tmq_message_t; - - struct tmq_col_batch_t; - typedef struct tmq_col_batch_t tmq_col_batch_t; - - //get content of message - tmq_col_batch_t* tmq_get_msg_col_by_idx(tmq_message_t*, int32_t col_id); - tmq_col_batch_t* tmq_get_msg_col_by_name(tmq_message_t*, const char*); - - //consumer config - int32_t tmq_conf_set(tmq_consumer_config_t* , const char* config_key, const char* config_value, char* errstr, int32_t errstr_cap); - - //consumer initialization - //resouces are supposed to be free by users by calling tmq_consumer_destroy - tmq_consumer_t* tmq_consumer_new(tmq_consumer_config_t* , char* errstr, int32_t errstr_cap); - - //subscribe - tmq_resp_err_t tmq_subscribe(tmq_consumer_t*, const SList*); - tmq_resp_err_t tmq_unsubscribe(tmq_consumer_t*); - - //consume - //resouces are supposed to be free by users by calling tmq_message_destroy - tmq_message_t* tmq_consume_poll(tmq_consumer_t*, int64_t blocking_time); - - //destroy message and free memory - void tmq_message_destroy(tmq_message_t*); - - //close consumer - int32_t tmq_consumer_close(tmq_consumer_t*); - - //destroy consumer - void tmq_consumer_destroy(tmq_message_t*); - - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_CONSUMER_H_*/ diff --git a/include/client/stream/stream.h b/include/client/stream/stream.h deleted file mode 100644 index 79b247c61c..0000000000 --- a/include/client/stream/stream.h +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_STREAM_H_ -#define _TD_STREAM_H_ - -#ifdef __cplusplus -extern "C" { -#endif - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_STREAM_H_*/ \ No newline at end of file diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 2d5fba7368..4a47acfa50 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -29,8 +29,9 @@ typedef struct SCorEpSet { } SCorEpSet; typedef struct SBlockOrderInfo { + bool nullFirst; int32_t order; - int32_t colIndex; + int32_t slotId; SColumnInfoData* pColData; } SBlockOrderInfo; @@ -176,7 +177,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock); SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols); -int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); +int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo); int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 192b4cebbc..50ff735f86 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -25,6 +25,7 @@ extern "C" { typedef struct SRpcMsg SRpcMsg; typedef struct SEpSet SEpSet; typedef struct SMgmtWrapper SMgmtWrapper; + typedef enum { QUERY_QUEUE, FETCH_QUEUE, @@ -38,10 +39,11 @@ typedef enum { typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueType qtype); -typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, SEpSet* epSet, SRpcMsg* pReq); +typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq); typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); -typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, SRpcMsg* pRsp); -typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg *pMsg); +typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp); +typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg); +typedef void (*ReleaseHandleFp)(SMgmtWrapper* pWrapper, void* handle, int8_t type); typedef struct { SMgmtWrapper* pWrapper; @@ -51,14 +53,16 @@ typedef struct { SendMnodeReqFp sendMnodeReqFp; SendRspFp sendRspFp; RegisterBrokenLinkArgFp registerBrokenLinkArgFp; + ReleaseHandleFp releaseHandleFp; } SMsgCb; int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); -int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq); +int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq); int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq); -void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp); +void tmsgSendRsp(const SMsgCb* pMsgCb, const SRpcMsg* pRsp); void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg); +void tmsgReleaseHandle(const SMsgCb* pMsgCb, void* handle, int8_t type); #ifdef __cplusplus } diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 4ad7e2dfc2..1d78702bc2 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -54,6 +54,16 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle); */ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type); +/** + * Set multiple input data blocks for the stream scan. + * @param tinfo + * @param pBlocks + * @param numOfInputBlock + * @param type + * @return + */ +int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, void** pBlocks, size_t numOfBlocks, int32_t type); + /** * Update the table id list, add or remove. * @@ -86,16 +96,6 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, */ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds); -/** - * Retrieve the produced results information, if current query is not paused or completed, - * this function will be blocked to wait for the query execution completed or paused, - * in which case enough results have been produced already. - * - * @param tinfo - * @return - */ -int32_t qRetrieveQueryResultInfo(qTaskInfo_t tinfo, bool* buildRes, void* pRspContext); - /** * kill the ongoing query and free the query handle and corresponding resources automatically * @param tinfo qhandle @@ -158,50 +158,6 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t */ int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type); -//================================================================================================ -// query handle management -/** - * Query handle mgmt object - * @param vgId - * @return - */ -void* qOpenTaskMgmt(int32_t vgId); - -/** - * broadcast the close information and wait for all query stop. - * @param pExecutor - */ -void qTaskMgmtNotifyClosing(void* pExecutor); - -/** - * Re-open the query handle management module when opening the vnode again. - * @param pExecutor - */ -void qQueryMgmtReOpen(void* pExecutor); - -/** - * Close query mgmt and clean up resources. - * @param pExecutor - */ -void qCleanupTaskMgmt(void* pExecutor); - -/** - * Add the query into the query mgmt object - * @param pMgmt - * @param qId - * @param qInfo - * @return - */ -void** qRegisterTask(void* pMgmt, uint64_t qId, void* qInfo); - -/** - * acquire the query handle according to the key from query mgmt object. - * @param pMgmt - * @param key - * @return - */ -void** qAcquireTask(void* pMgmt, uint64_t key); - /** * release the query handle and decrease the reference count in cache * @param pMgmt @@ -211,13 +167,6 @@ void** qAcquireTask(void* pMgmt, uint64_t key); */ void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle); -/** - * De-register the query handle from the management module and free it immediately. - * @param pMgmt - * @param pQInfo - * @return - */ -void** qDeregisterQInfo(void* pMgmt, void* pQInfo); void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 02fe591a09..eadc901389 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -30,6 +30,11 @@ enum { STREAM_TASK_STATUS__STOP, }; +enum { + STREAM_CREATED_BY__USER = 1, + STREAM_CREATED_BY__SMA, +}; + #if 0 // pipe -> fetch/pipe queue // merge -> merge queue diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 54813a77a3..157e0cb721 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -107,20 +107,20 @@ void rpcClose(void *); void * rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void * rpcReallocCont(void *ptr, int contLen); -void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); -void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); +// Because taosd supports multi-process mode +// These functions should not be used on the server side +// Please use tmsg functions, which are defined in tmsgcb.h +void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); void rpcSendResponse(const SRpcMsg *pMsg); +void rpcRegisterBrokenLinkArg(SRpcMsg *msg); +void rpcReleaseHandle(void *handle, int8_t type); // just release client conn to rpc instance, no close sock + +// These functions will not be called in the child process void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); +void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); -int rpcReportProgress(void *pConn, char *pCont, int contLen); -void rpcCancelRequest(int64_t rid); -void rpcRegisterBrokenLinkArg(SRpcMsg *msg); -// just release client conn to rpc instance, no close sock -void rpcReleaseHandle(void *handle, int8_t type); // -void rpcRefHandle(void *handle, int8_t type); -void rpcUnrefHandle(void *handle, int8_t type); #ifdef __cplusplus } diff --git a/include/util/tprocess.h b/include/util/tprocess.h index 052a6619c8..be6d58615e 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -55,9 +55,9 @@ int32_t taosProcRun(SProcObj *pProc); void taosProcStop(SProcObj *pProc); bool taosProcIsChild(SProcObj *pProc); int32_t taosProcChildId(SProcObj *pProc); -int32_t taosProcPutToChildQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, +int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, ProcFuncType ftype); -int32_t taosProcPutToParentQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, +int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, ProcFuncType ftype); #ifdef __cplusplus diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index d2a5f1ab74..7a958136e7 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -647,7 +647,6 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { typedef struct SSDataBlockSortHelper { SArray* orderInfo; // SArray SSDataBlock* pDataBlock; - bool nullFirst; } SSDataBlockSortHelper; int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { @@ -672,11 +671,11 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { } if (rightNull) { - return pHelper->nullFirst ? 1 : -1; + return pOrder->nullFirst ? 1 : -1; } if (leftNull) { - return pHelper->nullFirst ? -1 : 1; + return pOrder->nullFirst ? -1 : 1; } } @@ -907,7 +906,7 @@ static __compar_fn_t getComparFn(int32_t type, int32_t order) { } } -int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst) { +int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { ASSERT(pDataBlock != NULL && pOrderInfo != NULL); if (pDataBlock->info.rows <= 1) { return TSDB_CODE_SUCCESS; @@ -922,7 +921,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) { SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i); - SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->colIndex); + SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId); if (pColInfoData->hasNull) { sortColumnHasNull = true; } @@ -961,10 +960,10 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs int64_t p0 = taosGetTimestampUs(); - SSDataBlockSortHelper helper = {.nullFirst = nullFirst, .pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; + SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) { struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i); - pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->colIndex); + pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId); } taosqsort(index, rows, sizeof(int32_t), &helper, dataBlockCompar); @@ -1012,7 +1011,7 @@ SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock* for (int32_t i = 0; i < numOfCols; ++i) { SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i); - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex); + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->slotId); pInfo->pColData = pColInfo; sortValLengthPerRow += pColInfo->info.bytes; } @@ -1106,7 +1105,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF // Allocate the additional buffer. int64_t p0 = taosGetTimestampUs(); - SSDataBlockSortHelper helper = {.nullFirst = nullFirst, .pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; + SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; uint32_t rows = pDataBlock->info.rows; SHelper* index = createTupleIndex_rv(rows, helper.orderInfo, pDataBlock); diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index 328a0c2b0c..2b68aab95a 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -24,7 +24,7 @@ int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) { return (*pMsgCb->qsizeFp)(pMsgCb->pWrapper, vgId, qtype); } -int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq) { +int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) { return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq); } @@ -32,8 +32,12 @@ int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) { return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq); } -void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); } +void tmsgSendRsp(const SMsgCb* pMsgCb, const SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); } void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) { (*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg); +} + +void tmsgReleaseHandle(const SMsgCb* pMsgCb, void* handle, int8_t type) { + (*pMsgCb->releaseHandleFp)(pMsgCb->pWrapper, handle, type); } \ No newline at end of file diff --git a/source/common/test/commonTests.cpp b/source/common/test/commonTests.cpp index ccd800d3f4..d60261dfca 100644 --- a/source/common/test/commonTests.cpp +++ b/source/common/test/commonTests.cpp @@ -167,10 +167,10 @@ TEST(testCase, Datablock_test) { printf("the second row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData)); SArray* pOrderInfo = taosArrayInit(3, sizeof(SBlockOrderInfo)); - SBlockOrderInfo order = {.order = TSDB_ORDER_ASC, .colIndex = 0}; + SBlockOrderInfo order = {.nullFirst = true, .order = TSDB_ORDER_ASC, .slotId = 0}; taosArrayPush(pOrderInfo, &order); - blockDataSort(b, pOrderInfo, true); + blockDataSort(b, pOrderInfo); blockDataDestroy(b); taosArrayDestroy(pOrderInfo); diff --git a/source/dnode/mgmt/container/inc/dnd.h b/source/dnode/mgmt/container/inc/dnd.h index 6968ae4609..3cdde09532 100644 --- a/source/dnode/mgmt/container/inc/dnd.h +++ b/source/dnode/mgmt/container/inc/dnd.h @@ -137,8 +137,8 @@ void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc void dndSendMonitorReport(SDnode *pDnode); int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); -int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg); -void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); +int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg); +void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/container/inc/dndInt.h b/source/dnode/mgmt/container/inc/dndInt.h index d10835b67f..0f0cc78a1d 100644 --- a/source/dnode/mgmt/container/inc/dndInt.h +++ b/source/dnode/mgmt/container/inc/dndInt.h @@ -56,7 +56,7 @@ void dndCleanupServer(SDnode *pDnode); int32_t dndInitClient(SDnode *pDnode); void dndCleanupClient(SDnode *pDnode); int32_t dndInitMsgHandle(SDnode *pDnode); -void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); +void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index 797e2535d6..baf25b73d6 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -309,7 +309,7 @@ int32_t dndInitMsgHandle(SDnode *pDnode) { return 0; } -static int32_t dndSendRpcReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) { +static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *pReq) { if (pMgmt->clientRpc == NULL) { terrno = TSDB_CODE_DND_OFFLINE; return -1; @@ -319,7 +319,7 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) { return 0; } -int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pReq) { +int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) { if (pWrapper->procType == PROC_CHILD) { } else { SDnode *pDnode = pWrapper->pDnode; @@ -348,7 +348,7 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { } } -void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { +void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { if (pRsp->code == TSDB_CODE_APP_NOT_READY) { SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE); if (pDnodeWrapper != NULL) { @@ -362,7 +362,7 @@ void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { } } -void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { +void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { if (pWrapper->procType == PROC_CHILD) { int32_t code = -1; do { diff --git a/source/dnode/mgmt/dnode/inc/dm.h b/source/dnode/mgmt/dnode/inc/dm.h index 6c18d7969c..3984e6dbd4 100644 --- a/source/dnode/mgmt/dnode/inc/dm.h +++ b/source/dnode/mgmt/dnode/inc/dm.h @@ -29,7 +29,7 @@ void dmInitMsgHandles(SMgmtWrapper *pWrapper); void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); -void dmSendRedirectRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); +void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/dnode/src/dmInt.c b/source/dnode/mgmt/dnode/src/dmInt.c index 53049f7e78..b729888a72 100644 --- a/source/dnode/mgmt/dnode/src/dmInt.c +++ b/source/dnode/mgmt/dnode/src/dmInt.c @@ -54,7 +54,7 @@ void dmGetDnodeEp(SMgmtWrapper *pWrapper, int32_t dnodeId, char *pEp, char *pFqd taosRUnLockLatch(&pMgmt->latch); } -void dmSendRedirectRsp(SDnodeMgmt *pMgmt, SRpcMsg *pReq) { +void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pReq) { SDnode *pDnode = pMgmt->pDnode; SEpSet epSet = {0}; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index caf5172596..6976d83abd 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -731,10 +731,10 @@ typedef struct { int32_t vgNum; SRWLatch lock; int8_t status; - int8_t sourceType; - int8_t sinkType; // int32_t sqlLen; - int32_t sinkVgId; // 0 for automatic + int8_t createdBy; // STREAM_CREATED_BY__USER or SMA + int32_t fixedSinkVgId; // 0 for shuffle + int64_t smaId; // 0 for unused char* sql; char* logicalPlan; char* physicalPlan; diff --git a/source/dnode/mnode/impl/inc/mndScheduler.h b/source/dnode/mnode/impl/inc/mndScheduler.h index 416061bf34..42951beca2 100644 --- a/source/dnode/mnode/impl/inc/mndScheduler.h +++ b/source/dnode/mnode/impl/inc/mndScheduler.h @@ -27,7 +27,7 @@ void mndCleanupScheduler(SMnode* pMnode); int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub); -int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId); +int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index e7cdd34a7e..b5d22cb7a5 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId); +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 1b3564924a..6374b4cad2 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -26,8 +26,11 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1; if (tEncodeI32(pEncoder, pObj->version) < 0) return -1; if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; + if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1; + if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1; - if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1; + /*if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;*/ if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1; // TODO encode tasks if (pObj->tasks) { @@ -69,8 +72,11 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1; if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; + if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1; + if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1; - if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1; + /*if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;*/ if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1; pObj->tasks = NULL; int32_t sz; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 566bd1d282..4562d9e5d3 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -119,7 +119,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { return pVgroup; } -int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) { +int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SSdb* pSdb = pMnode->pSdb; void* pIter = NULL; SArray* tasks = taosArrayGetP(pStream->tasks, 0); @@ -151,9 +151,9 @@ int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, pTask->execType = TASK_EXEC__NONE; // sink - if (smaId != -1) { + if (pStream->createdBy == STREAM_CREATED_BY__SMA) { pTask->sinkType = TASK_SINK__SMA; - pTask->smaSink.smaId = smaId; + pTask->smaSink.smaId = pStream->smaId; } else { pTask->sinkType = TASK_SINK__TABLE; } @@ -166,7 +166,45 @@ int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, return 0; } -int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) { +int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { + ASSERT(pStream->fixedSinkVgId != 0); + SArray* tasks = taosArrayGetP(pStream->tasks, 0); + SStreamTask* pTask = tNewSStreamTask(pStream->uid); + if (pTask == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + taosArrayPush(tasks, &pTask); + + pTask->nodeId = pStream->fixedSinkVgId; + SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId); + if (pVgroup == NULL) { + return -1; + } + pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); + // source + pTask->sourceType = TASK_SOURCE__MERGE; + + // exec + pTask->execType = TASK_EXEC__NONE; + + // sink + if (pStream->createdBy == STREAM_CREATED_BY__SMA) { + pTask->sinkType = TASK_SINK__SMA; + pTask->smaSink.smaId = pStream->smaId; + } else { + pTask->sinkType = TASK_SINK__TABLE; + } + // + // dispatch + pTask->dispatchType = TASK_DISPATCH__NONE; + + mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId); + + return 0; +} + +int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SSdb* pSdb = pMnode->pSdb; SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); if (pPlan == NULL) { @@ -185,7 +223,11 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i taosArrayPush(pStream->tasks, &taskOneLevel); // add extra sink hasExtraSink = true; - mndAddSinkToStream(pMnode, pTrans, pStream, smaId); + if (pStream->fixedSinkVgId == 0) { + mndAddShuffledSinkToStream(pMnode, pTrans, pStream); + } else { + mndAddFixedSinkToStream(pMnode, pTrans, pStream); + } } for (int32_t level = 0; level < totLevel; level++) { @@ -221,12 +263,14 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i pTask->sinkType = TASK_SINK__SHOW; pTask->showSink.reserved = 0; if (!hasExtraSink) { - if (smaId != -1) { +#if 1 + if (pStream->createdBy == STREAM_CREATED_BY__SMA) { pTask->sinkType = TASK_SINK__SMA; - pTask->smaSink.smaId = smaId; + pTask->smaSink.smaId = pStream->smaId; } else { pTask->sinkType = TASK_SINK__TABLE; } +#endif } } else { pTask->sinkType = TASK_SINK__NONE; @@ -286,35 +330,47 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i if (hasExtraSink) { // add dispatcher - pTask->dispatchType = TASK_DISPATCH__SHUFFLE; + if (pStream->fixedSinkVgId == 0) { + pTask->dispatchType = TASK_DISPATCH__SHUFFLE; - pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC; - SDbObj* pDb = mndAcquireDb(pMnode, pStream->db); - ASSERT(pDb); - if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { + pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC; + SDbObj* pDb = mndAcquireDb(pMnode, pStream->db); + ASSERT(pDb); + if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { + sdbRelease(pSdb, pDb); + qDestroyQueryPlan(pPlan); + return -1; + } sdbRelease(pSdb, pDb); - qDestroyQueryPlan(pPlan); - return -1; - } - sdbRelease(pSdb, pDb); - // put taskId to useDbRsp - // TODO: optimize - SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t sz = taosArrayGetSize(pVgs); - SArray* sinkLv = taosArrayGetP(pStream->tasks, 0); - int32_t sinkLvSize = taosArrayGetSize(sinkLv); - for (int32_t i = 0; i < sz; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); - for (int32_t j = 0; j < sinkLvSize; j++) { - SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); - /*printf("vgid %d node id %d\n", pVgInfo->vgId, pTask->nodeId);*/ - if (pLastLevelTask->nodeId == pVgInfo->vgId) { - pVgInfo->taskId = pLastLevelTask->taskId; - /*printf("taskid %d set to %d\n", pVgInfo->taskId, pTask->taskId);*/ - break; + // put taskId to useDbRsp + // TODO: optimize + SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t sz = taosArrayGetSize(pVgs); + SArray* sinkLv = taosArrayGetP(pStream->tasks, 0); + int32_t sinkLvSize = taosArrayGetSize(sinkLv); + for (int32_t i = 0; i < sz; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); + for (int32_t j = 0; j < sinkLvSize; j++) { + SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); + /*printf("vgid %d node id %d\n", pVgInfo->vgId, pTask->nodeId);*/ + if (pLastLevelTask->nodeId == pVgInfo->vgId) { + pVgInfo->taskId = pLastLevelTask->taskId; + /*printf("taskid %d set to %d\n", pVgInfo->taskId, pTask->taskId);*/ + break; + } } } + } else { + pTask->dispatchType = TASK_DISPATCH__FIXED; + pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC; + SArray* pArray = taosArrayGetP(pStream->tasks, 0); + // one sink only + ASSERT(taosArrayGetSize(pArray) == 1); + SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); + pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; + pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; + pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; } } #endif diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 5c62cfa0f2..94114a96bf 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -415,6 +415,10 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre streamObj.dbUid = pDb->uid; streamObj.version = 1; streamObj.sql = pCreate->sql; + streamObj.createdBy = STREAM_CREATED_BY__SMA; + // TODO + streamObj.fixedSinkVgId = 0; + streamObj.smaId = smaObj.uid; /*streamObj.physicalPlan = "";*/ streamObj.logicalPlan = "not implemented"; @@ -428,7 +432,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, smaObj.uid) != 0) goto _OVER; + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index bbb2f64282..376a41b0cd 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -246,7 +246,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { return code; } -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId) { +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) { SNode *pAst = NULL; if (nodesStringToNode(ast, &pAst) < 0) { @@ -271,7 +271,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast return -1; } - if (mndScheduleStream(pMnode, pTrans, pStream, smaId) < 0) { + if (mndScheduleStream(pMnode, pTrans, pStream) < 0) { mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr()); return -1; } @@ -300,6 +300,10 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe streamObj.dbUid = pDb->uid; streamObj.version = 1; streamObj.sql = pCreate->sql; + streamObj.createdBy = STREAM_CREATED_BY__USER; + // TODO + streamObj.fixedSinkVgId = 0; + streamObj.smaId = 0; /*streamObj.physicalPlan = "";*/ streamObj.logicalPlan = "not implemented"; @@ -310,7 +314,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe } mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, -1) != 0) { + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) { mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 8c161e4c8b..9169f78866 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -157,6 +157,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { } } +#if 0 int j = 0; for (int32_t i = 0; i < colNumNeed; i++) { col_id_t colId = *(col_id_t*)taosArrayGet(pHandle->pColIdList, i); @@ -183,6 +184,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { blockDataEnsureColumnCapacity(&colInfo, numOfRows); taosArrayPush(pArray, &colInfo); } +#endif STSRowIter iter = {0}; tdSTSRowIterInit(&iter, pTschema); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index fe48cc21e4..2bbd16fbd0 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -430,9 +430,10 @@ typedef struct STagScanInfo { } STagScanInfo; typedef struct SStreamBlockScanInfo { + SArray* pBlockLists; // multiple SSDatablock. SSDataBlock* pRes; // result SSDataBlock int32_t blockType; // current block type - bool blockValid; // Is current data has returned? + int32_t validBlockIndex; // Is current data has returned? SColumnInfo* pCols; // the output column info uint64_t numOfRows; // total scanned rows uint64_t numOfExec; // execution times @@ -572,11 +573,13 @@ typedef struct SGroupbyOperatorInfo { typedef struct SSessionAggOperatorInfo { SOptrBasicInfo binfo; SAggSupporter aggSup; + SGroupResInfo groupResInfo; STimeWindow curWindow; // current time window TSKEY prevTs; // previous timestamp int32_t numOfRows; // number of rows int32_t start; // start row index bool reptScan; // next round scan + int64_t gap; // session window gap } SSessionAggOperatorInfo; typedef struct SStateWindowOperatorInfo { @@ -593,8 +596,7 @@ typedef struct SSortedMergeOperatorInfo { SOptrBasicInfo binfo; bool hasVarCol; - SArray *orderInfo; // SArray - bool nullFirst; + SArray* pSortInfo; int32_t numOfSources; SSortHandle *pSortHandle; @@ -613,12 +615,10 @@ typedef struct SSortedMergeOperatorInfo { SAggSupporter aggSup; } SSortedMergeOperatorInfo; -typedef struct SOrderOperatorInfo { +typedef struct SSortOperatorInfo { uint32_t sortBufSize; // max buffer size for in-memory sort SSDataBlock *pDataBlock; - bool hasVarCol; // has variable length column, such as binary/varchar/nchar - SArray *orderInfo; - bool nullFirst; + SArray* pSortInfo; SSortHandle *pSortHandle; int32_t bufPageSize; int32_t numOfRowsInRes; @@ -629,7 +629,7 @@ typedef struct SOrderOperatorInfo { uint64_t totalSize; // total load bytes from remote uint64_t totalRows; // total number of rows uint64_t totalElapsed; // total elapsed time -} SOrderOperatorInfo; +} SSortOperatorInfo; typedef struct SDistinctDataInfo { int32_t index; @@ -655,15 +655,15 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index ef7af2b4e3..8971ee33d3 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -63,7 +63,7 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void* * @param type * @return */ -SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, int32_t pageSize, int32_t numOfPages, SSchema* pSchema, int32_t numOfCols, const char* idstr); +SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr); /** * diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index f0cffafca2..2c6468a13f 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -19,8 +19,6 @@ #include "executil.h" #include "executorimpl.h" -//#include "queryLog.h" -#include "tbuffer.h" #include "tcompression.h" #include "tlosertree.h" diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 26422fa618..3be496bc2b 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -14,11 +14,12 @@ */ #include "executor.h" +#include "tdatablock.h" #include "executorimpl.h" #include "planner.h" #include "vnode.h" -static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t type, char* id) { +static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void** input, size_t numOfBlocks, int32_t type, char* id) { ASSERT(pOperator != NULL); if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -31,7 +32,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t return TSDB_CODE_QRY_APP_ERROR; } pOperator->status = OP_NOT_OPENED; - return doSetStreamBlock(pOperator->pDownstream[0], input, type, id); + return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id); } else { SStreamBlockScanInfo* pInfo = pOperator->info; @@ -43,20 +44,20 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t } if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) { - if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { + if (tqReadHandleSetMsg(pInfo->readerHandle, input[0], 0) < 0) { qError("submit msg messed up when initing stream block, %s" PRIx64, id); return TSDB_CODE_QRY_APP_ERROR; } } else { - ASSERT(!pInfo->blockValid); + for (int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* pDataBlock = input[i]; - SSDataBlock* pDataBlock = input; - pInfo->pRes->info = pDataBlock->info; - taosArrayClear(pInfo->pRes->pDataBlock); - taosArrayAddAll(pInfo->pRes->pDataBlock, pDataBlock->pDataBlock); + SSDataBlock* p = createOneDataBlock(pDataBlock); + p->info = pDataBlock->info; - // set current block valid. - pInfo->blockValid = true; + taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); + taosArrayPush(pInfo->pBlockLists, &p); + } } return TSDB_CODE_SUCCESS; @@ -64,17 +65,21 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t } int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) { + qSetMultiStreamInput(tinfo, (void**) &input, 1, type); +} + +int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, void** pBlocks, size_t numOfBlocks, int32_t type) { if (tinfo == NULL) { return TSDB_CODE_QRY_APP_ERROR; } - if (input == NULL) { + if (pBlocks == NULL || numOfBlocks == 0) { return TSDB_CODE_SUCCESS; } SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)input, type, GET_TASKID(pTaskInfo)); + int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); } else { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 081c0aa671..b66c50cad1 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -69,6 +69,7 @@ typedef enum SResultTsInterpType { typedef struct SColMatchInfo { int32_t colId; int32_t targetSlotId; + bool output; } SColMatchInfo; #if 0 @@ -322,26 +323,6 @@ static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, STaskRuntimeEn } //setup the output buffer for each operator -SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) { - const static int32_t minSize = 8; - - SSDataBlock *res = taosMemoryCalloc(1, sizeof(SSDataBlock)); - res->info.numOfCols = numOfOutput; - res->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); - for (int32_t i = 0; i < numOfOutput; ++i) { - SColumnInfoData idata = {{0}}; - idata.info.type = pExpr[i].base.resSchema.type; - idata.info.bytes = pExpr[i].base.resSchema.bytes; - idata.info.colId = pExpr[i].base.resSchema.colId; - - int32_t size = TMAX(idata.info.bytes * numOfRows, minSize); - idata.pData = taosMemoryCalloc(1, size); // at least to hold a pointer on x64 platform - taosArrayPush(res->pDataBlock, &idata); - } - - return res; -} - SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) { int32_t numOfCols = LIST_LENGTH(pNode->pSlots); @@ -350,11 +331,15 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) { pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); pBlock->info.blockId = pNode->dataBlockId; - pBlock->info.rowSize = pNode->totalRowSize; + pBlock->info.rowSize = pNode->totalRowSize; // todo ?? for(int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData idata = {{0}}; SSlotDescNode* pDescNode = nodesListGetNode(pNode->pSlots, i); + if (!pDescNode->output) { + continue; + } + idata.info.type = pDescNode->dataType.type; idata.info.bytes = pDescNode->dataType.bytes; idata.info.scale = pDescNode->dataType.scale; @@ -1813,25 +1798,27 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) { } } -static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { - STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - STableQueryInfo* item = pRuntimeEnv->current; +// todo handle multiple tables cases. +static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo *pInfo, SSDataBlock *pBlock) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; // primary timestamp column - SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0); - bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); - SOptrBasicInfo* pBInfo = &pInfo->binfo; + bool masterScan = true; + STimeWindow window = {0}; + int32_t numOfOutput = pOperator->numOfOutput; + int64_t gid = pBlock->info.groupId; - int64_t gap = pOperator->pRuntimeEnv->pQueryAttr->sw.gap; + int64_t gap = pInfo->gap; pInfo->numOfRows = 0; - if (IS_REPEAT_SCAN(pRuntimeEnv) && !pInfo->reptScan) { + if (/*IS_REPEAT_SCAN(pRuntimeEnv) && */!pInfo->reptScan) { pInfo->reptScan = true; pInfo->prevTs = INT64_MIN; } TSKEY* tsList = (TSKEY*)pColInfoData->pData; - for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { + for (int32_t j = 0; j < pBlock->info.rows; ++j) { if (pInfo->prevTs == INT64_MIN) { pInfo->curWindow.skey = tsList[j]; pInfo->curWindow.ekey = tsList[j]; @@ -1848,17 +1835,15 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator } } else { // start a new session window SResultRow* pResult = NULL; - pInfo->curWindow.ekey = pInfo->curWindow.skey; - int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.uid, &pInfo->curWindow, masterScan, - &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, - pBInfo->rowCellInfoOffset); + int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &window, masterScan, &pResult, gid, pInfo->binfo.pCtx, + numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); + longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } -// doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, -// pSDataBlock->info.rows, pOperator->numOfOutput); + // pInfo->numOfRows data belong to the current session window + doApplyFunctions(pInfo->binfo.pCtx, &window, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); pInfo->curWindow.skey = tsList[j]; pInfo->curWindow.ekey = tsList[j]; @@ -1871,15 +1856,13 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator SResultRow* pResult = NULL; pInfo->curWindow.ekey = pInfo->curWindow.skey; - int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.uid, &pInfo->curWindow, masterScan, - &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, - pBInfo->rowCellInfoOffset); + int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &window, masterScan, &pResult, gid, pInfo->binfo.pCtx, + numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); + longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } -// doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, -// pSDataBlock->info.rows, pOperator->numOfOutput); + doApplyFunctions(pInfo->binfo.pCtx, &window, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { @@ -2883,16 +2866,19 @@ int32_t loadDataBlock(SExecTaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo, return terrno; } - int32_t numOfCols = pBlock->info.numOfCols; - for(int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* p = taosArrayGet(pCols, i); - SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i); - ASSERT(pColMatchInfo->colId == p->info.colId); - - taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p); + int32_t numOfCols = pBlock->info.numOfCols; + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* p = taosArrayGet(pCols, i); + SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i); + if (!pColMatchInfo->output) { + continue; } - return TSDB_CODE_SUCCESS; + ASSERT(pColMatchInfo->colId == p->info.colId); + taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p); + } + + return TSDB_CODE_SUCCESS; } int32_t loadDataBlockOnDemand(SExecTaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { @@ -4738,6 +4724,17 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo *pOperator, bool* newgroup) { #endif } +static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { + size_t total = taosArrayGetSize(pInfo->pBlockLists); + + pInfo->validBlockIndex = 0; + for(int32_t i = 0; i < total; ++i) { + SSDataBlock* p = taosArrayGet(pInfo->pBlockLists, i); + blockDataDestroy(p); + } + taosArrayClear(pInfo->pBlockLists); +} + static SSDataBlock* doStreamBlockScan(SOperatorInfo *pOperator, bool* newgroup) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -4750,43 +4747,45 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo *pOperator, bool* newgroup) } if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) { - if (pInfo->blockValid) { - pInfo->blockValid = false; // this block can only be used once. - return pInfo->pRes; - } else { + size_t total = taosArrayGetSize(pInfo->pBlockLists); + if (pInfo->validBlockIndex >= total) { + doClearBufferedBlocks(pInfo); return NULL; } + + int32_t current = pInfo->validBlockIndex++; + return taosArrayGet(pInfo->pBlockLists, current); + } else { + SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; + blockDataCleanup(pInfo->pRes); + + while (tqNextDataBlock(pInfo->readerHandle)) { + pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + terrno = pTaskInfo->code; + return NULL; + } + + if (pBlockInfo->rows == 0) { + return NULL; + } + + pInfo->pRes->pDataBlock = tqRetrieveDataBlock(pInfo->readerHandle); + if (pInfo->pRes->pDataBlock == NULL) { + // TODO add log + pTaskInfo->code = terrno; + return NULL; + } + + break; + } + + // record the scan action. + pInfo->numOfExec++; + pInfo->numOfRows += pBlockInfo->rows; + + return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; } - - SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; - blockDataCleanup(pInfo->pRes); - - while (tqNextDataBlock(pInfo->readerHandle)) { - pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - terrno = pTaskInfo->code; - return NULL; - } - - if (pBlockInfo->rows == 0) { - return NULL; - } - - pInfo->pRes->pDataBlock = tqRetrieveDataBlock(pInfo->readerHandle); - if (pInfo->pRes->pDataBlock == NULL) { - // TODO add log - pTaskInfo->code = terrno; - return NULL; - } - - break; - } - - // record the scan action. - pInfo->numOfExec++; - pInfo->numOfRows += pBlockInfo->rows; - - return (pBlockInfo->rows == 0)? NULL:pInfo->pRes; } int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) { @@ -5423,6 +5422,13 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock* return NULL; } + pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES); + if (pInfo->pBlockLists == NULL) { + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + return NULL; + } + pInfo->readerHandle = streamReadHandle; pInfo->pRes = pResBlock; @@ -5840,14 +5846,14 @@ static void cleanupAggSup(SAggSupporter* pAggSup); static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*) param; - taosArrayDestroy(pInfo->orderInfo); + taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->groupInfo); if (pInfo->pSortHandle != NULL) { tsortDestroySortHandle(pInfo->pSortHandle); } - blockDataDestroy(pInfo->binfo.pRes); + blockDataDestroy(pInfo->binfo.pRes); cleanupAggSup(&pInfo->aggSup); } @@ -6105,12 +6111,10 @@ static SSDataBlock* doSortedMerge(SOperatorInfo *pOperator, bool* newgroup) { return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->hasVarCol, pInfo->binfo.capacity); } - SSchema* p = blockDataExtractSchema(pInfo->binfo.pRes, NULL); int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, - numOfBufPage, p, pInfo->binfo.pRes->info.numOfCols, "GET_TASKID(pTaskInfo)"); + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, + numOfBufPage, pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)"); - taosMemoryFreeClear(p); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); for(int32_t i = 0; i < pOperator->numOfDownstream; ++i) { @@ -6128,29 +6132,6 @@ static SSDataBlock* doSortedMerge(SOperatorInfo *pOperator, bool* newgroup) { return doMerge(pOperator); } -static SArray* createBlockOrder(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal) { - SArray* pOrderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); - - size_t numOfOrder = taosArrayGetSize(pOrderVal); - for (int32_t j = 0; j < numOfOrder; ++j) { - SBlockOrderInfo orderInfo = {0}; - SOrder* pOrder = taosArrayGet(pOrderVal, j); - orderInfo.order = pOrder->order; - - for (int32_t i = 0; i < numOfCols; ++i) { - SExprInfo* pExpr = &pExprInfo[i]; - if (pExpr->base.resSchema.colId == pOrder->col.colId) { - orderInfo.colIndex = i; - break; - } - } - - taosArrayPush(pOrderInfo, &orderInfo); - } - - return pOrderInfo; -} - static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGroupInfo, SSortedMergeOperatorInfo* pInfo) { if (pGroupInfo == NULL || taosArrayGetSize(pGroupInfo) == 0) { return 0; @@ -6199,7 +6180,7 @@ static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGr return TSDB_CODE_SUCCESS; } -SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo) { SSortedMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortedMergeOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -6228,7 +6209,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t // pRuntimeEnv->pQueryAttr->topBotQuery, false)); pInfo->sortBufSize = 1024 * 16; // 1MB pInfo->bufPageSize = 1024; - pInfo->orderInfo = createBlockOrder(pExprInfo, num, pOrderVal); + pInfo->pSortInfo = pSortInfo; pInfo->binfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize); @@ -6268,35 +6249,34 @@ static SSDataBlock* doSort(SOperatorInfo *pOperator, bool* newgroup) { } SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SOrderOperatorInfo* pInfo = pOperator->info; + SSortOperatorInfo* pInfo = pOperator->info; + bool hasVarCol = pInfo->pDataBlock->info.hasVarCol; + if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->hasVarCol, pInfo->numOfRowsInRes); + return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes); } - SSchema* p = blockDataExtractSchema(pInfo->pDataBlock, NULL); int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, - numOfBufPage, p, pInfo->pDataBlock->info.numOfCols, "GET_TASKID(pTaskInfo)"); + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, + numOfBufPage, pInfo->pDataBlock, "GET_TASKID(pTaskInfo)"); - taosMemoryFreeClear(p); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); SGenericSource* ps = taosMemoryCalloc(1, sizeof(SGenericSource)); - ps->param = pOperator; + ps->param = pOperator->pDownstream[0]; tsortAddSource(pInfo->pSortHandle, ps); - // TODO set error code; int32_t code = tsortOpen(pInfo->pSortHandle); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, terrno); } pOperator->status = OP_RES_TO_RETURN; - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->hasVarCol, pInfo->numOfRowsInRes); + return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes); } -SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal, SExecTaskInfo* pTaskInfo) { - SOrderOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SOrderOperatorInfo)); +SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo) { + SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { taosMemoryFreeClear(pInfo); @@ -6305,37 +6285,21 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx return NULL; } - pInfo->sortBufSize = 1024 * 16; // 1MB - pInfo->bufPageSize = 1024; - pInfo->numOfRowsInRes = 1024; + pInfo->sortBufSize = 1024 * 16; // 1MB, TODO dynamic set the available sort buffer + pInfo->bufPageSize = 1024; + pInfo->numOfRowsInRes = 1024; + pInfo->pDataBlock = pResBlock; + pInfo->pSortInfo = pSortInfo; - pInfo->orderInfo = createBlockOrder(pExprInfo, numOfCols, pOrderVal); - - for(int32_t i = 0; i < numOfCols; ++i) { - if (IS_VAR_DATA_TYPE(pExprInfo[i].base.resSchema.type)) { - pInfo->hasVarCol = true; - break; - } - } - - if (pInfo->orderInfo == NULL || pInfo->pDataBlock == NULL) { - taosMemoryFreeClear(pOperator); - destroyOrderOperatorInfo(pInfo, numOfCols); - taosMemoryFreeClear(pInfo); - - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; - } - - pOperator->name = "Order"; + pOperator->name = "Sort"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doSort; - pOperator->closeFn = destroyOrderOperatorInfo; + pOperator->getNextFn = doSort; + pOperator->closeFn = destroyOrderOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -6496,7 +6460,6 @@ static bool aggDecodeResultRow(SOperatorInfo* pOperator, char *result, int32_t l offset += valueLen; initResultRow(resultRow); - pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = (SResultRowPosition) {.pageId = resultRow->pageId, .offset = resultRow->offset}; } @@ -6741,7 +6704,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) { STableIntervalOperatorInfo* pInfo = pOperator->info; - // int32_t order = pQueryAttr->order.order; + int32_t order = TSDB_ORDER_ASC; // STimeWindow win = pQueryAttr->window; bool newgroup = false; SOperatorInfo* downstream = pOperator->pDownstream[0]; @@ -6758,7 +6721,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) { // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); } @@ -6770,7 +6733,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) { return TSDB_CODE_SUCCESS; } -static SSDataBlock* doIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { +static SSDataBlock* doBuildIntervalResult(SOperatorInfo *pOperator, bool* newgroup) { STableIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -7102,13 +7065,14 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo *pOperator, bool* newgroup) return NULL; } - SSessionAggOperatorInfo* pWindowInfo = pOperator->info; - SOptrBasicInfo* pBInfo = &pWindowInfo->binfo; + SSessionAggOperatorInfo* pInfo = pOperator->info; + SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { -// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); - if (pBInfo->pRes->info.rows == 0/* || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) { - pOperator->status = OP_EXEC_DONE; + toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset); + if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + doSetOperatorCompleted(pOperator); + return NULL; } return pBInfo->pRes; @@ -7127,19 +7091,20 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo *pOperator, bool* newgroup) // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order); - doSessionWindowAggImpl(pOperator, pWindowInfo, pBlock); + doSessionWindowAggImpl(pOperator, pInfo, pBlock); } // restore the value pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); -// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); - finalizeQueryResult(pBInfo->pCtx, pOperator->numOfOutput); + finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); -// initGroupResInfo(&pBInfo->groupResInfo, &pBInfo->resultRowInfo); -// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); - if (pBInfo->pRes->info.rows == 0/* || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) { - pOperator->status = OP_EXEC_DONE; + initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); + + blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity); + toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset); + if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + doSetOperatorCompleted(pOperator); } return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes; @@ -7505,10 +7470,10 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { } static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { - SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; + SSortOperatorInfo* pInfo = (SSortOperatorInfo*) param; pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock); - taosArrayDestroy(pInfo->orderInfo); + taosArrayDestroy(pInfo->pSortInfo); } static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { @@ -7716,7 +7681,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; pOperator->_openFn = doOpenIntervalAgg; - pOperator->getNextFn = doIntervalAgg; + pOperator->getNextFn = doBuildIntervalResult; pOperator->closeFn = destroyIntervalOperatorInfo; code = appendDownstream(pOperator, &downstream, 1); @@ -7782,25 +7747,27 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper return pOperator; } -SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo) { SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols, pTaskInfo->id.str); + int32_t numOfRows = 4096; + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); + pInfo->gap = gap; pInfo->binfo.pRes = pResBlock; pInfo->prevTs = INT64_MIN; pInfo->reptScan = false; pOperator->name = "SessionWindowAggOperator"; -// pOperator->operatorType = OP_SessionWindow; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; @@ -8035,7 +8002,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI offset += pExpr[index->colIndex].base.resSchema.bytes; } - pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pOperator->resultInfo.capacity); +// pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pOperator->resultInfo.capacity); pOperator->name = "SLimitOperator"; @@ -8328,7 +8295,7 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato pInfo->outputCapacity = 4096; pInfo->pDistinctDataInfo = taosArrayInit(numOfOutput, sizeof(SDistinctDataInfo)); pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity); +// pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -8535,16 +8502,17 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo); static SArray* extractScanColumnId(SNodeList* pNodeList); static SArray* extractColumnInfo(SNodeList* pNodeList); -static SArray* extractColMatchInfo(SNodeList* pNodeList); +static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols); +static SArray* createSortInfo(SNodeList* pNodeList); SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; - size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols); + int32_t numOfCols = 0; tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); - SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols); + SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pColList, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { @@ -8583,15 +8551,13 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa size_t size = LIST_LENGTH(pPhyNode->pChildren); assert(size == 1); - for (int32_t i = 0; i < size; ++i) { - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); + SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); - int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(((SProjectPhysiNode*)pPhyNode)->pProjections, NULL, &num); - SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo); - } + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(((SProjectPhysiNode*)pPhyNode)->pProjections, NULL, &num); + SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_AGG == nodeType(pPhyNode)) { size_t size = LIST_LENGTH(pPhyNode->pChildren); assert(size == 1); @@ -8623,16 +8589,46 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; + //todo: set the correct primary timestamp key column int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - SInterval interval = {.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, + SInterval interval = {.interval = pIntervalPhyNode->interval, + .sliding = pIntervalPhyNode->sliding, .intervalUnit = pIntervalPhyNode->intervalUnit, - .slidingUnit = pIntervalPhyNode->slidingUnit, .offset = pIntervalPhyNode->offset}; + .slidingUnit = pIntervalPhyNode->slidingUnit, + .offset = pIntervalPhyNode->offset}; return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, pTableGroupInfo, pTaskInfo); } - } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { + } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) { + size_t size = LIST_LENGTH(pPhyNode->pChildren); + assert(size == 1); + + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); + SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + + SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; + + SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + SArray* info = createSortInfo(pSortPhyNode->pSortKeys); + return createSortOperatorInfo(op, pResBlock, info, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == nodeType(pPhyNode)) { + size_t size = LIST_LENGTH(pPhyNode->pChildren); + assert(size == 1); + + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); + SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + + SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; + + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); + SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + return createSessionAggOperatorInfo(op, pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); + } else { + ASSERT(0); + }/*else if (pPhyNode->info.type == OP_MultiTableAggregate) { size_t size = taosArrayGetSize(pPhyNode->pChildren); assert(size == 1); @@ -8725,7 +8721,39 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { return pList; } -SArray* extractColMatchInfo(SNodeList* pNodeList) { +SArray* createSortInfo(SNodeList* pNodeList) { + size_t numOfCols = LIST_LENGTH(pNodeList); + SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo)); + if (pList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return pList; + } + + for(int32_t i = 0; i < numOfCols; ++i) { + STargetNode* pNode = (STargetNode*) nodesListGetNode(pNodeList, i); + SOrderByExprNode* pSortKey = (SOrderByExprNode*) pNode->pExpr; + SBlockOrderInfo bi = {0}; + bi.order = (pSortKey->order == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC; + bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST); + + SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr; + bi.slotId = pColNode->slotId; +// pColNode->order; +// SColumn c = {0}; +// c.slotId = pColNode->slotId; +// c.colId = pColNode->colId; +// c.type = pColNode->node.resType.type; +// c.bytes = pColNode->node.resType.bytes; +// c.precision = pColNode->node.resType.precision; +// c.scale = pColNode->node.resType.scale; + + taosArrayPush(pList, &bi); + } + + return pList; +} + +SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols) { size_t numOfCols = LIST_LENGTH(pNodeList); SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo)); if (pList == NULL) { @@ -8738,12 +8766,25 @@ SArray* extractColMatchInfo(SNodeList* pNodeList) { SColumnNode* pColNode = (SColumnNode*) pNode->pExpr; SColMatchInfo c = {0}; - c.colId = pColNode->colId; + c.colId = pColNode->colId; c.targetSlotId = pNode->slotId; - + c.output = true; taosArrayPush(pList, &c); } + *numOfOutputCols = 0; + + int32_t num = LIST_LENGTH(pOutputNodeList->pSlots); + for(int32_t i = 0; i < num; ++i) { + SSlotDescNode* pNode = (SSlotDescNode*) nodesListGetNode(pOutputNodeList->pSlots, i); + SColMatchInfo* info = taosArrayGet(pList, pNode->slotId); +// if (pNode->output) { + (*numOfOutputCols) += 1; +// } else { +// info->output = false; +// } + } + return pList; } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 85ba462c9a..7a57d62969 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -23,20 +23,19 @@ #include "tsort.h" #include "tutil.h" -typedef struct STupleHandle { +struct STupleHandle { SSDataBlock* pBlock; int32_t rowIndex; -} STupleHandle; +}; -typedef struct SSortHandle { +struct SSortHandle { int32_t type; int32_t pageSize; int32_t numOfPages; SDiskbasedBuf *pBuf; - SArray *pOrderInfo; - bool nullFirst; + SArray *pSortInfo; SArray *pOrderedSource; _sort_fetch_block_fn_t fetchfp; @@ -60,7 +59,7 @@ typedef struct SSortHandle { bool inMemSort; bool needAdjust; STupleHandle tupleHandle; -} SSortHandle; +}; static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param); @@ -90,18 +89,18 @@ static SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) { * @param type * @return */ -SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, int32_t pageSize, int32_t numOfPages, SSchema* pSchema, int32_t numOfCols, const char* idstr) { +SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr) { SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle)); pSortHandle->type = type; pSortHandle->pageSize = pageSize; pSortHandle->numOfPages = numOfPages; - pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES); - pSortHandle->pOrderInfo = pOrderInfo; - pSortHandle->nullFirst = nullFirst; - pSortHandle->cmpParam.orderInfo = pOrderInfo; + pSortHandle->pSortInfo = pSortInfo; + pSortHandle->pDataBlock = createOneDataBlock(pBlock); + + pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES); + pSortHandle->cmpParam.orderInfo = pSortInfo; - pSortHandle->pDataBlock = createDataBlock_rv(pSchema, numOfCols); tsortSetComparFp(pSortHandle, msortComparFn); if (idstr != NULL) { @@ -364,14 +363,14 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { for(int32_t i = 0; i < pInfo->size; ++i) { SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i); - SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->colIndex); + SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId); bool leftNull = false; if (pLeftColInfoData->hasNull) { leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg); } - SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->colIndex); + SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId); bool rightNull = false; if (pRightColInfoData->hasNull) { rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg); @@ -415,6 +414,9 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { static int32_t doInternalMergeSort(SSortHandle* pHandle) { size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource); + if (numOfSources == 0) { + return 0; + } // Calculate the I/O counts to complete the data sort. double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages)); @@ -542,7 +544,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { if (size > sortBufSize) { // Perform the in-memory sort and then flush data in the buffer into disk. int64_t p = taosGetTimestampUs(); - blockDataSort(pHandle->pDataBlock, pHandle->pOrderInfo, pHandle->nullFirst); + blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); int64_t el = taosGetTimestampUs() - p; pHandle->sortElapsed += el; @@ -555,7 +557,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { size_t size = blockDataGetSize(pHandle->pDataBlock); // Perform the in-memory sort and then flush data in the buffer into disk. - blockDataSort(pHandle->pDataBlock, pHandle->pOrderInfo, pHandle->nullFirst); + blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); // All sorted data can fit in memory, external memory sort is not needed. Return to directly if (size <= sortBufSize) { @@ -603,6 +605,10 @@ int32_t tsortOpen(SSortHandle* pHandle) { ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf)); } + if (numOfSources == 0) { + return 0; + } + code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle); if (code != TSDB_CODE_SUCCESS) { return code; diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 7fb9b2ad7e..c9b0b62013 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -966,7 +966,7 @@ TEST(testCase, inMem_sort_Test) { exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); taosArrayPush(pExprInfo, &exp1); - SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(10000, 5, 1000, data_asc, 1), pExprInfo, pOrderVal, NULL); + SOperatorInfo* pOperator = createSortOperatorInfo(createDummyOperator(10000, 5, 1000, data_asc, 1), pExprInfo, pOrderVal, NULL); bool newgroup = false; SSDataBlock* pRes = pOperator->getNextFn(pOperator, &newgroup); @@ -1035,7 +1035,7 @@ TEST(testCase, external_sort_Test) { exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); // taosArrayPush(pExprInfo, &exp1); - SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(10000, 1500, 1000, data_desc, 1), pExprInfo, pOrderVal, NULL); + SOperatorInfo* pOperator = createSortOperatorInfo(createDummyOperator(10000, 1500, 1000, data_desc, 1), pExprInfo, pOrderVal, NULL); bool newgroup = false; SSDataBlock* pRes = NULL; diff --git a/source/libs/executor/test/sortTests.cpp b/source/libs/executor/test/sortTests.cpp index 586aed7a67..ecea24135f 100644 --- a/source/libs/executor/test/sortTests.cpp +++ b/source/libs/executor/test/sortTests.cpp @@ -98,14 +98,14 @@ int32_t docomp(const void* p1, const void* p2, void* param) { for(int32_t i = 0; i < pInfo->size; ++i) { SBlockOrderInfo* pOrder = (SBlockOrderInfo*)TARRAY_GET_ELEM(pInfo, i); - SColumnInfoData* pLeftColInfoData = (SColumnInfoData*)TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->colIndex); + SColumnInfoData* pLeftColInfoData = (SColumnInfoData*)TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId); bool leftNull = false; if (pLeftColInfoData->hasNull) { leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg); } - SColumnInfoData* pRightColInfoData = (SColumnInfoData*) TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->colIndex); + SColumnInfoData* pRightColInfoData = (SColumnInfoData*) TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId); bool rightNull = false; if (pRightColInfoData->hasNull) { rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg); diff --git a/source/libs/index/test/fstTest.cc b/source/libs/index/test/fstTest.cc index f13fdf7214..b507b5ab02 100644 --- a/source/libs/index/test/fstTest.cc +++ b/source/libs/index/test/fstTest.cc @@ -396,6 +396,8 @@ void checkFstCheckIteratorRange1() { fw->Put("c", 3); fw->Put("d", 4); fw->Put("e", 5); + fw->Put("f", 5); + fw->Put("G", 5); delete fw; FstReadMemory* m = new FstReadMemory(1024 * 64); @@ -413,6 +415,33 @@ void checkFstCheckIteratorRange1() { assert(result.size() == 3); taosMemoryFree(ctx); } + { + // prefix search + std::vector result; + AutomationCtx* ctx = automCtxCreate((void*)"he", AUTOMATION_ALWAYS); + // [b, e) + m->SearchRange(ctx, "b", GT, "e", LT, result); + assert(result.size() == 2); + taosMemoryFree(ctx); + } + { + // prefix search + std::vector result; + AutomationCtx* ctx = automCtxCreate((void*)"he", AUTOMATION_ALWAYS); + // [b, e) + m->SearchRange(ctx, "b", GT, "e", LE, result); + assert(result.size() == 3); + taosMemoryFree(ctx); + } + { + // prefix search + std::vector result; + AutomationCtx* ctx = automCtxCreate((void*)"he", AUTOMATION_ALWAYS); + // [b, e) + m->SearchRange(ctx, "b", GE, "e", LE, result); + assert(result.size() == 4); + taosMemoryFree(ctx); + } } void checkFstCheckIteratorRange2() { FstWriter* fw = new FstWriter; @@ -424,7 +453,7 @@ void checkFstCheckIteratorRange2() { std::cout << "insert data count : " << count << "elapas time: " << e - s << std::endl; fw->Put("ab", 1); - fw->Put("bd", 2); + fw->Put("b", 2); fw->Put("cdd", 3); fw->Put("cde", 3); fw->Put("ddd", 4); @@ -438,16 +467,41 @@ void checkFstCheckIteratorRange2() { return; } { - // prefix search + // range search std::vector result; - - AutomationCtx* ctx = automCtxCreate((void*)"he", AUTOMATION_ALWAYS); - + AutomationCtx* ctx = automCtxCreate((void*)"he", AUTOMATION_ALWAYS); // [b, e) m->SearchRange(ctx, "b", GE, "ed", LT, result); assert(result.size() == 4); taosMemoryFree(ctx); } + { + // range search + std::vector result; + AutomationCtx* ctx = automCtxCreate((void*)"he", AUTOMATION_ALWAYS); + // [b, e) + m->SearchRange(ctx, "b", GE, "ed", LE, result); + assert(result.size() == 5); + taosMemoryFree(ctx); + } + { + // range search + std::vector result; + AutomationCtx* ctx = automCtxCreate((void*)"he", AUTOMATION_ALWAYS); + // [b, e) + m->SearchRange(ctx, "b", GT, "ed", LE, result); + assert(result.size() == 4); + taosMemoryFree(ctx); + } + { + // range search + std::vector result; + AutomationCtx* ctx = automCtxCreate((void*)"he", AUTOMATION_ALWAYS); + // [b, e) + m->SearchRange(ctx, "b", GT, "ed", LT, result); + assert(result.size() == 3); + taosMemoryFree(ctx); + } } void fst_get(Fst* fst) { diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 70651840f7..c5189637ac 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -74,7 +74,7 @@ static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg->contLen = tlen; pMsg->code = 0; pMsg->msgType = pTask->dispatchMsgType; - /*pMsg->noResp = 1;*/ + pMsg->noResp = 1; return 0; } diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index c137657a99..dfb5eb35d6 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -214,9 +214,11 @@ static void uvHandleReq(SSrvConn* pConn) { // no ref here } - if (pHead->noResp == 0) { - transMsg.handle = pConn; - } + // if pHead->noResp = 1, + // 1. server application should not send resp on handle + // 2. once send out data, cli conn released to conn pool immediately + // 3. not mixed with persist + transMsg.handle = pConn; STrans* pTransInst = pConn->pTransInst; (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 4dce5f560f..2675c6cdf2 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -207,8 +207,8 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) { } } -static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int16_t rawHeadLen, char *pBody, int32_t rawBodyLen, - ProcFuncType funcType) { +static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody, + int32_t rawBodyLen, ProcFuncType funcType) { const int32_t headLen = CEIL8(rawHeadLen); const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t fullLen = headLen + bodyLen + 8; @@ -471,12 +471,12 @@ void taosProcCleanup(SProcObj *pProc) { } } -int32_t taosProcPutToChildQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, - ProcFuncType funcType) { +int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, + ProcFuncType funcType) { return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, funcType); } -int32_t taosProcPutToParentQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, - ProcFuncType funcType) { +int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, + ProcFuncType funcType) { return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, funcType); } diff --git a/tests/script/tsim/parser/fourArithmetic-basic.sim b/tests/script/tsim/parser/fourArithmetic-basic.sim new file mode 100644 index 0000000000..2ade01522e --- /dev/null +++ b/tests/script/tsim/parser/fourArithmetic-basic.sim @@ -0,0 +1,110 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 +system sh/exec.sh -n dnode1 -s start + +$loop_cnt = 0 +check_dnode_ready: + $loop_cnt = $loop_cnt + 1 + sleep 200 + if $loop_cnt == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05 +if $data00 != 1 then + return -1 +endi +if $data04 != ready then + goto check_dnode_ready +endi + +sql connect + +$dbNamme = d0 +print =============== create database +sql create database $dbNamme vgroups 1 +sql show databases +print $data00 $data01 $data02 +if $rows != 2 then + return -1 +endi + +sql use $dbNamme + +print =============== create super table +sql create table if not exists stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double) tags (t1 int) + +sql show stables +if $rows != 1 then + return -1 +endi + +print =============== create child table +sql create table ct0 using stb tags(1000) +#sql create table ct1 using stb tags(2000) +#sql create table ct3 using stb tags(3000) + +sql show tables +if $rows != 1 then + return -1 +endi + +print =============== insert data + +$tbPrefix = ct +$tbNum = 1 +$rowNum = 10 +$tstart = 1640966400000 # 2022-01-01 00:00:00.000 + +$i = 0 +while $i < $tbNum + $tb = $tbPrefix . $i + + $x = 0 + while $x < $rowNum + $c2 = $x + 10 + $c3 = $x * 10 + $c4 = $x - 10 + + sql insert into $tb values ($tstart , $x , $c2 , $c3 , $c4 ) + $tstart = $tstart + 1 + $x = $x + 1 + endw + + $i = $i + 1 + $tstart = 1640966400000 +endw + +sql select ts, c2-c1, c3/c1, c4+c1, c1*9, c1%3 from ct0 +print ===> rows: $rows +print ===> $data00 $data01 $data02 $data03 $data04 $data05 +print ===> $data10 $data11 $data12 $data13 $data14 $data15 +print ===> $data20 $data21 $data22 $data23 $data24 $data25 +print ===> $data30 $data31 $data32 $data33 $data34 $data35 +if $rows != 10 then + return -1 +endi + +if $data01 != 10.000000000 then + return -1 +endi +if $data02 != -nan then + return -1 +endi +if $data03 != -10.000000000 then + return -1 +endi + +if $data91 != 10.000000000 then + return -1 +endi +if $data92 != 10.000000000 then + return -1 +endi +if $data93 != 8.000000000 then + return -1 +endi +#system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/parser/groupby-basic.sim b/tests/script/tsim/parser/groupby-basic.sim new file mode 100644 index 0000000000..cbd05031df --- /dev/null +++ b/tests/script/tsim/parser/groupby-basic.sim @@ -0,0 +1,812 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c walLevel -v 1 +system sh/cfg.sh -n dnode1 -c maxtablespervnode -v 4 +system sh/exec.sh -n dnode1 -s start + +$loop_cnt = 0 +check_dnode_ready: + $loop_cnt = $loop_cnt + 1 + sleep 200 + if $loop_cnt == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05 +if $data00 != 1 then + return -1 +endi +if $data04 != ready then + goto check_dnode_ready +endi + +sql connect + +$dbPrefix = group_db +$tbPrefix = group_tb +$mtPrefix = group_mt +$tbNum = 8 +$rowNum = 100 +$totalNum = $tbNum * $rowNum + +print =============== groupby.sim +$i = 0 +$db = $dbPrefix . $i +$mt = $mtPrefix . $i + +$i = 0 +$db = $dbPrefix . $i +$mt = $mtPrefix . $i + +$tstart = 1640966400000 # 2022-01-01 00:00:00.000 + +print ==== create db, stable, ctables, insert data +sql drop database if exists $db -x step1 +step1: +sql create database if not exists $db keep 3650 +sql use $db + +sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(12)) + +$i = 0 +while $i < $tbNum + $tb = $tbPrefix . $i + $tg2 = ' . abc + $tg2 = $tg2 . ' + + sql create table $tb using $mt tags( $i , $tg2 ) + + $x = 0 + while $x < $rowNum + $c = $x / 10 + $c = $c * 10 + $c = $x - $c + + $binary = ' . binary + $binary = $binary . $c + $binary = $binary . ' + + $nchar = ' . nchar + $nchar = $nchar . $c + $nchar = $nchar . ' + + sql insert into $tb values ($tstart , $c , $c , $x , $x , $c , $c , $c , $binary , $nchar ) + #print ==== insert into $tb values ($tstart , $c , $c , $x , $x , $c , $c , $c , $binary , $nchar ) + $tstart = $tstart + 1 + $x = $x + 1 + endw + + $i = $i + 1 + $tstart = 1640966400000 +endw + +sleep 100 + +$i1 = 1 +$i2 = 0 + +$db = $dbPrefix . $i +$mt = $mtPrefix . $i + +$dbPrefix = group_db +$tbPrefix = group_tb +$mtPrefix = group_mt + +$tb1 = $tbPrefix . $i1 +$tb2 = $tbPrefix . $i2 +$ts1 = $tb1 . .ts +$ts2 = $tb2 . .ts + +print ===============================groupby_operation +print +print ==== select count(*), c1 from group_tb0 group by c1 +sql select count(*), c1 from group_tb0 group by c1 +print rows: $rows +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +if $row != 20 then + return -1 +endi + +if $data00 != 100 then + return -1 +endi + +if $data01 != 0 then + return -1 +endi + +if $data10 != 100 then + return -1 +endi + +if $data11 != 1 then + return -1 +endi + +sql select first(ts),c1 from group_tb0 where c1<20 group by c1; +if $row != 20 then + return -1 +endi + +if $data00 != @70-01-01 08:01:40.000@ then + return -1 +endi + +if $data01 != 0 then + return -1 +endi + +if $data90 != @70-01-01 08:01:40.009@ then + return -1 +endi + +if $data91 != 9 then + return -1 +endi + +sql select first(ts), ts, c1 from group_tb0 where c1 < 20 group by c1; +print $row +if $row != 20 then + return -1 +endi + +if $data00 != $data01 then + return -1 +endi + +if $data10 != $data11 then + return -1 +endi + +if $data20 != $data21 then + return -1 +endi + +if $data90 != $data91 then + return -1 +endi + +if $data02 != 0 then + return -1 +endi + +if $data12 != 1 then + return -1 +endi + +if $data92 != 9 then + return -1 +endi + +sql select sum(c1), c1, avg(c1), min(c1), max(c2) from group_tb0 where c1 < 20 group by c1; +if $row != 20 then + return -1 +endi + +if $data00 != 0 then + return -1 +endi + +if $data01 != 0 then + return -1 +endi + +print $data02 +if $data02 != 0.000000000 then + return -1 +endi + +if $data03 != 0 then + return -1 +endi + +print $data04 +if $data04 != 0.00000 then + return -1 +endi + +if $data10 != 100 then + return -1 +endi + +if $data11 != 1 then + return -1 +endi + +print $data12 +if $data12 != 1.000000000 then + return -1 +endi + +if $data13 != 1 then + return -1 +endi + +if $data14 != 1.00000 then + print expect 1.00000, actual:$data14 + return -1 +endi + +sql_error select sum(c1), ts, c1 from group_tb0 where c1<20 group by c1; +sql_error select first(ts), ts, c2 from group_tb0 where c1 < 20 group by c1; +sql_error select sum(c3), ts, c2 from group_tb0 where c1 < 20 group by c1; +sql_error select sum(c3), first(ts), c2 from group_tb0 where c1 < 20 group by c1; +sql_error select first(c3), ts, c1, c2 from group_tb0 where c1 < 20 group by c1; +sql_error select first(c3), last(c3), ts, c1 from group_tb0 where c1 < 20 group by c1; +sql_error select ts from group_tb0 group by c1; + +#===========================interval=====not support====================== +sql_error select count(*), c1 from group_tb0 where c1<20 interval(1y) group by c1; +#=====tbname must be the first in the group by clause===================== +sql_error select count(*) from group_tb0 where c1 < 20 group by c1, tbname; + +#super table group by normal columns +sql select count(*), c1 from group_mt0 where c1< 20 group by c1; +if $row != 20 then + return -1 +endi + +if $data00 != 800 then + return -1 +endi + +if $data01 != 0 then + return -1 +endi + +if $data10 != 800 then + return -1 +endi + +if $data11 != 1 then + return -1 +endi + +if $data90 != 800 then + return -1 +endi + +if $data91 != 9 then + return -1 +endi + +sql select first(c1), c1, ts from group_mt0 where c1<20 group by c1; +if $row != 20 then + return -1 +endi + +if $data00 != $data01 then + return -1 +endi + +if $data02 != @70-01-01 08:01:40.000@ then + return -1 +endi + +if $data10 != $data11 then + return -1 +endi + +if $data12 != @70-01-01 08:01:40.001@ then + return -1 +endi + +if $data20 != $data21 then + return -1 +endi + +if $data22 != @70-01-01 08:01:40.002@ then + return -1 +endi + +if $data90 != $data91 then + return -1 +endi + +if $data92 != @70-01-01 08:01:40.009@ then + return -1 +endi + +sql select first(c1), last(ts), first(ts), last(c1),c1,sum(c1),avg(c1),count(*) from group_mt0 where c1<20 group by c1; +if $row != 20 then + return -1 +endi + +if $data00 != $data03 then + return -1 +endi + +if $data01 != @70-01-01 08:01:49.900@ then + return -1 +endi + +if $data02 != @70-01-01 08:01:40.000@ then + return -1 +endi + +if $data07 != 800 then + return -1 +endi + +if $data10 != $data13 then + return -1 +endi + +if $data11 != @70-01-01 08:01:49.901@ then + return -1 +endi + +if $data12 != @70-01-01 08:01:40.001@ then + return -1 +endi + +if $data17 != 800 then + return -1 +endi + +if $data90 != $data93 then + return -1 +endi + +if $data91 != @70-01-01 08:01:49.909@ then + return -1 +endi + +if $data92 != @70-01-01 08:01:40.009@ then + return -1 +endi + +if $data97 != 800 then + return -1 +endi + +if $data95 != 7200 then + return -1 +endi + +if $data94 != 9 then + return -1 +endi + +sql select c1,sum(c1),avg(c1),count(*) from group_mt0 where c1<5 group by c1; +if $row != 5 then + return -1 +endi + +if $data00 != 0 then + return -1 +endi + +if $data11 != 800 then + return -1 +endi + +sql select first(c1), last(ts), first(ts), last(c1),sum(c1),avg(c1),count(*) from group_mt0 where c1<20 group by tbname,c1; +if $row != 160 then + return -1 +endi + +print $data00 +if $data00 != 0 then + return -1 +endi + +if $data01 != @70-01-01 08:01:49.900@ then + return -1 +endi + +print $data01 +if $data02 != @70-01-01 08:01:40.000@ then + return -1 +endi + +if $data03 != 0 then + return -1 +endi + +if $data04 != 0 then + return -1 +endi + +if $data06 != 100 then + return -1 +endi + +if $data07 != @group_tb0@ then + return -1 +endi + +if $data90 != 9 then + return -1 +endi + +if $data91 != @70-01-01 08:01:49.909@ then + return -1 +endi + +if $data92 != @70-01-01 08:01:40.009@ then + return -1 +endi + +if $data93 != 9 then + return -1 +endi + +if $data94 != 900 then + return -1 +endi + +if $data96 != 100 then + return -1 +endi + +if $data97 != @group_tb0@ then + return -1 +endi + +sql select count(*),first(ts),last(ts),min(c3) from group_tb1 group by c4; +if $rows != 10000 then + return -1 +endi + +if $data00 != 1 then + return -1 +endi + +if $data01 != @70-01-01 08:01:40.000@ then + return -1 +endi + +if $data02 != @70-01-01 08:01:40.000@ then + return -1 +endi + +if $data03 != 0 then + return -1 +endi + +sql select count(*),first(ts),last(ts),min(c3) from group_tb1 group by c4 limit 1; +if $rows != 1 then + return -1 +endi + +sql select count(*),first(ts),last(ts),min(c3) from group_tb1 group by c4 limit 20 offset 9990; +if $rows != 10 then + return -1 +endi + +sql select count(*),first(ts),last(ts),min(c3),max(c3),sum(c3),avg(c3),sum(c4)/count(c4) from group_tb1 group by c4; +if $rows != 10000 then + return -1 +endi + +print ---------------------------------> group by binary|nchar data add cases +sql select count(*) from group_tb1 group by c8; +if $rows != 100 then + return -1 +endi + +sql select count(*),sum(c4), count(c4), sum(c4)/count(c4) from group_tb1 group by c8 +if $rows != 100 then + return -1 +endi + +if $data00 != 100 then + return -1 +endi + +if $data01 != 495000 then + return -1 +endi + +if $data02 != 100 then + return -1 +endi + +if $data03 != 4950.000000000 then + print expect 4950.000000000 , acutal $data03 + return -1 +endi + +if $data10 != 100 then + return -1 +endi + +if $data11 != 495100 then + return -1 +endi + +if $data13 != 4951.000000000 then + return -1 +endi + +print ====================> group by normal column + slimit + soffset +sql select count(*), c8 from group_mt0 group by c8 limit 1 offset 0; +if $rows != 100 then + return -1 +endi + +sql select sum(c2),c8,avg(c2), sum(c2)/count(*) from group_mt0 group by c8 slimit 2 soffset 99 +if $rows != 1 then + return -1 +endi + +if $data00 != 79200.000000000 then + return -1 +endi + +if $data01 != @binary99@ then + return -1 +endi + +if $data02 != 99.000000000 then + return -1 +endi + +if $data03 != 99.000000000 then + return -1 +endi + +print ============>td-1765 +sql select percentile(c4, 49),min(c4),max(c4),avg(c4),stddev(c4) from group_tb0 group by c8; +if $rows != 100 then + return -1 +endi + +if $data00 != 4851.000000000 then + return -1 +endi + +if $data01 != 0 then + return -1 +endi + +if $data02 != 9900 then + return -1 +endi + +if $data03 != 4950.000000000 then + return -1 +endi + +if $data04 != 2886.607004772 then + return -1 +endi + +if $data10 != 4852.000000000 then + return -1 +endi + +if $data11 != 1 then + return -1 +endi + +if $data12 != 9901 then + return -1 +endi + +if $data13 != 4951.000000000 then + return -1 +endi + +if $data14 != 2886.607004772 then + return -1 +endi + +print ================>td-2090 +sql select leastsquares(c2, 1, 1) from group_tb1 group by c8; +if $rows != 100 then + return -1 +endi + +if $data00 != @{slop:0.000000, intercept:0.000000}@ then + return -1 +endi + +if $data10 != @{slop:0.000000, intercept:1.000000}@ then + return -1 +endi + +if $data90 != @{slop:0.000000, intercept:9.000000}@ then + return -1 +endi + +#=========================== group by multi tags ====================== +sql create table st (ts timestamp, c int) tags (t1 int, t2 int, t3 int, t4 int); +sql create table t1 using st tags(1, 1, 1, 1); +sql create table t2 using st tags(1, 2, 2, 2); +sql insert into t1 values ('2020-03-27 04:11:16.000', 1)('2020-03-27 04:11:17.000', 2) ('2020-03-27 04:11:18.000', 3) ('2020-03-27 04:11:19.000', 4) ; +sql insert into t1 values ('2020-03-27 04:21:16.000', 1)('2020-03-27 04:31:17.000', 2) ('2020-03-27 04:51:18.000', 3) ('2020-03-27 05:10:19.000', 4) ; +sql insert into t2 values ('2020-03-27 04:11:16.000', 1)('2020-03-27 04:11:17.000', 2) ('2020-03-27 04:11:18.000', 3) ('2020-03-27 04:11:19.000', 4) ; +sql insert into t2 values ('2020-03-27 04:21:16.000', 1)('2020-03-27 04:31:17.000', 2) ('2020-03-27 04:51:18.000', 3) ('2020-03-27 05:10:19.000', 4) ; + +print =================>TD-2665 +sql_error create table txx as select avg(c) as t from st; +sql_error create table txx1 as select avg(c) as t from t1; + +sql select stddev(c),stddev(c) from st group by c; +if $rows != 4 then + return -1 +endi + +print =================>TD-2236 +sql select first(ts),last(ts) from t1 group by c; +if $rows != 4 then + return -1 +endi + +if $data00 != @20-03-27 04:11:16.000@ then + return -1 +endi + +if $data01 != @20-03-27 04:21:16.000@ then + return -1 +endi + +if $data10 != @20-03-27 04:11:17.000@ then + return -1 +endi + +if $data11 != @20-03-27 04:31:17.000@ then + return -1 +endi + +if $data20 != @20-03-27 04:11:18.000@ then + return -1 +endi + +if $data21 != @20-03-27 04:51:18.000@ then + return -1 +endi + +if $data30 != @20-03-27 04:11:19.000@ then + return -1 +endi + +if $data31 != @20-03-27 05:10:19.000@ then + return -1 +endi + +print ===============> +sql select stddev(c),c from st where t2=1 or t2=2 group by c; +if $rows != 4 then + return -1 +endi + +if $data00 != 0.000000000 then + return -1 +endi + +if $data01 != 1 then + return -1 +endi + +if $data10 != 0.000000000 then + return -1 +endi + +if $data11 != 2 then + return -1 +endi + +if $data20 != 0.000000000 then + return -1 +endi + +if $data21 != 3 then + return -1 +endi + +if $data30 != 0.000000000 then + return -1 +endi + +if $data31 != 4 then + return -1 +endi + +sql_error select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,c; +sql select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,t1,t2; +if $rows != 40 then + return -1 +endi + +if $data01 != 1.000000000 then + return -1 +endi +if $data02 != t1 then + return -1 +endi +if $data03 != 1 then + return -1 +endi +if $data04 != 1 then + return -1 +endi + +if $data11 != 1.000000000 then + return -1 +endi +if $data12 != t1 then + return -1 +endi +if $data13 != 1 then + return -1 +endi +if $data14 != 1 then + return -1 +endi + +sql select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,t1,t2 limit 1; +if $rows != 2 then + return -1 +endi + +if $data11 != 1.000000000 then + return -1 +endi +if $data12 != t2 then + return -1 +endi +if $data13 != 1 then + return -1 +endi +if $data14 != 2 then + return -1 +endi + +sql create table m1 (ts timestamp, k int, f1 int) tags(a int); +sql create table tm0 using m1 tags(0); +sql create table tm1 using m1 tags(1); + +sql insert into tm0 values('2020-1-1 1:1:1', 1, 10); +sql insert into tm0 values('2020-1-1 1:1:2', 1, 20); +sql insert into tm1 values('2020-2-1 1:1:1', 2, 10); +sql insert into tm1 values('2020-2-1 1:1:2', 2, 20); + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +sleep 100 +system sh/exec.sh -n dnode1 -s start +sleep 100 + +sql connect +sleep 100 +sql use group_db0; + +print =========================>TD-4894 +sql select count(*),k from m1 group by k; +if $rows != 2 then + return -1 +endi + +if $data00 != 2 then + return -1 +endi + +if $data01 != 1 then + return -1 +endi + +if $data10 != 2 then + return -1 +endi + +if $data11 != 2 then + return -1 +endi + +sql_error select count(*) from m1 group by tbname,k,f1; +sql_error select count(*) from m1 group by tbname,k,a; +sql_error select count(*) from m1 group by k, tbname; +sql_error select count(*) from m1 group by k,f1; +sql_error select count(*) from tm0 group by tbname; +sql_error select count(*) from tm0 group by a; +sql_error select count(*) from tm0 group by k,f1; + +sql_error select count(*),f1 from m1 group by tbname,k; + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/query/interval-offset.sim b/tests/script/tsim/query/interval-offset.sim index f188dff1ba..4c4ebc6670 100644 --- a/tests/script/tsim/query/interval-offset.sim +++ b/tests/script/tsim/query/interval-offset.sim @@ -41,6 +41,38 @@ sql insert into ct1 values ( '2022-01-01 01:01:26.000', 6 ) sql insert into ct1 values ( '2022-01-01 01:01:30.000', 7 ) sql insert into ct1 values ( '2022-01-01 01:01:36.000', 8 ) +print =============== insert data into child table ct2 (d) +sql insert into ct2 values ( '2022-01-01 01:00:01.000', 1 ) +sql insert into ct2 values ( '2022-01-01 10:00:01.000', 2 ) +sql insert into ct2 values ( '2022-01-01 20:00:01.000', 3 ) +sql insert into ct2 values ( '2022-01-02 10:00:01.000', 4 ) +sql insert into ct2 values ( '2022-01-02 20:00:01.000', 5 ) +sql insert into ct2 values ( '2022-01-03 10:00:01.000', 6 ) +sql insert into ct2 values ( '2022-01-03 20:00:01.000', 7 ) + +print =============== insert data into child table ct3 (n) +sql insert into ct3 values ( '2021-12-21 01:01:01.000', NULL ) +sql insert into ct3 values ( '2021-12-31 01:01:01.000', 1 ) +sql insert into ct3 values ( '2022-01-01 01:01:06.000', 2 ) +sql insert into ct3 values ( '2022-01-07 01:01:10.000', 3 ) +sql insert into ct3 values ( '2022-01-31 01:01:16.000', 4 ) +sql insert into ct3 values ( '2022-02-01 01:01:20.000', 5 ) +sql insert into ct3 values ( '2022-02-28 01:01:26.000', 6 ) +sql insert into ct3 values ( '2022-03-01 01:01:30.000', 7 ) +sql insert into ct3 values ( '2022-03-08 01:01:36.000', 8 ) + +print =============== insert data into child table ct4 (y) +sql insert into ct4 values ( '2020-10-21 01:01:01.000', 1 ) +sql insert into ct4 values ( '2020-12-31 01:01:01.000', 2 ) +sql insert into ct4 values ( '2021-01-01 01:01:06.000', 3 ) +sql insert into ct4 values ( '2021-05-07 01:01:10.000', 4 ) +sql insert into ct4 values ( '2021-09-30 01:01:16.000', 5 ) +sql insert into ct4 values ( '2022-02-01 01:01:20.000', 6 ) +sql insert into ct4 values ( '2022-10-28 01:01:26.000', 7 ) +sql insert into ct4 values ( '2022-12-01 01:01:30.000', 8 ) +sql insert into ct4 values ( '2022-12-31 01:01:36.000', 9 ) + +print ================ start query ====================== sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) print ===> rows: $rows @@ -102,15 +134,6 @@ if $data80 != 1 then return -1 endi -print =============== insert data into child table ct2 (d) -sql insert into ct2 values ( '2022-01-01 01:00:01.000', 1 ) -sql insert into ct2 values ( '2022-01-01 10:00:01.000', 2 ) -sql insert into ct2 values ( '2022-01-01 20:00:01.000', 3 ) -sql insert into ct2 values ( '2022-01-02 10:00:01.000', 4 ) -sql insert into ct2 values ( '2022-01-02 20:00:01.000', 5 ) -sql insert into ct2 values ( '2022-01-03 10:00:01.000', 6 ) -sql insert into ct2 values ( '2022-01-03 20:00:01.000', 7 ) - sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h) print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h) print ===> rows: $rows @@ -140,45 +163,19 @@ print ===> rows4: $data40 $data41 $data42 $data43 $data44 $data45 print ===> rows5: $data50 $data51 $data52 $data53 $data54 $data55 print ===> rows6: $data60 $data61 $data62 $data63 $data64 $data65 print ===> rows7: $data70 $data71 $data72 $data73 $data74 $data75 -#if $rows != 8 then -# return -1 -#endi -#if $data00 != 1 then -# return -1 -#endi -#if $data10 != 2 then -# return -1 -#endi -#if $data20 != 2 then -# return -1 -#endi -#if $data30 != 2 then -# return -1 -#endi -#if $data40 != 2 then -# return -1 -#endi -#if $data50 != 2 then -# return -1 -#endi -#if $data60 != 2 then -# return -1 -#endi -#if $data70 != 1 then -# return -1 -#endi - -print =============== insert data into child table ct3 (n) -sql insert into ct3 values ( '2021-12-21 01:01:01.000', NULL ); -sql insert into ct3 values ( '2021-12-31 01:01:01.000', 1 ); -sql insert into ct3 values ( '2022-01-01 01:01:06.000', 2 ); -sql insert into ct3 values ( '2022-01-07 01:01:10.000', 3 ); -sql insert into ct3 values ( '2022-01-31 01:01:16.000', 4 ); -sql insert into ct3 values ( '2022-02-01 01:01:20.000', 5 ); -sql insert into ct3 values ( '2022-02-28 01:01:26.000', 6 ); -sql insert into ct3 values ( '2022-03-01 01:01:30.000', 7 ); -sql insert into ct3 values ( '2022-03-08 01:01:36.000', 8 ); - +if $rows != 8 then + return -1 +endi +if $data00 != 1 then + return -1 +endi +if $data10 != 2 then + return -1 +endi +if $data70 != 1 then + return -1 +endi + sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w) print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w) print ===> rows: $rows @@ -236,18 +233,6 @@ print ===> rows7: $data70 $data71 $data72 $data73 $data74 # return -1 #endi - -print =============== insert data into child table ct4 (y) -sql insert into ct4 values ( '2020-10-21 01:01:01.000', 1 ) -sql insert into ct4 values ( '2020-12-31 01:01:01.000', 2 ) -sql insert into ct4 values ( '2021-01-01 01:01:06.000', 3 ) -sql insert into ct4 values ( '2021-05-07 01:01:10.000', 4 ) -sql insert into ct4 values ( '2021-09-30 01:01:16.000', 5 ) -sql insert into ct4 values ( '2022-02-01 01:01:20.000', 6 ) -sql insert into ct4 values ( '2022-10-28 01:01:26.000', 7 ) -sql insert into ct4 values ( '2022-12-01 01:01:30.000', 8 ) -sql insert into ct4 values ( '2022-12-31 01:01:36.000', 9 ) - sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct4 interval(1y, 6n) print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct4 interval(1y, 6n) print ===> rows: $rows @@ -305,6 +290,27 @@ print ===> rows7: $data70 $data71 $data72 $data73 $data74 # return -1 #endi +#================================================= +print =============== stop and restart taosd +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s start + +$loop_cnt = 0 +check_dnode_ready: + $loop_cnt = $loop_cnt + 1 + sleep 200 + if $loop_cnt == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05 +if $data00 != 1 then + return -1 +endi +if $data04 != ready then + goto check_dnode_ready +endi diff --git a/tests/script/tsim/testCaseSuite.sim b/tests/script/tsim/testCaseSuite.sim index 4b700d2ca4..2ed5e5fcf3 100644 --- a/tests/script/tsim/testCaseSuite.sim +++ b/tests/script/tsim/testCaseSuite.sim @@ -2,6 +2,8 @@ run tsim/user/basic1.sim run tsim/db/basic1.sim +run tsim/db/basic2.sim +run tsim/db/basic3.sim run tsim/db/basic6.sim run tsim/db/basic7.sim run tsim/db/error1.sim @@ -13,12 +15,15 @@ run tsim/insert/basic1.sim run tsim/insert/backquote.sim run tsim/insert/null.sim +#run tsim/parser/groupby-basic.sim +#run tsim/parser/fourArithmetic-basic.sim + run tsim/query/interval.sim -#run tsim/query/interval-offset.sim # TD-14266 +run tsim/query/interval-offset.sim run tsim/show/basic.sim run tsim/table/basic1.sim run tsim/tmq/basic.sim - +#run tsim/tmq/basic1.sim diff --git a/tests/script/tsim/tmq/basic1.sim b/tests/script/tsim/tmq/basic1.sim new file mode 100644 index 0000000000..a91196d71f --- /dev/null +++ b/tests/script/tsim/tmq/basic1.sim @@ -0,0 +1,182 @@ +#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406 +# vgroups=1, one topic for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# vgroups=1, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# vgroups=4, one topic for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# vgroups=4, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; +# +# notes: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). +# +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 +system sh/exec.sh -n dnode1 -s start + +$loop_cnt = 0 +check_dnode_ready: + $loop_cnt = $loop_cnt + 1 + sleep 200 + if $loop_cnt == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05 +if $data00 != 1 then + return -1 +endi +if $data04 != ready then + goto check_dnode_ready +endi + +sql connect + +$dbNamme = d0 +print =============== create database , vgroup 1 +sql create database $dbNamme vgroups 1 +sql show databases +print $data00 $data01 $data02 +if $rows != 2 then + return -1 +endi + +sql use $dbNamme + +print =============== create super table +sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int) + +sql show stables +if $rows != 1 then + return -1 +endi + +print =============== create child table +sql create table ct0 using stb tags(1000) +sql create table ct1 using stb tags(2000) +#sql create table ct3 using stb tags(3000) + +print =============== create normal table +sql create table ntb (ts timestamp, c1 int, c2 float, c3 binary(10)) + +print =============== create multi topics. notes: now only support: +print =============== 1. columns from stb; 2. * from ctb; 3. columns from ctb +print =============== will support: * from stb; function from stb/ctb + +sql create topic topic_stb_column as select ts, c1, c3 from stb +#sql create topic topic_stb_all as select * from stb +#sql create topic topic_stb_function as select ts, abs(c1), sina(c2) from stb + +sql create topic topic_ctb_column as select ts, c1, c3 from ct0 +sql create topic topic_ctb_all as select * from ct0 +#sql create topic topic_ctb_function as select ts, abs(c1), sina(c2) from ct0 + +sql create topic topic_ntb_column as select ts, c1, c3 from ntb +sql create topic topic_ntb_all as select * from ntb +#sql create topic topic_ntb_function as select ts, abs(c1), sina(c2) from ntb + +sql show tables +if $rows != 3 then + return -1 +endi + +print =============== insert data + +$tbPrefix = ct +$tbNum = 2 +$rowNum = 10 +$tstart = 1640966400000 # 2022-01-01 00:00:00.000 + +$i = 0 +while $i < $tbNum + $tb = $tbPrefix . $i + + $x = 0 + while $x < $rowNum + $c = $x / 10 + $c = $c * 10 + $c = $x - $c + + $binary = ' . binary + $binary = $binary . $c + $binary = $binary . ' + + sql insert into $tb values ($tstart , $c , $x , $binary ) + sql insert into ntb values ($tstart , $c , $x , $binary ) + $tstart = $tstart + 1 + $x = $x + 1 + endw + + $i = $i + 1 + $tstart = 1640966400000 +endw + +#root@trd02 /home $ tmq_sim --help +# -c Configuration directory, default is +# -d The name of the database for cosumer, no default +# -t The topic string for cosumer, no default +# -k The key-value string for cosumer, no default +# -g showMsgFlag, default is 0 +# + +$totalMsgCnt = $rowNum * $tbNum +print inserted totalMsgCnt: $totalMsgCnt + +# supported key: +# group.id: +# enable.auto.commit: +# auto.offset.reset: +# td.connect.ip: +# td.connect.user:root +# td.connect.pass:taosdata +# td.connect.port:6030 +# td.connect.db:db + +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2" +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2" +print cmd result----> $system_content +if $system_content != @{consume success: 20}@ then + return -1 +endi + +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2" +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2" +#print cmd result----> $system_content +#if $system_content != @{consume success: 20}@ then +# return -1 +#endi + +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2" +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2" +print cmd result----> $system_content +if $system_content != @{consume success: 20}@ then + return -1 +endi + +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2" +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2" +print cmd result----> $system_content +if $system_content != @{consume success: 20}@ then + return -1 +endi + +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2" +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2" +print cmd result----> $system_content +if $system_content != @{consume success: 20}@ then + return -1 +endi + +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" +print cmd result----> $system_content +if $system_content != @{consume success: 20}@ then + return -1 +endi + +print =============== create database , vgroup 4 +$dbNamme = d1 +sql create database $dbNamme vgroups 4 + + +#system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 81a988e9ae..75542bfa71 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -151,8 +151,6 @@ void parseInputString() { token = strtok(NULL, delim); } - printf("\n\n"); - token = strtok(g_stConfInfo.keyString, delim); while(token != NULL) { //printf("%s\n", token ); @@ -226,7 +224,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) { if ((err = tmq_subscribe(tmq, topics))) { printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); - return; + exit(-1); } int32_t totalMsgs = 0;