From 7e89f1c427d861d322e2ff2800ddbe030c1d0079 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 18 May 2022 18:27:42 +0800 Subject: [PATCH 1/4] refactor: remove auth func --- source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 5 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 61 ------------------- source/dnode/mnode/impl/src/mndProfile.c | 3 +- source/dnode/mnode/impl/src/mnode.c | 21 ++++--- 4 files changed, 14 insertions(+), 76 deletions(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index 5dc5b6cef2..30d7750f79 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -275,7 +275,7 @@ static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus) } void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg) { - dDebug("start to process net test req"); + dDebug("msg:%p, net test req will be processed", pMsg); SRpcMsg rsp = {.code = 0, .info = pMsg->info}; rsp.pCont = rpcMallocCont(pMsg->contLen); if (rsp.pCont == NULL) { @@ -287,8 +287,7 @@ void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg) { } void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) { - dDebug("start to process server startup status req"); - + dDebug("msg:%p, server startup status req will be processed", pMsg); SServerStatusRsp statusRsp = {0}; dmGetServerStartupStatus(pDnode, &statusRsp); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index f66bc84def..b4c8a0807c 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -344,66 +344,6 @@ void dmCleanupClient(SDnode *pDnode) { } } -static inline int32_t dmGetHideUserAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { - int32_t code = 0; - char pass[TSDB_PASSWORD_LEN + 1] = {0}; - - if (strcmp(user, INTERNAL_USER) == 0) { - taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass); - } else if (strcmp(user, TSDB_NETTEST_USER) == 0) { - taosEncryptPass_c((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass); - } else { - code = -1; - } - - if (code == 0) { - memcpy(secret, pass, TSDB_PASSWORD_LEN); - *spi = 1; - *encrypt = 0; - *ckey = 0; - } - - return code; -} - -static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, - char *ckey) { - if (dmGetHideUserAuth(user, spi, encrypt, secret, ckey) == 0) { - dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt); - return 0; - } - - SAuthReq authReq = {0}; - tstrncpy(authReq.user, user, TSDB_USER_LEN); - int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq); - void *pReq = rpcMallocCont(contLen); - tSerializeSAuthReq(pReq, contLen, &authReq); - - SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .info.ahandle = (void *)9528}; - SRpcMsg rpcRsp = {0}; - SEpSet epSet = {0}; - dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt); - dmGetMnodeEpSet(&pDnode->data, &epSet); - dmSendRecv(&epSet, &rpcMsg, &rpcRsp); - - if (rpcRsp.code != 0) { - terrno = rpcRsp.code; - dError("user:%s, failed to get user auth from other mnodes since %s", user, terrstr()); - } else { - SAuthRsp authRsp = {0}; - tDeserializeSAuthReq(rpcRsp.pCont, rpcRsp.contLen, &authRsp); - memcpy(secret, authRsp.secret, TSDB_PASSWORD_LEN); - memcpy(ckey, authRsp.ckey, TSDB_PASSWORD_LEN); - *spi = authRsp.spi; - *encrypt = authRsp.encrypt; - dTrace("user:%s, success to get user auth from other mnodes, spi:%d encrypt:%d", user, authRsp.spi, - authRsp.encrypt); - } - - rpcFreeCont(rpcRsp.pCont); - return rpcRsp.code; -} - int32_t dmInitServer(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; @@ -416,7 +356,6 @@ int32_t dmInitServer(SDnode *pDnode) { rpcInit.sessions = tsMaxShellConns; rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = tsShellActivityTimer * 1000; - rpcInit.afp = (RpcAfp)dmRetrieveUserAuthInfo; rpcInit.parent = pDnode; pTrans->serverRpc = rpcOpen(&rpcInit); diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 5cf2a36731..7e99c5583d 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -197,8 +197,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { goto CONN_OVER; } if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) { - mError("user:%s, failed to auth while acquire user, input:%s saved:%s", pReq->conn.user, connReq.passwd, - pUser->pass); + mError("user:%s, failed to auth while acquire user, input:%s", pReq->conn.user, connReq.passwd); code = TSDB_CODE_RPC_AUTH_FAILURE; goto CONN_OVER; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 3dfba4eca7..5326376bda 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -343,19 +343,20 @@ void mndStop(SMnode *pMnode) { return mndCleanupTimer(pMnode); } int32_t mndProcessMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; void *ahandle = pMsg->info.ahandle; - mTrace("msg:%p, will be processed, type:%s app:%p", pMsg, TMSG_INFO(pMsg->msgType), ahandle); - if (IsReq(pMsg) && !mndIsMaster(pMnode)) { - terrno = TSDB_CODE_APP_NOT_READY; - mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); - return -1; - } + if (IsReq(pMsg)) { + if (!mndIsMaster(pMnode)) { + terrno = TSDB_CODE_APP_NOT_READY; + mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); + return -1; + } - if (IsReq(pMsg) && (pMsg->contLen == 0 || pMsg->pCont == NULL)) { - terrno = TSDB_CODE_INVALID_MSG_LEN; - mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); - return -1; + if (pMsg->contLen == 0 || pMsg->pCont == NULL) { + terrno = TSDB_CODE_INVALID_MSG_LEN; + mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); + return -1; + } } MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; From 0164158256daf9fad228cf24918e57d0463aeb52 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 18 May 2022 18:37:39 +0800 Subject: [PATCH 2/4] refactor: adjust msgcb --- include/common/tmsgcb.h | 6 ++-- source/common/src/tmsgcb.c | 28 +++++++++---------- source/dnode/mgmt/node_mgmt/src/dmNodes.c | 2 +- source/dnode/mnode/impl/test/trans/trans2.cpp | 2 +- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 7ba6e5044c..9fa657a2a6 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -60,9 +60,9 @@ typedef struct { ReportStartup reportStartupFp; } SMsgCb; -void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb); -int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg); -int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); +void tmsgSetDefault(const SMsgCb* msgcb); +int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg); +int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype); int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg); void tmsgSendRsp(SRpcMsg* pMsg); void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet); diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index f69fb65f04..126a4c023a 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -17,46 +17,46 @@ #include "tmsgcb.h" #include "taoserror.h" -static SMsgCb tsDefaultMsgCb; +static SMsgCb defaultMsgCb; -void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; } +void tmsgSetDefault(const SMsgCb* msgcb) { defaultMsgCb = *msgcb; } -int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg) { - PutToQueueFp fp = pMsgCb->queueFps[qtype]; - return (*fp)(pMsgCb->mgmt, pMsg); +int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg) { + PutToQueueFp fp = msgcb->queueFps[qtype]; + return (*fp)(msgcb->mgmt, pMsg); } -int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) { - GetQueueSizeFp fp = pMsgCb->qsizeFp; - return (*fp)(pMsgCb->mgmt, vgId, qtype); +int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype) { + GetQueueSizeFp fp = msgcb->qsizeFp; + return (*fp)(msgcb->mgmt, vgId, qtype); } int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) { - SendReqFp fp = tsDefaultMsgCb.sendReqFp; + SendReqFp fp = defaultMsgCb.sendReqFp; return (*fp)(epSet, pMsg); } void tmsgSendRsp(SRpcMsg* pMsg) { - SendRspFp fp = tsDefaultMsgCb.sendRspFp; + SendRspFp fp = defaultMsgCb.sendRspFp; return (*fp)(pMsg); } void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet) { - SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp; + SendRedirectRspFp fp = defaultMsgCb.sendRedirectRspFp; (*fp)(pMsg, pNewEpSet); } void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg) { - RegisterBrokenLinkArgFp fp = tsDefaultMsgCb.registerBrokenLinkArgFp; + RegisterBrokenLinkArgFp fp = defaultMsgCb.registerBrokenLinkArgFp; (*fp)(pMsg); } void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) { - ReleaseHandleFp fp = tsDefaultMsgCb.releaseHandleFp; + ReleaseHandleFp fp = defaultMsgCb.releaseHandleFp; (*fp)(pHandle, type); } void tmsgReportStartup(const char* name, const char* desc) { - ReportStartup fp = tsDefaultMsgCb.reportStartupFp; + ReportStartup fp = defaultMsgCb.reportStartupFp; (*fp)(name, desc); } \ No newline at end of file diff --git a/source/dnode/mgmt/node_mgmt/src/dmNodes.c b/source/dnode/mgmt/node_mgmt/src/dmNodes.c index 3a26388d14..ecfa37725a 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmNodes.c +++ b/source/dnode/mgmt/node_mgmt/src/dmNodes.c @@ -78,7 +78,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); if (pWrapper->ntype == DNODE || InChildProc(pWrapper)) { - tmsgSetDefaultMsgCb(&input.msgCb); + tmsgSetDefault(&input.msgCb); } if (OnlyInSingleProc(pWrapper)) { diff --git a/source/dnode/mnode/impl/test/trans/trans2.cpp b/source/dnode/mnode/impl/test/trans/trans2.cpp index 82acd7fddc..c4ed48fe60 100644 --- a/source/dnode/mnode/impl/test/trans/trans2.cpp +++ b/source/dnode/mnode/impl/test/trans/trans2.cpp @@ -56,7 +56,7 @@ class MndTestTrans2 : public ::testing::Test { msgCb.sendReqFp = sendReq; msgCb.sendRspFp = sendRsp; msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack - tmsgSetDefaultMsgCb(&msgCb); + tmsgSetDefault(&msgCb); SMnodeOpt opt = {0}; opt.deploy = 1; From 5ca0e81bba2663fd8289daeafbd03ed8e6604611 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 17 May 2022 17:38:21 +0800 Subject: [PATCH 3/4] stream final interval operator --- include/libs/nodes/nodes.h | 1 + source/libs/executor/inc/executorimpl.h | 27 ++- source/libs/executor/src/executorimpl.c | 35 ++-- source/libs/executor/src/scanoperator.c | 120 ++++++++++- source/libs/executor/src/timewindowoperator.c | 196 +++++++++++++++++- 5 files changed, 342 insertions(+), 37 deletions(-) diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 27dae6d210..291e08fdbf 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -208,6 +208,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_SORT, QUERY_NODE_PHYSICAL_PLAN_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, + QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_FILL, QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW, QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW, diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index bf178612ba..e010363a1e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -378,6 +378,13 @@ typedef enum EStreamScanMode { STREAM_SCAN_FROM_DATAREADER, } EStreamScanMode; +typedef struct SCatchSupporter { + SHashObj* pWindowHashTable; // quick locate the window object for each window + SDiskbasedBuf* pDataBuf; // buffer based on blocked-wised disk file + int32_t keySize; + int64_t* pKeyBuf; +} SCatchSupporter; + typedef struct SStreamBlockScanInfo { SArray* pBlockLists; // multiple SSDatablock. SSDataBlock* pRes; // result SSDataBlock @@ -400,6 +407,8 @@ typedef struct SStreamBlockScanInfo { EStreamScanMode scanMode; SOperatorInfo* pOperatorDumy; SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. + SCatchSupporter childAggSup; + SArray* childIds; } SStreamBlockScanInfo; typedef struct SSysTableScanInfo { @@ -460,6 +469,16 @@ typedef struct SIntervalAggOperatorInfo { bool invertible; } SIntervalAggOperatorInfo; +typedef struct SStreamFinalIntervalOperatorInfo { + SOptrBasicInfo binfo; // basic info + SGroupResInfo groupResInfo; // multiple results build supporter + SInterval interval; // interval info + int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. + SAggSupporter aggSup; // aggregate supporter + int32_t order; // current SSDataBlock scan order + STimeWindowAggSupp twAggSup; +} SStreamFinalIntervalOperatorInfo; + typedef struct SAggOperatorInfo { SOptrBasicInfo binfo; SAggSupporter aggSup; @@ -696,6 +715,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, + STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, @@ -771,9 +793,8 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order); int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); - -void doClearWindow(SIntervalAggOperatorInfo* pInfo, char* pData, int16_t bytes, - uint64_t groupId, int32_t numOfOutput); +int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t keyBufSize, + const char* pKey, const char* pDir); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d0a1840d72..e42ced4669 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -342,28 +342,6 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, return pResultRow; } -void doClearWindow(SIntervalAggOperatorInfo* pInfo, char* pData, int16_t bytes, - uint64_t groupId, int32_t numOfOutput) { - SAggSupporter* pSup = &pInfo->aggSup; - SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId); - SResultRowPosition* p1 = - (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, - GET_RES_WINDOW_KEY_LEN(bytes)); - SResultRow* pResult = getResultRowByPos(pSup->pResultBuf, p1); - SqlFunctionCtx* pCtx = pInfo->binfo.pCtx; - for (int32_t i = 0; i < numOfOutput; ++i) { - pCtx[i].resultInfo = getResultCell(pResult, i, pInfo->binfo.rowCellInfoOffset); - struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo; - if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) { - continue; - } - pResInfo->initialized = false; - if (pCtx[i].functionId != -1) { - pCtx[i].fpSet.init(&pCtx[i], pResInfo); - } - } -} - /** * the struct of key in hash table * +----------+---------------+ @@ -5321,3 +5299,16 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo return TSDB_CODE_SUCCESS; } + +int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t keyBufSize, + const char* pKey, const char* pDir) { + pCatchSup->keySize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY); + pCatchSup->pKeyBuf = taosMemoryCalloc(1, pCatchSup->keySize); + int32_t pageSize = rowSize * 32; + int32_t bufSize = pageSize * 4096; + createDiskbasedBuf(&pCatchSup->pDataBuf, pageSize, bufSize, pKey, pDir); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pCatchSup->pWindowHashTable = taosHashInit(10000, hashFn, true, HASH_NO_LOCK);; + return TSDB_CODE_SUCCESS; +} + \ No newline at end of file diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d042e463f0..2e3f8c8044 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -36,6 +36,11 @@ #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) +typedef struct SWindowPosition { + int32_t pageId; + int32_t rowId; +} SWindowPosition; + static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity); static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName); @@ -675,6 +680,96 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool inverti return NULL; } +void static setSupKeyBuf(SCatchSupporter* pSup, int64_t groupId, int64_t childId, TSKEY ts) { + int64_t* pKey = (int64_t*)pSup->pKeyBuf; + pKey[0] = groupId; + pKey[1] = childId; + pKey[2] = ts; +} + +static int32_t catchWidonwInfo(SSDataBlock* pDataBlock, SCatchSupporter* pSup, + int32_t pageId, int32_t tsIndex, int64_t childId) { + SColumnInfoData* pColDataInfo = taosArrayGet(pDataBlock->pDataBlock, tsIndex); + TSKEY* tsCols = (int64_t*)pColDataInfo->pData; + for (int32_t i = 0; i < pDataBlock->info.rows; i++) { + setSupKeyBuf(pSup, pDataBlock->info.groupId, childId, tsCols[i]); + SWindowPosition* p1 = (SWindowPosition*)taosHashGet(pSup->pWindowHashTable, + pSup->pKeyBuf, pSup->keySize); + if (p1 == NULL) { + SWindowPosition pos = {.pageId = pageId, .rowId = i}; + int32_t code = taosHashPut(pSup->pWindowHashTable, pSup->pKeyBuf, pSup->keySize, &pos, + sizeof(SWindowPosition)); + if (code != TSDB_CODE_SUCCESS ) { + return code; + } + } else { + p1->pageId = pageId; + p1->rowId = i; + } + } + return TSDB_CODE_SUCCESS; +} + +static int32_t catchDatablock(SSDataBlock* pDataBlock, SCatchSupporter* pSup, + int32_t tsIndex, int64_t childId) { + int32_t start = 0; + int32_t stop = 0; + int32_t pageSize = getBufPageSize(pSup->pDataBuf); + while(start < pDataBlock->info.rows) { + blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pageSize); + SSDataBlock* pDB = blockDataExtractBlock(pDataBlock, start, stop - start + 1); + if (pDB == NULL) { + return terrno; + } + int32_t pageId = -1; + void* pPage = getNewBufPage(pSup->pDataBuf, pDataBlock->info.groupId, &pageId); + if (pPage == NULL) { + blockDataDestroy(pDB); + return terrno; + } + int32_t size = blockDataGetSize(pDB) + sizeof(int32_t) + pDB->info.numOfCols * sizeof(int32_t); + assert(size <= pageSize); + blockDataToBuf(pPage, pDB); + setBufPageDirty(pPage, true); + releaseBufPage(pSup->pDataBuf, pPage); + blockDataDestroy(pDB); + start = stop + 1; + int32_t code = catchWidonwInfo(pDB, pSup, pageId, tsIndex, childId); + if (code != TSDB_CODE_SUCCESS ) { + return code; + } + } + return TSDB_CODE_SUCCESS; +} + +static SSDataBlock* getDataFromCatch(SStreamBlockScanInfo* pInfo) { + SSDataBlock* pBlock = pInfo->pUpdateRes; + if (pInfo->updateResIndex < pBlock->info.rows) { + blockDataCleanup(pInfo->pRes); + SCatchSupporter* pCSup = &pInfo->childAggSup; + SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, 0); + TSKEY *tsCols = (TSKEY*)pColDataInfo->pData; + int32_t size = taosArrayGetSize(pInfo->childIds); + for (int32_t i = 0; i < size; i++) { + int64_t id = *(int64_t *)taosArrayGet(pInfo->childIds, i); + setSupKeyBuf(pCSup, pBlock->info.groupId, id, + tsCols[pInfo->updateResIndex]); + SWindowPosition* pos = (SWindowPosition*)taosHashGet(pCSup->pWindowHashTable, + pCSup->pKeyBuf, pCSup->keySize); + void* buf = getBufPage(pCSup->pDataBuf, pos->pageId); + SSDataBlock* pDB = createOneDataBlock(pInfo->pRes, false); + blockDataFromBuf(pDB, buf); + SSDataBlock* pSub = blockDataExtractBlock(pDB, pos->rowId, 1); + blockDataMerge(pInfo->pRes, pSub, NULL); + blockDataDestroy(pDB); + blockDataDestroy(pSub); + } + pInfo->updateResIndex++; + return pInfo->pRes; + } + return NULL; +} + static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -688,6 +783,15 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { size_t total = taosArrayGetSize(pInfo->pBlockLists); if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) { + if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) { + SSDataBlock* pDB = getDataFromCatch(pInfo); + if (pDB != NULL) { + return pDB; + } else { + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + } + } + if (pInfo->validBlockIndex >= total) { doClearBufferedBlocks(pInfo); pOperator->status = OP_EXEC_DONE; @@ -695,7 +799,17 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } int32_t current = pInfo->validBlockIndex++; - return taosArrayGetP(pInfo->pBlockLists, current); + SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); + if (pBlock->info.type == STREAM_REPROCESS) { + pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES; + } else { + int32_t code = catchDatablock(pBlock, &pInfo->childAggSup, pInfo->primaryTsIndex, 0); + if (code != TDB_CODE_SUCCESS) { + pTaskInfo->code = code; + longjmp(pTaskInfo->env, code); + } + } + return pBlock; } else { if (pInfo->scanMode == STREAM_SCAN_FROM_RES) { blockDataDestroy(pInfo->pUpdateRes); @@ -857,6 +971,10 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR pInfo->pOperatorDumy = pOperatorDumy; pInfo->interval = pSTInfo->interval; + size_t childKeyBufSize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY); + initCatchSupporter(&pInfo->childAggSup, 1024, childKeyBufSize, + "StreamFinalInterval", "/tmp/"); // TODO(liuyao) get row size from phy plan + pOperator->name = "StreamBlockScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->blocking = false; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 479ce394b1..62732aa1f9 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -8,6 +8,8 @@ typedef enum SResultTsInterpType { RESULT_ROW_END_INTERP = 2, } SResultTsInterpType; +static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator); + /* * There are two cases to handle: * @@ -473,8 +475,7 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SqlFun } static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo, - TSKEY* primaryKeys, int32_t prevPosition, SIntervalAggOperatorInfo* pInfo) { - int32_t order = pInfo->order; + TSKEY* primaryKeys, int32_t prevPosition, int32_t order) { bool ascQuery = (order == TSDB_ORDER_ASC); int32_t precision = pInterval->precision; @@ -723,7 +724,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe STimeWindow nextWin = win; while (1) { int32_t prevEndPos = (forwardStep - 1) * step + startPos; - startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo); + startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order); if (startPos < 0) { break; } @@ -1031,18 +1032,41 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type } } } -static void doClearWindows(SIntervalAggOperatorInfo* pInfo, int32_t numOfOutput, SSDataBlock* pBlock) { - SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); + +void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData, + int16_t bytes, uint64_t groupId, int32_t numOfOutput) { + SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId); + SResultRowPosition* p1 = + (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, + GET_RES_WINDOW_KEY_LEN(bytes)); + SResultRow* pResult = getResultRowByPos(pSup->pResultBuf, p1); + SqlFunctionCtx* pCtx = pBinfo->pCtx; + for (int32_t i = 0; i < numOfOutput; ++i) { + pCtx[i].resultInfo = getResultCell(pResult, i, pBinfo->rowCellInfoOffset); + struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo; + if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) { + continue; + } + pResInfo->initialized = false; + if (pCtx[i].functionId != -1) { + pCtx[i].fpSet.init(&pCtx[i], pResInfo); + } + } +} + +static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, + SInterval* pIntrerval, int32_t tsIndex, int32_t numOfOutput, SSDataBlock* pBlock) { + SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex); TSKEY *tsCols = (TSKEY*)pColDataInfo->pData; int32_t step = 0; for (int32_t i = 0; i < pBlock->info.rows; i += step) { SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; - STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], &pInfo->interval, - pInfo->interval.precision, NULL); + STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pIntrerval, + pIntrerval->precision, NULL); step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); - doClearWindow(pInfo, (char*)&win.skey, sizeof(TKEY), pBlock->info.groupId, numOfOutput); + doClearWindow(pSup, pBinfo, (char*)&win.skey, sizeof(TKEY), pBlock->info.groupId, numOfOutput); } } @@ -1084,7 +1108,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type); } if (pBlock->info.type == STREAM_REPROCESS) { - doClearWindows(pInfo, pOperator->numOfExprs, pBlock); + doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, + pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock); continue; } pInfo->order = TSDB_ORDER_ASC; @@ -1097,8 +1122,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - // TODO: remove for stream - /*ASSERT(pInfo->binfo.pRes->info.rows > 0);*/ pOperator->status = OP_RES_TO_RETURN; return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; @@ -1116,6 +1139,12 @@ void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) { cleanupAggSup(&pInfo->aggSup); } +void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { + SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo *)param; + doDestroyBasicInfo(&pInfo->binfo, numOfOutput); + cleanupAggSup(&pInfo->aggSup); +} + bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) { for (int32_t i = 0; i < numOfCols; i++) { if (!fmIsInvertible(pFCtx[i].functionId)) { @@ -1185,6 +1214,63 @@ _error: return NULL; } +SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, + STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, + SExecTaskInfo* pTaskInfo) { + SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + pInfo->order = TSDB_ORDER_ASC; + pInfo->interval = *pInterval; + pInfo->twAggSup = *pTwAggSupp; + pInfo->primaryTsIndex = primaryTsSlotId; + + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + initResultSizeInfo(pOperator, 4096); + + int32_t code = + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, + keyBufSize, pTaskInfo->id.str); + + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); + + pOperator->name = "StreamFinalIntervalOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->numOfExprs = numOfCols; + pOperator->info = pInfo; + + pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, + destroyStreamFinalIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, + NULL); + + code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return pOperator; + +_error: + destroyStreamFinalIntervalOperatorInfo(pInfo, numOfCols); + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + pTaskInfo->code = code; + return NULL; +} + SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, @@ -1548,3 +1634,91 @@ _error: pTaskInfo->code = code; return NULL; } + +static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, + int32_t tableGroupId) { + SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info; + SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; + int32_t numOfOutput = pOperatorInfo->numOfExprs; + SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); + int32_t step = 1; + bool ascScan = true; + TSKEY* tsCols = NULL; + SResultRow* pResult = NULL; + int32_t forwardStep = 0; + + if (pSDataBlock->pDataBlock != NULL) { + SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); + tsCols = (int64_t*)pColDataInfo->pData; + } + int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1); + TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascScan); + STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, + &pInfo->interval, pInfo->interval.precision, NULL); + while (1) { + int32_t code = + setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pInfo->binfo.pCtx, + numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); + if (code != TSDB_CODE_SUCCESS || pResult == NULL) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); + pos->groupId = tableGroupId; + pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; + *(int64_t*)pos->key = pResult->win.skey; + taosArrayPush(pUpdated, &pos); + forwardStep = + getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); + // window start(end) key interpolation + doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, + pInfo->order, false); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, + pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + int32_t prevEndPos = (forwardStep - 1) * step + startPos; + startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order); + if (startPos < 0) { + break; + } + } + return pUpdated; +} + +static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { + SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; + SOperatorInfo* downstream = pOperator->pDownstream[0]; + SArray* pUpdated = NULL; + + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } else if (pOperator->status == OP_RES_TO_RETURN) { + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + pOperator->status = OP_EXEC_DONE; + } + return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; + } + + while (1) { + publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + if (pBlock == NULL) { + break; + } + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true); + if (pBlock->info.type == STREAM_REPROCESS) { + doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, + pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock); + continue; + } + pUpdated = doHashInterval(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); + } + + finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); + initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + pOperator->status = OP_RES_TO_RETURN; + return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; +} From 8b6aa46d483526ab7594a339cc44ef27220fe35e Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 18 May 2022 11:54:52 +0000 Subject: [PATCH 4/4] fix alter table coredump --- source/dnode/vnode/src/meta/metaTable.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 3d1854927c..619458af24 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -442,7 +442,8 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl } entry.version = version; - int tlen; + int tlen; + SSchema *pNewSchema = NULL; switch (pAlterTbReq->action) { case TSDB_ALTER_TABLE_ADD_COLUMN: if (pColumn) { @@ -451,8 +452,9 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl } pSchema->sver++; pSchema->nCols++; - pSchema->pSchema = - taosMemoryRealloc(entry.ntbEntry.schema.pSchema, sizeof(SSchema) * entry.ntbEntry.schema.nCols); + pNewSchema = taosMemoryMalloc(sizeof(SSchema) * pSchema->nCols); + memcpy(pNewSchema, pSchema->pSchema, sizeof(SSchema) * (pSchema->nCols - 1)); + pSchema->pSchema = pNewSchema; pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].bytes = pAlterTbReq->bytes; pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].type = pAlterTbReq->type; pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].flags = pAlterTbReq->flags; @@ -511,6 +513,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl metaULock(pMeta); + if (pNewSchema) taosMemoryFree(pNewSchema); tDecoderClear(&dc); tdbTbcClose(pTbDbc); tdbTbcClose(pUidIdxc);