Merge pull request #16385 from taosdata/feature/3.0_interval_hash_optimize
enh: assign specific threads for rsma
This commit is contained in:
commit
03e8863f4d
|
@ -698,122 +698,123 @@ charset 的有效值是 UTF-8。
|
||||||
| 45 | numOfVnodeFetchThreads | 否 | 是 |
|
| 45 | numOfVnodeFetchThreads | 否 | 是 |
|
||||||
| 46 | numOfVnodeWriteThreads | 否 | 是 |
|
| 46 | numOfVnodeWriteThreads | 否 | 是 |
|
||||||
| 47 | numOfVnodeSyncThreads | 否 | 是 |
|
| 47 | numOfVnodeSyncThreads | 否 | 是 |
|
||||||
| 48 | numOfQnodeQueryThreads | 否 | 是 |
|
| 48 | numOfVnodeRsmaThreads | 否 | 是 |
|
||||||
| 49 | numOfQnodeFetchThreads | 否 | 是 |
|
| 49 | numOfQnodeQueryThreads | 否 | 是 |
|
||||||
| 50 | numOfSnodeSharedThreads | 否 | 是 |
|
| 50 | numOfQnodeFetchThreads | 否 | 是 |
|
||||||
| 51 | numOfSnodeUniqueThreads | 否 | 是 |
|
| 51 | numOfSnodeSharedThreads | 否 | 是 |
|
||||||
| 52 | rpcQueueMemoryAllowed | 否 | 是 |
|
| 52 | numOfSnodeUniqueThreads | 否 | 是 |
|
||||||
| 53 | logDir | 是 | 是 |
|
| 53 | rpcQueueMemoryAllowed | 否 | 是 |
|
||||||
| 54 | minimalLogDirGB | 是 | 是 |
|
| 54 | logDir | 是 | 是 |
|
||||||
| 55 | numOfLogLines | 是 | 是 |
|
| 55 | minimalLogDirGB | 是 | 是 |
|
||||||
| 56 | asyncLog | 是 | 是 |
|
| 56 | numOfLogLines | 是 | 是 |
|
||||||
| 57 | logKeepDays | 是 | 是 |
|
| 57 | asyncLog | 是 | 是 |
|
||||||
| 58 | debugFlag | 是 | 是 |
|
| 58 | logKeepDays | 是 | 是 |
|
||||||
| 59 | tmrDebugFlag | 是 | 是 |
|
| 59 | debugFlag | 是 | 是 |
|
||||||
| 60 | uDebugFlag | 是 | 是 |
|
| 60 | tmrDebugFlag | 是 | 是 |
|
||||||
| 61 | rpcDebugFlag | 是 | 是 |
|
| 61 | uDebugFlag | 是 | 是 |
|
||||||
| 62 | jniDebugFlag | 是 | 是 |
|
| 62 | rpcDebugFlag | 是 | 是 |
|
||||||
| 63 | qDebugFlag | 是 | 是 |
|
| 63 | jniDebugFlag | 是 | 是 |
|
||||||
| 64 | cDebugFlag | 是 | 是 |
|
| 64 | qDebugFlag | 是 | 是 |
|
||||||
| 65 | dDebugFlag | 是 | 是 |
|
| 65 | cDebugFlag | 是 | 是 |
|
||||||
| 66 | vDebugFlag | 是 | 是 |
|
| 66 | dDebugFlag | 是 | 是 |
|
||||||
| 67 | mDebugFlag | 是 | 是 |
|
| 67 | vDebugFlag | 是 | 是 |
|
||||||
| 68 | wDebugFlag | 是 | 是 |
|
| 68 | mDebugFlag | 是 | 是 |
|
||||||
| 69 | sDebugFlag | 是 | 是 |
|
| 69 | wDebugFlag | 是 | 是 |
|
||||||
| 70 | tsdbDebugFlag | 是 | 是 |
|
| 70 | sDebugFlag | 是 | 是 |
|
||||||
| 71 | tqDebugFlag | 否 | 是 |
|
| 71 | tsdbDebugFlag | 是 | 是 |
|
||||||
| 72 | fsDebugFlag | 是 | 是 |
|
| 72 | tqDebugFlag | 否 | 是 |
|
||||||
| 73 | udfDebugFlag | 否 | 是 |
|
| 73 | fsDebugFlag | 是 | 是 |
|
||||||
| 74 | smaDebugFlag | 否 | 是 |
|
| 74 | udfDebugFlag | 否 | 是 |
|
||||||
| 75 | idxDebugFlag | 否 | 是 |
|
| 75 | smaDebugFlag | 否 | 是 |
|
||||||
| 76 | tdbDebugFlag | 否 | 是 |
|
| 76 | idxDebugFlag | 否 | 是 |
|
||||||
| 77 | metaDebugFlag | 否 | 是 |
|
| 77 | tdbDebugFlag | 否 | 是 |
|
||||||
| 78 | timezone | 是 | 是 |
|
| 78 | metaDebugFlag | 否 | 是 |
|
||||||
| 79 | locale | 是 | 是 |
|
| 79 | timezone | 是 | 是 |
|
||||||
| 80 | charset | 是 | 是 |
|
| 80 | locale | 是 | 是 |
|
||||||
| 81 | udf | 是 | 是 |
|
| 81 | charset | 是 | 是 |
|
||||||
| 82 | enableCoreFile | 是 | 是 |
|
| 82 | udf | 是 | 是 |
|
||||||
| 83 | arbitrator | 是 | 否 |
|
| 83 | enableCoreFile | 是 | 是 |
|
||||||
| 84 | numOfThreadsPerCore | 是 | 否 |
|
| 84 | arbitrator | 是 | 否 |
|
||||||
| 85 | numOfMnodes | 是 | 否 |
|
| 85 | numOfThreadsPerCore | 是 | 否 |
|
||||||
| 86 | vnodeBak | 是 | 否 |
|
| 86 | numOfMnodes | 是 | 否 |
|
||||||
| 87 | balance | 是 | 否 |
|
| 87 | vnodeBak | 是 | 否 |
|
||||||
| 88 | balanceInterval | 是 | 否 |
|
| 88 | balance | 是 | 否 |
|
||||||
| 89 | offlineThreshold | 是 | 否 |
|
| 89 | balanceInterval | 是 | 否 |
|
||||||
| 90 | role | 是 | 否 |
|
| 90 | offlineThreshold | 是 | 否 |
|
||||||
| 91 | dnodeNopLoop | 是 | 否 |
|
| 91 | role | 是 | 否 |
|
||||||
| 92 | keepTimeOffset | 是 | 否 |
|
| 92 | dnodeNopLoop | 是 | 否 |
|
||||||
| 93 | rpcTimer | 是 | 否 |
|
| 93 | keepTimeOffset | 是 | 否 |
|
||||||
| 94 | rpcMaxTime | 是 | 否 |
|
| 94 | rpcTimer | 是 | 否 |
|
||||||
| 95 | rpcForceTcp | 是 | 否 |
|
| 95 | rpcMaxTime | 是 | 否 |
|
||||||
| 96 | tcpConnTimeout | 是 | 否 |
|
| 96 | rpcForceTcp | 是 | 否 |
|
||||||
| 97 | syncCheckInterval | 是 | 否 |
|
| 97 | tcpConnTimeout | 是 | 否 |
|
||||||
| 98 | maxTmrCtrl | 是 | 否 |
|
| 98 | syncCheckInterval | 是 | 否 |
|
||||||
| 99 | monitorReplica | 是 | 否 |
|
| 99 | maxTmrCtrl | 是 | 否 |
|
||||||
| 100 | smlTagNullName | 是 | 否 |
|
| 100 | monitorReplica | 是 | 否 |
|
||||||
| 101 | keepColumnName | 是 | 否 |
|
| 101 | smlTagNullName | 是 | 否 |
|
||||||
| 102 | ratioOfQueryCores | 是 | 否 |
|
| 102 | keepColumnName | 是 | 否 |
|
||||||
| 103 | maxStreamCompDelay | 是 | 否 |
|
| 103 | ratioOfQueryCores | 是 | 否 |
|
||||||
| 104 | maxFirstStreamCompDelay | 是 | 否 |
|
| 104 | maxStreamCompDelay | 是 | 否 |
|
||||||
| 105 | retryStreamCompDelay | 是 | 否 |
|
| 105 | maxFirstStreamCompDelay | 是 | 否 |
|
||||||
| 106 | streamCompDelayRatio | 是 | 否 |
|
| 106 | retryStreamCompDelay | 是 | 否 |
|
||||||
| 107 | maxVgroupsPerDb | 是 | 否 |
|
| 107 | streamCompDelayRatio | 是 | 否 |
|
||||||
| 108 | maxTablesPerVnode | 是 | 否 |
|
| 108 | maxVgroupsPerDb | 是 | 否 |
|
||||||
| 109 | minTablesPerVnode | 是 | 否 |
|
| 109 | maxTablesPerVnode | 是 | 否 |
|
||||||
| 110 | tableIncStepPerVnode | 是 | 否 |
|
| 110 | minTablesPerVnode | 是 | 否 |
|
||||||
| 111 | cache | 是 | 否 |
|
| 111 | tableIncStepPerVnode | 是 | 否 |
|
||||||
| 112 | blocks | 是 | 否 |
|
| 112 | cache | 是 | 否 |
|
||||||
| 113 | days | 是 | 否 |
|
| 113 | blocks | 是 | 否 |
|
||||||
| 114 | keep | 是 | 否 |
|
| 114 | days | 是 | 否 |
|
||||||
| 115 | minRows | 是 | 否 |
|
| 115 | keep | 是 | 否 |
|
||||||
| 116 | maxRows | 是 | 否 |
|
| 116 | minRows | 是 | 否 |
|
||||||
| 117 | quorum | 是 | 否 |
|
| 117 | maxRows | 是 | 否 |
|
||||||
| 118 | comp | 是 | 否 |
|
| 118 | quorum | 是 | 否 |
|
||||||
| 119 | walLevel | 是 | 否 |
|
| 119 | comp | 是 | 否 |
|
||||||
| 120 | fsync | 是 | 否 |
|
| 120 | walLevel | 是 | 否 |
|
||||||
| 121 | replica | 是 | 否 |
|
| 121 | fsync | 是 | 否 |
|
||||||
| 122 | partitions | 是 | 否 |
|
| 122 | replica | 是 | 否 |
|
||||||
| 123 | quorum | 是 | 否 |
|
| 123 | partitions | 是 | 否 |
|
||||||
| 124 | update | 是 | 否 |
|
| 124 | quorum | 是 | 否 |
|
||||||
| 125 | cachelast | 是 | 否 |
|
| 125 | update | 是 | 否 |
|
||||||
| 126 | maxSQLLength | 是 | 否 |
|
| 126 | cachelast | 是 | 否 |
|
||||||
| 127 | maxWildCardsLength | 是 | 否 |
|
| 127 | maxSQLLength | 是 | 否 |
|
||||||
| 128 | maxRegexStringLen | 是 | 否 |
|
| 128 | maxWildCardsLength | 是 | 否 |
|
||||||
| 129 | maxNumOfOrderedRes | 是 | 否 |
|
| 129 | maxRegexStringLen | 是 | 否 |
|
||||||
| 130 | maxConnections | 是 | 否 |
|
| 130 | maxNumOfOrderedRes | 是 | 否 |
|
||||||
| 131 | mnodeEqualVnodeNum | 是 | 否 |
|
| 131 | maxConnections | 是 | 否 |
|
||||||
| 132 | http | 是 | 否 |
|
| 132 | mnodeEqualVnodeNum | 是 | 否 |
|
||||||
| 133 | httpEnableRecordSql | 是 | 否 |
|
| 133 | http | 是 | 否 |
|
||||||
| 134 | httpMaxThreads | 是 | 否 |
|
| 134 | httpEnableRecordSql | 是 | 否 |
|
||||||
| 135 | restfulRowLimit | 是 | 否 |
|
| 135 | httpMaxThreads | 是 | 否 |
|
||||||
| 136 | httpDbNameMandatory | 是 | 否 |
|
| 136 | restfulRowLimit | 是 | 否 |
|
||||||
| 137 | httpKeepAlive | 是 | 否 |
|
| 137 | httpDbNameMandatory | 是 | 否 |
|
||||||
| 138 | enableRecordSql | 是 | 否 |
|
| 138 | httpKeepAlive | 是 | 否 |
|
||||||
| 139 | maxBinaryDisplayWidth | 是 | 否 |
|
| 139 | enableRecordSql | 是 | 否 |
|
||||||
| 140 | stream | 是 | 否 |
|
| 140 | maxBinaryDisplayWidth | 是 | 否 |
|
||||||
| 141 | retrieveBlockingModel | 是 | 否 |
|
| 141 | stream | 是 | 否 |
|
||||||
| 142 | tsdbMetaCompactRatio | 是 | 否 |
|
| 142 | retrieveBlockingModel | 是 | 否 |
|
||||||
| 143 | defaultJSONStrType | 是 | 否 |
|
| 143 | tsdbMetaCompactRatio | 是 | 否 |
|
||||||
| 144 | walFlushSize | 是 | 否 |
|
| 144 | defaultJSONStrType | 是 | 否 |
|
||||||
| 145 | keepTimeOffset | 是 | 否 |
|
| 145 | walFlushSize | 是 | 否 |
|
||||||
| 146 | flowctrl | 是 | 否 |
|
| 146 | keepTimeOffset | 是 | 否 |
|
||||||
| 147 | slaveQuery | 是 | 否 |
|
| 147 | flowctrl | 是 | 否 |
|
||||||
| 148 | adjustMaster | 是 | 否 |
|
| 148 | slaveQuery | 是 | 否 |
|
||||||
| 149 | topicBinaryLen | 是 | 否 |
|
| 149 | adjustMaster | 是 | 否 |
|
||||||
| 150 | telegrafUseFieldNum | 是 | 否 |
|
| 150 | topicBinaryLen | 是 | 否 |
|
||||||
| 151 | deadLockKillQuery | 是 | 否 |
|
| 151 | telegrafUseFieldNum | 是 | 否 |
|
||||||
| 152 | clientMerge | 是 | 否 |
|
| 152 | deadLockKillQuery | 是 | 否 |
|
||||||
| 153 | sdbDebugFlag | 是 | 否 |
|
| 153 | clientMerge | 是 | 否 |
|
||||||
| 154 | odbcDebugFlag | 是 | 否 |
|
| 154 | sdbDebugFlag | 是 | 否 |
|
||||||
| 155 | httpDebugFlag | 是 | 否 |
|
| 155 | odbcDebugFlag | 是 | 否 |
|
||||||
| 156 | monDebugFlag | 是 | 否 |
|
| 156 | httpDebugFlag | 是 | 否 |
|
||||||
| 157 | cqDebugFlag | 是 | 否 |
|
| 157 | monDebugFlag | 是 | 否 |
|
||||||
| 158 | shortcutFlag | 是 | 否 |
|
| 158 | cqDebugFlag | 是 | 否 |
|
||||||
| 159 | probeSeconds | 是 | 否 |
|
| 159 | shortcutFlag | 是 | 否 |
|
||||||
| 160 | probeKillSeconds | 是 | 否 |
|
| 160 | probeSeconds | 是 | 否 |
|
||||||
| 161 | probeInterval | 是 | 否 |
|
| 161 | probeKillSeconds | 是 | 否 |
|
||||||
| 162 | lossyColumns | 是 | 否 |
|
| 162 | probeInterval | 是 | 否 |
|
||||||
| 163 | fPrecision | 是 | 否 |
|
| 163 | lossyColumns | 是 | 否 |
|
||||||
| 164 | dPrecision | 是 | 否 |
|
| 164 | fPrecision | 是 | 否 |
|
||||||
| 165 | maxRange | 是 | 否 |
|
| 165 | dPrecision | 是 | 否 |
|
||||||
| 166 | range | 是 | 否 |
|
| 166 | maxRange | 是 | 否 |
|
||||||
|
| 167 | range | 是 | 否 |
|
||||||
|
|
|
@ -66,6 +66,7 @@ extern int32_t tsNumOfVnodeStreamThreads;
|
||||||
extern int32_t tsNumOfVnodeFetchThreads;
|
extern int32_t tsNumOfVnodeFetchThreads;
|
||||||
extern int32_t tsNumOfVnodeWriteThreads;
|
extern int32_t tsNumOfVnodeWriteThreads;
|
||||||
extern int32_t tsNumOfVnodeSyncThreads;
|
extern int32_t tsNumOfVnodeSyncThreads;
|
||||||
|
extern int32_t tsNumOfVnodeRsmaThreads;
|
||||||
extern int32_t tsNumOfQnodeQueryThreads;
|
extern int32_t tsNumOfQnodeQueryThreads;
|
||||||
extern int32_t tsNumOfQnodeFetchThreads;
|
extern int32_t tsNumOfQnodeFetchThreads;
|
||||||
extern int32_t tsNumOfSnodeSharedThreads;
|
extern int32_t tsNumOfSnodeSharedThreads;
|
||||||
|
|
|
@ -61,6 +61,7 @@ int32_t tsNumOfVnodeStreamThreads = 2;
|
||||||
int32_t tsNumOfVnodeFetchThreads = 4;
|
int32_t tsNumOfVnodeFetchThreads = 4;
|
||||||
int32_t tsNumOfVnodeWriteThreads = 2;
|
int32_t tsNumOfVnodeWriteThreads = 2;
|
||||||
int32_t tsNumOfVnodeSyncThreads = 2;
|
int32_t tsNumOfVnodeSyncThreads = 2;
|
||||||
|
int32_t tsNumOfVnodeRsmaThreads = 2;
|
||||||
int32_t tsNumOfQnodeQueryThreads = 4;
|
int32_t tsNumOfQnodeQueryThreads = 4;
|
||||||
int32_t tsNumOfQnodeFetchThreads = 4;
|
int32_t tsNumOfQnodeFetchThreads = 4;
|
||||||
int32_t tsNumOfSnodeSharedThreads = 2;
|
int32_t tsNumOfSnodeSharedThreads = 2;
|
||||||
|
@ -378,6 +379,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 16);
|
tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 16);
|
||||||
if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
|
tsNumOfVnodeRsmaThreads = tsNumOfCores;
|
||||||
|
tsNumOfVnodeRsmaThreads = TMAX(tsNumOfVnodeRsmaThreads, 4);
|
||||||
|
if (cfgAddInt32(pCfg, "numOfVnodeRsmaThreads", tsNumOfVnodeRsmaThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
tsNumOfQnodeQueryThreads = tsNumOfCores * 2;
|
tsNumOfQnodeQueryThreads = tsNumOfCores * 2;
|
||||||
tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4);
|
tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4);
|
||||||
if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
@ -539,6 +544,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
|
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
|
||||||
tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32;
|
tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32;
|
||||||
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
|
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
|
||||||
|
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
|
||||||
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
|
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
|
||||||
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
|
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
|
||||||
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
|
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
|
||||||
|
@ -783,6 +789,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
|
||||||
tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32;
|
tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32;
|
||||||
} else if (strcasecmp("numOfVnodeSyncThreads", name) == 0) {
|
} else if (strcasecmp("numOfVnodeSyncThreads", name) == 0) {
|
||||||
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
|
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
|
||||||
|
} else if (strcasecmp("numOfVnodeRsmaThreads", name) == 0) {
|
||||||
|
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
|
||||||
} else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) {
|
} else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) {
|
||||||
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
|
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
|
||||||
} else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) {
|
} else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) {
|
||||||
|
|
|
@ -33,7 +33,6 @@ extern "C" {
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
#define RSMA_TASK_INFO_HASH_SLOT (8)
|
#define RSMA_TASK_INFO_HASH_SLOT (8)
|
||||||
#define RSMA_EXECUTOR_MAX (1)
|
|
||||||
|
|
||||||
typedef struct SSmaEnv SSmaEnv;
|
typedef struct SSmaEnv SSmaEnv;
|
||||||
typedef struct SSmaStat SSmaStat;
|
typedef struct SSmaStat SSmaStat;
|
||||||
|
@ -49,9 +48,12 @@ typedef struct SQTaskFWriter SQTaskFWriter;
|
||||||
struct SSmaEnv {
|
struct SSmaEnv {
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
int8_t type;
|
int8_t type;
|
||||||
|
int8_t flag; // 0x01 inClose
|
||||||
SSmaStat *pStat;
|
SSmaStat *pStat;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#define SMA_ENV_FLG_CLOSE ((int8_t)0x1)
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t inited;
|
int8_t inited;
|
||||||
int32_t rsetId;
|
int32_t rsetId;
|
||||||
|
@ -93,7 +95,6 @@ struct SRSmaStat {
|
||||||
int64_t refId; // shared by fetch tasks
|
int64_t refId; // shared by fetch tasks
|
||||||
volatile int64_t nBufItems; // number of items in queue buffer
|
volatile int64_t nBufItems; // number of items in queue buffer
|
||||||
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
|
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
|
||||||
volatile int8_t nExecutor; // [1, max(half of query threads, 4)]
|
|
||||||
int8_t triggerStat; // shared by fetch tasks
|
int8_t triggerStat; // shared by fetch tasks
|
||||||
int8_t commitStat; // 0 not in committing, 1 in committing
|
int8_t commitStat; // 0 not in committing, 1 in committing
|
||||||
SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w)
|
SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w)
|
||||||
|
@ -107,6 +108,7 @@ struct SSmaStat {
|
||||||
SRSmaStat rsmaStat; // rollup sma
|
SRSmaStat rsmaStat; // rollup sma
|
||||||
};
|
};
|
||||||
T_REF_DECLARE()
|
T_REF_DECLARE()
|
||||||
|
char data[];
|
||||||
};
|
};
|
||||||
|
|
||||||
#define SMA_STAT_TSMA(s) (&(s)->tsmaStat)
|
#define SMA_STAT_TSMA(s) (&(s)->tsmaStat)
|
||||||
|
|
|
@ -189,7 +189,6 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
|
||||||
int32_t smaInit();
|
int32_t smaInit();
|
||||||
void smaCleanUp();
|
void smaCleanUp();
|
||||||
int32_t smaOpen(SVnode* pVnode);
|
int32_t smaOpen(SVnode* pVnode);
|
||||||
int32_t smaPreClose(SSma* pSma);
|
|
||||||
int32_t smaClose(SSma* pSma);
|
int32_t smaClose(SSma* pSma);
|
||||||
int32_t smaBegin(SSma* pSma);
|
int32_t smaBegin(SSma* pSma);
|
||||||
int32_t smaSyncPreCommit(SSma* pSma);
|
int32_t smaSyncPreCommit(SSma* pSma);
|
||||||
|
@ -199,7 +198,6 @@ int32_t smaAsyncPreCommit(SSma* pSma);
|
||||||
int32_t smaAsyncCommit(SSma* pSma);
|
int32_t smaAsyncCommit(SSma* pSma);
|
||||||
int32_t smaAsyncPostCommit(SSma* pSma);
|
int32_t smaAsyncPostCommit(SSma* pSma);
|
||||||
int32_t smaDoRetention(SSma* pSma, int64_t now);
|
int32_t smaDoRetention(SSma* pSma, int64_t now);
|
||||||
int32_t smaProcessExec(SSma* pSma, void* pMsg);
|
|
||||||
|
|
||||||
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
||||||
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
||||||
|
@ -323,7 +321,6 @@ struct SVnode {
|
||||||
TdThreadMutex lock;
|
TdThreadMutex lock;
|
||||||
bool blocked;
|
bool blocked;
|
||||||
bool restored;
|
bool restored;
|
||||||
bool inClose;
|
|
||||||
tsem_t syncSem;
|
tsem_t syncSem;
|
||||||
SQHandle* pQuery;
|
SQHandle* pQuery;
|
||||||
};
|
};
|
||||||
|
|
|
@ -23,9 +23,11 @@ extern SSmaMgmt smaMgmt;
|
||||||
|
|
||||||
// declaration of static functions
|
// declaration of static functions
|
||||||
|
|
||||||
|
static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv);
|
||||||
|
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv);
|
||||||
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma);
|
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma);
|
||||||
static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path);
|
static int32_t tdRsmaStartExecutor(const SSma *pSma);
|
||||||
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv);
|
static int32_t tdRsmaStopExecutor(const SSma *pSma);
|
||||||
static void *tdFreeTSmaStat(STSmaStat *pStat);
|
static void *tdFreeTSmaStat(STSmaStat *pStat);
|
||||||
static void tdDestroyRSmaStat(void *pRSmaStat);
|
static void tdDestroyRSmaStat(void *pRSmaStat);
|
||||||
|
|
||||||
|
@ -97,35 +99,42 @@ void smaCleanUp() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path) {
|
static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) {
|
||||||
SSmaEnv *pEnv = NULL;
|
SSmaEnv *pEnv = NULL;
|
||||||
|
|
||||||
pEnv = (SSmaEnv *)taosMemoryCalloc(1, sizeof(SSmaEnv));
|
pEnv = (SSmaEnv *)taosMemoryCalloc(1, sizeof(SSmaEnv));
|
||||||
|
*ppEnv = pEnv;
|
||||||
if (!pEnv) {
|
if (!pEnv) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMA_ENV_TYPE(pEnv) = smaType;
|
SMA_ENV_TYPE(pEnv) = smaType;
|
||||||
|
|
||||||
taosInitRWLatch(&(pEnv->lock));
|
taosInitRWLatch(&(pEnv->lock));
|
||||||
|
|
||||||
|
(smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), *ppEnv)
|
||||||
|
: atomic_store_ptr(&SMA_RSMA_ENV(pSma), *ppEnv);
|
||||||
|
|
||||||
if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma) != TSDB_CODE_SUCCESS) {
|
if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma) != TSDB_CODE_SUCCESS) {
|
||||||
tdFreeSmaEnv(pEnv);
|
tdFreeSmaEnv(pEnv);
|
||||||
return NULL;
|
*ppEnv = NULL;
|
||||||
|
(smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), NULL)
|
||||||
|
: atomic_store_ptr(&SMA_RSMA_ENV(pSma), NULL);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pEnv;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv) {
|
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) {
|
||||||
if (!pEnv) {
|
if (!ppEnv) {
|
||||||
terrno = TSDB_CODE_INVALID_PTR;
|
terrno = TSDB_CODE_INVALID_PTR;
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!(*pEnv)) {
|
if (!(*ppEnv)) {
|
||||||
if (!(*pEnv = tdNewSmaEnv(pSma, smaType, path))) {
|
if (tdNewSmaEnv(pSma, smaType, ppEnv) != TSDB_CODE_SUCCESS) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -199,7 +208,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
|
||||||
* tdInitSmaStat invoked in other multithread environment later.
|
* tdInitSmaStat invoked in other multithread environment later.
|
||||||
*/
|
*/
|
||||||
if (!(*pSmaStat)) {
|
if (!(*pSmaStat)) {
|
||||||
*pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat));
|
*pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat) + sizeof(TdThread) * tsNumOfVnodeRsmaThreads);
|
||||||
if (!(*pSmaStat)) {
|
if (!(*pSmaStat)) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
@ -231,6 +240,10 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
|
||||||
if (!RSMA_INFO_HASH(pRSmaStat)) {
|
if (!RSMA_INFO_HASH(pRSmaStat)) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (tdRsmaStartExecutor(pSma) < 0) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
} else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
} else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
||||||
// TODO
|
// TODO
|
||||||
} else {
|
} else {
|
||||||
|
@ -291,6 +304,9 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// step 4:
|
||||||
|
tdRsmaStopExecutor(pSma);
|
||||||
|
|
||||||
// step 5: free pStat
|
// step 5: free pStat
|
||||||
taosMemoryFreeClear(pStat);
|
taosMemoryFreeClear(pStat);
|
||||||
}
|
}
|
||||||
|
@ -381,17 +397,70 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
|
||||||
pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&SMA_TSMA_ENV(pSma))
|
pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&SMA_TSMA_ENV(pSma))
|
||||||
: atomic_load_ptr(&SMA_RSMA_ENV(pSma));
|
: atomic_load_ptr(&SMA_RSMA_ENV(pSma));
|
||||||
if (!pEnv) {
|
if (!pEnv) {
|
||||||
char rname[TSDB_FILENAME_LEN] = {0};
|
if (tdInitSmaEnv(pSma, smaType, &pEnv) < 0) {
|
||||||
|
|
||||||
if (tdInitSmaEnv(pSma, smaType, rname, &pEnv) < 0) {
|
|
||||||
tdUnLockSma(pSma);
|
tdUnLockSma(pSma);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
(smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), pEnv)
|
|
||||||
: atomic_store_ptr(&SMA_RSMA_ENV(pSma), pEnv);
|
|
||||||
}
|
}
|
||||||
tdUnLockSma(pSma);
|
tdUnLockSma(pSma);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
void *tdRSmaExecutorFunc(void *param) {
|
||||||
|
setThreadName("vnode-rsma");
|
||||||
|
|
||||||
|
tdRSmaProcessExecImpl((SSma *)param, RSMA_EXEC_OVERFLOW);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tdRsmaStartExecutor(const SSma *pSma) {
|
||||||
|
TdThreadAttr thAttr = {0};
|
||||||
|
taosThreadAttrInit(&thAttr);
|
||||||
|
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
|
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||||
|
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
||||||
|
TdThread *pthread = (TdThread *)&pStat->data;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) {
|
||||||
|
if (taosThreadCreate(&pthread[i], &thAttr, tdRSmaExecutorFunc, (void *)pSma) != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
smaError("vgId:%d, failed to create pthread for rsma since %s", SMA_VID(pSma), terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
smaDebug("vgId:%d, success to create pthread for rsma", SMA_VID(pSma));
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadAttrDestroy(&thAttr);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tdRsmaStopExecutor(const SSma *pSma) {
|
||||||
|
if (pSma && VND_IS_RSMA(pSma->pVnode)) {
|
||||||
|
SSmaEnv *pEnv = NULL;
|
||||||
|
SSmaStat *pStat = NULL;
|
||||||
|
SRSmaStat *pRSmaStat = NULL;
|
||||||
|
TdThread *pthread = NULL;
|
||||||
|
|
||||||
|
if (!(pEnv = SMA_RSMA_ENV(pSma)) || !(pStat = SMA_ENV_STAT(pEnv))) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
pEnv->flag |= SMA_ENV_FLG_CLOSE;
|
||||||
|
pRSmaStat = (SRSmaStat *)pStat;
|
||||||
|
pthread = (TdThread *)&pStat->data;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) {
|
||||||
|
tsem_post(&(pRSmaStat->notEmpty));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) {
|
||||||
|
if (taosCheckPthreadValid(pthread[i])) {
|
||||||
|
smaDebug("vgId:%d, start to join pthread for rsma:%" PRId64, SMA_VID(pSma), pthread[i]);
|
||||||
|
taosThreadJoin(pthread[i], NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -146,20 +146,6 @@ int32_t smaClose(SSma *pSma) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t smaPreClose(SSma *pSma) {
|
|
||||||
if (pSma && VND_IS_RSMA(pSma->pVnode)) {
|
|
||||||
SSmaEnv *pEnv = NULL;
|
|
||||||
SRSmaStat *pStat = NULL;
|
|
||||||
if (!(pEnv = SMA_RSMA_ENV(pSma)) || !(pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv))) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
for (int32_t i = 0; i < RSMA_EXECUTOR_MAX; ++i) {
|
|
||||||
tsem_post(&(pStat->notEmpty));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief rsma env restore
|
* @brief rsma env restore
|
||||||
*
|
*
|
||||||
|
|
|
@ -621,7 +621,7 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
|
||||||
*/
|
*/
|
||||||
int32_t smaDoRetention(SSma *pSma, int64_t now) {
|
int32_t smaDoRetention(SSma *pSma, int64_t now) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (VND_IS_RSMA(pSma->pVnode)) {
|
if (!VND_IS_RSMA(pSma->pVnode)) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -734,10 +734,12 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu
|
||||||
|
|
||||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
|
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
|
||||||
|
|
||||||
tsem_post(&(pRSmaStat->notEmpty));
|
|
||||||
|
|
||||||
int64_t nItems = atomic_fetch_add_64(&pRSmaStat->nBufItems, 1);
|
int64_t nItems = atomic_fetch_add_64(&pRSmaStat->nBufItems, 1);
|
||||||
|
|
||||||
|
if (atomic_load_8(&pInfo->assigned) == 0) {
|
||||||
|
tsem_post(&(pRSmaStat->notEmpty));
|
||||||
|
}
|
||||||
|
|
||||||
// smoothing consume
|
// smoothing consume
|
||||||
int32_t n = nItems / RSMA_QTASKEXEC_SMOOTH_SIZE;
|
int32_t n = nItems / RSMA_QTASKEXEC_SMOOTH_SIZE;
|
||||||
if (n > 1) {
|
if (n > 1) {
|
||||||
|
@ -911,39 +913,6 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputTyp
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tdRSmaExecCheck(SSma *pSma) {
|
|
||||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
|
|
||||||
|
|
||||||
if (atomic_load_8(&pRSmaStat->nExecutor) >= TMIN(RSMA_EXECUTOR_MAX, tsNumOfVnodeQueryThreads / 2)) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
SRSmaExecMsg fetchMsg;
|
|
||||||
int32_t contLen = sizeof(SMsgHead);
|
|
||||||
void *pBuf = rpcMallocCont(0 + contLen);
|
|
||||||
|
|
||||||
((SMsgHead *)pBuf)->vgId = SMA_VID(pSma);
|
|
||||||
((SMsgHead *)pBuf)->contLen = sizeof(SMsgHead);
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {
|
|
||||||
.code = 0,
|
|
||||||
.msgType = TDMT_VND_EXEC_RSMA,
|
|
||||||
.pCont = pBuf,
|
|
||||||
.contLen = contLen,
|
|
||||||
};
|
|
||||||
|
|
||||||
if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) {
|
|
||||||
smaError("vgId:%d, failed to put rsma exec msg into query-queue since %s", SMA_VID(pSma), terrstr());
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
smaDebug("vgId:%d, success to put rsma fetch msg into query-queue", SMA_VID(pSma));
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
_err:
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
|
int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
|
||||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||||
if (!pEnv) {
|
if (!pEnv) {
|
||||||
|
@ -974,10 +943,6 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdRSmaExecCheck(pSma) < 0) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tdUidStoreDestory(&uidStore);
|
tdUidStoreDestory(&uidStore);
|
||||||
|
@ -1563,7 +1528,9 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
ASSERT(qItem->level == pItem->level);
|
ASSERT(qItem->level == pItem->level);
|
||||||
ASSERT(qItem->fetchLevel == pItem->fetchLevel);
|
ASSERT(qItem->fetchLevel == pItem->fetchLevel);
|
||||||
#endif
|
#endif
|
||||||
|
if (atomic_load_8(&pRSmaInfo->assigned) == 0) {
|
||||||
tsem_post(&(pStat->notEmpty));
|
tsem_post(&(pStat->notEmpty));
|
||||||
|
}
|
||||||
smaInfo("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64, SMA_VID(pSma), pItem->level,
|
smaInfo("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64, SMA_VID(pSma), pItem->level,
|
||||||
pRSmaInfo->suid);
|
pRSmaInfo->suid);
|
||||||
} break;
|
} break;
|
||||||
|
@ -1591,9 +1558,11 @@ _end:
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tdFreeRSmaSubmitItems(SArray *pItems) {
|
static void tdFreeRSmaSubmitItems(SArray *pItems) {
|
||||||
|
ASSERT(taosArrayGetSize(pItems) > 0);
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) {
|
||||||
taosFreeQitem(*(void **)taosArrayGet(pItems, i));
|
taosFreeQitem(*(void **)taosArrayGet(pItems, i));
|
||||||
}
|
}
|
||||||
|
taosArrayClear(pItems);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1703,6 +1672,7 @@ _err:
|
||||||
* @param type
|
* @param type
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
|
|
||||||
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
||||||
SVnode *pVnode = pSma->pVnode;
|
SVnode *pVnode = pSma->pVnode;
|
||||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||||
|
@ -1722,23 +1692,27 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isBusy = false;
|
|
||||||
while (true) {
|
while (true) {
|
||||||
isBusy = false;
|
|
||||||
// step 1: rsma exec - consume data in buffer queue for all suids
|
// step 1: rsma exec - consume data in buffer queue for all suids
|
||||||
if (type == RSMA_EXEC_OVERFLOW || type == RSMA_EXEC_COMMIT) {
|
if (type == RSMA_EXEC_OVERFLOW || type == RSMA_EXEC_COMMIT) {
|
||||||
void *pIter = taosHashIterate(infoHash, NULL); // infoHash has r/w lock
|
void *pIter = NULL;
|
||||||
while (pIter) {
|
while ((pIter = taosHashIterate(infoHash, pIter))) {
|
||||||
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
|
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
|
||||||
int64_t itemSize = 0;
|
|
||||||
if ((itemSize = taosQueueItemSize(pInfo->queue)) || RSMA_INFO_ITEM(pInfo, 0)->fetchLevel ||
|
|
||||||
RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) {
|
|
||||||
smaDebug("vgId:%d, queueItemSize is %" PRIi64 " execType:%" PRIi8, SMA_VID(pSma), itemSize, type);
|
|
||||||
if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) {
|
if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) {
|
||||||
|
if ((taosQueueItemSize(pInfo->queue) > 0) || RSMA_INFO_ITEM(pInfo, 0)->fetchLevel ||
|
||||||
|
RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) {
|
||||||
|
int32_t batchCnt = -1;
|
||||||
|
int32_t batchMax = taosHashGetSize(infoHash) / tsNumOfVnodeRsmaThreads;
|
||||||
|
bool occupied = (batchMax <= 1);
|
||||||
|
if (batchMax > 1) {
|
||||||
|
batchMax = 100 / batchMax;
|
||||||
|
}
|
||||||
|
while (occupied || (++batchCnt < batchMax)) { // greedy mode
|
||||||
taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock
|
taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock
|
||||||
int32_t qallItemSize = taosQallItemSize(pInfo->qall);
|
int32_t qallItemSize = taosQallItemSize(pInfo->qall);
|
||||||
if (qallItemSize > 0) {
|
if (qallItemSize > 0) {
|
||||||
tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type);
|
tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type);
|
||||||
|
smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi8, SMA_VID(pSma), qallItemSize, type);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (type == RSMA_EXEC_OVERFLOW) {
|
if (type == RSMA_EXEC_OVERFLOW) {
|
||||||
|
@ -1746,17 +1720,25 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (qallItemSize > 0) {
|
if (qallItemSize > 0) {
|
||||||
// subtract the item size after the task finished, commit should wait for all items be consumed
|
|
||||||
atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize);
|
atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize);
|
||||||
isBusy = true;
|
continue;
|
||||||
|
} else if (RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0));
|
ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pIter = taosHashIterate(infoHash, pIter);
|
|
||||||
}
|
|
||||||
if (type == RSMA_EXEC_COMMIT) {
|
if (type == RSMA_EXEC_COMMIT) {
|
||||||
|
if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) {
|
||||||
break;
|
break;
|
||||||
|
} else {
|
||||||
|
// commit should wait for all items be consumed
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#if 0
|
#if 0
|
||||||
|
@ -1790,16 +1772,19 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) {
|
if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) {
|
||||||
if (pVnode->inClose) {
|
if (pEnv->flag & SMA_ENV_FLG_CLOSE) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsem_wait(&pRSmaStat->notEmpty);
|
tsem_wait(&pRSmaStat->notEmpty);
|
||||||
if (pVnode->inClose && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) {
|
|
||||||
smaInfo("vgId:%d, exec task end, inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), pVnode->inClose,
|
if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) {
|
||||||
|
smaInfo("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag,
|
||||||
atomic_load_64(&pRSmaStat->nBufItems));
|
atomic_load_64(&pRSmaStat->nBufItems));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} // end of while(true)
|
} // end of while(true)
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
|
@ -1809,39 +1794,3 @@ _err:
|
||||||
taosArrayDestroy(pSubmitArr);
|
taosArrayDestroy(pSubmitArr);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief exec rsma level 1data, fetch result of level 2/3 and submit
|
|
||||||
*
|
|
||||||
* @param pSma
|
|
||||||
* @param pMsg
|
|
||||||
* @return int32_t
|
|
||||||
*/
|
|
||||||
int32_t smaProcessExec(SSma *pSma, void *pMsg) {
|
|
||||||
SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg;
|
|
||||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
|
|
||||||
|
|
||||||
if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) {
|
|
||||||
terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
smaDebug("vgId:%d, begin to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
|
||||||
|
|
||||||
int8_t nOld = atomic_fetch_add_8(&pRSmaStat->nExecutor, 1);
|
|
||||||
|
|
||||||
if (nOld < TMIN(RSMA_EXECUTOR_MAX, tsNumOfVnodeQueryThreads / 2)) {
|
|
||||||
if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_OVERFLOW) < 0) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
atomic_fetch_sub_8(&pRSmaStat->nExecutor, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
smaDebug("vgId:%d, success to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
_err:
|
|
||||||
atomic_fetch_sub_8(&pRSmaStat->nExecutor, 1);
|
|
||||||
smaError("vgId:%d, failed to process rsma exec msg by TID:%p since %s", SMA_VID(pSma), (void *)taosGetSelfPthreadId(),
|
|
||||||
terrstr());
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
|
@ -87,7 +87,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
||||||
pVnode->msgCb = msgCb;
|
pVnode->msgCb = msgCb;
|
||||||
taosThreadMutexInit(&pVnode->lock, NULL);
|
taosThreadMutexInit(&pVnode->lock, NULL);
|
||||||
pVnode->blocked = false;
|
pVnode->blocked = false;
|
||||||
pVnode->inClose = false;
|
|
||||||
|
|
||||||
tsem_init(&pVnode->syncSem, 0, 0);
|
tsem_init(&pVnode->syncSem, 0, 0);
|
||||||
tsem_init(&(pVnode->canCommit), 0, 1);
|
tsem_init(&(pVnode->canCommit), 0, 1);
|
||||||
|
@ -182,8 +181,6 @@ _err:
|
||||||
void vnodePreClose(SVnode *pVnode) {
|
void vnodePreClose(SVnode *pVnode) {
|
||||||
if (pVnode) {
|
if (pVnode) {
|
||||||
syncLeaderTransfer(pVnode->sync);
|
syncLeaderTransfer(pVnode->sync);
|
||||||
pVnode->inClose = true;
|
|
||||||
smaPreClose(pVnode->pSma);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -301,8 +301,6 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
|
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
|
||||||
case TDMT_SCH_QUERY_CONTINUE:
|
case TDMT_SCH_QUERY_CONTINUE:
|
||||||
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
|
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
|
||||||
case TDMT_VND_EXEC_RSMA:
|
|
||||||
return smaProcessExec(pVnode->pSma, pMsg);
|
|
||||||
default:
|
default:
|
||||||
vError("unknown msg type:%d in query queue", pMsg->msgType);
|
vError("unknown msg type:%d in query queue", pMsg->msgType);
|
||||||
return TSDB_CODE_VND_APP_ERROR;
|
return TSDB_CODE_VND_APP_ERROR;
|
||||||
|
@ -380,14 +378,14 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq,
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SVTrimDbReq trimReq = {0};
|
SVTrimDbReq trimReq = {0};
|
||||||
|
|
||||||
vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);
|
|
||||||
|
|
||||||
// decode
|
// decode
|
||||||
if (tDeserializeSVTrimDbReq(pReq, len, &trimReq) != 0) {
|
if (tDeserializeSVTrimDbReq(pReq, len, &trimReq) != 0) {
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);
|
||||||
|
|
||||||
// process
|
// process
|
||||||
code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp);
|
code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp);
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
|
|
Loading…
Reference in New Issue