From baa48489b7f901db5ba89c7e0b220eb32dfd4a57 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 6 Jul 2022 21:25:34 +0800 Subject: [PATCH] merge from 3.0 --- include/common/tmsgdef.h | 1 + include/libs/nodes/plannodes.h | 1 + source/client/src/clientEnv.c | 2 +- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 + source/dnode/mgmt/mgmt_qnode/src/qmHandle.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 4 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 5 +- source/dnode/mnode/impl/src/mndMain.c | 3 +- source/dnode/mnode/impl/src/mndQuery.c | 2 + source/dnode/qnode/src/qnode.c | 1 + source/dnode/vnode/inc/vnode.h | 25 +- source/dnode/vnode/src/inc/vnd.h | 2 + source/dnode/vnode/src/vnd/vnodeSvr.c | 16 +- source/dnode/vnode/src/vnd/vnodeSync.c | 52 +++-- source/libs/executor/inc/executorimpl.h | 7 +- source/libs/executor/src/executorimpl.c | 2 +- source/libs/executor/src/scanoperator.c | 158 +++++++++++-- source/libs/executor/src/timewindowoperator.c | 217 +++++++++++++----- source/libs/function/src/udfd.c | 2 +- source/libs/nodes/src/nodesCloneFuncs.c | 1 + source/libs/nodes/src/nodesCodeFuncs.c | 7 + source/libs/qworker/inc/qwInt.h | 1 + source/libs/qworker/inc/qwMsg.h | 3 +- source/libs/qworker/src/qwMsg.c | 6 +- source/libs/qworker/src/qworker.c | 8 +- source/libs/qworker/test/qworkerTests.cpp | 4 +- source/libs/scheduler/inc/schInt.h | 5 +- source/libs/scheduler/src/schRemote.c | 8 +- source/libs/scheduler/src/schTask.c | 6 +- source/libs/sync/src/syncMain.c | 7 +- tests/script/tsim/stream/state0.sim | 49 ++++ 32 files changed, 473 insertions(+), 135 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 806c0b5122..8b39530e84 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -209,6 +209,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_CONTINUE, "query-continue", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_HEARTBEAT, "query-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_FETCH, "fetch", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SCH_MERGE_FETCH, "merge-fetch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_CANCEL_TASK, "cancel-task", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_DROP_TASK, "drop-task", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index d3e9840987..6a865b4e2a 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -353,6 +353,7 @@ typedef struct SDownstreamSourceNode { uint64_t taskId; uint64_t schedId; int32_t execId; + int32_t fetchMsgType; } SDownstreamSourceNode; typedef struct SExchangePhysiNode { diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 797d58e6ef..89ecf16b40 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -102,7 +102,7 @@ void closeTransporter(SAppInstInfo *pAppInfo) { static bool clientRpcRfp(int32_t code, tmsg_t msgType) { if (NEED_REDIRECT_ERROR(code)) { - if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) { + if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) { return false; } return true; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 59d68b2110..d60b69daba 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -214,6 +214,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c index 1f22eefddf..14cb1bd533 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c @@ -111,6 +111,7 @@ SArray *qmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH_RSP, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 1f7347203d..a3df32a08c 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -328,6 +328,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 3913e3fda8..1d795c74f2 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -238,9 +238,9 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { } int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { - pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeMsg); + pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeWriteMsg); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); - pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyMsg); + pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 7043991525..ad7fbf4344 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -42,7 +42,7 @@ static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) { static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) { pMsg->info.hasEpSet = 1; - SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; + SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info, .msgType = pMsg->msgType}; int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet); rsp.pCont = rpcMallocCont(contLen); @@ -88,6 +88,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { case TDMT_MND_SYSTABLE_RETRIEVE_RSP: case TDMT_DND_SYSTABLE_RETRIEVE_RSP: case TDMT_SCH_FETCH_RSP: + case TDMT_SCH_MERGE_FETCH_RSP: qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0); return; case TDMT_MND_STATUS_RSP: @@ -253,7 +254,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { static bool rpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) { - if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) { + if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) { return false; } return true; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 5e708616fd..f18f3c983e 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -531,7 +531,8 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { if (!IsReq(pMsg)) return 0; if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY || pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT || - pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) { + pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || + pMsg->msgType == TDMT_SCH_DROP_TASK) { return 0; } if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0; diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index aec99fa3b7..5a527b994e 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -45,6 +45,7 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pMsg, 0); break; case TDMT_SCH_FETCH: + case TDMT_SCH_MERGE_FETCH: code = qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg, 0); break; case TDMT_SCH_DROP_TASK: @@ -72,6 +73,7 @@ int32_t mndInitQuery(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_QUERY, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_CONTINUE, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_FETCH, mndProcessQueryMsg); + mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_FETCH, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg); diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index cfc63b083d..723402e639 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -86,6 +86,7 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) { code = qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg, ts); break; case TDMT_SCH_FETCH: + case TDMT_SCH_MERGE_FETCH: code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts); break; case TDMT_SCH_FETCH_RSP: diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 0498d889d6..6320f4719d 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -51,15 +51,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodeClose(SVnode *pVnode); -int32_t vnodePreProcessReq(SVnode *pVnode, SRpcMsg *pMsg); -int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); -int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); -int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); -int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); -int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); -int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); -int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); -int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName); + int32_t vnodeStart(SVnode *pVnode); void vnodeStop(SVnode *pVnode); int64_t vnodeGetSyncHandle(SVnode *pVnode); @@ -68,14 +60,25 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId); int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int64_t sver, int64_t ever); int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader); int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData); + int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen); int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list); int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list); void *vnodeGetIdx(SVnode *pVnode); void *vnodeGetIvtIdx(SVnode *pVnode); -void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); -void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); +int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); +int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName); + +int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg); +int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); + +int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); +int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); +int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); +void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); +void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); // meta typedef struct SMeta SMeta; // todo: remove diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 32be479116..cb25e93cde 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -94,6 +94,8 @@ int32_t vnodeAsyncCommit(SVnode* pVnode); int32_t vnodeSyncOpen(SVnode* pVnode, char* path); void vnodeSyncStart(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode); +void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg); +bool vnodeIsLeader(SVnode* pVnode); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 8ad92d0478..b3e9f53a5a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -28,7 +28,7 @@ static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, vo static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -int32_t vnodePreProcessReq(SVnode *pVnode, SRpcMsg *pMsg) { +int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; SDecoder dc = {0}; @@ -133,7 +133,7 @@ _err: return code; } -int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) { +int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) { void *ptr = NULL; void *pReq; int32_t len; @@ -261,6 +261,11 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in vnode query queue is processing"); + if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) { + vnodeRedirectRpcMsg(pVnode, pMsg); + return 0; + } + SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; switch (pMsg->msgType) { case TDMT_SCH_QUERY: @@ -276,11 +281,18 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("message in fetch queue is processing"); + if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG) + && !vnodeIsLeader(pVnode)) { + vnodeRedirectRpcMsg(pVnode, pMsg); + return 0; + } + char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); switch (pMsg->msgType) { case TDMT_SCH_FETCH: + case TDMT_SCH_MERGE_FETCH: return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0); case TDMT_SCH_FETCH_RSP: return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg, 0); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 6f6102ea14..f75ccba4bb 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -120,7 +120,24 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) { return code; } -void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { +void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) { + SEpSet newEpSet = {0}; + syncGetRetryEpSet(pVnode->sync, &newEpSet); + + const STraceId *trace = &pMsg->info.traceId; + vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg, + newEpSet.numOfEps, newEpSet.inUse); + for (int32_t i = 0; i < newEpSet.numOfEps; ++i) { + vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn, + newEpSet.eps[i].port); + } + pMsg->info.hasEpSet = 1; + + SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info, .msgType = pMsg->msgType + 1}; + tmsgSendRedirectRsp(&rsp, &newEpSet); +} + +void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnode *pVnode = pInfo->ahandle; int32_t vgId = pVnode->config.vgId; int32_t code = 0; @@ -131,7 +148,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { const STraceId *trace = &pMsg->info.traceId; vGTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle); - code = vnodePreProcessReq(pVnode, pMsg); + code = vnodePreProcessWriteMsg(pVnode, pMsg); if (code != 0) { vError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr()); } else { @@ -141,7 +158,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType)); if (code > 0) { SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; - if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) { + if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) { rsp.code = terrno; vError("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr()); } @@ -156,16 +173,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { vnodeAccumBlockMsg(pVnode, pMsg->msgType); } else if (code < 0) { if (terrno == TSDB_CODE_SYN_NOT_LEADER) { - SEpSet newEpSet = {0}; - syncGetRetryEpSet(pVnode->sync, &newEpSet); - vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps, - newEpSet.inUse); - for (int32_t i = 0; i < newEpSet.numOfEps; ++i) { - vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port); - } - pMsg->info.hasEpSet = 1; - SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; - tmsgSendRedirectRsp(&rsp, &newEpSet); + vnodeRedirectRpcMsg(pVnode, pMsg); } else { if (terrno != 0) code = terrno; vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code); @@ -185,7 +193,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { vnodeWaitBlockMsg(pVnode); } -void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { +void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnode *pVnode = pInfo->ahandle; int32_t vgId = pVnode->config.vgId; int32_t code = 0; @@ -199,7 +207,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; if (rsp.code == 0) { - if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) { + if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) { rsp.code = terrno; vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr()); } @@ -513,3 +521,17 @@ void vnodeSyncStart(SVnode *pVnode) { } void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } + +bool vnodeIsLeader(SVnode *pVnode) { + if (!syncIsReady(pVnode->sync)) { + return false; + } + + // todo + // if (!pVnode->restored) { + // terrno = TSDB_CODE_APP_NOT_READY; + // return false; + // } + + return true; +} \ No newline at end of file diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1d93903019..9b32560041 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -319,8 +319,9 @@ typedef enum EStreamScanMode { STREAM_SCAN_FROM_READERHANDLE = 1, STREAM_SCAN_FROM_RES, STREAM_SCAN_FROM_UPDATERES, - STREAM_SCAN_FROM_DATAREADER, + STREAM_SCAN_FROM_DATAREADER, // todo(liuyao) delete it STREAM_SCAN_FROM_DATAREADER_RETRIEVE, + STREAM_SCAN_FROM_DATAREADER_RANGE, } EStreamScanMode; typedef struct SCatchSupporter { @@ -612,6 +613,7 @@ typedef struct SStreamSessionAggOperatorInfo { SSDataBlock* pWinBlock; // window result SqlFunctionCtx* pDummyCtx; // for combine SSDataBlock* pDelRes; // delete result + bool returnDelete; SSDataBlock* pUpdateRes; // update window SHashObj* pStDeleted; void* pDelIterator; @@ -889,6 +891,9 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize); SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex); +SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, + TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex); +bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap); int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted); bool functionNeedToExecute(SqlFunctionCtx* pCtx); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 63f8c9769b..fa84d79f56 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2053,7 +2053,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf pMsgSendInfo->param = pWrapper; pMsgSendInfo->msgInfo.pData = pMsg; pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); - pMsgSendInfo->msgType = TDMT_SCH_FETCH; + pMsgSendInfo->msgType = pSource->fetchMsgType; pMsgSendInfo->fp = loadRemoteDataCallback; int64_t transporterId = 0; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 49e79639a8..aae85bbbc1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -792,13 +792,19 @@ static void doClearBufferedBlocks(SStreamScanInfo* pInfo) { } static bool isSessionWindow(SStreamScanInfo* pInfo) { - return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; + return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || + pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; } static bool isStateWindow(SStreamScanInfo* pInfo) { return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE; } +static bool isIntervalWindow(SStreamScanInfo* pInfo) { + return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || + pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL; +} + static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) { uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t)); if (groupId) { @@ -834,6 +840,49 @@ static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t grou } } +void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) { + pTableScanInfo->cond.twindows[0] = *pWin; + pTableScanInfo->curTWinIdx = 0; + // tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); + // if (!pTableScanInfo->dataReader) { + // return false; + // } + pTableScanInfo->scanTimes = 0; + pTableScanInfo->currentGroupId = -1; +} + +static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) { + if ((*pRowIndex) == pBlock->info.rows) { + return false; + } + + ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3); + SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + TSKEY* startData = (TSKEY*)pStartTsCol->pData; + SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + TSKEY* endData = (TSKEY*)pEndTsCol->pData; + STimeWindow win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]}; + setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex); + (*pRowIndex)++; + + for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) { + if (win.skey == startData[*pRowIndex]) { + win.ekey = TMAX(win.ekey, endData[*pRowIndex]); + continue; + } + if (win.skey == endData[*pRowIndex]) { + win.skey = TMIN(win.skey, startData[*pRowIndex]); + continue; + } + ASSERT((win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) || + (isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0))); + break; + } + + resetTableScanInfo(pInfo->pTableScanOp->info, &win); + return true; +} + static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { STimeWindow win = { .skey = INT64_MIN, @@ -852,6 +901,7 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, tsCols[*pRowIndex], INT64_MIN, pSDB->info.groupId, gap, &winIndex); win = pCurWin->win; + setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); (*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL); } else { win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, NULL); @@ -875,15 +925,7 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t if (!needRead) { return false; } - STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - pTableScanInfo->cond.twindows[0] = win; - pTableScanInfo->curTWinIdx = 0; - // tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); - // if (!pTableScanInfo->dataReader) { - // return false; - // } - pTableScanInfo->scanTimes = 0; - pTableScanInfo->currentGroupId = -1; + resetTableScanInfo(pInfo->pTableScanOp->info, &win); return true; } @@ -900,6 +942,26 @@ static void copyOneRow(SSDataBlock* dest, SSDataBlock* source, int32_t sourceRow dest->info.rows++; } +static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { + while (1) { + SSDataBlock* pResult = NULL; + pResult = doTableScan(pInfo->pTableScanOp); + if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) { + // scan next window data + pResult = doTableScan(pInfo->pTableScanOp); + } + if (!pResult) { + blockDataCleanup(pSDB); + *pRowIndex = 0; + return NULL; + } + + if (pResult->info.groupId == pInfo->groupId) { + return pResult; + } + } +} + static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { while (1) { SSDataBlock* pResult = NULL; @@ -931,9 +993,8 @@ static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_ return pResult; */ } - -static void copyDeleteDataBlock(SStreamScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator, - SSDataBlock* pUpdateRes) { +static void generateIntervalTs(SStreamScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator, + SSDataBlock* pUpdateRes) { if (pDelBlock->info.rows == 0) { return; } @@ -948,7 +1009,7 @@ static void copyDeleteDataBlock(SStreamScanInfo* pInfo, SSDataBlock* pDelBlock, uint64_t* uidCol = (uint64_t*)pGpCol->pData; SColumnInfoData* pDestTsCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX); - SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, DELETE_GROUPID_COLUMN_INDEX); + SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, GROUPID_COLUMN_INDEX); for (int32_t i = pInfo->deleteDataIndex; i < pDelBlock->info.rows && i < pDelBlock->info.capacity - (endData[i] - startData[i]) / pInfo->interval.interval - 1; @@ -969,6 +1030,43 @@ static void copyDeleteDataBlock(SStreamScanInfo* pInfo, SSDataBlock* pDelBlock, } } +static void generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SOperatorInfo* pOperator, + SSDataBlock* pUpdateRes) { + if (pBlock->info.rows == 0) { + return; + } + blockDataCleanup(pUpdateRes); + blockDataEnsureCapacity(pUpdateRes, pBlock->info.rows); + ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3); + SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + TSKEY* startData = (TSKEY*)pStartTsCol->pData; + SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + TSKEY* endData = (TSKEY*)pEndTsCol->pData; + SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); + uint64_t* uidCol = (uint64_t*)pGpCol->pData; + + SColumnInfoData* pDestStartCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX); + SColumnInfoData* pDestEndCol = taosArrayGet(pUpdateRes->pDataBlock, END_TS_COLUMN_INDEX); + SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, GROUPID_COLUMN_INDEX); + int32_t dummy = 0; + for (int32_t i = 0; i < pBlock->info.rows; i++) { + uint64_t groupId = getGroupId(pOperator, uidCol[i]); + // gap must be 0. + SResultWindowInfo* pStartWin = + getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, startData[i], endData[i], groupId, 0, &dummy); + if (!pStartWin) { + // window has been closed. + continue; + } + SResultWindowInfo* pEndWin = + getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, endData[i], endData[i], groupId, 0, &dummy); + ASSERT(pEndWin); + colDataAppend(pDestStartCol, i, (const char*)&pStartWin->win.skey, false); + colDataAppend(pDestEndCol, i, (const char*)&pEndWin->win.ekey, false); + colDataAppend(pDestGpCol, i, (const char*)&groupId, false); + pUpdateRes->info.rows++; + } +} static void setUpdateData(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) { blockDataCleanup(pUpdateBlock); int32_t size = taosArrayGetSize(pInfo->tsArray); @@ -1001,7 +1099,7 @@ static void setUpdateData(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlo } if (size == 0) { - copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pUpdateBlock); + generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pUpdateBlock); } } @@ -1060,11 +1158,17 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } break; case STREAM_DELETE_DATA: { pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; - copyDataBlock(pInfo->pDeleteDataRes, pBlock); - copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pInfo->pUpdateRes); pInfo->updateResIndex = 0; - prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex); + if (isIntervalWindow(pInfo)) { + copyDataBlock(pInfo->pDeleteDataRes, pBlock); + generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pInfo->pUpdateRes); + prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex); + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; + } else { + generateScanRange(pInfo, pBlock, pInfo->pTableScanOp, pInfo->pUpdateRes); + prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + } pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA; return pInfo->pUpdateRes; } break; @@ -1078,8 +1182,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; return pInfo->pRes; } else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) { - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; - if (!isStateWindow(pInfo)) { + if (isStateWindow(pInfo)) { + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + } else { + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); } return pInfo->pUpdateRes; @@ -1104,11 +1210,19 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { return pInfo->pUpdateRes; } pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + } else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE) { + SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); + if (pSDB) { + pSDB->info.type = STREAM_NORMAL; + checkUpdateData(pInfo, true, pSDB, false); + return pSDB; + } + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; } else if (isStateWindow(pInfo)) { pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; pInfo->updateResIndex = pInfo->pUpdateRes->info.rows; if (prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex)) { - ASSERT(pInfo->pUpdateRes->info.rows == 0); + blockDataCleanup(pInfo->pUpdateRes); // return empty data blcok return pInfo->pUpdateRes; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index de11b06c17..37e98693cc 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1323,13 +1323,13 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, } } -static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* pInterval, int32_t tsIndex, +static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* pInterval, int32_t numOfOutput, SSDataBlock* pBlock, SArray* pUpWins) { - SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, tsIndex); + SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* tsCols = (TSKEY*)pTsCol->pData; uint64_t* pGpDatas = NULL; if (pBlock->info.type == STREAM_RETRIEVE) { - SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, 2); + SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); pGpDatas = (uint64_t*)pGpCol->pData; } int32_t step = 0; @@ -1492,7 +1492,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { printDataBlock(pBlock, "single interval recv"); if (pBlock->info.type == STREAM_CLEAR) { - doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, 0, + doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, NULL); qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); continue; @@ -1710,7 +1710,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t)); pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes)); pInfo->delIndex = 0; - // pInfo->pDelRes = createDeleteBlock(); todo(liuyao) for delete + // pInfo->pDelRes = createPullDataBlock(); todo(liuyao) for delete pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false);// todo(liuyao) for delete pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;// todo(liuyao) for delete @@ -2571,13 +2571,13 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB ASSERT(3 <= taosArrayGetSize(pBlock->pDataBlock)); for (; (*pIndex) < size; (*pIndex)++) { SPullWindowInfo* pWin = taosArrayGet(array, (*pIndex)); - SColumnInfoData* pStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, 0); + SColumnInfoData* pStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); colDataAppend(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false); - SColumnInfoData* pEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, 1); + SColumnInfoData* pEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); colDataAppend(pEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false); - SColumnInfoData* pGroupId = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, 2); + SColumnInfoData* pGroupId = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); colDataAppend(pGroupId, pBlock->info.rows, (const char*)&pWin->groupId, false); pBlock->info.rows++; } @@ -2589,9 +2589,9 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB } void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) { - SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, 0); + SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* tsData = (TSKEY*)pStartCol->pData; - SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, 2); + SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); uint64_t* groupIdData = (uint64_t*)pGroupCol->pData; int32_t chId = getChildIndex(pBlock); for (int32_t i = 0; i < pBlock->info.rows; i++) { @@ -2680,7 +2680,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pInfo->binfo.pRes->info.type = pBlock->info.type; } else if (pBlock->info.type == STREAM_CLEAR) { SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes)); - doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pInfo->primaryTsIndex, pOperator->exprSupp.numOfExprs, + doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins); if (IS_FINAL_OP(pInfo)) { int32_t childIndex = getChildIndex(pBlock); @@ -2688,7 +2688,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamFinalIntervalOperatorInfo* pChildInfo = pChildOp->info; SExprSupp* pChildSup = &pChildOp->exprSupp; - doClearWindows(&pChildInfo->aggSup, pChildSup, &pChildInfo->interval, pChildInfo->primaryTsIndex, + doClearWindows(&pChildInfo->aggSup, pChildSup, &pChildInfo->interval, pChildSup->numOfExprs, pBlock, NULL); rebuildIntervalWindow(pInfo, pSup, pUpWins, pInfo->binfo.pRes->info.groupId, pOperator->exprSupp.numOfExprs, pOperator->pTaskInfo, NULL); @@ -2719,7 +2719,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { continue; } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) { SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes)); - doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs, pBlock, pUpWins); + doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins); removeResults(pUpWins, pUpdated); taosArrayDestroy(pUpWins); if (taosArrayGetSize(pUpdated) > 0) { @@ -2901,7 +2901,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK); pInfo->pPullDataRes = createPullDataBlock(); pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; - // pInfo->pDelRes = createDeleteBlock(); // todo(liuyao) for delete + // pInfo->pDelRes = createPullDataBlock(); // todo(liuyao) for delete pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false);// todo(liuyao) for delete pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;// todo(liuyao) for delete pInfo->delIndex = 0; @@ -3046,13 +3046,14 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK); pInfo->pDelIterator = NULL; - // pInfo->pDelRes = createDeleteBlock(); // todo(liuyao) for delete + // pInfo->pDelRes = createPullDataBlock(); pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false);// todo(liuyao) for delete pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;// todo(liuyao) for delete pInfo->pChildren = NULL; pInfo->isFinal = false; pInfo->pPhyNode = pPhyNode; pInfo->ignoreExpiredData = pSessionNode->window.igExpired; + pInfo->returnDelete = false; pOperator->name = "StreamSessionWindowAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; @@ -3087,15 +3088,23 @@ int64_t getSessionWindowEndkey(void* data, int32_t index) { SResultWindowInfo* pWin = taosArrayGet(pWinInfos, index); return pWin->win.ekey; } -static bool isInWindow(SResultWindowInfo* pWin, TSKEY ts, int64_t gap) { - int64_t sGap = ts - pWin->win.skey; - int64_t eGap = pWin->win.ekey - ts; - if ((sGap < 0 && sGap >= -gap) || (eGap < 0 && eGap >= -gap) || (sGap >= 0 && eGap >= 0)) { + +bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap) { + int64_t sGap = ts - pWin->skey + gap; + int64_t eGap = pWin->ekey - ts + gap; + // if ((sGap < 0 && sGap >= -gap) || (eGap < 0 && eGap >= -gap) || (sGap >= 0 && eGap >= 0)) { + // return true; + // } + if (sGap >= 0 && eGap >= 0) { return true; } return false; } +bool isInWindow(SResultWindowInfo* pWinInfo, TSKEY ts, int64_t gap) { + return isInTimeWindow(&pWinInfo->win, ts, gap); +} + static SResultWindowInfo* insertNewSessionWindow(SArray* pWinInfos, TSKEY ts, int32_t index) { SResultWindowInfo win = {.pos.offset = -1, .pos.pageId = -1, .win.skey = ts, .win.ekey = ts, .isOutput = false}; return taosArrayInsert(pWinInfos, index, &win); @@ -3118,6 +3127,41 @@ SArray* getWinInfos(SStreamAggSupporter* pAggSup, uint64_t groupId) { return pWinInfos; } +// don't add new window +SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, + int64_t gap, int32_t* pIndex) { + SArray* pWinInfos = getWinInfos(pAggSup, groupId); + pAggSup->pCurWins = pWinInfos; + + int32_t size = taosArrayGetSize(pWinInfos); + if (size == 0) { + return NULL; + } + // find the first position which is smaller than the key + int32_t index = binarySearch(pWinInfos, size, startTs, TSDB_ORDER_DESC, getSessionWindowEndkey); + SResultWindowInfo* pWin = NULL; + if (index >= 0) { + pWin = taosArrayGet(pWinInfos, index); + if (isInWindow(pWin, startTs, gap)) { + *pIndex = index; + return pWin; + } + } + + if (index + 1 < size) { + pWin = taosArrayGet(pWinInfos, index + 1); + if (isInWindow(pWin, startTs, gap)) { + *pIndex = index + 1; + return pWin; + } else if (endTs != INT64_MIN && isInWindow(pWin, endTs, gap)) { + *pIndex = index; + return pWin; + } + } + + return NULL; +} + SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex) { SArray* pWinInfos = getWinInfos(pAggSup, groupId); @@ -3358,6 +3402,34 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData } } +void deleteWindow(SArray* pWinInfos, int32_t index) { + ASSERT(index >= 0 && index < taosArrayGetSize(pWinInfos)); + taosArrayRemove(pWinInfos, index); +} + +static void doDeleteSessionWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int64_t gap, SArray* result) { + SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + TSKEY* startDatas = (TSKEY*)pStartTsCol->pData; + SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + TSKEY* endDatas = (TSKEY*)pEndTsCol->pData; + SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); + uint64_t* gpDatas = (uint64_t*)pGroupCol->pData; + for (int32_t i = 0; i < pBlock->info.rows; i++) { + int32_t winIndex = 0; + while(1) { + SResultWindowInfo* pCurWin = + getCurSessionWindow(pAggSup, startDatas[i], endDatas[i], gpDatas[i], gap, &winIndex); + if (!pCurWin) { + break; + } + deleteWindow(pAggSup->pCurWins, winIndex); + if (result) { + taosArrayPush(result, pCurWin); + } + } + } +} + static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup, SSDataBlock* pBlock, int32_t tsIndex, int32_t numOfOutput, int64_t gap, SArray* result) { SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex); @@ -3366,13 +3438,14 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup, for (int32_t i = 0; i < pBlock->info.rows; i += step) { int32_t winIndex = 0; SResultWindowInfo* pCurWin = - getSessionTimeWindow(pAggSup, tsCols[i], INT64_MIN, pBlock->info.groupId, gap, &winIndex); - step = updateSessionWindowInfo(pCurWin, tsCols, NULL, pBlock->info.rows, i, gap, NULL); - ASSERT(isInWindow(pCurWin, tsCols[i], gap)); - if (pCurWin->pos.pageId == -1) { + getCurSessionWindow(pAggSup, tsCols[i], INT64_MIN, pBlock->info.groupId, gap, &winIndex); + if (!pCurWin || pCurWin->pos.pageId == -1) { // window has been closed. + step = 1; continue; } + step = updateSessionWindowInfo(pCurWin, tsCols, NULL, pBlock->info.rows, i, gap, NULL); + ASSERT(isInWindow(pCurWin, tsCols[i], gap)); doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pSup, numOfOutput); if (result) { taosArrayPush(result, pCurWin); @@ -3407,7 +3480,7 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It blockDataEnsureCapacity(pBlock, size); size_t keyLen = 0; while (((*Ite) = taosHashIterate(pStDeleted, *Ite)) != NULL) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); colDataAppend(pColInfoData, pBlock->info.rows, *Ite, false); for (int32_t i = 1; i < taosArrayGetSize(pBlock->pDataBlock); i++) { pColInfoData = taosArrayGet(pBlock->pDataBlock, i); @@ -3495,7 +3568,7 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra pSeWin->isOutput = true; } if (delete) { - taosArrayRemove(pWins, i); + deleteWindow(pWins, i); i--; size = taosArrayGetSize(pWins); } @@ -3535,6 +3608,14 @@ int32_t getAllSessionWindow(SHashObj* pHashMap, SArray* pClosed, __get_win_info_ return TSDB_CODE_SUCCESS; } +static void copyDeleteWindowInfo(SArray* pResWins, SHashObj* pStDeleted) { + int32_t size = taosArrayGetSize(pResWins); + for (int32_t i = 0; i < size; i++) { + SResultWindowInfo* pWinInfo = taosArrayGet(pResWins, i); + taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &pWinInfo->win.skey, sizeof(TSKEY)); + } +} + static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SExprSupp* pSup = &pOperator->exprSupp; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; @@ -3570,17 +3651,32 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { if (pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, 0, pOperator->exprSupp.numOfExprs, - pInfo->gap, pWins); + 0, pWins); if (IS_FINAL_OP(pInfo)) { int32_t childIndex = getChildIndex(pBlock); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, 0, pChildOp->exprSupp.numOfExprs, - pChildInfo->gap, NULL); + 0, NULL); rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->exprSupp.numOfExprs, pOperator); } taosArrayDestroy(pWins); continue; + } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { + SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); + // gap must be 0 + doDeleteSessionWindows(&pInfo->streamAggSup, pBlock, 0, pWins); + if (IS_FINAL_OP(pInfo)) { + int32_t childIndex = getChildIndex(pBlock); + SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); + SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; + // gap must be 0 + doDeleteSessionWindows(&pChildInfo->streamAggSup, pBlock, 0, NULL); + rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->exprSupp.numOfExprs, pOperator); + } + copyDeleteWindowInfo(pWins, pInfo->pStDeleted); + taosArrayDestroy(pWins); + continue; } else if (pBlock->info.type == STREAM_GET_ALL) { getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getResWinForSession); continue; @@ -3664,26 +3760,29 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } else if (pOperator->status == OP_RES_TO_RETURN) { - doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); - if (pInfo->pDelRes->info.rows > 0) { + doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); + if (pBInfo->pRes->info.rows > 0) { + printDataBlock(pBInfo->pRes, "Semi Session"); + return pBInfo->pRes; + } + + // doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); + if (pInfo->pDelRes->info.rows > 0 && !pInfo->returnDelete) { + pInfo->returnDelete = true; printDataBlock(pInfo->pDelRes, "Semi Session"); return pInfo->pDelRes; } - doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); - if (pBInfo->pRes->info.rows == 0) { - pOperator->status = OP_EXEC_DONE; - if (pInfo->pUpdateRes->info.rows == 0) { - // semi interval operator clear disk buffer - clearStreamSessionOperator(pInfo); - return NULL; - } + + if (pInfo->pUpdateRes->info.rows > 0) { // process the rest of the data pOperator->status = OP_OPENED; printDataBlock(pInfo->pUpdateRes, "Semi Session"); return pInfo->pUpdateRes; } - printDataBlock(pBInfo->pRes, "Semi Session"); - return pBInfo->pRes; + // semi interval operator clear disk buffer + clearStreamSessionOperator(pInfo); + pOperator->status = OP_EXEC_DONE; + return NULL; } _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); @@ -3699,11 +3798,17 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { if (pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); - doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, 0, pSup->numOfExprs, pInfo->gap, pWins); + doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, 0, pSup->numOfExprs, 0, pWins); removeSessionResults(pStUpdated, pWins); taosArrayDestroy(pWins); copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex); break; + } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { + // gap must be 0 + doDeleteSessionWindows(&pInfo->streamAggSup, pBlock, 0, NULL); + copyDataBlock(pInfo->pDelRes, pBlock); + pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; + break; } else if (pBlock->info.type == STREAM_GET_ALL) { getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getResWinForSession); continue; @@ -3728,24 +3833,29 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { pSup->rowEntryInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); - doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); - if (pInfo->pDelRes->info.rows > 0) { + + doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); + if (pBInfo->pRes->info.rows > 0) { + printDataBlock(pBInfo->pRes, "Semi Session"); + return pBInfo->pRes; + } + + // doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); + if (pInfo->pDelRes->info.rows > 0 && !pInfo->returnDelete) { + pInfo->returnDelete = true; printDataBlock(pInfo->pDelRes, "Semi Session"); return pInfo->pDelRes; } - doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); - if (pBInfo->pRes->info.rows == 0) { - pOperator->status = OP_EXEC_DONE; - if (pInfo->pUpdateRes->info.rows == 0) { - return NULL; - } + + if (pInfo->pUpdateRes->info.rows > 0) { // process the rest of the data pOperator->status = OP_OPENED; printDataBlock(pInfo->pUpdateRes, "Semi Session"); return pInfo->pUpdateRes; } - printDataBlock(pBInfo->pRes, "Semi Session"); - return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; + + pOperator->status = OP_EXEC_DONE; + return NULL; } SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, @@ -3971,11 +4081,6 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, S return rows - start; } -void deleteWindow(SArray* pWinInfos, int32_t index) { - ASSERT(index >= 0 && index < taosArrayGetSize(pWinInfos)); - taosArrayRemove(pWinInfos, index); -} - static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int32_t tsIndex, SColumn* pCol, int32_t keyIndex, SHashObj* pSeUpdated, SHashObj* pSeDeleted) { SColumnInfoData* pTsColInfo = taosArrayGet(pBlock->pDataBlock, tsIndex); @@ -4179,7 +4284,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK); pInfo->pDelIterator = NULL; - // pInfo->pDelRes = createDeleteBlock(); // todo(liuyao) for delete + // pInfo->pDelRes = createPullDataBlock(); // todo(liuyao) for delete pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false);// todo(liuyao) for delete pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;// todo(liuyao) for delete pInfo->pChildren = NULL; diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 708ea4bd38..a412b589a9 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -549,7 +549,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { static bool udfdRpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) { - if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) { + if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) { return false; } return true; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index a1b0cc5947..6c0717e845 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -595,6 +595,7 @@ static int32_t downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstre COPY_SCALAR_FIELD(taskId); COPY_SCALAR_FIELD(schedId); COPY_SCALAR_FIELD(execId); + COPY_SCALAR_FIELD(fetchMsgType); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 0bff063ea1..2a94ee43e3 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -3538,6 +3538,7 @@ static const char* jkDownstreamSourceAddr = "Addr"; static const char* jkDownstreamSourceTaskId = "TaskId"; static const char* jkDownstreamSourceSchedId = "SchedId"; static const char* jkDownstreamSourceExecId = "ExecId"; +static const char* jkDownstreamSourceFetchMsgType = "FetchMsgType"; static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) { const SDownstreamSourceNode* pNode = (const SDownstreamSourceNode*)pObj; @@ -3552,6 +3553,9 @@ static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceExecId, pNode->execId); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceFetchMsgType, pNode->fetchMsgType); + } return code; } @@ -3569,6 +3573,9 @@ static int32_t jsonToDownstreamSourceNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetIntValue(pJson, jkDownstreamSourceExecId, &pNode->execId); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkDownstreamSourceFetchMsgType, &pNode->fetchMsgType); + } return code; } diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index eb10a2fdd6..2b1e535e8c 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -123,6 +123,7 @@ typedef struct SQWTaskCtx { int8_t taskType; int8_t explain; int32_t queryType; + int32_t fetchType; int32_t execId; bool queryFetched; diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index 75b11c1b0b..7becaf06eb 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -35,8 +35,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes); int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code); int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code); -int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, - int32_t code); +int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 8df3ac90fa..ea5aa3c563 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -104,7 +104,7 @@ int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { +int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { if (NULL == pRsp) { pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); memset(pRsp, 0, sizeof(SRetrieveTableRsp)); @@ -112,7 +112,7 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i } SRpcMsg rpcRsp = { - .msgType = TDMT_SCH_FETCH_RSP, + .msgType = rspType, .pCont = pRsp, .contLen = sizeof(*pRsp) + dataLength, .code = code, @@ -436,7 +436,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int int64_t rId = 0; int32_t eId = msg->execId; - SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info}; + SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info, .msgType = pMsg->msgType}; QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 5ba525329f..3f8d62b1aa 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -606,7 +606,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { qwMsg->connInfo = ctx->dataConnInfo; QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); - qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code); + qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code); rsp = NULL; QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, @@ -628,7 +628,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { rsp = NULL; qwMsg->connInfo = ctx->dataConnInfo; - qwBuildAndSendFetchRsp(&qwMsg->connInfo, NULL, 0, code); + qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, NULL, 0, code); QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0); } @@ -661,6 +661,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); + ctx->queryType = qwMsg->msgType; + SOutputData sOutput = {0}; QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); @@ -711,7 +713,7 @@ _return: } if (code || rsp) { - qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code); + qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen); } diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index bc37400249..be54db51de 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -214,7 +214,8 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { rpcFreeCont(rsp); break; } - case TDMT_SCH_FETCH_RSP: { + case TDMT_SCH_FETCH_RSP: + case TDMT_SCH_MERGE_FETCH_RSP: { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)pRsp->pCont; if (0 == pRsp->code && 0 == rsp->completed) { @@ -815,6 +816,7 @@ void *fetchQueueThread(void *param) { switch (fetchRpc->msgType) { case TDMT_SCH_FETCH: + case TDMT_SCH_MERGE_FETCH: qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0); break; case TDMT_SCH_CANCEL_TASK: diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 8e8652aab5..7289e4b6be 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -35,7 +35,7 @@ extern "C" { #define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000 #define SCH_MAX_TASK_TIMEOUT_USEC 60000000 -#define SCH_TASK_MAX_EXEC_TIMES 5 +#define SCH_TASK_MAX_EXEC_TIMES 8 #define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA enum { @@ -318,6 +318,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true #define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) #define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) +#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_SRC_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH) #define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0) #define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) @@ -327,7 +328,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode) #define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) #define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (SCH_NETWORK_ERR(_code) && ((_len) > 0)) -#define SCH_NEED_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH) +#define SCH_NEED_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH) #define SCH_NEED_REDIRECT(_msgType, _code, _rspLen) (SCH_NEED_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_SUB_TASK_NETWORK_ERR(_code, _rspLen))) #define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_NEED_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index bf85d09e00..e1035c4fca 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -44,6 +44,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy // SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); return TSDB_CODE_SUCCESS; case TDMT_SCH_FETCH_RSP: + case TDMT_SCH_MERGE_FETCH_RSP: if (lastMsgType != reqMsgType && -1 != lastMsgType) { SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType)); @@ -304,7 +305,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa } break; } - case TDMT_SCH_FETCH_RSP: { + case TDMT_SCH_FETCH_RSP: + case TDMT_SCH_MERGE_FETCH_RSP: { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; SCH_ERR_JRET(rspCode); @@ -558,6 +560,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { case TDMT_VND_DELETE: case TDMT_SCH_EXPLAIN: case TDMT_SCH_FETCH: + case TDMT_SCH_MERGE_FETCH: *fp = schHandleCallback; break; case TDMT_SCH_DROP_TASK: @@ -1016,7 +1019,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, persistHandle = true; break; } - case TDMT_SCH_FETCH: { + case TDMT_SCH_FETCH: + case TDMT_SCH_MERGE_FETCH: { msgSize = sizeof(SResFetchReq); msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index e60006d75c..45cb0ab935 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -258,7 +258,9 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { .taskId = pTask->taskId, .schedId = schMgmt.sId, .execId = pTask->execId, - .addr = pTask->succeedAddr}; + .addr = pTask->succeedAddr, + .fetchMsgType = SCH_FETCH_TYPE(pTask), + }; qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source); SCH_UNLOCK(SCH_WRITE, &parent->lock); @@ -818,7 +820,7 @@ int32_t schLaunchFetchTask(SSchJob *pJob) { return TSDB_CODE_SUCCESS; } - SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_SCH_FETCH)); + SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask))); return TSDB_CODE_SUCCESS; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index e207d89212..cc6057b031 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -648,8 +648,6 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) { } int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { - int32_t ret = 0; - SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { taosReleaseRef(tsNodeRefId, rid); @@ -657,8 +655,8 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { return -1; } ASSERT(rid == pSyncNode->rid); - ret = syncNodePropose(pSyncNode, pMsg, isWeak); + int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak); taosReleaseRef(tsNodeRefId, pSyncNode->rid); return ret; } @@ -669,15 +667,14 @@ int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_ return -1; } - int32_t ret = 0; SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; } ASSERT(rid == pSyncNode->rid); - ret = syncNodeProposeBatch(pSyncNode, pMsgArr, pIsWeakArr, arrSize); + int32_t ret = syncNodeProposeBatch(pSyncNode, pMsgArr, pIsWeakArr, arrSize); taosReleaseRef(tsNodeRefId, pSyncNode->rid); return ret; } diff --git a/tests/script/tsim/stream/state0.sim b/tests/script/tsim/stream/state0.sim index 2f2038b914..f98e356540 100644 --- a/tests/script/tsim/stream/state0.sim +++ b/tests/script/tsim/stream/state0.sim @@ -449,4 +449,53 @@ if $data26 != 14 then return -1 endi +sql create database test1 vgroups 1 +sql show databases + +print $data00 $data01 $data02 + +sql use test1 + +sql create table t1(ts timestamp, a int, b int , c int, d double, id int); +sql create stream streams2 trigger at_once into streamt1 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a); + +sql insert into t1 values(1648791212000,2,2,3,1.0,1); +sql insert into t1 values(1648791213000,1,2,3,1.0,1); +sql insert into t1 values(1648791213000,1,2,4,1.0,2); +$loop_count = 0 +loop5: + +sleep 300 +sql select * from streamt1 order by c desc; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows + goto loop5 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop5 +endi + +if $data05 != 4 then + print =====data05=$data05 + goto loop5 +endi + +if $data11 != 1 then + print =====data11=$data11 + goto loop5 +endi + +if $data15 != 3 then + print =====data15=$data15 + goto loop5 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT