From 6c7072286d4f82dd3e7353af02992a9a9df838f6 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 15 Jun 2022 11:06:06 +0800 Subject: [PATCH 1/4] refactor(sync): if eqmsg error, return --- source/libs/sync/src/syncMain.c | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 9db9550681..920a0245fd 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1743,16 +1743,26 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg); if (pSyncNode->FpEqMsg != NULL) { - pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); + int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); + if (code != 0) { + sError("vgId:%d sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); + rpcFreeCont(rpcMsg.pCont); + syncTimeoutDestroy(pSyncMsg); + return; + } } else { - sTrace("syncNodeEqElectTimer pSyncNode->FpEqMsg is NULL"); + sTrace("syncNodeEqElectTimer FpEqMsg is NULL"); } syncTimeoutDestroy(pSyncMsg); // reset timer ms - pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); - taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, - &pSyncNode->pElectTimer); + if (gSyncEnv != NULL) { + pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); + taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pElectTimer); + } else { + sError("sync env elect is already stop"); + } } else { sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%" PRIu64 ", electTimerLogicClockUser:%" PRIu64 "", pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); @@ -1774,19 +1784,19 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { if (code != 0) { sError("vgId:%d sync enqueue timer msg error, code:%d", pSyncNode->vgId, code); rpcFreeCont(rpcMsg.pCont); + syncTimeoutDestroy(pSyncMsg); return; } - } else { - sTrace("syncNodeEqHeartbeatTimer pSyncNode->FpEqMsg is NULL"); + sError("syncNodeEqHeartbeatTimer FpEqMsg is NULL"); } syncTimeoutDestroy(pSyncMsg); if (gSyncEnv != NULL) { - taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, - &pSyncNode->pHeartbeatTimer); + taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pHeartbeatTimer); } else { - sError("sync env is already stop"); + sError("sync env heartbeat is already stop"); } } else { sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64 From c9f6c2041091204289163212a170487d12ac41fa Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 15 Jun 2022 11:42:04 +0800 Subject: [PATCH 2/4] refactor(sync): if eqmsg error, return --- source/libs/sync/src/syncMain.c | 57 +++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 920a0245fd..7b1fd010ab 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -945,9 +945,13 @@ int32_t syncNodePingAll(SSyncNode* pSyncNode) { // timer control -------------- int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { int32_t ret = 0; - taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, - &pSyncNode->pPingTimer); - atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); + if (syncEnvIsStart()) { + taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pPingTimer); + atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); + } else { + sError("sync env is stop, syncNodeStartPingTimer"); + } return ret; } @@ -961,10 +965,14 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { int32_t ret = 0; - pSyncNode->electTimerMS = ms; - taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, - &pSyncNode->pElectTimer); - atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); + if (syncEnvIsStart()) { + pSyncNode->electTimerMS = ms; + taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pElectTimer); + atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); + } else { + sError("sync env is stop, syncNodeStartElectTimer"); + } return ret; } @@ -998,9 +1006,13 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t ret = 0; - taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, - &pSyncNode->pHeartbeatTimer); - atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); + if (syncEnvIsStart()) { + taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pHeartbeatTimer); + atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); + } else { + sError("sync env is stop, syncNodeStartHeartbeatTimer"); + } return ret; } @@ -1720,14 +1732,25 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg); if (pSyncNode->FpEqMsg != NULL) { - pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); + int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); + if (code != 0) { + sError("vgId:%d sync enqueue ping msg error, code:%d", pSyncNode->vgId, code); + rpcFreeCont(rpcMsg.pCont); + syncTimeoutDestroy(pSyncMsg); + return; + } } else { sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL"); } syncTimeoutDestroy(pSyncMsg); - taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, - &pSyncNode->pPingTimer); + if (syncEnvIsStart()) { + taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pPingTimer); + } else { + sError("sync env is stop, syncNodeEqPingTimer"); + } + } else { sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRIu64 ", pingTimerLogicClockUser:%" PRIu64 "", pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); @@ -1756,12 +1779,12 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { syncTimeoutDestroy(pSyncMsg); // reset timer ms - if (gSyncEnv != NULL) { + if (syncEnvIsStart()) { pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pElectTimer); } else { - sError("sync env elect is already stop"); + sError("sync env is stop, syncNodeEqElectTimer"); } } else { sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%" PRIu64 ", electTimerLogicClockUser:%" PRIu64 "", @@ -1792,11 +1815,11 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { } syncTimeoutDestroy(pSyncMsg); - if (gSyncEnv != NULL) { + if (syncEnvIsStart()) { taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pHeartbeatTimer); } else { - sError("sync env heartbeat is already stop"); + sError("sync env is stop, syncNodeEqHeartbeatTimer"); } } else { sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64 From 4b18bd718eaa4767579aec660c902d2751632542 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 15 Jun 2022 16:14:17 +0800 Subject: [PATCH 3/4] refactor(sync): adjust errno --- include/libs/sync/sync.h | 10 +---- include/util/taoserror.h | 2 + source/dnode/mnode/impl/src/mndMain.c | 15 ++++--- source/dnode/mnode/impl/src/mndSync.c | 18 ++++---- source/dnode/vnode/src/vnd/vnodeSvr.c | 9 ++-- source/dnode/vnode/src/vnd/vnodeSync.c | 27 ++++-------- source/libs/sync/src/syncMain.c | 44 ++++++++++++------- .../test/syncConfigChangeSnapshotTest.cpp | 2 +- .../libs/sync/test/syncConfigChangeTest.cpp | 2 +- source/libs/sync/test/syncReplicateTest.cpp | 2 +- source/libs/sync/test/syncTestTool.cpp | 2 +- source/util/src/terror.c | 2 + 12 files changed, 71 insertions(+), 64 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index a369b81d26..2d21b3531a 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -24,7 +24,7 @@ extern "C" { #include "tdef.h" #include "tmsgcb.h" -#define SYNC_INDEX_BEGIN 0 +#define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_INVALID -1 typedef uint64_t SyncNodeId; @@ -44,14 +44,6 @@ typedef enum { TAOS_SYNC_STATE_ERROR = 103, } ESyncState; -typedef enum { - TAOS_SYNC_PROPOSE_SUCCESS = 0, - TAOS_SYNC_PROPOSE_NOT_LEADER = 1, - TAOS_SYNC_ONLY_ONE_REPLICA = 2, - TAOS_SYNC_NOT_IN_NEW_CONFIG = 3, - TAOS_SYNC_OTHER_ERROR = 100, -} ESyncProposeCode; - typedef enum { TAOS_SYNC_FSM_CB_SUCCESS = 0, TAOS_SYNC_FSM_CB_OTHER_ERROR = 1, diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 30cbbbeb11..6fc84e023d 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -411,6 +411,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A) #define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x0910) +#define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x0911) +#define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0912) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) // tq diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 59599ee134..00118eac1e 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -380,17 +380,19 @@ void mndStop(SMnode *pMnode) { int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SSyncMgmt *pMgmt = &pMnode->syncMgmt; - int32_t code = TAOS_SYNC_OTHER_ERROR; + int32_t code = 0; if (!syncEnvIsStart()) { mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType)); - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync); if (pSyncNode == NULL) { mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType)); - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } char logBuf[512] = {0}; @@ -451,7 +453,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { tmsgSendRsp(&rsp); } else { mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - code = TAOS_SYNC_OTHER_ERROR; + code = -1; } } else { if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { @@ -492,10 +494,13 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { tmsgSendRsp(&rsp); } else { mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - code = TAOS_SYNC_OTHER_ERROR; + code = -1; } } + if (code != 0) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + } return code; } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 9940037356..63708aef8a 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -234,9 +234,9 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak); if (code == 0) { tsem_wait(&pMgmt->syncSem); - } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { + } else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { terrno = TSDB_CODE_APP_NOT_READY; - } else if (code == TAOS_SYNC_OTHER_ERROR) { + } else if (code == -1 && terrno == TSDB_CODE_SYN_INTERNAL_ERROR) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; } else { terrno = TSDB_CODE_APP_ERROR; @@ -257,13 +257,13 @@ void mndSyncStart(SMnode *pMnode) { syncStart(pMgmt->sync); mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby); -/* - if (pMgmt->standby) { - syncStartStandBy(pMgmt->sync); - } else { - syncStart(pMgmt->sync); - } -*/ + /* + if (pMgmt->standby) { + syncStartStandBy(pMgmt->sync); + } else { + syncStart(pMgmt->sync); + } + */ } void mndSyncStop(SMnode *pMnode) {} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index cd64bc8a9c..34ff9ffa0f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -296,7 +296,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) { } int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - int32_t ret = TAOS_SYNC_OTHER_ERROR; + int32_t ret = 0; if (syncEnvIsStart()) { SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); @@ -381,15 +381,18 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { tmsgSendRsp(&rsp); } else { vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType); - ret = TAOS_SYNC_OTHER_ERROR; + ret = -1; } syncNodeRelease(pSyncNode); } else { vError("==vnodeProcessSyncReq== error syncEnv stop"); - ret = TAOS_SYNC_OTHER_ERROR; + ret = -1; } + if (ret != 0) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + } return ret; } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 17c2c186be..975dff9fb6 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -98,7 +98,8 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { if (code == 0) { vnodeAccumBlockMsg(pVnode, pMsg->msgType); - } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { + + } else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { SEpSet newEpSet = {0}; syncGetEpSet(pVnode->sync, &newEpSet); SEp *pEp = &newEpSet.eps[newEpSet.inUse]; @@ -247,29 +248,17 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } -int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { - return 0; -} +int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { return 0; } -int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { - return 0; -} +int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; } -int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { - return 0; -} +int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; } -int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { - return 0; -} +int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { return 0; } -int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { - return 0; -} +int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { return 0; } -int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { - return 0; -} +int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { return 0; } static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7b1fd010ab..953294e489 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -149,12 +149,14 @@ void syncStop(int64_t rid) { int32_t syncSetStandby(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } // state change @@ -177,7 +179,8 @@ int32_t syncSetStandby(int64_t rid) { int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } ASSERT(rid == pSyncNode->rid); @@ -201,7 +204,8 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg if (!IamInNew) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return TAOS_SYNC_NOT_IN_NEW_CONFIG; + terrno = TSDB_CODE_SYN_NOT_IN_NEW_CONFIG; + return -1; } char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg); @@ -219,7 +223,8 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } ASSERT(rid == pSyncNode->rid); @@ -246,7 +251,8 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { if (!IamInNew) { sError("sync reconfig error, not in new config"); taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return TAOS_SYNC_NOT_IN_NEW_CONFIG; + terrno = TSDB_CODE_SYN_NOT_IN_NEW_CONFIG; + return -1; } char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg); @@ -272,13 +278,15 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { int32_t syncLeaderTransfer(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } ASSERT(rid == pSyncNode->rid); if (pSyncNode->peersNum == 0) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0]; @@ -291,7 +299,8 @@ int32_t syncLeaderTransfer(int64_t rid) { int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } ASSERT(rid == pSyncNode->rid); int32_t ret = 0; @@ -299,7 +308,8 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { if (pSyncNode->replicaNum == 1) { sError("only one replica, cannot drop leader"); taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return TAOS_SYNC_ONLY_ONE_REPLICA; + terrno = TSDB_CODE_SYN_ONE_REPLICA; + return -1; } SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId); @@ -538,11 +548,12 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) { } int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { - int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS; + int32_t ret = 0; SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } assert(rid == pSyncNode->rid); sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType)); @@ -553,7 +564,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { } int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) { - int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS; + int32_t ret = 0; sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType)); if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { @@ -567,14 +578,17 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) { - ret = TAOS_SYNC_PROPOSE_SUCCESS; + ret = 0; } else { + ret = -1; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; sError("syncPropose pSyncNode->FpEqMsg is NULL"); } syncClientRequestDestroy(pSyncMsg); } else { + ret = -1; + terrno = TSDB_CODE_SYN_NOT_LEADER; sError("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state)); - ret = TAOS_SYNC_PROPOSE_NOT_LEADER; } return ret; diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 10b54d0aa4..3ee2b458d6 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -338,7 +338,7 @@ int main(int argc, char** argv) { if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); int32_t ret = syncPropose(rid, pRpcMsg, false); - if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { + if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { sTrace("%s value%d write not leader", s, alreadySend); } else { assert(ret == 0); diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index 1e64a8a6f7..b1dc53d804 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -251,7 +251,7 @@ int main(int argc, char** argv) { if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); int32_t ret = syncPropose(rid, pRpcMsg, false); - if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { + if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { sTrace("%s value%d write not leader", s, alreadySend); } else { assert(ret == 0); diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index bf9e34fffb..d955e64f81 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -188,7 +188,7 @@ int main(int argc, char** argv) { if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); int32_t ret = syncPropose(rid, pRpcMsg, false); - if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { + if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { sTrace("%s value%d write not leader", s, alreadySend); } else { assert(ret == 0); diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index ebcd7368cc..21049454f4 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -391,7 +391,7 @@ int main(int argc, char** argv) { if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); int32_t ret = syncPropose(rid, pRpcMsg, false); - if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { + if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { sTrace("%s value%d write not leader, leaderTransferWait:%d", simpleStr, alreadySend, leaderTransferWait); } else { assert(ret == 0); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8e6284d2cb..621f947b64 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -413,6 +413,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, "Invalid msg length") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync not leader") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_ONE_REPLICA, "Sync one replica") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_IN_NEW_CONFIG, "Sync not in new config") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error") // wal From 87bcbe003ee84d5d3dd38fe685bbc966a009f46b Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 15 Jun 2022 16:40:45 +0800 Subject: [PATCH 4/4] feat(stream): stream state&session support partition by --- source/libs/executor/inc/executorimpl.h | 8 +- source/libs/executor/src/executorimpl.c | 7 +- source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/timewindowoperator.c | 178 ++++++++++-------- 4 files changed, 108 insertions(+), 87 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index c4d16c89e1..18d35ecc56 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -365,7 +365,9 @@ typedef struct SCatchSupporter { } SCatchSupporter; typedef struct SStreamAggSupporter { - SArray* pResultRows; + SHashObj* pResultRows; + SArray* pCurWins; + int32_t valueSize; int32_t keySize; char* pKeyBuf; // window key buffer SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file @@ -899,9 +901,9 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order); int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, - SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t size); + SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t size); SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize); -SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap, int32_t* pIndex); +SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, int64_t gap, int32_t* pIndex); int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted); bool functionNeedToExecute(SqlFunctionCtx* pCtx); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 99a9dcb6af..53477eb56f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4814,6 +4814,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo .calTrigger = pIntervalPhyNode->window.triggerType, .maxTs = INT64_MIN, }; + ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY); int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type); @@ -5498,14 +5499,16 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo } int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput, - size_t size) { + int32_t size) { pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pSup->keySize = sizeof(int64_t) + sizeof(TSKEY); pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize); - pSup->pResultRows = taosArrayInit(1024, size); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pSup->pResultRows = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); if (pSup->pKeyBuf == NULL || pSup->pResultRows == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + pSup->valueSize = size; pSup->pScanWindow = taosArrayInit(4, sizeof(STimeWindow)); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e44e05224f..571e9ae120 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -716,7 +716,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { int64_t gap = pInfo->sessionSup.gap; int32_t winIndex = 0; SResultWindowInfo* pCurWin = - getSessionTimeWindow(pAggSup->pResultRows, tsCols[pInfo->updateResIndex], gap, &winIndex); + getSessionTimeWindow(pAggSup, tsCols[pInfo->updateResIndex], pSDB->info.groupId, gap, &winIndex); win = pCurWin->win; pInfo->updateResIndex += updateSessionWindowInfo(pCurWin, tsCols, pSDB->info.rows, pInfo->updateResIndex, gap, NULL); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 5a18649cab..738c5b2b26 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1320,6 +1320,15 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { break; } + if (pBlock->info.type == STREAM_REPROCESS) { + doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, 0, pOperator->numOfExprs, pBlock, NULL); + qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); + continue; + } else if (pBlock->info.type == STREAM_GET_ALL) { + getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated); + continue; + } + // The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the // caller. Note that all the time window are not close till now. // the pDataBlock are always the same one, no need to call this again @@ -1328,16 +1337,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type); } - if (pBlock->info.type == STREAM_REPROCESS) { - doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, 0, pOperator->numOfExprs, pBlock, NULL); - qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); - continue; - } else if (pBlock->info.type == STREAM_GET_ALL && - pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) { - getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated); - continue; - } - pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated); } @@ -2038,7 +2037,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { break; } - setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true); if (pBlock->info.type == STREAM_REPROCESS) { SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow)); doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, pInfo->primaryTsIndex, pOperator->numOfExprs, @@ -2058,12 +2056,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex); taosArrayDestroy(pUpWins); break; - } else if (pBlock->info.type == STREAM_GET_ALL && isFinalInterval(pInfo) && - pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) { + } else if (pBlock->info.type == STREAM_GET_ALL && isFinalInterval(pInfo)) { getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated); continue; } + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true); if (isFinalInterval(pInfo)) { int32_t chIndex = getChildIndex(pBlock); int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -2125,6 +2123,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, .calTrigger = pIntervalPhyNode->window.triggerType, .maxTs = INT64_MIN, }; + ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY); pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(pOperator, 4096); @@ -2190,8 +2189,13 @@ _error: } void destroyStreamAggSupporter(SStreamAggSupporter* pSup) { - taosArrayDestroy(pSup->pResultRows); taosMemoryFreeClear(pSup->pKeyBuf); + void **pIte = NULL; + while ((pIte = taosHashIterate(pSup->pResultRows, pIte)) != NULL) { + SArray *pWins = (SArray *) (*pIte); + taosArrayDestroy(pWins); + } + taosHashCleanup(pSup->pResultRows); destroyDiskbasedBuf(pSup->pResultBuf); } @@ -2333,7 +2337,22 @@ static SResultWindowInfo* addNewSessionWindow(SArray* pWinInfos, TSKEY ts) { return taosArrayPush(pWinInfos, &win); } -SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap, int32_t* pIndex) { +SArray* getWinInfos(SStreamAggSupporter* pAggSup, uint64_t groupId) { + void** ite = taosHashGet(pAggSup->pResultRows, &groupId, sizeof(uint64_t)); + SArray* pWinInfos = NULL; + if (ite == NULL) { + pWinInfos = taosArrayInit(1024, pAggSup->valueSize); + taosHashPut(pAggSup->pResultRows, &groupId, sizeof(uint64_t), &pWinInfos, sizeof(void *)); + } else { + pWinInfos = *ite; + } + return pWinInfos; +} + +SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, int64_t gap, int32_t* pIndex) { + SArray* pWinInfos = getWinInfos(pAggSup, groupId); + pAggSup->pCurWins = pWinInfos; + int32_t size = taosArrayGetSize(pWinInfos); if (size == 0) { *pIndex = 0; @@ -2389,7 +2408,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes SStreamAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) { assert(pWinInfo->win.skey <= pWinInfo->win.ekey); // too many time window in query - int32_t size = taosArrayGetSize(pAggSup->pResultRows); + int32_t size = taosArrayGetSize(pAggSup->pCurWins); if (size > MAX_INTERVAL_TIME_WINDOW) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); } @@ -2449,25 +2468,6 @@ static int32_t doOneStateWindowAgg(SStreamStateAggOperatorInfo* pInfo, SSDataBlo pSDataBlock, pCurWin, pResult, startIndex, winRows, numOutput, pTaskInfo); } -int32_t copyWinInfoToDataBlock(SSDataBlock* pBlock, SStreamAggSupporter* pAggSup, int32_t start, int32_t num, - int32_t numOfExprs, SOptrBasicInfo* pBinfo) { - for (int32_t i = start; i < num; i += 1) { - SResultWindowInfo* pWinInfo = taosArrayGet(pAggSup->pResultRows, start); - SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, pWinInfo->pos.pageId); - SResultRow* pRow = (SResultRow*)((char*)bufPage + pWinInfo->pos.offset); - for (int32_t j = 0; j < numOfExprs; ++j) { - SResultRowEntryInfo* pResultInfo = getResultCell(pRow, j, pBinfo->rowCellInfoOffset); - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, j); - char* in = GET_ROWCELL_INTERBUF(pBinfo->pCtx[j].resultInfo); - colDataAppend(pColInfoData, pBlock->info.rows, in, pResultInfo->isNullRes); - } - pBlock->info.rows += pRow->numOfRows; - releaseBufPage(pAggSup->pResultBuf, bufPage); - } - blockDataUpdateTsWindow(pBlock, -1); - return TSDB_CODE_SUCCESS; -} - int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap) { SResultWindowInfo* pCurWin = taosArrayGet(pWinInfos, startIndex); int32_t size = taosArrayGetSize(pWinInfos); @@ -2484,15 +2484,15 @@ int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap) void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, int32_t num, int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pStUpdated, SHashObj* pStDeleted) { - SResultWindowInfo* pCurWin = taosArrayGet(pInfo->streamAggSup.pResultRows, startIndex); + SResultWindowInfo* pCurWin = taosArrayGet(pInfo->streamAggSup.pCurWins, startIndex); SResultRow* pCurResult = NULL; setWindowOutputBuf(pCurWin, &pCurResult, pInfo->binfo.pCtx, groupId, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->streamAggSup, pTaskInfo); num += startIndex + 1; - ASSERT(num <= taosArrayGetSize(pInfo->streamAggSup.pResultRows)); + ASSERT(num <= taosArrayGetSize(pInfo->streamAggSup.pCurWins)); // Just look for the window behind StartIndex for (int32_t i = startIndex + 1; i < num; i++) { - SResultWindowInfo* pWinInfo = taosArrayGet(pInfo->streamAggSup.pResultRows, i); + SResultWindowInfo* pWinInfo = taosArrayGet(pInfo->streamAggSup.pCurWins, i); SResultRow* pWinResult = NULL; setWindowOutputBuf(pWinInfo, &pWinResult, pInfo->pDummyCtx, groupId, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->streamAggSup, pTaskInfo); @@ -2503,7 +2503,7 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &pWinInfo->win.skey, sizeof(TSKEY)); pWinInfo->isOutput = false; } - taosArrayRemove(pInfo->streamAggSup.pResultRows, i); + taosArrayRemove(pInfo->streamAggSup.pCurWins, i); } } @@ -2533,7 +2533,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; for (int32_t i = 0; i < pSDataBlock->info.rows;) { int32_t winIndex = 0; - SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup->pResultRows, tsCols[i], gap, &winIndex); + SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, tsCols[i], pSDataBlock->info.groupId, gap, &winIndex); winRows = updateSessionWindowInfo(pCurWin, tsCols, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted); code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pTaskInfo); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { @@ -2543,7 +2543,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData // doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, // forwardRows, // pInfo->order, false); - int32_t winNum = getNumCompactWindow(pAggSup->pResultRows, winIndex, gap); + int32_t winNum = getNumCompactWindow(pAggSup->pCurWins, winIndex, gap); if (winNum > 0) { compactTimeWindow(pInfo, winIndex, winNum, groupId, numOfOutput, pTaskInfo, pStUpdated, pStDeleted); } @@ -2566,7 +2566,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SOptrBasicInfo* int32_t step = 0; for (int32_t i = 0; i < pBlock->info.rows; i += step) { int32_t winIndex = 0; - SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup->pResultRows, tsCols[i], gap, &winIndex); + SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, tsCols[i], pBlock->info.groupId, gap, &winIndex); step = updateSessionWindowInfo(pCurWin, tsCols, pBlock->info.rows, i, gap, NULL); ASSERT(isInWindow(pCurWin, tsCols[i], gap)); doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pBinfo, numOfOutput); @@ -2627,7 +2627,7 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin for (int32_t j = 0; j < numOfChildren; j++) { SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j); SStreamSessionAggOperatorInfo* pChInfo = pChild->info; - SArray* pChWins = pChInfo->streamAggSup.pResultRows; + SArray* pChWins = getWinInfos(&pChInfo->streamAggSup, groupId); int32_t chWinSize = taosArrayGetSize(pChWins); int32_t index = binarySearch(pChWins, chWinSize, pParentWin->win.skey, TSDB_ORDER_DESC, getSessionWindowEndkey); for (int32_t k = index; k > 0 && k < chWinSize; k++) { @@ -2651,36 +2651,44 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*); SResultWindowInfo* getSessionWinInfo(void* pData) { return (SResultWindowInfo*)pData; } SResultWindowInfo* getStateWinInfo(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; } -int32_t closeSessionWindow(SArray* pWins, STimeWindowAggSupp* pTwSup, SArray* pClosed, +int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArray* pClosed, __get_win_info_ fn) { // Todo(liuyao) save window to tdb - int32_t size = taosArrayGetSize(pWins); - for (int32_t i = 0; i < size; i++) { - void* pWin = taosArrayGet(pWins, i); - SResultWindowInfo* pSeWin = fn(pWin); - if (pSeWin->win.ekey < pTwSup->maxTs - pTwSup->waterMark) { - if (!pSeWin->isClosed) { - pSeWin->isClosed = true; - if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { - int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed); - pSeWin->isOutput = true; + void **pIte = NULL; + while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { + SArray *pWins = (SArray *) (*pIte); + int32_t size = taosArrayGetSize(pWins); + for (int32_t i = 0; i < size; i++) { + void* pWin = taosArrayGet(pWins, i); + SResultWindowInfo* pSeWin = fn(pWin); + if (pSeWin->win.ekey < pTwSup->maxTs - pTwSup->waterMark) { + if (!pSeWin->isClosed) { + pSeWin->isClosed = true; + if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed); + pSeWin->isOutput = true; + } } + continue; } - continue; + break; } - break; } return TSDB_CODE_SUCCESS; } -int32_t getAllSessionWindow(SArray* pWins, SArray* pClosed, __get_win_info_ fn) { - int32_t size = taosArrayGetSize(pWins); - for (int32_t i = 0; i < size; i++) { - void* pWin = taosArrayGet(pWins, i); - SResultWindowInfo* pSeWin = fn(pWin); - if (!pSeWin->isClosed) { - int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed); - pSeWin->isOutput = true; +int32_t getAllSessionWindow(SHashObj* pHashMap, SArray* pClosed, __get_win_info_ fn) { + void **pIte = NULL; + while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { + SArray *pWins = (SArray *) (*pIte); + int32_t size = taosArrayGetSize(pWins); + for (int32_t i = 0; i < size; i++) { + void* pWin = taosArrayGet(pWins, i); + SResultWindowInfo* pSeWin = fn(pWin); + if (!pSeWin->isClosed) { + int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed); + pSeWin->isOutput = true; + } } } return TSDB_CODE_SUCCESS; @@ -2714,8 +2722,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { if (pBlock == NULL) { break; } - // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); + if (pBlock->info.type == STREAM_REPROCESS) { SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); doClearSessionWindows(&pInfo->streamAggSup, &pInfo->binfo, pBlock, 0, pOperator->numOfExprs, pInfo->gap, pWins); @@ -2729,12 +2736,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { } taosArrayDestroy(pWins); continue; - } else if (pBlock->info.type == STREAM_GET_ALL && - pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) { + } else if (pBlock->info.type == STREAM_GET_ALL) { getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getSessionWinInfo); continue; } + // the pDataBlock are always the same one, no need to call this again + setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); if (isFinalSession(pInfo)) { int32_t childIndex = 0; // Todo(liuyao) get child id from SSDataBlock SOptrBasicInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); @@ -2873,7 +2881,9 @@ bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) { return pKeyData && compareVal(pKeyData, &pWin->stateKey); } -SStateWindowInfo* getStateWindowByTs(SArray* pWinInfos, TSKEY ts, int32_t* pIndex) { +SStateWindowInfo* getStateWindowByTs(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, int32_t* pIndex) { + SArray* pWinInfos = getWinInfos(pAggSup, groupId); + pAggSup->pCurWins = pWinInfos; int32_t size = taosArrayGetSize(pWinInfos); int32_t index = binarySearch(pWinInfos, size, ts, TSDB_ORDER_DESC, getStateWinTsKey); SStateWindowInfo* pWin = NULL; @@ -2896,7 +2906,10 @@ SStateWindowInfo* getStateWindowByTs(SArray* pWinInfos, TSKEY ts, int32_t* pInde return NULL; } -SStateWindowInfo* getStateWindow(SArray* pWinInfos, TSKEY ts, char* pKeyData, SColumn* pCol, int32_t* pIndex) { +SStateWindowInfo* getStateWindow(SStreamAggSupporter* pAggSup, TSKEY ts, + uint64_t groupId, char* pKeyData, SColumn* pCol, int32_t* pIndex) { + SArray* pWinInfos = getWinInfos(pAggSup, groupId); + pAggSup->pCurWins = pWinInfos; int32_t size = taosArrayGetSize(pWinInfos); if (size == 0) { *pIndex = 0; @@ -2987,16 +3000,16 @@ static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc for (int32_t i = 0; i < pBlock->info.rows; i += step) { char* pKeyData = colDataGetData(pKeyColInfo, i); int32_t winIndex = 0; - SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup->pResultRows, tsCol[i], &winIndex); + SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup, tsCol[i], pBlock->info.groupId, &winIndex); if (!pCurWin) { continue; } - step = updateStateWindowInfo(pAggSup->pResultRows, winIndex, tsCol, pKeyColInfo, pBlock->info.rows, i, &allEqual, + step = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCol, pKeyColInfo, pBlock->info.rows, i, &allEqual, pSeDeleted); ASSERT(isTsInWindow(pCurWin, tsCol[i]) || isEqualStateKey(pCurWin, pKeyData)); taosArrayPush(pAggSup->pScanWindow, &pCurWin->winInfo.win); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); - deleteWindow(pAggSup->pResultRows, winIndex); + deleteWindow(pAggSup->pCurWins, winIndex); } } @@ -3026,13 +3039,15 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl char* pKeyData = colDataGetData(pKeyColInfo, i); int32_t winIndex = 0; bool allEqual = true; - SStateWindowInfo* pCurWin = getStateWindow(pAggSup->pResultRows, tsCols[i], pKeyData, &pInfo->stateCol, &winIndex); - winRows = updateStateWindowInfo(pAggSup->pResultRows, winIndex, tsCols, pKeyColInfo, pSDataBlock->info.rows, i, - &allEqual, pInfo->pSeDeleted); + SStateWindowInfo* pCurWin = + getStateWindow(pAggSup, tsCols[i], pSDataBlock->info.groupId, pKeyData, + &pInfo->stateCol, &winIndex); + winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, pKeyColInfo, + pSDataBlock->info.rows, i, &allEqual, pInfo->pSeDeleted); if (!allEqual) { taosArrayPush(pAggSup->pScanWindow, &pCurWin->winInfo.win); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); - deleteWindow(pAggSup->pResultRows, winIndex); + deleteWindow(pAggSup->pCurWins, winIndex); continue; } code = doOneStateWindowAgg(pInfo, pSDataBlock, &pCurWin->winInfo, &pResult, i, winRows, numOfOutput, pTaskInfo); @@ -3079,17 +3094,18 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { if (pBlock == NULL) { break; } - // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); + if (pBlock->info.type == STREAM_REPROCESS) { doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId, pSeUpdated, pInfo->pSeDeleted); continue; - } else if (pBlock->info.type == STREAM_GET_ALL && - pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) { + } else if (pBlock->info.type == STREAM_GET_ALL) { getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getStateWinInfo); continue; } + + // the pDataBlock are always the same one, no need to call this again + setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); doStreamStateAggImpl(pOperator, pBlock, pSeUpdated, pInfo->pSeDeleted); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); }