From 9dbddeed16ad722f56735f4f993dfe02f121b588 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 10 Jan 2024 10:03:38 +0800 Subject: [PATCH 01/25] opt msg on mnd --- source/common/src/tglobal.c | 137 +++++++++++++------------- source/dnode/mnode/impl/src/mndMain.c | 39 ++++---- 2 files changed, 86 insertions(+), 90 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 79d21955d4..76e1ab46b7 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -95,9 +95,9 @@ int32_t tsMonitorMaxLogs = 100; bool tsMonitorComp = false; // audit -bool tsEnableAudit = true; -bool tsEnableAuditCreateTable = true; -int32_t tsAuditInterval = 5000; +bool tsEnableAudit = true; +bool tsEnableAuditCreateTable = true; +int32_t tsAuditInterval = 5000; // telem #ifdef TD_ENTERPRISE @@ -252,7 +252,7 @@ int32_t tsCompactPullupInterval = 10; int32_t tsMqRebalanceInterval = 2; int32_t tsStreamCheckpointInterval = 60; float tsSinkDataRate = 2.0; -int32_t tsStreamNodeCheckInterval = 15; +int32_t tsStreamNodeCheckInterval = 16; int32_t tsTtlUnit = 86400; int32_t tsTtlPushIntervalSec = 10; int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups @@ -282,7 +282,7 @@ int32_t tsS3BlockCacheSize = 16; // number of blocks int32_t tsS3PageCacheSize = 4096; // number of pages int32_t tsS3UploadDelaySec = 60 * 60 * 24; -bool tsExperimental = true; +bool tsExperimental = true; #ifndef _STORAGE int32_t taosSetTfsCfg(SConfig *pCfg) { @@ -691,8 +691,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) - return -1; + if (cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, CFG_SCOPE_BOTH, CFG_DYN_ENT_SERVER) != 0) return -1; @@ -711,8 +710,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "compactPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != - 0) + if (cfgAddInt32(pCfg, "compactPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER, + CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) @@ -1465,40 +1464,38 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, char *name) { {"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag}, }; - static OptionNameAndVar options[] = { - {"audit", &tsEnableAudit}, - {"asynclog", &tsAsyncLog}, - {"disableStream", &tsDisableStream}, - {"enableWhiteList", &tsEnableWhiteList}, - {"telemetryReporting", &tsEnableTelem}, - {"monitor", &tsEnableMonitor}, + static OptionNameAndVar options[] = {{"audit", &tsEnableAudit}, + {"asynclog", &tsAsyncLog}, + {"disableStream", &tsDisableStream}, + {"enableWhiteList", &tsEnableWhiteList}, + {"telemetryReporting", &tsEnableTelem}, + {"monitor", &tsEnableMonitor}, - {"mndSdbWriteDelta", &tsMndSdbWriteDelta}, - {"minDiskFreeSize", &tsMinDiskFreeSize}, + {"mndSdbWriteDelta", &tsMndSdbWriteDelta}, + {"minDiskFreeSize", &tsMinDiskFreeSize}, - {"cacheLazyLoadThreshold", &tsCacheLazyLoadThreshold}, - {"checkpointInterval", &tsStreamCheckpointInterval}, - {"keepAliveIdle", &tsKeepAliveIdle}, - {"logKeepDays", &tsLogKeepDays}, - {"maxStreamBackendCache", &tsMaxStreamBackendCache}, - {"mqRebalanceInterval", &tsMqRebalanceInterval}, - {"numOfLogLines", &tsNumOfLogLines}, - {"queryRspPolicy", &tsQueryRspPolicy}, - {"timeseriesThreshold", &tsTimeSeriesThreshold}, - {"tmqMaxTopicNum", &tmqMaxTopicNum}, - {"transPullupInterval", &tsTransPullupInterval}, - {"compactPullupInterval", &tsCompactPullupInterval}, - {"trimVDbIntervalSec", &tsTrimVDbIntervalSec}, - {"ttlBatchDropNum", &tsTtlBatchDropNum}, - {"ttlFlushThreshold", &tsTtlFlushThreshold}, - {"ttlPushInterval", &tsTtlPushIntervalSec}, - //{"s3BlockSize", &tsS3BlockSize}, - {"s3BlockCacheSize", &tsS3BlockCacheSize}, - {"s3PageCacheSize", &tsS3PageCacheSize}, - {"s3UploadDelaySec", &tsS3UploadDelaySec}, - {"supportVnodes", &tsNumOfSupportVnodes}, - {"experimental", &tsExperimental} - }; + {"cacheLazyLoadThreshold", &tsCacheLazyLoadThreshold}, + {"checkpointInterval", &tsStreamCheckpointInterval}, + {"keepAliveIdle", &tsKeepAliveIdle}, + {"logKeepDays", &tsLogKeepDays}, + {"maxStreamBackendCache", &tsMaxStreamBackendCache}, + {"mqRebalanceInterval", &tsMqRebalanceInterval}, + {"numOfLogLines", &tsNumOfLogLines}, + {"queryRspPolicy", &tsQueryRspPolicy}, + {"timeseriesThreshold", &tsTimeSeriesThreshold}, + {"tmqMaxTopicNum", &tmqMaxTopicNum}, + {"transPullupInterval", &tsTransPullupInterval}, + {"compactPullupInterval", &tsCompactPullupInterval}, + {"trimVDbIntervalSec", &tsTrimVDbIntervalSec}, + {"ttlBatchDropNum", &tsTtlBatchDropNum}, + {"ttlFlushThreshold", &tsTtlFlushThreshold}, + {"ttlPushInterval", &tsTtlPushIntervalSec}, + //{"s3BlockSize", &tsS3BlockSize}, + {"s3BlockCacheSize", &tsS3BlockCacheSize}, + {"s3PageCacheSize", &tsS3PageCacheSize}, + {"s3UploadDelaySec", &tsS3UploadDelaySec}, + {"supportVnodes", &tsNumOfSupportVnodes}, + {"experimental", &tsExperimental}}; if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) { taosCfgSetOption(options, tListLen(options), pItem, false); @@ -1692,36 +1689,34 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) { {"cDebugFlag", &cDebugFlag}, {"dDebugFlag", &dDebugFlag}, {"fsDebugFlag", &fsDebugFlag}, {"idxDebugFlag", &idxDebugFlag}, {"jniDebugFlag", &jniDebugFlag}, {"qDebugFlag", &qDebugFlag}, {"rpcDebugFlag", &rpcDebugFlag}, {"smaDebugFlag", &smaDebugFlag}, {"tmrDebugFlag", &tmrDebugFlag}, - {"uDebugFlag", &uDebugFlag}, {"simDebugFlag", &simDebugFlag}, + {"uDebugFlag", &uDebugFlag}, {"simDebugFlag", &simDebugFlag}, }; - static OptionNameAndVar options[] = { - {"asyncLog", &tsAsyncLog}, - {"assert", &tsAssert}, - {"compressMsgSize", &tsCompressMsgSize}, - {"countAlwaysReturnValue", &tsCountAlwaysReturnValue}, - {"crashReporting", &tsEnableCrashReport}, - {"enableCoreFile", &tsAsyncLog}, - {"enableQueryHb", &tsEnableQueryHb}, - {"keepColumnName", &tsKeepColumnName}, - {"keepAliveIdle", &tsKeepAliveIdle}, - {"logKeepDays", &tsLogKeepDays}, - {"maxInsertBatchRows", &tsMaxInsertBatchRows}, - {"maxRetryWaitTime", &tsMaxRetryWaitTime}, - {"minSlidingTime", &tsMinSlidingTime}, - {"minIntervalTime", &tsMinIntervalTime}, - {"numOfLogLines", &tsNumOfLogLines}, - {"querySmaOptimize", &tsQuerySmaOptimize}, - {"queryPolicy", &tsQueryPolicy}, - {"queryPlannerTrace", &tsQueryPlannerTrace}, - {"queryNodeChunkSize", &tsQueryNodeChunkSize}, - {"queryUseNodeAllocator", &tsQueryUseNodeAllocator}, - {"smlDot2Underline", &tsSmlDot2Underline}, - {"shellActivityTimer", &tsShellActivityTimer}, - {"slowLogThreshold", &tsSlowLogThreshold}, - {"useAdapter", &tsUseAdapter}, - {"experimental", &tsExperimental} - }; + static OptionNameAndVar options[] = {{"asyncLog", &tsAsyncLog}, + {"assert", &tsAssert}, + {"compressMsgSize", &tsCompressMsgSize}, + {"countAlwaysReturnValue", &tsCountAlwaysReturnValue}, + {"crashReporting", &tsEnableCrashReport}, + {"enableCoreFile", &tsAsyncLog}, + {"enableQueryHb", &tsEnableQueryHb}, + {"keepColumnName", &tsKeepColumnName}, + {"keepAliveIdle", &tsKeepAliveIdle}, + {"logKeepDays", &tsLogKeepDays}, + {"maxInsertBatchRows", &tsMaxInsertBatchRows}, + {"maxRetryWaitTime", &tsMaxRetryWaitTime}, + {"minSlidingTime", &tsMinSlidingTime}, + {"minIntervalTime", &tsMinIntervalTime}, + {"numOfLogLines", &tsNumOfLogLines}, + {"querySmaOptimize", &tsQuerySmaOptimize}, + {"queryPolicy", &tsQueryPolicy}, + {"queryPlannerTrace", &tsQueryPlannerTrace}, + {"queryNodeChunkSize", &tsQueryNodeChunkSize}, + {"queryUseNodeAllocator", &tsQueryUseNodeAllocator}, + {"smlDot2Underline", &tsSmlDot2Underline}, + {"shellActivityTimer", &tsShellActivityTimer}, + {"slowLogThreshold", &tsSlowLogThreshold}, + {"useAdapter", &tsUseAdapter}, + {"experimental", &tsExperimental}}; if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) { taosCfgSetOption(options, tListLen(options), pItem, false); @@ -1763,7 +1758,7 @@ static void taosCheckAndSetDebugFlag(int32_t *pFlagPtr, char *name, int32_t flag void taosSetAllDebugFlag(int32_t flag) { if (flag <= 0) return; - SArray *noNeedToSetVars = NULL; + SArray * noNeedToSetVars = NULL; SConfigItem *pItem = cfgGetItem(tsCfg, "debugFlag"); if (pItem != NULL) { pItem->i32 = flag; @@ -1798,7 +1793,7 @@ void taosSetAllDebugFlag(int32_t flag) { taosArrayClear(noNeedToSetVars); // reset array uInfo("all debug flag are set to %d", flag); - if (terrno == TSDB_CODE_CFG_NOT_FOUND) terrno = TSDB_CODE_SUCCESS; // ignore not exist + if (terrno == TSDB_CODE_CFG_NOT_FOUND) terrno = TSDB_CODE_SUCCESS; // ignore not exist } int8_t taosGranted() { return atomic_load_8(&tsGrant); } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index c0fd93c65d..4bd0776ecd 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -107,7 +107,7 @@ static void *mndBuildCheckpointTickMsg(int32_t *pContLen, int64_t sec) { static void mndPullupTrans(SMnode *pMnode) { mTrace("pullup trans msg"); int32_t contLen = 0; - void *pReq = mndBuildTimerMsg(&contLen); + void * pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); @@ -117,7 +117,7 @@ static void mndPullupTrans(SMnode *pMnode) { static void mndPullupCompacts(SMnode *pMnode) { mTrace("pullup compact timer msg"); int32_t contLen = 0; - void *pReq = mndBuildTimerMsg(&contLen); + void * pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_COMPACT_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); @@ -127,7 +127,7 @@ static void mndPullupCompacts(SMnode *pMnode) { static void mndPullupTtl(SMnode *pMnode) { mTrace("pullup ttl"); int32_t contLen = 0; - void *pReq = mndBuildTimerMsg(&contLen); + void * pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } @@ -135,14 +135,14 @@ static void mndPullupTtl(SMnode *pMnode) { static void mndPullupTrimDb(SMnode *pMnode) { mTrace("pullup trim"); int32_t contLen = 0; - void *pReq = mndBuildTimerMsg(&contLen); + void * pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRIM_DB_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } static void mndCalMqRebalance(SMnode *pMnode) { int32_t contLen = 0; - void *pReq = mndBuildTimerMsg(&contLen); + void * pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); @@ -151,7 +151,7 @@ static void mndCalMqRebalance(SMnode *pMnode) { static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) { int32_t contLen = 0; - void *pReq = mndBuildCheckpointTickMsg(&contLen, sec); + void * pReq = mndBuildCheckpointTickMsg(&contLen, sec); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); @@ -160,7 +160,7 @@ static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) { static void mndStreamCheckpointRemain(SMnode *pMnode) { int32_t contLen = 0; - void *pReq = mndBuildCheckpointTickMsg(&contLen, 0); + void * pReq = mndBuildCheckpointTickMsg(&contLen, 0); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); @@ -169,7 +169,7 @@ static void mndStreamCheckpointRemain(SMnode *pMnode) { static void mndStreamCheckNode(SMnode *pMnode) { int32_t contLen = 0; - void *pReq = mndBuildTimerMsg(&contLen); + void * pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_NODECHECK_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); @@ -179,7 +179,7 @@ static void mndStreamCheckNode(SMnode *pMnode) { static void mndPullupTelem(SMnode *pMnode) { mTrace("pullup telem msg"); int32_t contLen = 0; - void *pReq = mndBuildTimerMsg(&contLen); + void * pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); @@ -189,7 +189,7 @@ static void mndPullupTelem(SMnode *pMnode) { static void mndPullupGrant(SMnode *pMnode) { mTrace("pullup grant msg"); int32_t contLen = 0; - void *pReq = mndBuildTimerMsg(&contLen); + void * pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = { .msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527}; @@ -200,7 +200,7 @@ static void mndPullupGrant(SMnode *pMnode) { static void mndIncreaseUpTime(SMnode *pMnode) { mTrace("increate uptime"); int32_t contLen = 0; - void *pReq = mndBuildTimerMsg(&contLen); + void * pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = { .msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528}; @@ -254,7 +254,7 @@ static void mndCheckDnodeOffline(SMnode *pMnode) { mTrace("check dnode offline"); if (mndAcquireRpc(pMnode) != 0) return; - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; int64_t curMs = taosGetTimestampMs(); void *pIter = NULL; @@ -299,14 +299,14 @@ static bool mnodeIsNotLeader(SMnode *pMnode) { } static int32_t minCronTime() { - int64_t min = INT64_MAX; + int32_t min = INT32_MAX; min = TMIN(min, tsTtlPushIntervalSec); min = TMIN(min, tsTrimVDbIntervalSec); min = TMIN(min, tsTransPullupInterval); min = TMIN(min, tsCompactPullupInterval); min = TMIN(min, tsMqRebalanceInterval); min = TMIN(min, tsStreamCheckpointInterval); - min = TMIN(min, 5); // checkpointRemain + min = TMIN(min, 6); // checkpointRemain min = TMIN(min, tsStreamNodeCheckInterval); int64_t telemInt = TMIN(60, (tsTelemInterval - 1)); @@ -386,7 +386,8 @@ static void *mndThreadFp(void *param) { int64_t minCron = minCronTime(); if (sec % minCron == 0 && mnodeIsNotLeader(pMnode)) { // not leader, do nothing - mTrace("timer not process since mnode is not leader, reason: %s", tstrerror(terrno)) terrno = 0; + mTrace("timer not process since mnode is not leader, reason: %s", tstrerror(terrno)); + terrno = 0; continue; } mndDoTimerPullupTask(pMnode, sec); @@ -700,7 +701,7 @@ void mndStop(SMnode *pMnode) { } int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { - SMnode *pMnode = pMsg->info.node; + SMnode * pMnode = pMsg->info.node; SSyncMgmt *pMgmt = &pMnode->syncMgmt; const STraceId *trace = &pMsg->info.traceId; @@ -802,7 +803,7 @@ _OVER: } int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { - SMnode *pMnode = pMsg->info.node; + SMnode * pMnode = pMsg->info.node; const STraceId *trace = &pMsg->info.traceId; MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; @@ -857,7 +858,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) { if (mndAcquireRpc(pMnode) != 0) return -1; - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; int64_t ms = taosGetTimestampMs(); pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc)); @@ -941,7 +942,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries; tstrncpy(desc.status, "unsynced", sizeof(desc.status)); for (int32_t i = 0; i < pVgroup->replica; ++i) { - SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; + SVnodeGid * pVgid = &pVgroup->vnodeGid[i]; SMonVnodeDesc *pVnDesc = &desc.vnodes[i]; pVnDesc->dnode_id = pVgid->dnodeId; tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->syncState), sizeof(pVnDesc->vnode_role)); From 2537adb72887dda2d0eb2bae6e015a2fcb75d6d3 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 10 Jan 2024 06:00:44 +0000 Subject: [PATCH 02/25] opt msg on mnode --- source/common/src/tglobal.c | 8 +++---- source/dnode/mnode/impl/src/mndMain.c | 32 +++++++++++++-------------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 76e1ab46b7..4b32fa676e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -108,7 +108,7 @@ bool tsEnableTelem = true; int32_t tsTelemInterval = 43200; char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.tdengine.com"; uint16_t tsTelemPort = 80; -char * tsTelemUri = "/report"; +char *tsTelemUri = "/report"; #ifdef TD_ENTERPRISE bool tsEnableCrashReport = false; @@ -1376,7 +1376,7 @@ void taosCleanupCfg() { typedef struct { const char *optionName; - void * optionVar; + void *optionVar; } OptionNameAndVar; static int32_t taosCfgSetOption(OptionNameAndVar *pOptions, int32_t optionSize, SConfigItem *pItem, bool isDebugflag) { @@ -1389,7 +1389,7 @@ static int32_t taosCfgSetOption(OptionNameAndVar *pOptions, int32_t optionSize, switch (pItem->dtype) { case CFG_DTYPE_BOOL: { int32_t flag = pItem->i32; - bool * pVar = pOptions[d].optionVar; + bool *pVar = pOptions[d].optionVar; uInfo("%s set from %d to %d", optName, *pVar, flag); *pVar = flag; terrno = TSDB_CODE_SUCCESS; @@ -1758,7 +1758,7 @@ static void taosCheckAndSetDebugFlag(int32_t *pFlagPtr, char *name, int32_t flag void taosSetAllDebugFlag(int32_t flag) { if (flag <= 0) return; - SArray * noNeedToSetVars = NULL; + SArray *noNeedToSetVars = NULL; SConfigItem *pItem = cfgGetItem(tsCfg, "debugFlag"); if (pItem != NULL) { pItem->i32 = flag; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 4bd0776ecd..aa54de9ae7 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -107,7 +107,7 @@ static void *mndBuildCheckpointTickMsg(int32_t *pContLen, int64_t sec) { static void mndPullupTrans(SMnode *pMnode) { mTrace("pullup trans msg"); int32_t contLen = 0; - void * pReq = mndBuildTimerMsg(&contLen); + void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); @@ -117,7 +117,7 @@ static void mndPullupTrans(SMnode *pMnode) { static void mndPullupCompacts(SMnode *pMnode) { mTrace("pullup compact timer msg"); int32_t contLen = 0; - void * pReq = mndBuildTimerMsg(&contLen); + void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_COMPACT_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); @@ -127,7 +127,7 @@ static void mndPullupCompacts(SMnode *pMnode) { static void mndPullupTtl(SMnode *pMnode) { mTrace("pullup ttl"); int32_t contLen = 0; - void * pReq = mndBuildTimerMsg(&contLen); + void *pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } @@ -135,14 +135,14 @@ static void mndPullupTtl(SMnode *pMnode) { static void mndPullupTrimDb(SMnode *pMnode) { mTrace("pullup trim"); int32_t contLen = 0; - void * pReq = mndBuildTimerMsg(&contLen); + void *pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRIM_DB_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } static void mndCalMqRebalance(SMnode *pMnode) { int32_t contLen = 0; - void * pReq = mndBuildTimerMsg(&contLen); + void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); @@ -151,7 +151,7 @@ static void mndCalMqRebalance(SMnode *pMnode) { static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) { int32_t contLen = 0; - void * pReq = mndBuildCheckpointTickMsg(&contLen, sec); + void *pReq = mndBuildCheckpointTickMsg(&contLen, sec); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); @@ -160,7 +160,7 @@ static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) { static void mndStreamCheckpointRemain(SMnode *pMnode) { int32_t contLen = 0; - void * pReq = mndBuildCheckpointTickMsg(&contLen, 0); + void *pReq = mndBuildCheckpointTickMsg(&contLen, 0); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); @@ -169,7 +169,7 @@ static void mndStreamCheckpointRemain(SMnode *pMnode) { static void mndStreamCheckNode(SMnode *pMnode) { int32_t contLen = 0; - void * pReq = mndBuildTimerMsg(&contLen); + void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_NODECHECK_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); @@ -179,7 +179,7 @@ static void mndStreamCheckNode(SMnode *pMnode) { static void mndPullupTelem(SMnode *pMnode) { mTrace("pullup telem msg"); int32_t contLen = 0; - void * pReq = mndBuildTimerMsg(&contLen); + void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); @@ -189,7 +189,7 @@ static void mndPullupTelem(SMnode *pMnode) { static void mndPullupGrant(SMnode *pMnode) { mTrace("pullup grant msg"); int32_t contLen = 0; - void * pReq = mndBuildTimerMsg(&contLen); + void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = { .msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527}; @@ -200,7 +200,7 @@ static void mndPullupGrant(SMnode *pMnode) { static void mndIncreaseUpTime(SMnode *pMnode) { mTrace("increate uptime"); int32_t contLen = 0; - void * pReq = mndBuildTimerMsg(&contLen); + void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = { .msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528}; @@ -254,7 +254,7 @@ static void mndCheckDnodeOffline(SMnode *pMnode) { mTrace("check dnode offline"); if (mndAcquireRpc(pMnode) != 0) return; - SSdb * pSdb = pMnode->pSdb; + SSdb *pSdb = pMnode->pSdb; int64_t curMs = taosGetTimestampMs(); void *pIter = NULL; @@ -701,7 +701,7 @@ void mndStop(SMnode *pMnode) { } int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { - SMnode * pMnode = pMsg->info.node; + SMnode *pMnode = pMsg->info.node; SSyncMgmt *pMgmt = &pMnode->syncMgmt; const STraceId *trace = &pMsg->info.traceId; @@ -803,7 +803,7 @@ _OVER: } int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { - SMnode * pMnode = pMsg->info.node; + SMnode *pMnode = pMsg->info.node; const STraceId *trace = &pMsg->info.traceId; MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; @@ -858,7 +858,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) { if (mndAcquireRpc(pMnode) != 0) return -1; - SSdb * pSdb = pMnode->pSdb; + SSdb *pSdb = pMnode->pSdb; int64_t ms = taosGetTimestampMs(); pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc)); @@ -942,7 +942,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries; tstrncpy(desc.status, "unsynced", sizeof(desc.status)); for (int32_t i = 0; i < pVgroup->replica; ++i) { - SVnodeGid * pVgid = &pVgroup->vnodeGid[i]; + SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SMonVnodeDesc *pVnDesc = &desc.vnodes[i]; pVnDesc->dnode_id = pVgid->dnodeId; tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->syncState), sizeof(pVnDesc->vnode_role)); From 3f0236ec171801f3ca09f55d0746611a6a50d139 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 10 Jan 2024 06:18:39 +0000 Subject: [PATCH 03/25] opt msg on mnode --- source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 29 +++++++++++---------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index b79b3f9e11..7ca19d7725 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -47,28 +47,29 @@ static void *dmStatusThreadFp(void *param) { } SDmNotifyHandle dmNotifyHdl = {.state = 0}; -static void *dmNotifyThreadFp(void *param) { - SDnodeMgmt *pMgmt = param; - setThreadName("dnode-notify"); - if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) { - return NULL; +static void *dmNotifyThreadFp(void *param) { + SDnodeMgmt *pMgmt = param; + setThreadName("dnode-notify"); + + if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) { + return NULL; } - bool wait = true; - while (1) { - if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; + bool wait = true; + while (1) { + if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; if (wait) tsem_wait(&dmNotifyHdl.sem); atomic_store_8(&dmNotifyHdl.state, 1); - dmSendNotifyReq(pMgmt); - if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) { - wait = true; - continue; + dmSendNotifyReq(pMgmt); + if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) { + wait = true; + continue; } - wait = false; + wait = false; } - return NULL; + return NULL; } static void *dmMonitorThreadFp(void *param) { From ba2ba0a8cbb292fd69b8011e45efe5d13c818f88 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 10 Jan 2024 16:19:20 +0800 Subject: [PATCH 04/25] fix: time window computation error --- source/common/src/ttime.c | 13 ++--- source/libs/executor/test/timewindowTest.cpp | 23 ++++++++ tests/parallel_test/cases.task | 1 + tests/system-test/2-query/count_interval.py | 60 ++++++++++++++++++++ 4 files changed, 90 insertions(+), 7 deletions(-) create mode 100644 tests/system-test/2-query/count_interval.py diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 023a425df2..376cfce255 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -872,18 +872,17 @@ int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval) { ASSERT(pInterval->offset >= 0); if (pInterval->offset > 0) { - start = taosTimeAdd(start, pInterval->offset, pInterval->offsetUnit, precision); - // try to move current window to the left-hande-side, due to the offset effect. int64_t end = taosTimeAdd(start, pInterval->interval, pInterval->intervalUnit, precision) - 1; - int64_t newe = end; + int64_t slidingEnd = end; while (newe >= ts) { - end = newe; - newe = taosTimeAdd(newe, -pInterval->sliding, pInterval->slidingUnit, precision); + end = slidingEnd; + slidingEnd = taosTimeAdd(slidingEnd, -pInterval->sliding, pInterval->slidingUnit, precision); + newe = taosTimeAdd(slidingEnd, pInterval->offset, pInterval->offsetUnit, precision); } - - start = taosTimeAdd(end, -pInterval->interval, pInterval->intervalUnit, precision) + 1; + int64_t slidingStart = taosTimeAdd(end, -pInterval->interval, pInterval->intervalUnit, precision) + 1; + start = taosTimeAdd(slidingStart, pInterval->offset, pInterval->offsetUnit, precision); } return start; diff --git a/source/libs/executor/test/timewindowTest.cpp b/source/libs/executor/test/timewindowTest.cpp index 2894c66587..7a83b64fac 100644 --- a/source/libs/executor/test/timewindowTest.cpp +++ b/source/libs/executor/test/timewindowTest.cpp @@ -158,4 +158,27 @@ TEST(testCase, timewindow_gen) { } +TEST(testCase, timewindow_natural) { + osSetTimezone("CST"); + + int32_t precision = TSDB_TIME_PRECISION_MILLI; + + SInterval interval2 = createInterval(17, 17, 13392000000, 'n', 'n', 0, precision); + int64_t key1 = 1633446027072; + STimeWindow w1 = {0}; + getInitialStartTimeWindow(&interval2, key1, &w1, true); + printTimeWindow(&w1, precision, key1); + STimeWindow w3 = getAlignQueryTimeWindow(&interval2, key1); + printf("%ld win %ld, %ld\n", key1, w3.skey, w3.ekey); + + int64_t key2 = 1648758398208; + STimeWindow w2 = {0}; + getInitialStartTimeWindow(&interval2, key2, &w2, true); + printTimeWindow(&w2, precision, key2); + STimeWindow w4 = getAlignQueryTimeWindow(&interval2, key2); + printf("%ld win %ld, %ld\n", key2, w3.skey, w3.ekey); + + ASSERT_EQ(w3.skey, w4.skey); + ASSERT_EQ(w3.ekey, w4.ekey); +} #pragma GCC diagnostic pop \ No newline at end of file diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 3db6eccffc..752234bbf5 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -36,6 +36,7 @@ #,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_interval.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/compact-col.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 2 diff --git a/tests/system-test/2-query/count_interval.py b/tests/system-test/2-query/count_interval.py new file mode 100644 index 0000000000..b37cc1db22 --- /dev/null +++ b/tests/system-test/2-query/count_interval.py @@ -0,0 +1,60 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import taos +from util.log import tdLog +from util.cases import tdCases +from util.sql import tdSql +from util.dnodes import tdDnodes +import random + + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + def restartTaosd(self, index=1, dbname="db"): + tdDnodes.stop(index) + tdDnodes.startWithoutSleep(index) + tdSql.execute(f"use d") + + def run(self): + tdSql.execute("drop database if exists d"); + tdSql.execute("create database d"); + tdSql.execute("use d"); + tdSql.execute("create table st(ts timestamp, f int) tags (t int)") + + for i in range(-2048, 2047): + ts = 1626624000000 + i; + tdSql.execute(f"insert into ct1 using st tags(1) values({ts}, {i})") + + tdSql.execute("flush database d") + for i in range(1638): + ts = 1648758398208 + i + tdSql.execute(f"insert into ct1 using st tags(1) values({ts}, {i})") + tdSql.execute("insert into ct1 using st tags(1) values(1648970742528, 1638)") + tdSql.execute("flush database d") + + tdSql.query("select count(ts) from ct1 interval(17n, 5n)") + self.restartTaosd() + tdSql.query("select count(ts) from ct1 interval(17n, 5n)") + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file From b5e5167b8fb845ffc0bd266e457c4abda533222e Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 10 Jan 2024 09:57:08 +0000 Subject: [PATCH 05/25] split sync/status channel --- include/common/tmsgcb.h | 4 + source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 2 +- source/dnode/mgmt/node_mgmt/inc/dmMgmt.h | 4 + source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 5 + source/dnode/mgmt/node_mgmt/src/dmTransport.c | 93 +++++++++++++++++-- source/dnode/mnode/impl/src/mndDump.c | 9 +- source/dnode/mnode/impl/test/trans/trans2.cpp | 1 + source/dnode/vnode/src/vnd/vnodeSync.c | 12 +-- source/libs/transport/src/tmsgcb.c | 8 ++ 9 files changed, 119 insertions(+), 19 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 311bffb7da..dd1e54318e 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -53,9 +53,12 @@ typedef struct { void* mgmt; void* clientRpc; void* serverRpc; + void* statusRpc; + void* syncRpc; PutToQueueFp putToQueueFp; GetQueueSizeFp qsizeFp; SendReqFp sendReqFp; + SendReqFp sendSyncReqFp; SendRspFp sendRspFp; RegisterBrokenLinkArgFp registerBrokenLinkArgFp; ReleaseHandleFp releaseHandleFp; @@ -67,6 +70,7 @@ void tmsgSetDefault(const SMsgCb* msgcb); int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg); int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype); int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg); +int32_t tmsgSendSyncReq(const SEpSet* epSet, SRpcMsg* pMsg); void tmsgSendRsp(SRpcMsg* pMsg); void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg); void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index a95ec42bd0..545a5fc870 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -161,7 +161,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { SEpSet epSet = {0}; int8_t epUpdated = 0; dmGetMnodeEpSet(pMgmt->pData, &epSet); - rpcSendRecvWithTimeout(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, 5000); + rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 10); if (rpcRsp.code != 0) { dmRotateMnodeEpSet(pMgmt->pData); char tbuf[512]; diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index c646bb4bdd..cb3395dcc2 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -48,6 +48,8 @@ typedef struct { typedef struct { void *serverRpc; void *clientRpc; + void *statusRpc; + void *syncRpc; SDnodeHandle msgHandles[TDMT_MAX]; } SDnodeTrans; @@ -136,8 +138,10 @@ int32_t dmInitServer(SDnode *pDnode); void dmCleanupServer(SDnode *pDnode); int32_t dmInitClient(SDnode *pDnode); int32_t dmInitStatusClient(SDnode *pDnode); +int32_t dmInitSyncClient(SDnode *pDnode); void dmCleanupClient(SDnode *pDnode); void dmCleanupStatusClient(SDnode *pDnode); +void dmCleanupSyncClient(SDnode *pDnode); SMsgCb dmGetMsgcb(SDnode *pDnode); #ifdef TD_MODULE_OPTIMIZE int32_t dmInitMsgHandle(SDnode *pDnode, SMgmtWrapper *wrappers); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index 84465640c0..63122171b1 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -93,6 +93,9 @@ int32_t dmInitDnode(SDnode *pDnode) { indexInit(tsNumOfCommitThreads); streamMetaInit(); + dmInitStatusClient(pDnode); + dmInitSyncClient(pDnode); + dmReportStartup("dnode-transport", "initialized"); dDebug("dnode is created, ptr:%p", pDnode); code = 0; @@ -112,7 +115,9 @@ void dmCleanupDnode(SDnode *pDnode) { dmCleanupClient(pDnode); dmCleanupStatusClient(pDnode); + dmCleanupSyncClient(pDnode); dmCleanupServer(pDnode); + dmClearVars(pDnode); rpcCleanup(); streamMetaCleanup(); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 1ea61f0e93..512f090ab0 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -322,6 +322,23 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { return 0; } } +static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { + SDnode *pDnode = dmInstance(); + if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + if (pDnode->status == DND_STAT_INIT) { + terrno = TSDB_CODE_APP_IS_STARTING; + } else { + terrno = TSDB_CODE_APP_IS_STOPPING; + } + dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), terrstr(), pMsg->info.handle); + return -1; + } else { + rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL); + return 0; + } +} static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { rpcRegisterBrokenLinkArg(pMsg); } @@ -421,16 +438,61 @@ int32_t dmInitStatusClient(SDnode *pDnode) { rpcInit.timeToGetConn = tsTimeToGetAvailableConn; taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); - // pTrans->statusClientRpc = rpcOpen(&rpcInit); - // if (pTrans->statusClientRpc == NULL) { - // dError("failed to init dnode rpc status client"); - // return -1; - // } + pTrans->statusRpc = rpcOpen(&rpcInit); + if (pTrans->statusRpc == NULL) { + dError("failed to init dnode rpc status client"); + return -1; + } dDebug("dnode rpc status client is initialized"); return 0; } +int32_t dmInitSyncClient(SDnode *pDnode) { + SDnodeTrans *pTrans = &pDnode->trans; + + SRpcInit rpcInit = {0}; + rpcInit.label = "DND-SYNC"; + rpcInit.numOfThreads = tsNumOfRpcThreads; + rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; + rpcInit.sessions = 1024; + rpcInit.connType = TAOS_CONN_CLIENT; + rpcInit.user = TSDB_DEFAULT_USER; + rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.parent = pDnode; + rpcInit.rfp = rpcRfp; + rpcInit.compressSize = tsCompressMsgSize; + + rpcInit.retryMinInterval = tsRedirectPeriod; + rpcInit.retryStepFactor = tsRedirectFactor; + rpcInit.retryMaxInterval = tsRedirectMaxPeriod; + rpcInit.retryMaxTimeout = tsMaxRetryWaitTime; + + rpcInit.failFastInterval = 5000; // interval threshold(ms) + rpcInit.failFastThreshold = 3; // failed threshold + rpcInit.ffp = dmFailFastFp; + + int32_t connLimitNum = 100; + connLimitNum = TMAX(connLimitNum, 10); + connLimitNum = TMIN(connLimitNum, 500); + + rpcInit.connLimitNum = connLimitNum; + rpcInit.connLimitLock = 1; + rpcInit.supportBatch = 1; + rpcInit.batchSize = 8 * 1024; + rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); + + pTrans->syncRpc = rpcOpen(&rpcInit); + if (pTrans->syncRpc == NULL) { + dError("failed to init dnode rpc sync client"); + return -1; + } + + dDebug("dnode rpc sync client is initialized"); + return 0; +} + void dmCleanupClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; if (pTrans->clientRpc) { @@ -441,11 +503,19 @@ void dmCleanupClient(SDnode *pDnode) { } void dmCleanupStatusClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; - // if (pTrans->statusClientRpc) { - // rpcClose(pTrans->statusClientRpc); - // pTrans->statusClientRpc = NULL; - // dDebug("dnode rpc status client is closed"); - // } + if (pTrans->statusRpc) { + rpcClose(pTrans->statusRpc); + pTrans->statusRpc = NULL; + dDebug("dnode rpc status client is closed"); + } +} +void dmCleanupSyncClient(SDnode *pDnode) { + SDnodeTrans *pTrans = &pDnode->trans; + if (pTrans->syncRpc) { + rpcClose(pTrans->syncRpc); + pTrans->syncRpc = NULL; + dDebug("dnode rpc sync client is closed"); + } } int32_t dmInitServer(SDnode *pDnode) { @@ -486,7 +556,10 @@ SMsgCb dmGetMsgcb(SDnode *pDnode) { SMsgCb msgCb = { .clientRpc = pDnode->trans.clientRpc, .serverRpc = pDnode->trans.serverRpc, + .statusRpc = pDnode->trans.statusRpc, + .syncRpc = pDnode->trans.syncRpc, .sendReqFp = dmSendReq, + .sendSyncReqFp = dmSendSyncReq, .sendRspFp = dmSendRsp, .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg, .releaseHandleFp = dmReleaseHandle, diff --git a/source/dnode/mnode/impl/src/mndDump.c b/source/dnode/mnode/impl/src/mndDump.c index 5efebbc16e..c68b11d184 100644 --- a/source/dnode/mnode/impl/src/mndDump.c +++ b/source/dnode/mnode/impl/src/mndDump.c @@ -32,6 +32,10 @@ int32_t sendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { terrno = TSDB_CODE_INVALID_PTR; return -1; } +int32_t sendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; +} char *i642str(int64_t val) { static char str[24] = {0}; @@ -568,6 +572,7 @@ void mndDumpSdb() { SMsgCb msgCb = {0}; msgCb.reportStartupFp = reportStartup; msgCb.sendReqFp = sendReq; + msgCb.sendSyncReqFp = sendSyncReq; msgCb.sendRspFp = sendRsp; msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack tmsgSetDefault(&msgCb); @@ -590,7 +595,7 @@ void mndDumpSdb() { dumpTopic(pSdb, json); dumpConsumer(pSdb, json); dumpSubscribe(pSdb, json); -// dumpOffset(pSdb, json); + // dumpOffset(pSdb, json); dumpStream(pSdb, json); dumpAcct(pSdb, json); dumpAuth(pSdb, json); @@ -605,7 +610,7 @@ void mndDumpSdb() { char *pCont = tjsonToString(json); int32_t contLen = strlen(pCont); char file[] = "sdb.json"; - TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC| TD_FILE_WRITE_THROUGH); + TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); mError("failed to write %s since %s", file, terrstr()); diff --git a/source/dnode/mnode/impl/test/trans/trans2.cpp b/source/dnode/mnode/impl/test/trans/trans2.cpp index 2d03631a37..4d0d53ced0 100644 --- a/source/dnode/mnode/impl/test/trans/trans2.cpp +++ b/source/dnode/mnode/impl/test/trans/trans2.cpp @@ -61,6 +61,7 @@ class MndTestTrans2 : public ::testing::Test { static SMsgCb msgCb = {0}; msgCb.reportStartupFp = reportStartup; msgCb.sendReqFp = sendReq; + msgCb.sendSyncReqFp = sendSyncReq; msgCb.sendRspFp = sendRsp; msgCb.queueFps[SYNC_QUEUE] = putToQueue; msgCb.queueFps[WRITE_QUEUE] = putToQueue; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 82baaa652b..09becffdaa 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -14,11 +14,11 @@ */ #define _DEFAULT_SOURCE -#include "tq.h" #include "sync.h" +#include "tq.h" +#include "tqCommon.h" #include "tsdb.h" #include "vnd.h" -#include "tqCommon.h" #define BATCH_ENABLE 0 @@ -411,7 +411,7 @@ static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { } static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { - int32_t code = tmsgSendReq(pEpSet, pMsg); + int32_t code = tmsgSendSyncReq(pEpSet, pMsg); if (code != 0) { rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; @@ -477,8 +477,8 @@ static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta } static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) { - SVnode *pVnode = pFsm->data; - int32_t code = vnodeSnapReaderOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapReader **)ppReader); + SVnode *pVnode = pFsm->data; + int32_t code = vnodeSnapReaderOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapReader **)ppReader); return code; } @@ -555,7 +555,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) walApplyVer(pVnode->pWal, commitIdx); pVnode->restored = true; - SStreamMeta* pMeta = pVnode->pTq->pStreamMeta; + SStreamMeta *pMeta = pVnode->pTq->pStreamMeta; streamMetaWLock(pMeta); if (pMeta->startInfo.tasksWillRestart) { diff --git a/source/libs/transport/src/tmsgcb.c b/source/libs/transport/src/tmsgcb.c index 1b1fa8cc1c..e44328c683 100644 --- a/source/libs/transport/src/tmsgcb.c +++ b/source/libs/transport/src/tmsgcb.c @@ -48,6 +48,14 @@ int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) { } return code; } +int32_t tmsgSendSyncReq(const SEpSet* epSet, SRpcMsg* pMsg) { + int32_t code = (*defaultMsgCb.sendSyncReqFp)(epSet, pMsg); + if (code != 0) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + } + return code; +} void tmsgSendRsp(SRpcMsg* pMsg) { #if 1 From b4b44009587f42a0eb61fc50c507e9d450fda76b Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 10 Jan 2024 21:36:09 +0800 Subject: [PATCH 06/25] fix: recalcuate window ekey --- source/libs/executor/src/executorInt.c | 8 +++++++- source/libs/executor/test/timewindowTest.cpp | 5 +++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 95d26fdd0e..cb4988c31f 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -449,7 +449,13 @@ STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key) { * if the realSkey > INT64_MAX - pInterval->interval, the query duration between * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges. */ - win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; + if (pInterval->offset > 0) { + int64_t slideStart = taosTimeAdd(win.skey, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision); + int64_t slideEnd = taosTimeAdd(slideStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; + win.ekey = taosTimeAdd(slideEnd, pInterval->offset, pInterval->offsetUnit, pInterval->precision); + } else { + win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; + } if (win.ekey < win.skey) { win.ekey = INT64_MAX; } diff --git a/source/libs/executor/test/timewindowTest.cpp b/source/libs/executor/test/timewindowTest.cpp index 7a83b64fac..3639bf15e1 100644 --- a/source/libs/executor/test/timewindowTest.cpp +++ b/source/libs/executor/test/timewindowTest.cpp @@ -164,6 +164,11 @@ TEST(testCase, timewindow_natural) { int32_t precision = TSDB_TIME_PRECISION_MILLI; SInterval interval2 = createInterval(17, 17, 13392000000, 'n', 'n', 0, precision); + int64_t key = 1648970865984; + STimeWindow w0 = getAlignQueryTimeWindow(&interval2, key); + printTimeWindow(&w0, precision, key); + ASSERT_GE(w0.ekey, key); + int64_t key1 = 1633446027072; STimeWindow w1 = {0}; getInitialStartTimeWindow(&interval2, key1, &w1, true); From ab137a56f0107734a766023bd91eebfabba20659 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 11 Jan 2024 08:31:16 +0800 Subject: [PATCH 07/25] fix: more interval end calculation with slide-start/slide-end as anchor --- include/common/ttime.h | 1 + source/common/src/ttime.c | 12 ++++++++++++ source/libs/executor/src/executil.c | 2 +- source/libs/executor/src/executorInt.c | 8 +------- source/libs/executor/src/timewindowoperator.c | 4 ++-- 5 files changed, 17 insertions(+), 10 deletions(-) diff --git a/include/common/ttime.h b/include/common/ttime.h index ed4d1a9290..4db25d2f71 100644 --- a/include/common/ttime.h +++ b/include/common/ttime.h @@ -75,6 +75,7 @@ static FORCE_INLINE int64_t taosGetTimestampToday(int32_t precision) { int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision); int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval); +int64_t taosTimeGetIntervalEnd(int64_t ts, const SInterval* pInterval); int32_t taosTimeCountIntervalForFill(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision, int32_t order); int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision); diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 376cfce255..f683baee7c 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -888,6 +888,18 @@ int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval) { return start; } +// used together with taosTimeTruncate. when offset is great than zero, slide-start/slide-end is the anchor point +int64_t taosTimeGetIntervalEnd(int64_t intervalStart, const SInterval* pInterval) { + if (pInterval->offset > 0) { + int64_t slideStart = taosTimeAdd(intervalStart, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision); + int64_t end = taosTimeAdd(slideStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; + int64_t result = taosTimeAdd(end, pInterval->offset, pInterval->offsetUnit, pInterval->precision); + return result; + } else { + int64_t result = taosTimeAdd(intervalStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; + return result; + } +} // internal function, when program is paused in debugger, // one can call this function from debugger to print a // timestamp as human readable string, for example (gdb): diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index b713b9b112..019483a015 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1838,7 +1838,7 @@ static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) { STimeWindow w = {0}; w.skey = taosTimeTruncate(ts, pInterval); - w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; + w.ekey = taosTimeGetIntervalEnd(w.skey, pInterval); return w; } diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index cb4988c31f..ff4d3d0d27 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -449,13 +449,7 @@ STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key) { * if the realSkey > INT64_MAX - pInterval->interval, the query duration between * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges. */ - if (pInterval->offset > 0) { - int64_t slideStart = taosTimeAdd(win.skey, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision); - int64_t slideEnd = taosTimeAdd(slideStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; - win.ekey = taosTimeAdd(slideEnd, pInterval->offset, pInterval->offsetUnit, pInterval->precision); - } else { - win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; - } + win.ekey = taosTimeGetIntervalEnd(win.skey, pInterval); if (win.ekey < win.skey) { win.ekey = INT64_MAX; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index afe1921d30..c459e4d8b9 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -450,7 +450,7 @@ int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBl TSKEY next = primaryKeys[startPos]; if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { pNext->skey = taosTimeTruncate(next, pInterval); - pNext->ekey = taosTimeAdd(pNext->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; + pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval); } else { pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; pNext->skey = pNext->ekey - pInterval->interval + 1; @@ -459,7 +459,7 @@ int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBl TSKEY next = primaryKeys[startPos]; if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { pNext->skey = taosTimeTruncate(next, pInterval); - pNext->ekey = taosTimeAdd(pNext->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; + pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval); } else { pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; pNext->ekey = pNext->skey + pInterval->interval - 1; From 686108050ba6bcde7946423ff1d6f9246dd9715f Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Wed, 10 Jan 2024 18:44:04 +0800 Subject: [PATCH 08/25] fix: table merge scan return disordered rows --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 15 ++++++++++++++- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 1 + 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index b57d2dfb45..256ef10c5e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -73,6 +73,7 @@ static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order static void resetTableListIndex(SReaderStatus* pStatus); static void getMemTableTimeRange(STsdbReader* pReader, int64_t* pMaxKey, int64_t* pMinKey); static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo); +static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader); static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } @@ -3095,6 +3096,17 @@ static ERetrieveType doReadDataFromSttFiles(STsdbReader* pReader) { return TSDB_READ_RETURN; } + if (pReader->status.bProcMemPreFileset) { + code = buildFromPreFilesetBuffer(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + if (pResBlock->info.rows > 0) { + pReader->status.processingMemPreFileSet = true; + return TSDB_READ_RETURN; + } + } + if (pBlockIter->numOfBlocks > 0) { // there are data blocks existed. return TSDB_READ_CONTINUE; } else { // all blocks in data file are checked, let's check the data in last files @@ -4342,6 +4354,7 @@ static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader) { } else { tsdbDebug("finished pre-fileset %d buffer processing. %s", fid, pReader->idStr); pStatus->bProcMemPreFileset = false; + pStatus->processingMemPreFileSet = false; if (pReader->notifyFn) { STsdReaderNotifyInfo info = {0}; info.duration.filesetId = fid; @@ -4374,7 +4387,7 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { pStatus->bProcMemFirstFileset, pReader->idStr); if (pStatus->bProcMemPreFileset) { if (pBlock->info.rows > 0) { - if (pReader->notifyFn) { + if (pReader->notifyFn && !pReader->status.processingMemPreFileSet) { int32_t fid = pReader->status.pCurrentFileset->fid; STsdReaderNotifyInfo info = {0}; info.duration.filesetId = fid; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 39e65f22b1..2157c7a423 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -219,6 +219,7 @@ typedef struct SReaderStatus { int64_t prevFilesetStartKey; int64_t prevFilesetEndKey; bool bProcMemFirstFileset; + bool processingMemPreFileSet; STableUidList procMemUidList; STableBlockScanInfo** pProcMemTableIter; } SReaderStatus; From df235b0adf1918842740496c0d54e6546907d413 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 11 Jan 2024 03:22:34 +0000 Subject: [PATCH 09/25] split sync/status channel --- source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 2 +- source/libs/transport/src/transCli.c | 1 + source/libs/transport/src/transComm.c | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 545a5fc870..d83ee541df 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -161,7 +161,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { SEpSet epSet = {0}; int8_t epUpdated = 0; dmGetMnodeEpSet(pMgmt->pData, &epSet); - rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 10); + rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 10 * 1000); if (rpcRsp.code != 0) { dmRotateMnodeEpSet(pMgmt->pData); char tbuf[512]; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 4b3000b47e..1ccf708703 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2701,6 +2701,7 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr ret = TSDB_CODE_TIMEOUT_ERROR; } else { memcpy(pRsp, pSyncMsg->pRsp, sizeof(STransMsg)); + pSyncMsg->pRsp->pCont = NULL; if (pSyncMsg->hasEpSet == 1) { epsetAssign(pEpSet, &pSyncMsg->epSet); *epUpdated = 1; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 759a4d79db..b1fb9a2450 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -667,7 +667,7 @@ void transDestroySyncMsg(void* msg) { STransSyncMsg* pSyncMsg = msg; tsem_destroy(pSyncMsg->pSem); taosMemoryFree(pSyncMsg->pSem); - + transFreeMsg(pSyncMsg->pRsp->pCont); taosMemoryFree(pSyncMsg->pRsp); taosMemoryFree(pSyncMsg); } From 44146caa9a73c947c69a215d640dbcd8bd671d09 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 11 Jan 2024 13:46:54 +0800 Subject: [PATCH 10/25] opt msg on mnd --- source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index d83ee541df..14853009e0 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -45,7 +45,7 @@ static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) { SRetrieveIpWhiteReq req = {.ipWhiteVer = oldVer}; int32_t contLen = tSerializeRetrieveIpWhite(NULL, 0, &req); - void *pHead = rpcMallocCont(contLen); + void * pHead = rpcMallocCont(contLen); tSerializeRetrieveIpWhite(pHead, contLen, &req); SRpcMsg rpcMsg = {.pCont = pHead, @@ -144,7 +144,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { req.ipWhiteVer = pMgmt->pData->ipWhiteVer; int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); - void *pHead = rpcMallocCont(contLen); + void * pHead = rpcMallocCont(contLen); tSerializeSStatusReq(pHead, contLen, &req); tFreeSStatusReq(&req); @@ -161,7 +161,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { SEpSet epSet = {0}; int8_t epUpdated = 0; dmGetMnodeEpSet(pMgmt->pData, &epSet); - rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 10 * 1000); + rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000); if (rpcRsp.code != 0) { dmRotateMnodeEpSet(pMgmt->pData); char tbuf[512]; @@ -189,7 +189,7 @@ void dmSendNotifyReq(SDnodeMgmt *pMgmt) { req.pVloads = vinfo.pVloads; int32_t contLen = tSerializeSNotifyReq(NULL, 0, &req); - void *pHead = rpcMallocCont(contLen); + void * pHead = rpcMallocCont(contLen); tSerializeSNotifyReq(pHead, contLen, &req); tFreeSNotifyReq(&req); @@ -284,7 +284,7 @@ int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { } SSDataBlock *dmBuildVariablesBlock(void) { - SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + SSDataBlock * pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); size_t size = 0; const SSysTableMeta *pMeta = NULL; getInfosDbMeta(&pMeta, &size); From 423fa762074105df4bc6d03d4e8a6e91646c160f Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 11 Jan 2024 06:01:06 +0000 Subject: [PATCH 11/25] split sync/status channel --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 512f090ab0..4f72ac24b4 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -364,7 +364,7 @@ int32_t dmInitClient(SDnode *pDnode) { SRpcInit rpcInit = {0}; rpcInit.label = "DND-C"; - rpcInit.numOfThreads = tsNumOfRpcThreads; + rpcInit.numOfThreads = tsNumOfRpcThreads / 2; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.sessions = 1024; rpcInit.connType = TAOS_CONN_CLIENT; @@ -383,7 +383,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.failFastThreshold = 3; // failed threshold rpcInit.ffp = dmFailFastFp; - int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3); + int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2; connLimitNum = TMAX(connLimitNum, 10); connLimitNum = TMIN(connLimitNum, 500); @@ -453,7 +453,7 @@ int32_t dmInitSyncClient(SDnode *pDnode) { SRpcInit rpcInit = {0}; rpcInit.label = "DND-SYNC"; - rpcInit.numOfThreads = tsNumOfRpcThreads; + rpcInit.numOfThreads = tsNumOfRpcThreads / 2; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.sessions = 1024; rpcInit.connType = TAOS_CONN_CLIENT; @@ -472,7 +472,7 @@ int32_t dmInitSyncClient(SDnode *pDnode) { rpcInit.failFastThreshold = 3; // failed threshold rpcInit.ffp = dmFailFastFp; - int32_t connLimitNum = 100; + int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2; connLimitNum = TMAX(connLimitNum, 10); connLimitNum = TMIN(connLimitNum, 500); From 7d46458bb70c38a5f5739e2ff6a5e510e8d4aa1f Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 11 Jan 2024 14:50:14 +0800 Subject: [PATCH 12/25] fix: use slide start to calculate previous slide start --- source/common/src/ttime.c | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index f683baee7c..bbda31efb1 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -873,16 +873,15 @@ int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval) { if (pInterval->offset > 0) { // try to move current window to the left-hande-side, due to the offset effect. - int64_t end = taosTimeAdd(start, pInterval->interval, pInterval->intervalUnit, precision) - 1; - int64_t newe = end; - int64_t slidingEnd = end; + int64_t newe = taosTimeAdd(start, pInterval->interval, pInterval->intervalUnit, precision) - 1; + int64_t slidingStart = start; while (newe >= ts) { - end = slidingEnd; - slidingEnd = taosTimeAdd(slidingEnd, -pInterval->sliding, pInterval->slidingUnit, precision); + start = slidingStart; + slidingStart = taosTimeAdd(slidingStart, -pInterval->sliding, pInterval->slidingUnit, precision); + int64_t slidingEnd = taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, precision) - 1; newe = taosTimeAdd(slidingEnd, pInterval->offset, pInterval->offsetUnit, precision); } - int64_t slidingStart = taosTimeAdd(end, -pInterval->interval, pInterval->intervalUnit, precision) + 1; - start = taosTimeAdd(slidingStart, pInterval->offset, pInterval->offsetUnit, precision); + start = taosTimeAdd(start, pInterval->offset, pInterval->offsetUnit, precision); } return start; @@ -892,8 +891,8 @@ int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval) { int64_t taosTimeGetIntervalEnd(int64_t intervalStart, const SInterval* pInterval) { if (pInterval->offset > 0) { int64_t slideStart = taosTimeAdd(intervalStart, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision); - int64_t end = taosTimeAdd(slideStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; - int64_t result = taosTimeAdd(end, pInterval->offset, pInterval->offsetUnit, pInterval->precision); + int64_t slideEnd = taosTimeAdd(slideStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; + int64_t result = taosTimeAdd(slideEnd, pInterval->offset, pInterval->offsetUnit, pInterval->precision); return result; } else { int64_t result = taosTimeAdd(intervalStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; From 5e1a7c694b790f2ece43112dd9127935e4d7079c Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 11 Jan 2024 07:23:46 +0000 Subject: [PATCH 13/25] split sync/status channel --- include/util/tdef.h | 34 +++++++++---------- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 6 ++-- source/libs/transport/src/transCli.c | 7 +++- 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/include/util/tdef.h b/include/util/tdef.h index 875a6f5738..aee20514ad 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -109,12 +109,12 @@ extern const int32_t TYPE_BYTES[21]; #define TSDB_INS_USER_STABLES_DBNAME_COLID 2 static const int64_t TICK_PER_SECOND[] = { - 1000LL, // MILLISECOND - 1000000LL, // MICROSECOND - 1000000000LL, // NANOSECOND - 0LL, // HOUR - 0LL, // MINUTE - 1LL // SECOND + 1000LL, // MILLISECOND + 1000000LL, // MICROSECOND + 1000000000LL, // NANOSECOND + 0LL, // HOUR + 0LL, // MINUTE + 1LL // SECOND }; #define TSDB_TICK_PER_SECOND(precision) \ @@ -239,8 +239,8 @@ typedef enum ELogicConditionType { #define TSDB_MAX_SQL_SHOW_LEN 1024 #define TSDB_MAX_ALLOWED_SQL_LEN (1 * 1024 * 1024u) // sql length should be less than 1mb -#define TSDB_VIEW_NAME_LEN 193 -#define TSDB_VIEW_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_VIEW_NAME_LEN + TSDB_NAME_DELIMITER_LEN) +#define TSDB_VIEW_NAME_LEN 193 +#define TSDB_VIEW_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_VIEW_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_APP_NAME_LEN TSDB_UNI_LEN #define TSDB_TB_COMMENT_LEN 1025 @@ -260,7 +260,7 @@ typedef enum ELogicConditionType { #define TSDB_PASSWORD_LEN 32 #define TSDB_USET_PASSWORD_LEN 129 #define TSDB_VERSION_LEN 32 -#define TSDB_LABEL_LEN 12 +#define TSDB_LABEL_LEN 16 #define TSDB_JOB_STATUS_LEN 32 #define TSDB_CLUSTER_ID_LEN 40 @@ -288,7 +288,7 @@ typedef enum ELogicConditionType { #define TSDB_ACTIVE_KEY_LEN 109 #define TSDB_CONN_ACTIVE_KEY_LEN 255 -#define TSDB_DEFAULT_PKT_SIZE 65480 // same as RPC_MAX_UDP_SIZE +#define TSDB_DEFAULT_PKT_SIZE 65480 // same as RPC_MAX_UDP_SIZE #define TSDB_SNAP_DATA_PAYLOAD_SIZE (1 * 1024 * 1024) #define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE @@ -397,13 +397,13 @@ typedef enum ELogicConditionType { #define TSDB_MAX_STT_TRIGGER 1 #define TSDB_DEFAULT_SST_TRIGGER 1 #endif -#define TSDB_STT_TRIGGER_ARRAY_SIZE 16 // maximum of TSDB_MAX_STT_TRIGGER of TD_ENTERPRISE and TD_COMMUNITY -#define TSDB_MIN_HASH_PREFIX (2 - TSDB_TABLE_NAME_LEN) -#define TSDB_MAX_HASH_PREFIX (TSDB_TABLE_NAME_LEN - 2) -#define TSDB_DEFAULT_HASH_PREFIX 0 -#define TSDB_MIN_HASH_SUFFIX (2 - TSDB_TABLE_NAME_LEN) -#define TSDB_MAX_HASH_SUFFIX (TSDB_TABLE_NAME_LEN - 2) -#define TSDB_DEFAULT_HASH_SUFFIX 0 +#define TSDB_STT_TRIGGER_ARRAY_SIZE 16 // maximum of TSDB_MAX_STT_TRIGGER of TD_ENTERPRISE and TD_COMMUNITY +#define TSDB_MIN_HASH_PREFIX (2 - TSDB_TABLE_NAME_LEN) +#define TSDB_MAX_HASH_PREFIX (TSDB_TABLE_NAME_LEN - 2) +#define TSDB_DEFAULT_HASH_PREFIX 0 +#define TSDB_MIN_HASH_SUFFIX (2 - TSDB_TABLE_NAME_LEN) +#define TSDB_MAX_HASH_SUFFIX (TSDB_TABLE_NAME_LEN - 2) +#define TSDB_DEFAULT_HASH_SUFFIX 0 #define TSDB_DB_MIN_WAL_RETENTION_PERIOD -1 #define TSDB_REP_DEF_DB_WAL_RET_PERIOD 3600 diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 4f72ac24b4..ceafdbd2e2 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -363,7 +363,7 @@ int32_t dmInitClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; SRpcInit rpcInit = {0}; - rpcInit.label = "DND-C"; + rpcInit.label = "DND-CLI"; rpcInit.numOfThreads = tsNumOfRpcThreads / 2; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.sessions = 1024; @@ -407,7 +407,7 @@ int32_t dmInitStatusClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; SRpcInit rpcInit = {0}; - rpcInit.label = "DND-STATUS"; + rpcInit.label = "DND-STATUS-CLI"; rpcInit.numOfThreads = 1; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.sessions = 1024; @@ -452,7 +452,7 @@ int32_t dmInitSyncClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; SRpcInit rpcInit = {0}; - rpcInit.label = "DND-SYNC"; + rpcInit.label = "DND-SYNC-CLI"; rpcInit.numOfThreads = tsNumOfRpcThreads / 2; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.sessions = 1024; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 1ccf708703..8c65a3ac1b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1907,7 +1907,12 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { static void* cliWorkThread(void* arg) { SCliThrd* pThrd = (SCliThrd*)arg; pThrd->pid = taosGetSelfPthreadId(); - setThreadName("trans-cli-work"); + + char threadName[TSDB_LABEL_LEN] = {0}; + STrans* pInst = pThrd->pTransInst; + strntolower(threadName, pInst->label, sizeof(threadName)); + setThreadName(threadName); + uv_run(pThrd->loop, UV_RUN_DEFAULT); tDebug("thread quit-thread:%08" PRId64, pThrd->pid); From 5a1510aa2fd7769d6916cac544455d20268fc4d2 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 11 Jan 2024 07:30:03 +0000 Subject: [PATCH 14/25] split sync/status channel --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index ceafdbd2e2..d79e9ff8ef 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -407,7 +407,7 @@ int32_t dmInitStatusClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; SRpcInit rpcInit = {0}; - rpcInit.label = "DND-STATUS-CLI"; + rpcInit.label = "DNODE-STA-CLI"; rpcInit.numOfThreads = 1; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.sessions = 1024; @@ -452,7 +452,7 @@ int32_t dmInitSyncClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; SRpcInit rpcInit = {0}; - rpcInit.label = "DND-SYNC-CLI"; + rpcInit.label = "DNODE-SYNC-CLI"; rpcInit.numOfThreads = tsNumOfRpcThreads / 2; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.sessions = 1024; From b0f670947452c47d6560a540ebdaf7edcc1a3222 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 11 Jan 2024 07:30:39 +0000 Subject: [PATCH 15/25] split sync/status channel --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index d79e9ff8ef..3b7ecce77c 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -363,7 +363,7 @@ int32_t dmInitClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; SRpcInit rpcInit = {0}; - rpcInit.label = "DND-CLI"; + rpcInit.label = "DNODE-CLI"; rpcInit.numOfThreads = tsNumOfRpcThreads / 2; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.sessions = 1024; From 3cea4fd2c79e8f1727acf837228219cf9a8985f3 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 11 Jan 2024 16:41:11 +0800 Subject: [PATCH 16/25] fix: getNextTimeWindow use new time algorithm --- source/common/src/ttime.c | 12 +++++++++- source/libs/executor/src/executil.c | 34 +++++++++-------------------- 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index bbda31efb1..775cff6670 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -683,6 +683,10 @@ int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* durati return getDuration(*duration, *unit, duration, timePrecision); } +static bool taosIsLeapYear(int32_t year) { + return (year % 4 == 0 && (year % 100 != 0 || year % 400 == 0)); +} + int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) { if (duration == 0) { return t; @@ -702,7 +706,13 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) { int32_t mon = tm.tm_year * 12 + tm.tm_mon + (int32_t)numOfMonth; tm.tm_year = mon / 12; tm.tm_mon = mon % 12; - + int daysOfMonth[] = {31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31}; + if (taosIsLeapYear(1900 + tm.tm_year)) { + daysOfMonth[1] = 29; + } + if (tm.tm_mday > daysOfMonth[tm.tm_mon]) { + tm.tm_mday = daysOfMonth[tm.tm_mon]; + } return (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision) + fraction); } diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 019483a015..efab09f02b 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1887,31 +1887,17 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI } void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order) { + int64_t slidingStart = 0; + if (pInterval->offset > 0) { + slidingStart = taosTimeAdd(tw->skey, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision); + } else { + slidingStart = tw->skey; + } int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order); - if (!IS_CALENDAR_TIME_DURATION(pInterval->slidingUnit)) { - tw->skey += pInterval->sliding * factor; - tw->ekey = taosTimeAdd(tw->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; - return; - } - - // convert key to second - int64_t key = convertTimePrecision(tw->skey, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000; - - int64_t duration = pInterval->sliding; - if (pInterval->slidingUnit == 'y') { - duration *= 12; - } - - struct tm tm; - time_t t = (time_t)key; - taosLocalTime(&t, &tm, NULL); - - int mon = (int)(tm.tm_year * 12 + tm.tm_mon + duration * factor); - tm.tm_year = mon / 12; - tm.tm_mon = mon % 12; - tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision); - - tw->ekey = taosTimeAdd(tw->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; + slidingStart = taosTimeAdd(slidingStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision); + tw->skey = taosTimeAdd(slidingStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision); + int64_t slidingEnd = taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; + tw->ekey = taosTimeAdd(slidingEnd, pInterval->offset, pInterval->offsetUnit, pInterval->precision); } bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) { From d3cfa9774040391e22a614571f3c0dee0858afe9 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 11 Jan 2024 09:41:02 +0000 Subject: [PATCH 17/25] fix invalid read --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8c65a3ac1b..246605694b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1910,7 +1910,7 @@ static void* cliWorkThread(void* arg) { char threadName[TSDB_LABEL_LEN] = {0}; STrans* pInst = pThrd->pTransInst; - strntolower(threadName, pInst->label, sizeof(threadName)); + strtolower(threadName, pInst->label); setThreadName(threadName); uv_run(pThrd->loop, UV_RUN_DEFAULT); From d3659db2fff9a97f0535c9b081dc1e760a294cb7 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 11 Jan 2024 19:22:54 +0800 Subject: [PATCH 18/25] fix:[TD-28202]move tq timer to write thread in mnode --- source/client/src/clientTmq.c | 4 ++-- source/dnode/mnode/impl/src/mndMain.c | 2 +- source/dnode/mnode/impl/src/mndSubscribe.c | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 15c8903978..28e269ee10 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1968,7 +1968,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { void* rspObj = NULL; int64_t startTime = taosGetTimestampMs(); - tscInfo("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, + tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, timeout); // in no topic status, delayed task also need to be processed @@ -2015,7 +2015,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { int64_t currentTime = taosGetTimestampMs(); int64_t elapsedTime = currentTime - startTime; if (elapsedTime > timeout) { - tscInfo("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64, + tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64, tmq->consumerId, tmq->epoch, startTime, currentTime); return NULL; } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index c0fd93c65d..acc65c0650 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -145,7 +145,7 @@ static void mndCalMqRebalance(SMnode *pMnode) { void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen}; - tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 62b671a12f..0909003201 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -747,7 +747,7 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) { int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); int32_t status = atomic_load_32(&pConsumer->status); - mInfo("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d", + mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d", pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime, hbStatus); From aa410a6deb5b9d71e30f3dd9dd2c82a884e65595 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 12 Jan 2024 09:12:56 +0800 Subject: [PATCH 19/25] fix(tsdb/cache): clear fs state before next open --- source/dnode/vnode/src/tsdb/tsdbCache.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index ab504acea7..9b29329cf3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1990,9 +1990,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie if (SFSNEXTROW_FILESET == state->state) { _next_fileset: - if (--state->iFileSet < 0) { - clearLastFileSet(state); + clearLastFileSet(state); + if (--state->iFileSet < 0) { *ppRow = NULL; return code; } else { @@ -2862,7 +2862,9 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC taosArraySet(pColArray, iCol, &lastCol); int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ); - taosArrayRemove(aColArray, aColIndex); + if (aColIndex >= 0) { + taosArrayRemove(aColArray, aColIndex); + } } else if (!COL_VAL_IS_VALUE(tColVal) && !COL_VAL_IS_VALUE(pColVal) && !setNoneCol) { noneCol = iCol; setNoneCol = true; From a9fe3ecaa90606efde37d86e0aded1c24c0d9467 Mon Sep 17 00:00:00 2001 From: zk66214 Date: Fri, 12 Jan 2024 10:21:13 +0800 Subject: [PATCH 20/25] fix characters problem --- .../system-test/1-insert/insert_timestamp.py | 42 +++++++------------ 1 file changed, 14 insertions(+), 28 deletions(-) diff --git a/tests/system-test/1-insert/insert_timestamp.py b/tests/system-test/1-insert/insert_timestamp.py index bc3b1c0275..310a27dae0 100644 --- a/tests/system-test/1-insert/insert_timestamp.py +++ b/tests/system-test/1-insert/insert_timestamp.py @@ -13,25 +13,13 @@ class TDTestCase: tdSql.init(conn.cursor(), True) def run(self): - """ - timestamp输入插入规则: - 对于插入的字段类型为timestamp类型的字段,只允许这么几种情况: - timestamp - timestamp +/- interval - interval + timestamp - timestamp可以是字符串譬如:"2023-12-05 00:00:00.000", 也可以是int型, 譬如:1701619200000 - interval支持:b, u, a, s, m, h, d, w 不支持n, y,譬如:1h, 2d - - 仅支持2元表达式,譬如:timestamp + 2h, 不支持2元以上表达,譬如timestamp + 2h + 1d - """ - tdSql.execute("create database test_insert_timestamp PRECISION 'ns';") tdSql.execute("use test_insert_timestamp;") tdSql.execute("create stable st(ts timestamp, c1 int) tags(id int);") tdSql.execute("create table test_t using st tags(1);") expectErrInfo = "syntax error" - # 异常场景:timestamp + timestamp + # abnormal scenario: timestamp + timestamp tdSql.error("insert into test_t values(now + today(), 1 );", expectErrInfo=expectErrInfo, fullMatched=False) tdSql.error("insert into test_t values(now - today(), 1 );", expectErrInfo=expectErrInfo, fullMatched=False) tdSql.error("insert into test_t values(today() + now(), 1 ); ", expectErrInfo=expectErrInfo, fullMatched=False) @@ -40,24 +28,24 @@ class TDTestCase: tdSql.error("insert into test_t values('2023-11-28 00:00:00.000' + 1701111600000, 1 ); ", expectErrInfo=expectErrInfo, fullMatched=False) tdSql.error("insert into test_t values(1701111500000 + 1701111600000, 1 ); ", expectErrInfo=expectErrInfo, fullMatched=False) - # 异常场景:timestamp + interval + interval + # abnormal scenario: timestamp + interval + interval tdSql.error("insert into test_t values(today() + 1d + 1s, 1);", expectErrInfo=expectErrInfo, fullMatched=False) - # 异常场景:interval - timestamp + # abnormal scenario: interval - timestamp tdSql.error("insert into test_t values(2h - now(), 1 ); ", expectErrInfo=expectErrInfo, fullMatched=False) tdSql.error("insert into test_t values(2h - today(), 1 ); ", expectErrInfo=expectErrInfo, fullMatched=False) - # 异常场景:interval + interval + # abnormal scenario: interval + interval tdSql.error("insert into test_t values(2h - 1h, 1 ); ", expectErrInfo=expectErrInfo, fullMatched=False) tdSql.error("insert into test_t values(2h + 1h, 1 ); ", expectErrInfo=expectErrInfo, fullMatched=False) - # 异常场景:非法interval类型n + # abnormal scenario: non-support datatype - n tdSql.error("insert into test_t values(today() + 2n, 7); ", expectErrInfo=expectErrInfo, fullMatched=False) - # 异常场景:非法interval类型y + # abnormal scenario: non-support datatype - y tdSql.error("insert into test_t values(today() - 2y, 8);", expectErrInfo=expectErrInfo, fullMatched=False) - # 异常场景:数据类型不对 + # abnormal scenario: non-support datatype tdSql.error("insert into test_t values('a1701619200000', 8);", expectErrInfo=expectErrInfo, fullMatched=False) tdSql.error("insert into test_t values('ss2023-12-05 00:00:00.000' + '1701619200000', 1);", expectErrInfo=expectErrInfo, fullMatched=False) tdSql.error("insert into test_t values(123456, 1);", expectErrInfo="Timestamp data out of range") @@ -66,31 +54,31 @@ class TDTestCase: tdSql.error("insert into test_t values(None, 1);", expectErrInfo=expectErrInfo, fullMatched=False) tdSql.error("insert into test_t values(null, 1);", expectErrInfo=expectErrInfo, fullMatched=False) - # 异常场景:格式不对 + # abnormal scenario: incorrect format tdSql.error("insert into test_t values('2023-122-05 00:00:00.000' + '1701619200000', 1);", expectErrInfo=expectErrInfo, fullMatched=False) tdSql.error("insert into test_t values('2023-12--05 00:00:00.000' + '1701619200000', 1);", expectErrInfo=expectErrInfo, fullMatched=False) tdSql.error("insert into test_t values('12/12/2023' + 10a, 1);", expectErrInfo=expectErrInfo, fullMatched=False) tdSql.error("insert into test_t values(1701619200000111, 1);", expectErrInfo="Timestamp data out of range", fullMatched=False) - # 正常场景:timestamp + interval + # normal scenario:timestamp + interval tdSql.execute("insert into test_t values(today() + 2b, 1);") tdSql.execute("insert into test_t values(1701619200000000000 + 2u, 2);") tdSql.execute("insert into test_t values(today + 2a, 3);") tdSql.execute("insert into test_t values('2023-12-05 23:59:59.999' + 2a, 4);") tdSql.execute("insert into test_t values(1701921599000000000 + 3a, 5);") - # 正常场景:timestamp - interval + # normal scenario:timestamp - interval tdSql.execute("insert into test_t values(today() - 2s, 6);") tdSql.execute("insert into test_t values(now() - 2m, 7);") tdSql.execute("insert into test_t values(today - 2h, 8);") tdSql.execute("insert into test_t values('2023-12-05 00:00:00.000000000' - 2a, 9);") tdSql.execute("insert into test_t values(1701669000000000000 - 2a, 10);") - # 正常场景:interval + timestamp + # normal scenario:interval + timestamp tdSql.execute("insert into test_t values(2d + now, 11);") tdSql.execute("insert into test_t values(2w + today, 12);") - # 正常场景:timestamp + # normal scenario:timestamp tdSql.execute("insert into test_t values('2023-12-05 00:00:00.000', 13);") tdSql.execute("insert into test_t values(1701629100000000000, 14);") tdSql.execute("insert into test_t values(now() + 2s, 15);") @@ -102,7 +90,7 @@ class TDTestCase: tdSql.execute("insert into test_t values(1701619200000000000, -5);") tdSql.execute("insert into test_t values('2023-12-05 12:12:12' + 10a, 19);") - # 验证数据 + # data verification tdSql.query(f'select ts,c1 from test_t order by c1;') tdSql.checkRows(22) tdSql.checkEqual(tdSql.queryResult[0][0], 1699977600000000000) # c1=-15 @@ -133,12 +121,10 @@ class TDTestCase: tdSql.execute("drop database if exists test_insert_timestamp;") def __convert_ts_to_date(self, ts: int) -> str: - # 创建datetime对象并进行转换 dt_object = datetime.datetime.fromtimestamp(ts / 1e9) - # 格式化日期字符串 formatted_date = dt_object.strftime('%Y-%m-%d') - # print("转换后的日期为:", formatted_date) + return formatted_date def __get_today_ts(self) -> int: From b36cc972367fa9a3520ddf3bc266ec76801bee8c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 12 Jan 2024 10:44:49 +0800 Subject: [PATCH 21/25] fix(stream): close the inputQ of the related stream task when the fill-history task recv the trans-state msg from upstream. --- include/libs/stream/tstream.h | 1 + source/dnode/vnode/src/tq/tq.c | 1 - source/libs/executor/src/executor.c | 6 ++-- source/libs/stream/src/stream.c | 30 +++++++++++++++---- source/libs/stream/src/streamDispatch.c | 3 ++ source/libs/stream/src/streamExec.c | 39 +++++++++++++++++-------- source/libs/stream/src/streamQueue.c | 2 -- source/libs/stream/src/streamStart.c | 14 +++++++-- 8 files changed, 69 insertions(+), 27 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9ea655c15c..a115783b70 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -562,6 +562,7 @@ struct SStreamDispatchReq { int32_t upstreamTaskId; int32_t upstreamChildId; int32_t upstreamNodeId; + int32_t upstreamRelTaskId; int32_t blockNum; int64_t totalLen; SArray* dataLen; // SArray diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 10420ec463..a53b322753 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -963,7 +963,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // let's decide which step should be executed now if (pTask->execInfo.step1Start == 0) { int64_t ts = taosGetTimestampMs(); - pTask->execInfo.step1Start = ts; tqDebug("s-task:%s start scan-history stage(step 1), status:%s, step1 startTs:%" PRId64, id, pStatus, ts); } else { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 6b38457d70..d27daad550 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -922,8 +922,8 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan pStreamInfo->fillHistoryWindow = *pWindow; pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2; - qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 - ", window:%" PRId64 " - %" PRId64, + qDebug("%s step 2. set param for stream scanner scan wal, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 + " - %" PRId64, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, pWindow->ekey); return 0; @@ -1129,7 +1129,7 @@ int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow; - qDebug("%s remove scan-history filter window:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64, + qDebug("%s remove timeWindow filter:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX); pWindow->skey = INT64_MIN; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 7db4bec6a0..7d507a1394 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -214,8 +214,9 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, } int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { - int32_t status = 0; - const char* id = pTask->id.idStr; + int32_t status = 0; + SStreamMeta* pMeta = pTask->pMeta; + const char* id = pTask->id.idStr; stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64 ", msgId:%d", id, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen, pReq->msgId); @@ -223,7 +224,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); ASSERT(pInfo != NULL); - if (pTask->pMeta->role == NODE_ROLE_FOLLOWER) { + if (pMeta->role == NODE_ROLE_FOLLOWER) { stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id); status = TASK_INPUT_STATUS__REFUSED; } else { @@ -244,6 +245,22 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId); + } else if (pReq->type == STREAM_INPUT__TRANS_STATE) { + atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); + streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); + + // disable the related stream task here to avoid it to receive the newly arrived data after the transfer-state + STaskId* pRelTaskId = &pTask->streamTaskId; + SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pRelTaskId->streamId, pRelTaskId->taskId); + if (pStreamTask != NULL) { + atomic_add_fetch_32(&pStreamTask->upstreamInfo.numOfClosed, 1); + streamTaskCloseUpstreamInput(pStreamTask, pReq->upstreamRelTaskId); + streamMetaReleaseTask(pMeta, pStreamTask); + } + + stDebug("s-task:%s close inputQ for upstream:0x%x since trans-state msgId:%d recv, rel stream-task:0x%" PRIx64 + " close inputQ for upstream:0x%x", + id, pReq->upstreamTaskId, pReq->msgId, pTask->streamTaskId.taskId, pReq->upstreamRelTaskId); } status = streamTaskAppendInputBlocks(pTask, pReq); @@ -252,9 +269,9 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S } // disable the data from upstream tasks - if (streamTaskGetStatus(pTask)->state == TASK_STATUS__HALT) { - status = TASK_INPUT_STATUS__BLOCKED; - } +// if (streamTaskGetStatus(pTask)->state == TASK_STATUS__HALT) { +// status = TASK_INPUT_STATUS__BLOCKED; +// } { // do send response with the input status @@ -295,6 +312,7 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) { } pTask->upstreamInfo.numOfClosed = 0; + stDebug("s-task:%s opening up inputQ from upstream tasks", pTask->id.idStr); } void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index a6d21e2625..e2c8704b1c 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -58,6 +58,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamRelTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1; ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum); @@ -84,6 +85,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->upstreamRelTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1; @@ -114,6 +116,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas pReq->upstreamTaskId = pTask->id.taskId; pReq->upstreamChildId = pTask->info.selfChildId; pReq->upstreamNodeId = pTask->info.nodeId; + pReq->upstreamRelTaskId = pTask->streamTaskId.taskId; pReq->blockNum = numOfBlocks; pReq->taskId = dstTaskId; pReq->type = type; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 18f7ed061a..c69fe2a79c 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -371,7 +371,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // In case of sink tasks, no need to halt them. // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to // start the task state transfer procedure. -// char* p = NULL; SStreamTaskState* pState = streamTaskGetStatus(pStreamTask); status = pState->state; char* p = pState->name; @@ -392,8 +391,12 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { } // 1. expand the query time window for stream task of WAL scanner - pTimeWindow->skey = INT64_MIN; - qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); + if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { + pTimeWindow->skey = INT64_MIN; + qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); + } else { + stDebug("s-task:%s non-source task no need to reset filter window", pStreamTask->id.idStr); + } // 2. transfer the ownership of executor state streamTaskReleaseState(pTask); @@ -407,10 +410,13 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 4. free it and remove fill-history task from disk meta-store streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); - // 5. save to disk + // 5. assign the status to the value that will be kept in disk pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state; - // 6. add empty delete block + // 6. open the inputQ for all upstream tasks + streamTaskOpenAllUpstreamInput(pStreamTask); + + // 7. add empty delete block if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) { SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); @@ -430,6 +436,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { int32_t code = TSDB_CODE_SUCCESS; + SStreamMeta* pMeta = pTask->pMeta; + ASSERT(pTask->status.appendTranstateBlock == 1); int32_t level = pTask->info.taskLevel; @@ -439,8 +447,14 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states. code = streamDoTransferStateToStreamTask(pTask); - } else { // drop fill-history task - streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &pTask->id); + } else { // drop fill-history task and open inputQ of sink task + SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); + if (pStreamTask != NULL) { + streamTaskOpenAllUpstreamInput(pStreamTask); + streamMetaReleaseTask(pMeta, pStreamTask); + } + + streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); } return code; @@ -496,16 +510,17 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_ } } -int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { +int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { const char* id = pTask->id.idStr; int32_t code = TSDB_CODE_SUCCESS; + int32_t level = pTask->info.taskLevel; - int32_t level = pTask->info.taskLevel; if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SINK) { int32_t remain = streamAlignTransferState(pTask); + if (remain > 0) { streamFreeQitem((SStreamQueueItem*)pBlock); - stDebug("s-task:%s receive upstream transfer state msg, remain:%d", id, remain); + stDebug("s-task:%s receive upstream trans-state msg, not sent remain:%d", id, remain); return 0; } } @@ -536,7 +551,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock } } else { // non-dispatch task, do task state transfer directly streamFreeQitem((SStreamQueueItem*)pBlock); - stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, pTask->info.taskLevel); + stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, level); ASSERT(pTask->info.fillHistory == 1); code = streamTransferStateToStreamTask(pTask); @@ -604,7 +619,7 @@ int32_t doStreamExecTask(SStreamTask* pTask) { } if (type == STREAM_INPUT__TRANS_STATE) { - streamProcessTranstateBlock(pTask, (SStreamDataBlock*)pInput); + streamProcessTransstateBlock(pTask, (SStreamDataBlock*)pInput); continue; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 8b5b2da061..90823cd937 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -147,8 +147,6 @@ const char* streamQueueItemGetTypeStr(int32_t type) { int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize) { - int32_t retryTimes = 0; - int32_t MAX_RETRY_TIMES = 5; const char* id = pTask->id.idStr; int32_t taskLevel = pTask->info.taskLevel; diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index d39528312d..24909a9776 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -1029,6 +1029,11 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); } } else { + ASSERT(pTask->info.fillHistory == 0); + if (pTask->info.taskLevel >= TASK_LEVEL__AGG) { + return; + } + int64_t ekey = 0; if (pRange->window.ekey < INT64_MAX) { ekey = pRange->window.ekey + 1; @@ -1043,10 +1048,13 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { pRange->range.minVer = 0; pRange->range.maxVer = ver; - stDebug("s-task:%s level:%d related fill-history task exists, update stream calc time window:%" PRId64 " - %" PRId64 + stDebug("s-task:%s level:%d related fill-history task exists, set stream task timeWindow:%" PRId64 " - %" PRId64 ", verRang:%" PRId64 " - %" PRId64, - pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, - pRange->range.maxVer); + pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, ver, INT64_MAX); + + SVersionRange verRange = {.minVer = ver, .maxVer = INT64_MAX}; + STimeWindow win = pRange->window; + streamSetParamForStreamScannerStep2(pTask, &verRange, &win); } } From cf941f74884288b783139e5dc14e8ef2f4a5d391 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 12 Jan 2024 11:12:23 +0800 Subject: [PATCH 22/25] fix(stream): check status when resume to run. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index da1c2ecf16..7883c858f0 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -820,15 +820,23 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead return 0; } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); - if (pTask != NULL) { - ASSERT(streamTaskReadyToRun(pTask, NULL)); - int64_t execTs = pTask->status.lastExecTs; - int32_t idle = taosGetTimestampMs() - execTs; - tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs); - streamResumeTask(pTask); + if (pTask != NULL) { + char* pStatus = NULL; + if (streamTaskReadyToRun(pTask, &pStatus)) { + int64_t execTs = pTask->status.lastExecTs; + int32_t idle = taosGetTimestampMs() - execTs; + tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs); + + streamResumeTask(pTask); + } else { + int8_t status = streamTaskSetSchedStatusInactive(pTask); + tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId, + pTask->id.idStr, pStatus, status); + } streamMetaReleaseTask(pMeta, pTask); } + return 0; } From 5fa8f85c425e03a2f86eb80f76b1b21fce50ad78 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 12 Jan 2024 11:28:41 +0800 Subject: [PATCH 23/25] fix(stream): remove invalid release. --- source/libs/stream/src/streamMeta.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 04d34b0945..68348f462f 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1520,8 +1520,6 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas if (pTask == NULL) { stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, taskId); streamMetaUpdateTaskDownstreamStatus(pMeta, streamId, taskId, 0, taosGetTimestampMs(), false); - - streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } From 52d8811bdca4a7622c1ee74417f9ffd7df5a2aec Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 12 Jan 2024 13:49:32 +0800 Subject: [PATCH 24/25] code coverage --- include/libs/function/functionMgt.h | 4 ++ .../executor/src/streamtimewindowoperator.c | 6 +++ source/libs/executor/src/timewindowoperator.c | 10 ---- source/libs/function/inc/builtins.h | 2 + source/libs/function/inc/builtinsimpl.h | 16 +++++- source/libs/function/src/builtins.c | 50 +++++++++++++++++++ source/libs/function/src/builtinsimpl.c | 11 ++-- .../libs/function/src/detail/tavgfunction.c | 2 + source/libs/function/src/functionMgt.c | 2 + 9 files changed, 87 insertions(+), 16 deletions(-) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 865f1b2295..a0b5d938e3 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -259,9 +259,13 @@ EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, STimeWindow* int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet); int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet); int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet); + +#ifdef BUILD_NO_CALL int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet); int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet); bool fmIsInvertible(int32_t funcId); +#endif + char* fmGetFuncName(int32_t funcId); #ifdef __cplusplus diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index b2d0f25466..22339aa384 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -427,6 +427,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { taosMemoryFreeClear(param); } +#ifdef BUILD_NO_CALL static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) { for (int32_t i = 0; i < numOfCols; i++) { if (fmIsUserDefinedFunc(pFCtx[i].functionId) || !fmIsInvertible(pFCtx[i].functionId)) { @@ -435,6 +436,7 @@ static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) { } return true; } +#endif void reloadFromDownStream(SOperatorInfo* downstream, SStreamIntervalOperatorInfo* pInfo) { SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore; @@ -3846,6 +3848,7 @@ _error: return NULL; } +#ifdef BUILD_NO_CALL static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) { for (int i = 0; i < num; i++) { if (type == STREAM_INVERT) { @@ -3855,6 +3858,7 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type } } } +#endif static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; @@ -3947,9 +3951,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { // caller. Note that all the time window are not close till now. // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); +#ifdef BUILD_NO_CALL if (pInfo->invertible) { setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type); } +#endif doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c459e4d8b9..b86253a8d1 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1079,16 +1079,6 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { return (rows == 0) ? NULL : pBlock; } -static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) { - for (int i = 0; i < num; i++) { - if (type == STREAM_INVERT) { - fmSetInvertFunc(pCtx[i].functionId, &(pCtx[i].fpSet)); - } else if (type == STREAM_NORMAL) { - fmSetNormalFunc(pCtx[i].functionId, &(pCtx[i].fpSet)); - } - } -} - static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) { SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false); if (NULL == pResult) { diff --git a/source/libs/function/inc/builtins.h b/source/libs/function/inc/builtins.h index e7fcc38818..6181a9b929 100644 --- a/source/libs/function/inc/builtins.h +++ b/source/libs/function/inc/builtins.h @@ -40,7 +40,9 @@ typedef struct SBuiltinFuncDefinition { FExecProcess processFunc; FScalarExecProcess sprocessFunc; FExecFinalize finalizeFunc; +#ifdef BUILD_NO_CALL FExecProcess invertFunc; +#endif FExecCombine combineFunc; const char* pPartialFunc; const char* pMergeFunc; diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index d2f19ed2eb..ba7bf72aea 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -59,12 +59,19 @@ int32_t combineFunction(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow); bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t countFunction(SqlFunctionCtx* pCtx); + +#ifdef BUILD_NO_CALL int32_t countInvertFunction(SqlFunctionCtx* pCtx); +#endif EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow); bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t sumFunction(SqlFunctionCtx* pCtx); + +#ifdef BUILD_NO_CALL int32_t sumInvertFunction(SqlFunctionCtx* pCtx); +#endif + int32_t sumCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); @@ -81,7 +88,11 @@ int32_t avgFunction(SqlFunctionCtx* pCtx); int32_t avgFunctionMerge(SqlFunctionCtx* pCtx); int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); + +#ifdef BUILD_NO_CALL int32_t avgInvertFunction(SqlFunctionCtx* pCtx); +#endif + int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getAvgInfoSize(); @@ -91,7 +102,11 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx); int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx); int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t stddevPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); + +#ifdef BUILD_NO_CALL int32_t stddevInvertFunction(SqlFunctionCtx* pCtx); +#endif + int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getStddevInfoSize(); @@ -99,7 +114,6 @@ bool getLeastSQRFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool leastSQRFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t leastSQRFunction(SqlFunctionCtx* pCtx); int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); -int32_t leastSQRInvertFunction(SqlFunctionCtx* pCtx); int32_t leastSQRCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 98fda024fa..61149f593a 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2368,7 +2368,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = countFunction, .sprocessFunc = countScalarFunction, .finalizeFunc = functionFinalize, +#ifdef BUILD_NO_CALL .invertFunc = countInvertFunction, +#endif .combineFunc = combineFunction, .pPartialFunc = "count", .pMergeFunc = "sum" @@ -2384,7 +2386,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = sumFunction, .sprocessFunc = sumScalarFunction, .finalizeFunc = functionFinalize, +#ifdef BUILD_NO_CALL .invertFunc = sumInvertFunction, +#endif .combineFunc = sumCombine, .pPartialFunc = "sum", .pMergeFunc = "sum" @@ -2429,7 +2433,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = stddevFunction, .sprocessFunc = stddevScalarFunction, .finalizeFunc = stddevFinalize, +#ifdef BUILD_NO_CALL .invertFunc = stddevInvertFunction, +#endif .combineFunc = stddevCombine, .pPartialFunc = "_stddev_partial", .pMergeFunc = "_stddev_merge" @@ -2443,7 +2449,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = stddevFunctionSetup, .processFunc = stddevFunction, .finalizeFunc = stddevPartialFinalize, +#ifdef BUILD_NO_CALL .invertFunc = stddevInvertFunction, +#endif .combineFunc = stddevCombine, }, { @@ -2455,7 +2463,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = stddevFunctionSetup, .processFunc = stddevFunctionMerge, .finalizeFunc = stddevFinalize, +#ifdef BUILD_NO_CALL .invertFunc = stddevInvertFunction, +#endif .combineFunc = stddevCombine, }, { @@ -2468,7 +2478,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = leastSQRFunction, .sprocessFunc = leastSQRScalarFunction, .finalizeFunc = leastSQRFinalize, +#ifdef BUILD_NO_CALL .invertFunc = NULL, +#endif .combineFunc = leastSQRCombine, }, { @@ -2482,7 +2494,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = avgFunction, .sprocessFunc = avgScalarFunction, .finalizeFunc = avgFinalize, +#ifdef BUILD_NO_CALL .invertFunc = avgInvertFunction, +#endif .combineFunc = avgCombine, .pPartialFunc = "_avg_partial", .pMergeFunc = "_avg_merge" @@ -2497,7 +2511,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = avgFunctionSetup, .processFunc = avgFunction, .finalizeFunc = avgPartialFinalize, +#ifdef BUILD_NO_CALL .invertFunc = avgInvertFunction, +#endif .combineFunc = avgCombine, }, { @@ -2509,7 +2525,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = avgFunctionSetup, .processFunc = avgFunctionMerge, .finalizeFunc = avgFinalize, +#ifdef BUILD_NO_CALL .invertFunc = avgInvertFunction, +#endif .combineFunc = avgCombine, }, { @@ -2523,7 +2541,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = percentileFunction, .sprocessFunc = percentileScalarFunction, .finalizeFunc = percentileFinalize, +#ifdef BUILD_NO_CALL .invertFunc = NULL, +#endif .combineFunc = NULL, }, { @@ -2536,7 +2556,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = apercentileFunction, .sprocessFunc = apercentileScalarFunction, .finalizeFunc = apercentileFinalize, +#ifdef BUILD_NO_CALL .invertFunc = NULL, +#endif .combineFunc = apercentileCombine, .pPartialFunc = "_apercentile_partial", .pMergeFunc = "_apercentile_merge", @@ -2551,7 +2573,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = apercentileFunctionSetup, .processFunc = apercentileFunction, .finalizeFunc = apercentilePartialFinalize, +#ifdef BUILD_NO_CALL .invertFunc = NULL, +#endif .combineFunc = apercentileCombine, }, { @@ -2563,7 +2587,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = apercentileFunctionSetup, .processFunc = apercentileFunctionMerge, .finalizeFunc = apercentileFinalize, +#ifdef BUILD_NO_CALL .invertFunc = NULL, +#endif .combineFunc = apercentileCombine, }, { @@ -2609,7 +2635,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = spreadFunction, .sprocessFunc = spreadScalarFunction, .finalizeFunc = spreadFinalize, +#ifdef BUILD_NO_CALL .invertFunc = NULL, +#endif .combineFunc = spreadCombine, .pPartialFunc = "_spread_partial", .pMergeFunc = "_spread_merge" @@ -2624,7 +2652,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = spreadFunctionSetup, .processFunc = spreadFunction, .finalizeFunc = spreadPartialFinalize, +#ifdef BUILD_NO_CALL .invertFunc = NULL, +#endif .combineFunc = spreadCombine, }, { @@ -2637,7 +2667,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = spreadFunctionSetup, .processFunc = spreadFunctionMerge, .finalizeFunc = spreadFinalize, +#ifdef BUILD_NO_CALL .invertFunc = NULL, +#endif .combineFunc = spreadCombine, }, { @@ -2651,7 +2683,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = elapsedFunctionSetup, .processFunc = elapsedFunction, .finalizeFunc = elapsedFinalize, +#ifdef BUILD_NO_CALL .invertFunc = NULL, +#endif .combineFunc = elapsedCombine, }, { @@ -2664,7 +2698,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = elapsedFunctionSetup, .processFunc = elapsedFunction, .finalizeFunc = elapsedPartialFinalize, +#ifdef BUILD_NO_CALL .invertFunc = NULL, +#endif .combineFunc = elapsedCombine, }, { @@ -2677,7 +2713,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = elapsedFunctionSetup, .processFunc = elapsedFunctionMerge, .finalizeFunc = elapsedFinalize, + #ifdef BUILD_NO_CALL .invertFunc = NULL, + #endif .combineFunc = elapsedCombine, }, { @@ -2907,7 +2945,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = histogramFunction, .sprocessFunc = histogramScalarFunction, .finalizeFunc = histogramFinalize, +#ifdef BUILD_NO_CALL .invertFunc = NULL, +#endif .combineFunc = histogramCombine, .pPartialFunc = "_histogram_partial", .pMergeFunc = "_histogram_merge", @@ -2921,7 +2961,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = histogramFunctionSetup, .processFunc = histogramFunctionPartial, .finalizeFunc = histogramPartialFinalize, +#ifdef BUILD_NO_CALL .invertFunc = NULL, +#endif .combineFunc = histogramCombine, }, { @@ -2933,7 +2975,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = functionSetup, .processFunc = histogramFunctionMerge, .finalizeFunc = histogramFinalize, +#ifdef BUILD_NO_CALL .invertFunc = NULL, +#endif .combineFunc = histogramCombine, }, { @@ -2946,7 +2990,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = hllFunction, .sprocessFunc = hllScalarFunction, .finalizeFunc = hllFinalize, +#ifdef BUILD_NO_CALL .invertFunc = NULL, +#endif .combineFunc = hllCombine, .pPartialFunc = "_hyperloglog_partial", .pMergeFunc = "_hyperloglog_merge" @@ -2960,7 +3006,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = functionSetup, .processFunc = hllFunction, .finalizeFunc = hllPartialFinalize, +#ifdef BUILD_NO_CALL .invertFunc = NULL, +#endif .combineFunc = hllCombine, }, { @@ -2972,7 +3020,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = functionSetup, .processFunc = hllFunctionMerge, .finalizeFunc = hllFinalize, +#ifdef BUILD_NO_CALL .invertFunc = NULL, +#endif .combineFunc = hllCombine, }, { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 8a2e118fe7..390190e8db 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -549,6 +549,7 @@ int32_t countFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } +#ifdef BUILD_NO_CALL int32_t countInvertFunction(SqlFunctionCtx* pCtx) { int64_t numOfElem = getNumOfElems(pCtx); @@ -559,6 +560,7 @@ int32_t countInvertFunction(SqlFunctionCtx* pCtx) { SET_VAL(pResInfo, *((int64_t*)buf), 1); return TSDB_CODE_SUCCESS; } +#endif int32_t combineFunction(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); @@ -642,6 +644,7 @@ _sum_over: return TSDB_CODE_SUCCESS; } +#ifdef BUILD_NO_CALL int32_t sumInvertFunction(SqlFunctionCtx* pCtx) { int32_t numOfElem = 0; @@ -699,6 +702,7 @@ int32_t sumInvertFunction(SqlFunctionCtx* pCtx) { SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); return TSDB_CODE_SUCCESS; } +#endif int32_t sumCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); @@ -1230,6 +1234,7 @@ int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } +#ifdef BUILD_NO_CALL int32_t stddevInvertFunction(SqlFunctionCtx* pCtx) { int32_t numOfElem = 0; @@ -1294,6 +1299,7 @@ int32_t stddevInvertFunction(SqlFunctionCtx* pCtx) { SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); return TSDB_CODE_SUCCESS; } +#endif int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SInputColumnInfoData* pInput = &pCtx->input; @@ -1578,11 +1584,6 @@ int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pResInfo->numOfRes; } -int32_t leastSQRInvertFunction(SqlFunctionCtx* pCtx) { - // TODO - return TSDB_CODE_SUCCESS; -} - int32_t leastSQRCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); SLeastSQRInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); diff --git a/source/libs/function/src/detail/tavgfunction.c b/source/libs/function/src/detail/tavgfunction.c index e626c937da..6bcbd1c3a7 100644 --- a/source/libs/function/src/detail/tavgfunction.c +++ b/source/libs/function/src/detail/tavgfunction.c @@ -724,6 +724,7 @@ int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } +#ifdef BUILD_NO_CALL int32_t avgInvertFunction(SqlFunctionCtx* pCtx) { int32_t numOfElem = 0; @@ -786,6 +787,7 @@ int32_t avgInvertFunction(SqlFunctionCtx* pCtx) { SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); return TSDB_CODE_SUCCESS; } +#endif int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 036e4238d4..7bb863839a 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -281,6 +281,7 @@ void fmFuncMgtDestroy() { } } +#ifdef BUILD_NO_CALL int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet) { if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) { return TSDB_CODE_FAILED; @@ -314,6 +315,7 @@ bool fmIsInvertible(int32_t funcId) { } return res; } +#endif // function has same input/output type bool fmIsSameInOutType(int32_t funcId) { From bc7434e2b5b4ddc539d194e67a2fd1da272061c4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 12 Jan 2024 14:49:16 +0800 Subject: [PATCH 25/25] fix(stream): set error code for return. --- source/dnode/mnode/impl/src/mndStreamTrans.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index bfea4349b0..847f55a8fd 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -92,6 +92,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char* if (strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) { mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid, tInfo.name); + terrno = TSDB_CODE_MND_TRANS_CONFLICT; return true; } else { mDebug("not conflict with checkpoint trans, name:%s, continue create trans", pTransName); @@ -100,6 +101,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char* (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0)) { mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid, tInfo.name); + terrno = TSDB_CODE_MND_TRANS_CONFLICT; return true; } } else {