diff --git a/docs/en/12-taos-sql/24-show.md b/docs/en/12-taos-sql/24-show.md index 96503c9598..6b56161322 100644 --- a/docs/en/12-taos-sql/24-show.md +++ b/docs/en/12-taos-sql/24-show.md @@ -3,7 +3,7 @@ sidebar_label: SHOW Statement title: SHOW Statement for Metadata --- -In addition to running SELECT statements on INFORMATION_SCHEMA, you can also use SHOW to obtain system metadata, information, and status. +`SHOW` command can be used to get brief system information. To get details about metatadata, information, and status in the system, please use `select` to query the tables in database `INFORMATION_SCHEMA`. ## SHOW ACCOUNTS diff --git a/docs/zh/12-taos-sql/24-show.md b/docs/zh/12-taos-sql/24-show.md index 781f94324c..75efd5f514 100644 --- a/docs/zh/12-taos-sql/24-show.md +++ b/docs/zh/12-taos-sql/24-show.md @@ -3,7 +3,7 @@ sidebar_label: SHOW 命令 title: 使用 SHOW 命令查看系统元数据 --- -除了使用 `select` 语句查询 `INFORMATION_SCHEMA` 数据库中的表获得系统中的各种元数据、系统信息和状态之外,也可以用 `SHOW` 命令来实现同样的目的。 +SHOW 命令可以用来获取简要的系统信息。若想获取系统中详细的各种元数据、系统信息和状态,请使用 select 语句查询 INFORMATION_SCHEMA 数据库中的表。 ## SHOW ACCOUNTS diff --git a/docs/zh/14-reference/12-config/index.md b/docs/zh/14-reference/12-config/index.md index d2efc5baf3..7b31e10572 100644 --- a/docs/zh/14-reference/12-config/index.md +++ b/docs/zh/14-reference/12-config/index.md @@ -698,122 +698,123 @@ charset 的有效值是 UTF-8。 | 45 | numOfVnodeFetchThreads | 否 | 是 | | 46 | numOfVnodeWriteThreads | 否 | 是 | | 47 | numOfVnodeSyncThreads | 否 | 是 | -| 48 | numOfQnodeQueryThreads | 否 | 是 | -| 49 | numOfQnodeFetchThreads | 否 | 是 | -| 50 | numOfSnodeSharedThreads | 否 | 是 | -| 51 | numOfSnodeUniqueThreads | 否 | 是 | -| 52 | rpcQueueMemoryAllowed | 否 | 是 | -| 53 | logDir | 是 | 是 | -| 54 | minimalLogDirGB | 是 | 是 | -| 55 | numOfLogLines | 是 | 是 | -| 56 | asyncLog | 是 | 是 | -| 57 | logKeepDays | 是 | 是 | -| 58 | debugFlag | 是 | 是 | -| 59 | tmrDebugFlag | 是 | 是 | -| 60 | uDebugFlag | 是 | 是 | -| 61 | rpcDebugFlag | 是 | 是 | -| 62 | jniDebugFlag | 是 | 是 | -| 63 | qDebugFlag | 是 | 是 | -| 64 | cDebugFlag | 是 | 是 | -| 65 | dDebugFlag | 是 | 是 | -| 66 | vDebugFlag | 是 | 是 | -| 67 | mDebugFlag | 是 | 是 | -| 68 | wDebugFlag | 是 | 是 | -| 69 | sDebugFlag | 是 | 是 | -| 70 | tsdbDebugFlag | 是 | 是 | -| 71 | tqDebugFlag | 否 | 是 | -| 72 | fsDebugFlag | 是 | 是 | -| 73 | udfDebugFlag | 否 | 是 | -| 74 | smaDebugFlag | 否 | 是 | -| 75 | idxDebugFlag | 否 | 是 | -| 76 | tdbDebugFlag | 否 | 是 | -| 77 | metaDebugFlag | 否 | 是 | -| 78 | timezone | 是 | 是 | -| 79 | locale | 是 | 是 | -| 80 | charset | 是 | 是 | -| 81 | udf | 是 | 是 | -| 82 | enableCoreFile | 是 | 是 | -| 83 | arbitrator | 是 | 否 | -| 84 | numOfThreadsPerCore | 是 | 否 | -| 85 | numOfMnodes | 是 | 否 | -| 86 | vnodeBak | 是 | 否 | -| 87 | balance | 是 | 否 | -| 88 | balanceInterval | 是 | 否 | -| 89 | offlineThreshold | 是 | 否 | -| 90 | role | 是 | 否 | -| 91 | dnodeNopLoop | 是 | 否 | -| 92 | keepTimeOffset | 是 | 否 | -| 93 | rpcTimer | 是 | 否 | -| 94 | rpcMaxTime | 是 | 否 | -| 95 | rpcForceTcp | 是 | 否 | -| 96 | tcpConnTimeout | 是 | 否 | -| 97 | syncCheckInterval | 是 | 否 | -| 98 | maxTmrCtrl | 是 | 否 | -| 99 | monitorReplica | 是 | 否 | -| 100 | smlTagNullName | 是 | 否 | -| 101 | keepColumnName | 是 | 否 | -| 102 | ratioOfQueryCores | 是 | 否 | -| 103 | maxStreamCompDelay | 是 | 否 | -| 104 | maxFirstStreamCompDelay | 是 | 否 | -| 105 | retryStreamCompDelay | 是 | 否 | -| 106 | streamCompDelayRatio | 是 | 否 | -| 107 | maxVgroupsPerDb | 是 | 否 | -| 108 | maxTablesPerVnode | 是 | 否 | -| 109 | minTablesPerVnode | 是 | 否 | -| 110 | tableIncStepPerVnode | 是 | 否 | -| 111 | cache | 是 | 否 | -| 112 | blocks | 是 | 否 | -| 113 | days | 是 | 否 | -| 114 | keep | 是 | 否 | -| 115 | minRows | 是 | 否 | -| 116 | maxRows | 是 | 否 | -| 117 | quorum | 是 | 否 | -| 118 | comp | 是 | 否 | -| 119 | walLevel | 是 | 否 | -| 120 | fsync | 是 | 否 | -| 121 | replica | 是 | 否 | -| 122 | partitions | 是 | 否 | -| 123 | quorum | 是 | 否 | -| 124 | update | 是 | 否 | -| 125 | cachelast | 是 | 否 | -| 126 | maxSQLLength | 是 | 否 | -| 127 | maxWildCardsLength | 是 | 否 | -| 128 | maxRegexStringLen | 是 | 否 | -| 129 | maxNumOfOrderedRes | 是 | 否 | -| 130 | maxConnections | 是 | 否 | -| 131 | mnodeEqualVnodeNum | 是 | 否 | -| 132 | http | 是 | 否 | -| 133 | httpEnableRecordSql | 是 | 否 | -| 134 | httpMaxThreads | 是 | 否 | -| 135 | restfulRowLimit | 是 | 否 | -| 136 | httpDbNameMandatory | 是 | 否 | -| 137 | httpKeepAlive | 是 | 否 | -| 138 | enableRecordSql | 是 | 否 | -| 139 | maxBinaryDisplayWidth | 是 | 否 | -| 140 | stream | 是 | 否 | -| 141 | retrieveBlockingModel | 是 | 否 | -| 142 | tsdbMetaCompactRatio | 是 | 否 | -| 143 | defaultJSONStrType | 是 | 否 | -| 144 | walFlushSize | 是 | 否 | -| 145 | keepTimeOffset | 是 | 否 | -| 146 | flowctrl | 是 | 否 | -| 147 | slaveQuery | 是 | 否 | -| 148 | adjustMaster | 是 | 否 | -| 149 | topicBinaryLen | 是 | 否 | -| 150 | telegrafUseFieldNum | 是 | 否 | -| 151 | deadLockKillQuery | 是 | 否 | -| 152 | clientMerge | 是 | 否 | -| 153 | sdbDebugFlag | 是 | 否 | -| 154 | odbcDebugFlag | 是 | 否 | -| 155 | httpDebugFlag | 是 | 否 | -| 156 | monDebugFlag | 是 | 否 | -| 157 | cqDebugFlag | 是 | 否 | -| 158 | shortcutFlag | 是 | 否 | -| 159 | probeSeconds | 是 | 否 | -| 160 | probeKillSeconds | 是 | 否 | -| 161 | probeInterval | 是 | 否 | -| 162 | lossyColumns | 是 | 否 | -| 163 | fPrecision | 是 | 否 | -| 164 | dPrecision | 是 | 否 | -| 165 | maxRange | 是 | 否 | -| 166 | range | 是 | 否 | +| 48 | numOfVnodeRsmaThreads | 否 | 是 | +| 49 | numOfQnodeQueryThreads | 否 | 是 | +| 50 | numOfQnodeFetchThreads | 否 | 是 | +| 51 | numOfSnodeSharedThreads | 否 | 是 | +| 52 | numOfSnodeUniqueThreads | 否 | 是 | +| 53 | rpcQueueMemoryAllowed | 否 | 是 | +| 54 | logDir | 是 | 是 | +| 55 | minimalLogDirGB | 是 | 是 | +| 56 | numOfLogLines | 是 | 是 | +| 57 | asyncLog | 是 | 是 | +| 58 | logKeepDays | 是 | 是 | +| 59 | debugFlag | 是 | 是 | +| 60 | tmrDebugFlag | 是 | 是 | +| 61 | uDebugFlag | 是 | 是 | +| 62 | rpcDebugFlag | 是 | 是 | +| 63 | jniDebugFlag | 是 | 是 | +| 64 | qDebugFlag | 是 | 是 | +| 65 | cDebugFlag | 是 | 是 | +| 66 | dDebugFlag | 是 | 是 | +| 67 | vDebugFlag | 是 | 是 | +| 68 | mDebugFlag | 是 | 是 | +| 69 | wDebugFlag | 是 | 是 | +| 70 | sDebugFlag | 是 | 是 | +| 71 | tsdbDebugFlag | 是 | 是 | +| 72 | tqDebugFlag | 否 | 是 | +| 73 | fsDebugFlag | 是 | 是 | +| 74 | udfDebugFlag | 否 | 是 | +| 75 | smaDebugFlag | 否 | 是 | +| 76 | idxDebugFlag | 否 | 是 | +| 77 | tdbDebugFlag | 否 | 是 | +| 78 | metaDebugFlag | 否 | 是 | +| 79 | timezone | 是 | 是 | +| 80 | locale | 是 | 是 | +| 81 | charset | 是 | 是 | +| 82 | udf | 是 | 是 | +| 83 | enableCoreFile | 是 | 是 | +| 84 | arbitrator | 是 | 否 | +| 85 | numOfThreadsPerCore | 是 | 否 | +| 86 | numOfMnodes | 是 | 否 | +| 87 | vnodeBak | 是 | 否 | +| 88 | balance | 是 | 否 | +| 89 | balanceInterval | 是 | 否 | +| 90 | offlineThreshold | 是 | 否 | +| 91 | role | 是 | 否 | +| 92 | dnodeNopLoop | 是 | 否 | +| 93 | keepTimeOffset | 是 | 否 | +| 94 | rpcTimer | 是 | 否 | +| 95 | rpcMaxTime | 是 | 否 | +| 96 | rpcForceTcp | 是 | 否 | +| 97 | tcpConnTimeout | 是 | 否 | +| 98 | syncCheckInterval | 是 | 否 | +| 99 | maxTmrCtrl | 是 | 否 | +| 100 | monitorReplica | 是 | 否 | +| 101 | smlTagNullName | 是 | 否 | +| 102 | keepColumnName | 是 | 否 | +| 103 | ratioOfQueryCores | 是 | 否 | +| 104 | maxStreamCompDelay | 是 | 否 | +| 105 | maxFirstStreamCompDelay | 是 | 否 | +| 106 | retryStreamCompDelay | 是 | 否 | +| 107 | streamCompDelayRatio | 是 | 否 | +| 108 | maxVgroupsPerDb | 是 | 否 | +| 109 | maxTablesPerVnode | 是 | 否 | +| 110 | minTablesPerVnode | 是 | 否 | +| 111 | tableIncStepPerVnode | 是 | 否 | +| 112 | cache | 是 | 否 | +| 113 | blocks | 是 | 否 | +| 114 | days | 是 | 否 | +| 115 | keep | 是 | 否 | +| 116 | minRows | 是 | 否 | +| 117 | maxRows | 是 | 否 | +| 118 | quorum | 是 | 否 | +| 119 | comp | 是 | 否 | +| 120 | walLevel | 是 | 否 | +| 121 | fsync | 是 | 否 | +| 122 | replica | 是 | 否 | +| 123 | partitions | 是 | 否 | +| 124 | quorum | 是 | 否 | +| 125 | update | 是 | 否 | +| 126 | cachelast | 是 | 否 | +| 127 | maxSQLLength | 是 | 否 | +| 128 | maxWildCardsLength | 是 | 否 | +| 129 | maxRegexStringLen | 是 | 否 | +| 130 | maxNumOfOrderedRes | 是 | 否 | +| 131 | maxConnections | 是 | 否 | +| 132 | mnodeEqualVnodeNum | 是 | 否 | +| 133 | http | 是 | 否 | +| 134 | httpEnableRecordSql | 是 | 否 | +| 135 | httpMaxThreads | 是 | 否 | +| 136 | restfulRowLimit | 是 | 否 | +| 137 | httpDbNameMandatory | 是 | 否 | +| 138 | httpKeepAlive | 是 | 否 | +| 139 | enableRecordSql | 是 | 否 | +| 140 | maxBinaryDisplayWidth | 是 | 否 | +| 141 | stream | 是 | 否 | +| 142 | retrieveBlockingModel | 是 | 否 | +| 143 | tsdbMetaCompactRatio | 是 | 否 | +| 144 | defaultJSONStrType | 是 | 否 | +| 145 | walFlushSize | 是 | 否 | +| 146 | keepTimeOffset | 是 | 否 | +| 147 | flowctrl | 是 | 否 | +| 148 | slaveQuery | 是 | 否 | +| 149 | adjustMaster | 是 | 否 | +| 150 | topicBinaryLen | 是 | 否 | +| 151 | telegrafUseFieldNum | 是 | 否 | +| 152 | deadLockKillQuery | 是 | 否 | +| 153 | clientMerge | 是 | 否 | +| 154 | sdbDebugFlag | 是 | 否 | +| 155 | odbcDebugFlag | 是 | 否 | +| 156 | httpDebugFlag | 是 | 否 | +| 157 | monDebugFlag | 是 | 否 | +| 158 | cqDebugFlag | 是 | 否 | +| 159 | shortcutFlag | 是 | 否 | +| 160 | probeSeconds | 是 | 否 | +| 161 | probeKillSeconds | 是 | 否 | +| 162 | probeInterval | 是 | 否 | +| 163 | lossyColumns | 是 | 否 | +| 164 | fPrecision | 是 | 否 | +| 165 | dPrecision | 是 | 否 | +| 166 | maxRange | 是 | 否 | +| 167 | range | 是 | 否 | diff --git a/include/common/tglobal.h b/include/common/tglobal.h index f872d1dbc2..03e15ed8e7 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -66,6 +66,7 @@ extern int32_t tsNumOfVnodeStreamThreads; extern int32_t tsNumOfVnodeFetchThreads; extern int32_t tsNumOfVnodeWriteThreads; extern int32_t tsNumOfVnodeSyncThreads; +extern int32_t tsNumOfVnodeRsmaThreads; extern int32_t tsNumOfQnodeQueryThreads; extern int32_t tsNumOfQnodeFetchThreads; extern int32_t tsNumOfSnodeSharedThreads; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3bbbb8b463..bb2729c776 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -61,6 +61,7 @@ int32_t tsNumOfVnodeStreamThreads = 2; int32_t tsNumOfVnodeFetchThreads = 4; int32_t tsNumOfVnodeWriteThreads = 2; int32_t tsNumOfVnodeSyncThreads = 2; +int32_t tsNumOfVnodeRsmaThreads = 2; int32_t tsNumOfQnodeQueryThreads = 4; int32_t tsNumOfQnodeFetchThreads = 4; int32_t tsNumOfSnodeSharedThreads = 2; @@ -378,6 +379,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 16); if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1; + tsNumOfVnodeRsmaThreads = tsNumOfCores; + tsNumOfVnodeRsmaThreads = TMAX(tsNumOfVnodeRsmaThreads, 4); + if (cfgAddInt32(pCfg, "numOfVnodeRsmaThreads", tsNumOfVnodeRsmaThreads, 1, 1024, 0) != 0) return -1; + tsNumOfQnodeQueryThreads = tsNumOfCores * 2; tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4); if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1; @@ -539,6 +544,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32; tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32; tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32; + tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; @@ -783,6 +789,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32; } else if (strcasecmp("numOfVnodeSyncThreads", name) == 0) { tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32; + } else if (strcasecmp("numOfVnodeRsmaThreads", name) == 0) { + tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; } else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) { tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; } else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) { diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index ca77042bb2..abfffc045f 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -33,7 +33,6 @@ extern "C" { // clang-format on #define RSMA_TASK_INFO_HASH_SLOT (8) -#define RSMA_EXECUTOR_MAX (1) typedef struct SSmaEnv SSmaEnv; typedef struct SSmaStat SSmaStat; @@ -49,9 +48,12 @@ typedef struct SQTaskFWriter SQTaskFWriter; struct SSmaEnv { SRWLatch lock; int8_t type; + int8_t flag; // 0x01 inClose SSmaStat *pStat; }; +#define SMA_ENV_FLG_CLOSE ((int8_t)0x1) + typedef struct { int8_t inited; int32_t rsetId; @@ -93,7 +95,6 @@ struct SRSmaStat { int64_t refId; // shared by fetch tasks volatile int64_t nBufItems; // number of items in queue buffer SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo) - volatile int8_t nExecutor; // [1, max(half of query threads, 4)] int8_t triggerStat; // shared by fetch tasks int8_t commitStat; // 0 not in committing, 1 in committing SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w) @@ -107,6 +108,7 @@ struct SSmaStat { SRSmaStat rsmaStat; // rollup sma }; T_REF_DECLARE() + char data[]; }; #define SMA_STAT_TSMA(s) (&(s)->tsmaStat) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 2de3b2b88b..59dba4fb06 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -190,7 +190,6 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem int32_t smaInit(); void smaCleanUp(); int32_t smaOpen(SVnode* pVnode); -int32_t smaPreClose(SSma* pSma); int32_t smaClose(SSma* pSma); int32_t smaBegin(SSma* pSma); int32_t smaSyncPreCommit(SSma* pSma); @@ -200,7 +199,6 @@ int32_t smaAsyncPreCommit(SSma* pSma); int32_t smaAsyncCommit(SSma* pSma); int32_t smaAsyncPostCommit(SSma* pSma); int32_t smaDoRetention(SSma* pSma, int64_t now); -int32_t smaProcessExec(SSma* pSma, void* pMsg); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); @@ -324,7 +322,6 @@ struct SVnode { TdThreadMutex lock; bool blocked; bool restored; - bool inClose; tsem_t syncSem; SQHandle* pQuery; }; diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index e3b83f9955..32a419022a 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -23,11 +23,13 @@ extern SSmaMgmt smaMgmt; // declaration of static functions -static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma); -static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path); -static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv); -static void *tdFreeTSmaStat(STSmaStat *pStat); -static void tdDestroyRSmaStat(void *pRSmaStat); +static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv); +static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv); +static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma); +static int32_t tdRsmaStartExecutor(const SSma *pSma); +static int32_t tdRsmaStopExecutor(const SSma *pSma); +static void *tdFreeTSmaStat(STSmaStat *pStat); +static void tdDestroyRSmaStat(void *pRSmaStat); /** * @brief rsma init @@ -97,35 +99,42 @@ void smaCleanUp() { } } -static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path) { +static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) { SSmaEnv *pEnv = NULL; pEnv = (SSmaEnv *)taosMemoryCalloc(1, sizeof(SSmaEnv)); + *ppEnv = pEnv; if (!pEnv) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return TSDB_CODE_FAILED; } SMA_ENV_TYPE(pEnv) = smaType; taosInitRWLatch(&(pEnv->lock)); + (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), *ppEnv) + : atomic_store_ptr(&SMA_RSMA_ENV(pSma), *ppEnv); + if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma) != TSDB_CODE_SUCCESS) { tdFreeSmaEnv(pEnv); - return NULL; + *ppEnv = NULL; + (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), NULL) + : atomic_store_ptr(&SMA_RSMA_ENV(pSma), NULL); + return TSDB_CODE_FAILED; } - return pEnv; + return TSDB_CODE_SUCCESS; } -static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv) { - if (!pEnv) { +static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) { + if (!ppEnv) { terrno = TSDB_CODE_INVALID_PTR; return TSDB_CODE_FAILED; } - if (!(*pEnv)) { - if (!(*pEnv = tdNewSmaEnv(pSma, smaType, path))) { + if (!(*ppEnv)) { + if (tdNewSmaEnv(pSma, smaType, ppEnv) != TSDB_CODE_SUCCESS) { return TSDB_CODE_FAILED; } } @@ -199,7 +208,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS * tdInitSmaStat invoked in other multithread environment later. */ if (!(*pSmaStat)) { - *pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat)); + *pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat) + sizeof(TdThread) * tsNumOfVnodeRsmaThreads); if (!(*pSmaStat)) { terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; @@ -231,6 +240,10 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS if (!RSMA_INFO_HASH(pRSmaStat)) { return TSDB_CODE_FAILED; } + + if (tdRsmaStartExecutor(pSma) < 0) { + return TSDB_CODE_FAILED; + } } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { // TODO } else { @@ -291,6 +304,9 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { } } + // step 4: + tdRsmaStopExecutor(pSma); + // step 5: free pStat taosMemoryFreeClear(pStat); } @@ -381,17 +397,70 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) { pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&SMA_TSMA_ENV(pSma)) : atomic_load_ptr(&SMA_RSMA_ENV(pSma)); if (!pEnv) { - char rname[TSDB_FILENAME_LEN] = {0}; - - if (tdInitSmaEnv(pSma, smaType, rname, &pEnv) < 0) { + if (tdInitSmaEnv(pSma, smaType, &pEnv) < 0) { tdUnLockSma(pSma); return TSDB_CODE_FAILED; } - - (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), pEnv) - : atomic_store_ptr(&SMA_RSMA_ENV(pSma), pEnv); } tdUnLockSma(pSma); return TSDB_CODE_SUCCESS; }; + +void *tdRSmaExecutorFunc(void *param) { + setThreadName("vnode-rsma"); + + tdRSmaProcessExecImpl((SSma *)param, RSMA_EXEC_OVERFLOW); + return NULL; +} + +static int32_t tdRsmaStartExecutor(const SSma *pSma) { + TdThreadAttr thAttr = {0}; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SSmaStat *pStat = SMA_ENV_STAT(pEnv); + TdThread *pthread = (TdThread *)&pStat->data; + + for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) { + if (taosThreadCreate(&pthread[i], &thAttr, tdRSmaExecutorFunc, (void *)pSma) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + smaError("vgId:%d, failed to create pthread for rsma since %s", SMA_VID(pSma), terrstr()); + return -1; + } + smaDebug("vgId:%d, success to create pthread for rsma", SMA_VID(pSma)); + } + + taosThreadAttrDestroy(&thAttr); + return 0; +} + +static int32_t tdRsmaStopExecutor(const SSma *pSma) { + if (pSma && VND_IS_RSMA(pSma->pVnode)) { + SSmaEnv *pEnv = NULL; + SSmaStat *pStat = NULL; + SRSmaStat *pRSmaStat = NULL; + TdThread *pthread = NULL; + + if (!(pEnv = SMA_RSMA_ENV(pSma)) || !(pStat = SMA_ENV_STAT(pEnv))) { + return 0; + } + + pEnv->flag |= SMA_ENV_FLG_CLOSE; + pRSmaStat = (SRSmaStat *)pStat; + pthread = (TdThread *)&pStat->data; + + for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) { + tsem_post(&(pRSmaStat->notEmpty)); + } + + for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) { + if (taosCheckPthreadValid(pthread[i])) { + smaDebug("vgId:%d, start to join pthread for rsma:%" PRId64, SMA_VID(pSma), pthread[i]); + taosThreadJoin(pthread[i], NULL); + } + } + } + return 0; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index e2710b26e3..235fb1f941 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -146,20 +146,6 @@ int32_t smaClose(SSma *pSma) { return 0; } -int32_t smaPreClose(SSma *pSma) { - if (pSma && VND_IS_RSMA(pSma->pVnode)) { - SSmaEnv *pEnv = NULL; - SRSmaStat *pStat = NULL; - if (!(pEnv = SMA_RSMA_ENV(pSma)) || !(pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv))) { - return 0; - } - for (int32_t i = 0; i < RSMA_EXECUTOR_MAX; ++i) { - tsem_post(&(pStat->notEmpty)); - } - } - return 0; -} - /** * @brief rsma env restore * diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 448b8ab508..426ab521fd 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -621,7 +621,7 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { */ int32_t smaDoRetention(SSma *pSma, int64_t now) { int32_t code = TSDB_CODE_SUCCESS; - if (VND_IS_RSMA(pSma->pVnode)) { + if (!VND_IS_RSMA(pSma->pVnode)) { return code; } @@ -734,10 +734,12 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); - tsem_post(&(pRSmaStat->notEmpty)); - int64_t nItems = atomic_fetch_add_64(&pRSmaStat->nBufItems, 1); + if (atomic_load_8(&pInfo->assigned) == 0) { + tsem_post(&(pRSmaStat->notEmpty)); + } + // smoothing consume int32_t n = nItems / RSMA_QTASKEXEC_SMOOTH_SIZE; if (n > 1) { @@ -911,39 +913,6 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputTyp return TSDB_CODE_SUCCESS; } -static int32_t tdRSmaExecCheck(SSma *pSma) { - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); - - if (atomic_load_8(&pRSmaStat->nExecutor) >= TMIN(RSMA_EXECUTOR_MAX, tsNumOfVnodeQueryThreads / 2)) { - return TSDB_CODE_SUCCESS; - } - - SRSmaExecMsg fetchMsg; - int32_t contLen = sizeof(SMsgHead); - void *pBuf = rpcMallocCont(0 + contLen); - - ((SMsgHead *)pBuf)->vgId = SMA_VID(pSma); - ((SMsgHead *)pBuf)->contLen = sizeof(SMsgHead); - - SRpcMsg rpcMsg = { - .code = 0, - .msgType = TDMT_VND_EXEC_RSMA, - .pCont = pBuf, - .contLen = contLen, - }; - - if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) { - smaError("vgId:%d, failed to put rsma exec msg into query-queue since %s", SMA_VID(pSma), terrstr()); - goto _err; - } - - smaDebug("vgId:%d, success to put rsma fetch msg into query-queue", SMA_VID(pSma)); - - return TSDB_CODE_SUCCESS; -_err: - return TSDB_CODE_FAILED; -} - int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); if (!pEnv) { @@ -974,10 +943,6 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { goto _err; } } - - if (tdRSmaExecCheck(pSma) < 0) { - goto _err; - } } } tdUidStoreDestory(&uidStore); @@ -1563,7 +1528,9 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { ASSERT(qItem->level == pItem->level); ASSERT(qItem->fetchLevel == pItem->fetchLevel); #endif - tsem_post(&(pStat->notEmpty)); + if (atomic_load_8(&pRSmaInfo->assigned) == 0) { + tsem_post(&(pStat->notEmpty)); + } smaInfo("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64, SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; @@ -1591,9 +1558,11 @@ _end: } static void tdFreeRSmaSubmitItems(SArray *pItems) { + ASSERT(taosArrayGetSize(pItems) > 0); for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) { taosFreeQitem(*(void **)taosArrayGet(pItems, i)); } + taosArrayClear(pItems); } /** @@ -1703,6 +1672,7 @@ _err: * @param type * @return int32_t */ + int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { SVnode *pVnode = pSma->pVnode; SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); @@ -1722,41 +1692,53 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { goto _err; } - bool isBusy = false; while (true) { - isBusy = false; // step 1: rsma exec - consume data in buffer queue for all suids if (type == RSMA_EXEC_OVERFLOW || type == RSMA_EXEC_COMMIT) { - void *pIter = taosHashIterate(infoHash, NULL); // infoHash has r/w lock - while (pIter) { + void *pIter = NULL; + while ((pIter = taosHashIterate(infoHash, pIter))) { SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; - int64_t itemSize = 0; - if ((itemSize = taosQueueItemSize(pInfo->queue)) || RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || - RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { - smaDebug("vgId:%d, queueItemSize is %" PRIi64 " execType:%" PRIi8, SMA_VID(pSma), itemSize, type); - if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) { - taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock - int32_t qallItemSize = taosQallItemSize(pInfo->qall); - if (qallItemSize > 0) { - tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type); + if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) { + if ((taosQueueItemSize(pInfo->queue) > 0) || RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || + RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { + int32_t batchCnt = -1; + int32_t batchMax = taosHashGetSize(infoHash) / tsNumOfVnodeRsmaThreads; + bool occupied = (batchMax <= 1); + if (batchMax > 1) { + batchMax = 100 / batchMax; } + while (occupied || (++batchCnt < batchMax)) { // greedy mode + taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock + int32_t qallItemSize = taosQallItemSize(pInfo->qall); + if (qallItemSize > 0) { + tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type); + smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi8, SMA_VID(pSma), qallItemSize, type); + } - if (type == RSMA_EXEC_OVERFLOW) { - tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr); - } + if (type == RSMA_EXEC_OVERFLOW) { + tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr); + } - if (qallItemSize > 0) { - // subtract the item size after the task finished, commit should wait for all items be consumed - atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); - isBusy = true; + if (qallItemSize > 0) { + atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); + continue; + } else if (RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { + continue; + } + + break; } - ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0)); } + ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0)); } - pIter = taosHashIterate(infoHash, pIter); } if (type == RSMA_EXEC_COMMIT) { - break; + if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) { + break; + } else { + // commit should wait for all items be consumed + continue; + } } } #if 0 @@ -1790,16 +1772,19 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { } if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) { - if (pVnode->inClose) { + if (pEnv->flag & SMA_ENV_FLG_CLOSE) { break; } + tsem_wait(&pRSmaStat->notEmpty); - if (pVnode->inClose && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) { - smaInfo("vgId:%d, exec task end, inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), pVnode->inClose, + + if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) { + smaInfo("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag, atomic_load_64(&pRSmaStat->nBufItems)); break; } } + } // end of while(true) _end: @@ -1809,39 +1794,3 @@ _err: taosArrayDestroy(pSubmitArr); return TSDB_CODE_FAILED; } - -/** - * @brief exec rsma level 1data, fetch result of level 2/3 and submit - * - * @param pSma - * @param pMsg - * @return int32_t - */ -int32_t smaProcessExec(SSma *pSma, void *pMsg) { - SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg; - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); - - if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) { - terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP; - goto _err; - } - smaDebug("vgId:%d, begin to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - - int8_t nOld = atomic_fetch_add_8(&pRSmaStat->nExecutor, 1); - - if (nOld < TMIN(RSMA_EXECUTOR_MAX, tsNumOfVnodeQueryThreads / 2)) { - if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_OVERFLOW) < 0) { - goto _err; - } - } else { - atomic_fetch_sub_8(&pRSmaStat->nExecutor, 1); - } - - smaDebug("vgId:%d, success to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - return TSDB_CODE_SUCCESS; -_err: - atomic_fetch_sub_8(&pRSmaStat->nExecutor, 1); - smaError("vgId:%d, failed to process rsma exec msg by TID:%p since %s", SMA_VID(pSma), (void *)taosGetSelfPthreadId(), - terrstr()); - return TSDB_CODE_FAILED; -} diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index dcfbd33b90..a4fd984fb7 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -87,7 +87,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { pVnode->msgCb = msgCb; taosThreadMutexInit(&pVnode->lock, NULL); pVnode->blocked = false; - pVnode->inClose = false; tsem_init(&pVnode->syncSem, 0, 0); tsem_init(&(pVnode->canCommit), 0, 1); @@ -182,8 +181,6 @@ _err: void vnodePreClose(SVnode *pVnode) { if (pVnode) { syncLeaderTransfer(pVnode->sync); - pVnode->inClose = true; - smaPreClose(pVnode->pSma); } } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 7a8d168f4f..495220b5de 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -301,8 +301,6 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0); case TDMT_SCH_QUERY_CONTINUE: return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0); - case TDMT_VND_EXEC_RSMA: - return smaProcessExec(pVnode->pSma, pMsg); default: vError("unknown msg type:%d in query queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; @@ -380,14 +378,14 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t code = 0; SVTrimDbReq trimReq = {0}; - vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp); - // decode if (tDeserializeSVTrimDbReq(pReq, len, &trimReq) != 0) { code = TSDB_CODE_INVALID_MSG; goto _exit; } + vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp); + // process code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp); if (code) goto _exit; diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c index 21aeaba70b..75844ce76f 100644 --- a/source/libs/index/src/indexFilter.c +++ b/source/libs/index/src/indexFilter.c @@ -255,6 +255,13 @@ static int32_t sifInitOperParams(SIFParam **params, SOperatorNode *node, SIFCtx if (node->opType == OP_TYPE_JSON_GET_VALUE) { return code; } + if ((node->pLeft != NULL && nodeType(node->pLeft) == QUERY_NODE_COLUMN) && + (node->pRight != NULL && nodeType(node->pRight) == QUERY_NODE_VALUE)) { + SColumnNode *cn = (SColumnNode *)(node->pLeft); + if (cn->node.resType.type == TSDB_DATA_TYPE_JSON) { + SIF_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + } SIFParam *paramList = taosMemoryCalloc(nParam, sizeof(SIFParam)); if (NULL == paramList) { diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 7a44edb12c..5430acb972 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -1401,7 +1401,7 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD pDecoder->pgno = 0; TDB_CELLDECODER_SET_FREE_NIL(pDecoder); - tdbDebug("tdb btc decoder set nil: %p/0x%x ", pDecoder, pDecoder->freeKV); + // tdbTrace("tdb btc decoder set nil: %p/0x%x ", pDecoder, pDecoder->freeKV); // 1. Decode header part if (!leaf) { diff --git a/source/libs/tdb/src/db/tdbPCache.c b/source/libs/tdb/src/db/tdbPCache.c index ab9b21dc3f..527ad200d4 100644 --- a/source/libs/tdb/src/db/tdbPCache.c +++ b/source/libs/tdb/src/db/tdbPCache.c @@ -145,7 +145,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) // 1. Search the hash table pPage = pCache->pgHash[tdbPCachePageHash(pPgid) % pCache->nHash]; while (pPage) { - if (memcmp(pPage->pgid.fileid, pPgid->fileid, TDB_FILE_ID_LEN) == 0 && pPage->pgid.pgno == pPgid->pgno) break; + if (pPage->pgid.pgno == pPgid->pgno && memcmp(pPage->pgid.fileid, pPgid->fileid, TDB_FILE_ID_LEN) == 0) break; pPage = pPage->pHashNext; } @@ -243,7 +243,7 @@ static void tdbPCachePinPage(SPCache *pCache, SPage *pPage) { pCache->nRecyclable--; // printf("pin page %d pgno %d pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage); - tdbTrace("pin page %d", pPage->id); + tdbDebug("pin page %d", pPage->id); } } @@ -264,30 +264,33 @@ static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) { pCache->nRecyclable++; // printf("unpin page %d pgno %d pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage); - tdbTrace("unpin page %d", pPage->id); + tdbDebug("unpin page %d", pPage->id); } static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) { - SPage **ppPage; - uint32_t h; - - h = tdbPCachePageHash(&(pPage->pgid)); - for (ppPage = &(pCache->pgHash[h % pCache->nHash]); (*ppPage) && *ppPage != pPage; ppPage = &((*ppPage)->pHashNext)) - ; + uint32_t h = tdbPCachePageHash(&(pPage->pgid)) % pCache->nHash; + SPage **ppPage = &(pCache->pgHash[h]); + if (*ppPage == pPage) { + pCache->pgHash[h] = pPage->pHashNext; + } else { + for (; (*ppPage) && (*ppPage)->pHashNext != pPage; ppPage = &((*ppPage)->pHashNext)) + ; + if (*ppPage) { + (*ppPage)->pHashNext = pPage->pHashNext; + } + } if (*ppPage) { - *ppPage = pPage->pHashNext; - pCache->nPage--; + pPage->pHashNext = NULL; + --pCache->nPage; // printf("rmv page %d to hash, pgno %d, pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage); } - tdbTrace("remove page %d to hash", pPage->id); + tdbDebug("remove page %p/%d from hash", pPage, pPage->id); } static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage) { - int h; - - h = tdbPCachePageHash(&(pPage->pgid)) % pCache->nHash; + uint32_t h = tdbPCachePageHash(&(pPage->pgid)) % pCache->nHash; pPage->pHashNext = pCache->pgHash[h]; pCache->pgHash[h] = pPage; @@ -295,7 +298,7 @@ static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage) { pCache->nPage++; // printf("add page %d to hash, pgno %d, pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage); - tdbTrace("add page %d to hash", pPage->id); + tdbDebug("add page %p/%d to hash", pPage, pPage->id); } static int tdbPCacheOpenImpl(SPCache *pCache) { diff --git a/source/libs/tdb/src/db/tdbPage.c b/source/libs/tdb/src/db/tdbPage.c index 276b06b147..9e0cd76573 100644 --- a/source/libs/tdb/src/db/tdbPage.c +++ b/source/libs/tdb/src/db/tdbPage.c @@ -74,6 +74,7 @@ int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t) int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg) { u8 *ptr; + tdbDebug("page/destroy: %p %p", pPage, xFree); ASSERT(xFree); for (int iOvfl = 0; iOvfl < pPage->nOverflow; iOvfl++) { @@ -87,6 +88,7 @@ int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg) } void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int, TXN *, SBTree *pBt)) { + tdbDebug("page/zero: %p %" PRIu8 " %p", pPage, szAmHdr, xCellSize); pPage->pPageHdr = pPage->pData + szAmHdr; TDB_PAGE_NCELLS_SET(pPage, 0); TDB_PAGE_CCELLS_SET(pPage, pPage->pageSize - sizeof(SPageFtr)); @@ -103,6 +105,7 @@ void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell } void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int, TXN *, SBTree *pBt)) { + tdbDebug("page/init: %p %" PRIu8 " %p", pPage, szAmHdr, xCellSize); pPage->pPageHdr = pPage->pData + szAmHdr; pPage->pCellIdx = pPage->pPageHdr + TDB_PAGE_HDR_SIZE(pPage); pPage->pFreeStart = pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * TDB_PAGE_NCELLS(pPage); diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 935f536a90..98e0b8f9c9 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -23,6 +23,7 @@ #define HTTP_RECV_BUF_SIZE 1024 + typedef struct SHttpClient { uv_connect_t conn; uv_tcp_t tcp; @@ -143,9 +144,9 @@ static void clientAllocBuffCb(uv_handle_t *handle, size_t suggested_size, uv_buf static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t *buf) { SHttpClient* cli = handle->data; if (nread < 0) { - uError("http-report read error:%s", uv_err_name(nread)); + uError("http-report recv error:%s", uv_err_name(nread)); } else { - uTrace("http-report succ to read %d bytes, just ignore it", nread); + uTrace("http-report succ to recv %d bytes, just ignore it", nread); } uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 6dd9481b95..207b967923 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -276,14 +276,16 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { while (transReadComplete(pBuf)) { tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); if (true == pBuf->invalid || false == uvHandleReq(conn)) { - tError("%s conn %p read invalid packet, dst: %s, srv: %s", transLabel(pTransInst), conn, conn->dst, conn->src); + tError("%s conn %p read invalid packet, received from %s, local info:%s", transLabel(pTransInst), conn, + conn->dst, conn->src); destroyConn(conn, true); return; } } return; } else { - tError("%s conn %p read invalid packet, exceed limit", transLabel(pTransInst), conn); + tError("%s conn %p read invalid packet, exceed limit, received from %s, local info:", transLabel(pTransInst), + conn, conn->dst, conn->src); destroyConn(conn, true); return; } @@ -649,7 +651,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads; - tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); + tTrace("new connection accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb); } else {